AthenaのCTAS(CREATE TABLE AS SELECT)でETLをするTips
AthenaのCTAS(CREATE TABLE AS SELECT)でETLをするTips:
この記事はAWS Advent Calendar 2018の3日目の記事です。
今年の10月にAthenaがCTAS(
CTASサポート以前のAthenaではクエリの結果を無圧縮のCSVでしか残せなかったのですが、CTASを使うと結果を列指向やJSONなどのフォーマットにしたうえ圧縮をかけて残せるようになりました。
これによりAthenaを使ったデータ加工の芽が出たのでいくつかのデータ加工バッチをAthenaでできるか検討しました。
この記事ではAthenaのCTASを使ったバッチを作る際のTipsについて紹介します。
AthenaはETL無しでS3のデータに直接分析をかけれるよ、というコンセプトを謳っており、現状ETLをするために設計されたサービスな訳ではないように感じています。しかし、それでもAthenaでETLしたい(出来たら嬉しい)理由は以下のような感じです。
結果を残したいクエリの先頭に
例えば以下のようなクエリを実行すれば
そしてその実データは
CTASでできようになったこと、未だにできないことを以下に挙げます。
ここからはCTASでETLするための具体的なTipsに移ります。
基本的に処理は冪等にしたいので、データの更新はなるべく追記ではなく上書きで実現したいです。
以下はCTASでINSERT OVERWRITEっぽいことを実現する方法です。
例えば日別のデータを保存したい場合は以下のような日付パーティションを付与した構造になると思います。
ここではパーティションに更にrevisionというカラムを加えます。
データの更新時は以下のようにバッチ実行時のtimestampをrevisionに指定してCTASによりデータを生成します。
その後、revision=latestというailiasとして作るパーティションが常に最新のrevisionのパーティションを見るように変更します。
CTASで作ったテーブルはいらないので消しときましょう。
これにより
ただしこの方法では
Athenaでデータをサマって結果を他のDB(Dynamoとか)に入れる場合は結果をJSONなど他のプログラムで扱いやすい形式にして出力するケースが多々あると思います。そういった場合に出力するファイル数を調節する方法です。
Bucketingを使えば特定のカラムの値をハッシュキーとして出力ファイルを分散することができます。
この際、出力するファイル数も指定することができるのでこの数を1にすることでファイルを一つにまとめることができます。
データを特定のカラムをもとに分けて出力したいのであればパーティショニングが使えます。
パーティショニングは特定のカラムをもとに出力パスを分けることが可能です。
この際パーティションに指定したカラムは元データから消えてしまうので、元データに残したい場合は重複してSELECTするようにしておきましょう。
Tipsというより注意点なんですが、現状Athenaの同時実行の制限は20クエリまでなのでLimitに引っかからないようにする必要があります。なので細かいクエリをたくさん投げるのには不向きです。Dailyの処理や少数のHourlyバッチで足りるくらいなら良いかもしれません。あるいはLimitに引っかかっても良いように処理は冪等かつリトライ可能にしておいて、自動でリトライするようにしておきましょう。
LambdaからAhenaのクエリを投げるようにしてStep Functionと組み合わせるとバックオフ込みのリトライが簡単に実現できます。
とはいえ、やはりAthena単独でETLを回すのは現状では厳しいのでAWS内だと以下のサービスと併用する必要がありそうです。
以上、AthenaのCTASでETLする際のTipsでした。データの規模にもよりますが、現状だとAthenaのCTASにETLを寄せまくれる感じでは無いので、Insert対応が来たり、同時実行数の増加が来たりしてAthenaがフルマネージドなSQLでETLできるサービスになる日を待ってます。
はじめに
この記事はAWS Advent Calendar 2018の3日目の記事です。今年の10月にAthenaがCTAS(
CREATE TABLE AS SELECT
)をサポートしました。CTASサポート以前のAthenaではクエリの結果を無圧縮のCSVでしか残せなかったのですが、CTASを使うと結果を列指向やJSONなどのフォーマットにしたうえ圧縮をかけて残せるようになりました。
これによりAthenaを使ったデータ加工の芽が出たのでいくつかのデータ加工バッチをAthenaでできるか検討しました。
この記事ではAthenaのCTASを使ったバッチを作る際のTipsについて紹介します。
前提
AthenaでETLしたい理由
AthenaはETL無しでS3のデータに直接分析をかけれるよ、というコンセプトを謳っており、現状ETLをするために設計されたサービスな訳ではないように感じています。しかし、それでもAthenaでETLしたい(出来たら嬉しい)理由は以下のような感じです。- 使うのがめっちゃ楽なフルマネージドサービス
- Presto早い
- Prestoの関数に便利な機能多い
- スキャンしたデータ量に応じた課金なので小規模から始めやすいし、安く使える
- S3のファイルを直接加工するための選択肢として手頃
CTASの使い方
結果を残したいクエリの先頭にCREATE TABLE table_name WITH ( property_name = expression ) AS
をつけるだけです。例えば以下のようなクエリを実行すれば
example
データベースにsample_data
テーブルが作成されます。そしてその実データは
s3://output-bucket/
の配下にPARQUET
形式で保存されます。CREATE TABLE example.sample_data WITH ( format='PARQUET', external_location='s3://output-bucket/' ) AS SELECT * FROM ( VALUES (1, 'a'), (2, 'b'), (3, 'c') ) AS t (id, name)
CTASでできること/できないこと
CTASでできようになったこと、未だにできないことを以下に挙げます。
できるようになったこと
- クエリの結果を新規テーブルとして保存
- S3にフォーマットを指定して保存する
- Parquet/ORC/JSON
- 保存するデータの圧縮
- 保存するデータのバケッティング
- 保存するデータのパーティショニング
未だにできないこと
- データの上書き
- データの追記
- CTASで書き込む先のS3のパスは空である必要があります
- S3にデータだけ書き込む
- 必ずテーブルを作る必要があります
- Avroでデータを保存する
CTASでETLするためのTips
ここからはCTASでETLするための具体的なTipsに移ります。
INSET OVERWRITEっぽいことを実現する
基本的に処理は冪等にしたいので、データの更新はなるべく追記ではなく上書きで実現したいです。以下はCTASでINSERT OVERWRITEっぽいことを実現する方法です。
例えば日別のデータを保存したい場合は以下のような日付パーティションを付与した構造になると思います。
s3://sample-buket/day=yyyymmdd/
CREATE EXTERNAL TABLE sample_table( data STRING) PARTITIONED BY ( day STRING) STORED AS PARQUET LOCATION 's3://sample-buket/' tblproperties ("parquet.compress"="SNAPPY");
s3://sample-buket/day=yyyymmdd/revision=timestamp/
CREATE EXTERNAL TABLE sample_table ( data STRING) PARTITIONED BY ( day STRING, revision STRING) STORED AS PARQUET LOCATION 's3://sample-buket/' tblproperties ("parquet.compress"="SNAPPY");
CREATE TABLE temp_database.tmp_1543818683 WITH ( format='PARQUET', external_location='s3://sample-buket/day=20181203/revision=1543818683/' ) AS -- 何らかSQLによる加工を入れる SELECT data FROM input_table;
-- 初回はADD PARTITION ALTER TABLE sample_table ADD IF NOT EXISTS PARTITION (day='20181203', revision='latest') LOCATION 's3://sample-buket/day=20181203/revision=1543818683/';
-- 変更の際はSET LOCATION ALTER TABLE sample_table PARTITION (day='20181203', revision='latest') SET LOCATION 's3://sample-buket/day=20181203/revision=1543818683/';
DROP TABLE IF EXISTS temp_database.tmp_1543818683;
revision='latest'
のパーティションを見ればデータが常に上書きされているようなテーブルを実現できます。ただしこの方法では
MSCK REPAIR TABLE
でパーティションをリカバリできないので注意が必要です。
出力ファイルの生成数を調整する
Athenaでデータをサマって結果を他のDB(Dynamoとか)に入れる場合は結果をJSONなど他のプログラムで扱いやすい形式にして出力するケースが多々あると思います。そういった場合に出力するファイル数を調節する方法です。
データを一つにまとめる
Bucketingを使えば特定のカラムの値をハッシュキーとして出力ファイルを分散することができます。この際、出力するファイル数も指定することができるのでこの数を1にすることでファイルを一つにまとめることができます。
CREATE TABLE temp_database.tmp_1543818683 WITH ( format='JSON', external_location='s3://output-bucket/revision=1543818683/', -- Bucketingのキーで使うカラム bucketed_by = ARRAY['id'], -- 出力するファイルの数 bucket_count = 1 ) AS SELECT * FROM ( VALUES (1, 'a'), (2, 'b'), (3, 'c') ) AS t (id, name)
結果
s3://output-bucket/revision=1543818683/20181203_070050_00280_qmtgu_bucket-00000.gz
解凍結果.json
{"id":1,"name":"a"} {"id":2,"name":"b"} {"id":3,"name":"c"}
データを特定のカラム別に分けて出力する
データを特定のカラムをもとに分けて出力したいのであればパーティショニングが使えます。パーティショニングは特定のカラムをもとに出力パスを分けることが可能です。
この際パーティションに指定したカラムは元データから消えてしまうので、元データに残したい場合は重複してSELECTするようにしておきましょう。
CREATE TABLE temp_database.tmp_1543818683 WITH ( format='JSON', external_location='s3://output-bucket/revision=1543818684/', -- Bucketingのキーで使うカラム bucketed_by = ARRAY['id'], -- 出力するファイルの数 bucket_count = 1, -- partitioningに使うカラム partitioned_by = ARRAY['partition_id'] ) AS SELECT id, name , -- パーティションに指定するカラムは末尾に書く id AS partition_id, FROM ( VALUES (1, 'a'), (2, 'b'), (3, 'c') ) AS t (id, name)
結果
s3://output-bucket/revision=1543818684/partition_id=1/20181203_070526_00306_ce5f8_bucket-00000.gz s3://output-bucket/revision=1543818684/partition_id=2/20181203_070526_00306_ce5f8_bucket-00000.gz s3://output-bucket/revision=1543818684/partition_id=3/20181203_070526_00306_ce5f8_bucket-00000.gz
partition_id=1のjson
{"id":1,"name":"a"}
partition_id=2のjson
{"id":2,"name":"b"}
partition_id=3のjson
{"id":3,"name":"c"}
QueryLimitに引っかからないように使う
Tipsというより注意点なんですが、現状Athenaの同時実行の制限は20クエリまでなのでLimitに引っかからないようにする必要があります。なので細かいクエリをたくさん投げるのには不向きです。Dailyの処理や少数のHourlyバッチで足りるくらいなら良いかもしれません。あるいはLimitに引っかかっても良いように処理は冪等かつリトライ可能にしておいて、自動でリトライするようにしておきましょう。LambdaからAhenaのクエリを投げるようにしてStep Functionと組み合わせるとバックオフ込みのリトライが簡単に実現できます。
Athenaに限界を感じたら
とはいえ、やはりAthena単独でETLを回すのは現状では厳しいのでAWS内だと以下のサービスと併用する必要がありそうです。- Glue Jobを使う
- AWSのマネージドETLサービス
- 中身はSpark
- 工夫すればSQLでのETL処理が可能
- https://dev.classmethod.jp/cloud/aws/20180528-aws-glue-etl-job-with-spark-sql/
- prestoのSQL互換では無いのでクエリ書き換えは必要
- EMRを使う
- Prestoにクエリを投げることに重きを置くなら
- クラスタの管理がある程度は発生してしまうので面倒
コメント
コメントを投稿