AWS Athenaで結果を強引にJSONで受け取る方法

AWS Athenaで結果を強引にJSONで受け取る方法:


tl;dr

  • Athenaの実行結果をJSONで受け取りたかった
  • SQLですべてのカラムを一つのJSONにキャストすればJSONで受け取れる
  • 本格的にETLするならGlueジョブを使いましょう


はじめに

AWS AthenaはマネージドPrestoとも言えるサービスでクラスタを管理しなくても手軽にS3のデータにクエリを投げれる良いサービスです。

一方でAthenaはINSERT文に対応していなかったり、クエリの出力形式がCSVかつ無圧縮しか選択できなかったりと、なんらかETLをするには不向きなサービスです(2018/10/15時点)。本格的なETLにはGlueを使うのがいいんですが、整形されたデータをチョロチョロっとサマって他のプログラムの入力にしたいみたいケースでは手軽にAthenaで済ませたくなります。そういった際に出力をプログラムで扱いやすいJSON形式でできるとなお嬉しいです。

ということで以下のようなクエリを書いてみました。


クエリ

こんな感じで出力結果をMAP経由でJSONにキャストすると結果を一つのJSON形式にできます。(サブクエリの中は適当なサンプルデータです。)

SELECT 
  CAST( MAP( 
    ARRAY[ 
      'name', 
      'age', 
      'skils', 
      'account' 
    ], 
    ARRAY[ 
      CAST(name as JSON), 
      CAST(age as JSON), 
      CAST(skils as JSON), 
      CAST(account as JSON) 
    ] 
  ) AS JSON) AS json_data 
FROM ( 
  SELECT 
    'kanga' as name, 
    100 as age, 
    ARRAY['sql','aws'] as skils, 
    MAP( 
      ARRAY['qiita', 'twitter'], 
      ARRAY['kanga', 'kanga333'] 
    ) as account 
); 
こんな感じに結果が受け取れます。

json.csv
"json_data" 
"{""account"":{""qiita"":""kanga"",""twitter"":""kanga333""},""age"":100,""name"":""kanga"",""skils"":[""sql"",""aws""]}" 
厳密には出力はJSON形式では無く、1カラムのCSVの文字列の中にJSONが入ってる感じになります。

利用する際は文字列からデコードする必要があります。

直接csvをロードする場合はこんな感じです。

load.py
import csv 
import json 
 
with open("json.csv") as f: 
    csv_records = csv.reader(f) 
    # Skip header 
    next(csv_records, None) 
    for csv_record in csv_records: 
        json_record = json.loads(csv_record[0]) 
        # Output: kanga 
        print(json_record['name']) 
        # Output: {'qiita': 'kanga', 'twitter': 'kanga333'} 
        print(json_record['account']) 
AWS APIのget_query_resultsから利用する際も同じ感じです。

get_query_results.py
import json 
import boto3 
 
athena = boto3.client('athena') 
response = athena.get_query_results( 
    QueryExecutionId='15299086-c4e6-425a-9b61-d360a875225d' 
) 
rows = response['ResultSet']['Rows'] 
 
for row in rows[1:]: 
    data = row['Data'][0]['VarCharValue'] 
    json_record = json.loads(data) 
    # Output: kanga 
    print(json_record['name']) 
    # Output: kanga 
    print(json_record['account']) 


おわりに

以上、Athenaの結果をJSONで受け取る方法でした。とりあえず現状、データをサマって抽出した上で何らかプログラムの入力にする、とかそういったケースで使えなくもないかな?と思います。

一方で、この方法で出力されるファイルは無圧縮のJSONになるので、大量のレコードを出力したい場合には使わない方が良いでしょう。そういったケースでは素直にGlueを使うかEMRでクラスタを立てましょう。

Athenaが正式にJSON形式での結果出力をサポートする日を待ってます、あるいはINSERT文対応。あとは結果を圧縮して格納するオプションも出ると嬉しいなぁ。。。

コメント

このブログの人気の投稿

投稿時間:2021-06-17 22:08:45 RSSフィード2021-06-17 22:00 分まとめ(2089件)

投稿時間:2021-06-20 02:06:12 RSSフィード2021-06-20 02:00 分まとめ(3871件)

投稿時間:2021-06-17 05:05:34 RSSフィード2021-06-17 05:00 分まとめ(1274件)