AWS Athenaで結果を強引にJSONで受け取る方法
AWS Athenaで結果を強引にJSONで受け取る方法:
AWS AthenaはマネージドPrestoとも言えるサービスでクラスタを管理しなくても手軽にS3のデータにクエリを投げれる良いサービスです。
一方でAthenaはINSERT文に対応していなかったり、クエリの出力形式がCSVかつ無圧縮しか選択できなかったりと、なんらかETLをするには不向きなサービスです(2018/10/15時点)。本格的なETLにはGlueを使うのがいいんですが、整形されたデータをチョロチョロっとサマって他のプログラムの入力にしたいみたいケースでは手軽にAthenaで済ませたくなります。そういった際に出力をプログラムで扱いやすいJSON形式でできるとなお嬉しいです。
ということで以下のようなクエリを書いてみました。
こんな感じで出力結果をMAP経由でJSONにキャストすると結果を一つのJSON形式にできます。(サブクエリの中は適当なサンプルデータです。)
こんな感じに結果が受け取れます。
厳密には出力はJSON形式では無く、1カラムのCSVの文字列の中にJSONが入ってる感じになります。
利用する際は文字列からデコードする必要があります。
直接csvをロードする場合はこんな感じです。
AWS APIのget_query_resultsから利用する際も同じ感じです。
以上、Athenaの結果をJSONで受け取る方法でした。とりあえず現状、データをサマって抽出した上で何らかプログラムの入力にする、とかそういったケースで使えなくもないかな?と思います。
一方で、この方法で出力されるファイルは無圧縮のJSONになるので、大量のレコードを出力したい場合には使わない方が良いでしょう。そういったケースでは素直にGlueを使うかEMRでクラスタを立てましょう。
Athenaが正式にJSON形式での結果出力をサポートする日を待ってます、あるいはINSERT文対応。あとは結果を圧縮して格納するオプションも出ると嬉しいなぁ。。。
tl;dr
- Athenaの実行結果をJSONで受け取りたかった
- SQLですべてのカラムを一つのJSONにキャストすればJSONで受け取れる
- 本格的にETLするならGlueジョブを使いましょう
はじめに
AWS AthenaはマネージドPrestoとも言えるサービスでクラスタを管理しなくても手軽にS3のデータにクエリを投げれる良いサービスです。一方でAthenaはINSERT文に対応していなかったり、クエリの出力形式がCSVかつ無圧縮しか選択できなかったりと、なんらかETLをするには不向きなサービスです(2018/10/15時点)。本格的なETLにはGlueを使うのがいいんですが、整形されたデータをチョロチョロっとサマって他のプログラムの入力にしたいみたいケースでは手軽にAthenaで済ませたくなります。そういった際に出力をプログラムで扱いやすいJSON形式でできるとなお嬉しいです。
ということで以下のようなクエリを書いてみました。
クエリ
こんな感じで出力結果をMAP経由でJSONにキャストすると結果を一つのJSON形式にできます。(サブクエリの中は適当なサンプルデータです。)SELECT CAST( MAP( ARRAY[ 'name', 'age', 'skils', 'account' ], ARRAY[ CAST(name as JSON), CAST(age as JSON), CAST(skils as JSON), CAST(account as JSON) ] ) AS JSON) AS json_data FROM ( SELECT 'kanga' as name, 100 as age, ARRAY['sql','aws'] as skils, MAP( ARRAY['qiita', 'twitter'], ARRAY['kanga', 'kanga333'] ) as account );
json.csv
"json_data" "{""account"":{""qiita"":""kanga"",""twitter"":""kanga333""},""age"":100,""name"":""kanga"",""skils"":[""sql"",""aws""]}"
利用する際は文字列からデコードする必要があります。
直接csvをロードする場合はこんな感じです。
load.py
import csv import json with open("json.csv") as f: csv_records = csv.reader(f) # Skip header next(csv_records, None) for csv_record in csv_records: json_record = json.loads(csv_record[0]) # Output: kanga print(json_record['name']) # Output: {'qiita': 'kanga', 'twitter': 'kanga333'} print(json_record['account'])
get_query_results.py
import json import boto3 athena = boto3.client('athena') response = athena.get_query_results( QueryExecutionId='15299086-c4e6-425a-9b61-d360a875225d' ) rows = response['ResultSet']['Rows'] for row in rows[1:]: data = row['Data'][0]['VarCharValue'] json_record = json.loads(data) # Output: kanga print(json_record['name']) # Output: kanga print(json_record['account'])
おわりに
以上、Athenaの結果をJSONで受け取る方法でした。とりあえず現状、データをサマって抽出した上で何らかプログラムの入力にする、とかそういったケースで使えなくもないかな?と思います。一方で、この方法で出力されるファイルは無圧縮のJSONになるので、大量のレコードを出力したい場合には使わない方が良いでしょう。そういったケースでは素直にGlueを使うかEMRでクラスタを立てましょう。
Athenaが正式にJSON形式での結果出力をサポートする日を待ってます、あるいはINSERT文対応。あとは結果を圧縮して格納するオプションも出ると嬉しいなぁ。。。
コメント
コメントを投稿