AWS GlueのPython Shellジョブを使ってGlue Crawlerを呼ぶ
AWS GlueのPython Shellジョブを使ってGlue Crawlerを呼ぶ:
つい先日、AWS GlueにPython Shellというジョブのタイプが追加がされました。
https://aws.amazon.com/jp/about-aws/whats-new/2019/01/introducing-python-shell-jobs-in-aws-glue/
本記事は、GlueのPython Shellについて簡単に説明した上で、サンプルとしてGlue Crawlerを起動するPython Shellを紹介します。
AWS Glueはサーバレスなコンピューティング環境にScalaやPythonのSparkジョブをサブミットして実行する事ができる、いわばフルマネージドSparkといえるような機能を持っています。
AWS GlueのPython ShellとはそんなGlueのコンピューティング環境でSparkではない普通のPythonスクリプトを実行できる機能です。
雑にまとめると、サーバレス環境でPythonスクリプトを実行できる機能なんですが、それ何てLambda?と思いますよね?(僕も思いました。)
出自が違うので技術的な比較はしないですが、使い勝手としてGlueのPython ShellとAWS Lambdaを比較してみます。
GlueはDPU呼ばれる計算機リソースの単位で課金されます。DPUとは4つのvCoreと16GBのメモリが割り当てられたリソースです。1DPU時間(つまり1DPUを1時間使った場合)で0.44$の料金が発生します。
Sparkを使ったGlueジョブの場合は1DPUが最小の割当単位ですが、Python Shellの場合は0.0625DPU(つまり1/16DPU)か1DPUを選択する事ができます。
0.0625DPUで一秒あたりの料金は
くらいになります。最小実行時間が1分なので、一回の実行当たり最小でも
Lambdaの場合は1GBのメモリで
それではPython Shellを使ったジョブを作ってみます。今回はGlue CrawlerをStartした後、その終了を待つスクリプトを作成します。
GlueのPython Shellとして作成することで、以下のような使い道を想定しています。
GlueのPython Shellは本当にただのPython2のスクリプトです。以下は引数
GlueのPython Shellは以下のような感じで呼ばれる形になります。
Glue実行時に任意のパラメータのキーとバリューを与える事ができますが、Python Shell実行時はその値がそのままスクリプトの引数として与えられます。上のコマンドの例では
そのためスクリプト側で
以下はGlueジョブをデプロイするためのCloudformationを一部抜粋したものです。
Glueジョブで使うスクリプトはS3にあげて置く必要があるので、上述のスクリプトをS3にアップしている前提です。
Sparkを使うGlueジョブと比べて
こういった感じのCloudFormationを使えばデプロイすることができます。
デプロイ後に動かしてみたら無事動きました。
トリガーの設定を紐づけて取り込みジョブAが動いた後にクローラーを動かしてみたり、Step Functionsから呼んで動かしてみた場合も無事に動きました。はい。
なんか味気ないですが、スクショとか撮ったりするのがめんどかったので、、、
なかなか便利そうな気がします。が、並列実行数が3なのでいろんな処理から呼んでると痛い目にあいそうな予感、、、
以上、GlueのPython Shellを使う一例の紹介でした。
今回はGlueとの連携を意識した処理を作ってみましたが、データ処理周りのバッチは時間がかかる事が多いので、タイムアウトが無いことを活かして普通にバッチ実行環境として採用するのも面白いかなと思いました。
はじめに
つい先日、AWS GlueにPython Shellというジョブのタイプが追加がされました。https://aws.amazon.com/jp/about-aws/whats-new/2019/01/introducing-python-shell-jobs-in-aws-glue/
本記事は、GlueのPython Shellについて簡単に説明した上で、サンプルとしてGlue Crawlerを起動するPython Shellを紹介します。
AWS GlueのPython Shellとは?
AWS Glueはサーバレスなコンピューティング環境にScalaやPythonのSparkジョブをサブミットして実行する事ができる、いわばフルマネージドSparkといえるような機能を持っています。AWS GlueのPython ShellとはそんなGlueのコンピューティング環境でSparkではない普通のPythonスクリプトを実行できる機能です。
雑にまとめると、サーバレス環境でPythonスクリプトを実行できる機能なんですが、それ何てLambda?と思いますよね?(僕も思いました。)
AWS Lambdaとの比較
出自が違うので技術的な比較はしないですが、使い勝手としてGlueのPython ShellとAWS Lambdaを比較してみます。
利点
- タイムアウトに制限が無い
- Glueジョブの一種なのでGlueトリガーから呼び出せる
- データ分析系のライブラリがデフォルトで使える
欠点
- ランタイムがPython2.7のみ
- ランタイムがPython2.7のみ
- ランタイムがPython2.7のみ
- 最小実行時間が1分とLambdaよりは粒度が荒い
- ただし課金は秒単位
- Lambdaほど柔軟な呼び出し方には対応してない
- Glueトリガーから呼ぶか、がStep Functionsから呼ぶか、API経由で直で呼ぶか、といった感じ
- 最大並列実行数が3と少なめ
料金感
GlueはDPU呼ばれる計算機リソースの単位で課金されます。DPUとは4つのvCoreと16GBのメモリが割り当てられたリソースです。1DPU時間(つまり1DPUを1時間使った場合)で0.44$の料金が発生します。Sparkを使ったGlueジョブの場合は1DPUが最小の割当単位ですが、Python Shellの場合は0.0625DPU(つまり1/16DPU)か1DPUを選択する事ができます。
0.0625DPUで一秒あたりの料金は
0.0625 * 0.44 * 1/3600 = 0.000007638888889 ($/sec)
0.00046$
くらいかかる感じになります。Lambdaの場合は1GBのメモリで
0.00001667($/sec)
、128MBのメモリで0.00000208($/sec)
の料金なので料金感としても特別割高という訳では無いのではないでしょうか??
使ってみる
それではPython Shellを使ったジョブを作ってみます。今回はGlue CrawlerをStartした後、その終了を待つスクリプトを作成します。GlueのPython Shellとして作成することで、以下のような使い道を想定しています。
- Step FunctionsからGlue Crawlerを起動できるようになる
- ただしLambdaでも同じ事はできる
- 強いていえばタイムアウトが無いのがPython Shellとして作る利点
- Glueトリガーと連携できるようになる
- 例えばデータを取り込むGlue Jobが成功した場合にCrawler起動するといった連携が可能になる
スクリプトの作成
GlueのPython Shellは本当にただのPython2のスクリプトです。以下は引数--crawler
で受け取った値のGlue Crawlerを起動して、その終了を待つといった動きをするスクリプトになります。# -*- coding: utf-8 -*- import argparse import logging import sys from time import sleep import boto3 # Setup logger fmt = "%(asctime)s %(levelname)s %(message)s" handler = logging.StreamHandler(sys.stdout) handler.setFormatter(logging.Formatter(fmt=fmt)) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.INFO) LOGGER.addHandler(handler) # Setup glue client glue_client = boto3.client('glue') def get_args(): parser = argparse.ArgumentParser() # crawler is glue crawler name parser.add_argument("--crawler", type=str, required=True) # parse_known_args return (known_args, unknown_args) args, _ = parser.parse_known_args() return args def start_crawler(crawler_name): glue_client.start_crawler( Name=crawler_name ) def get_crawler(crawler_name): resp = glue_client.get_crawler( Name=crawler_name ) return resp def is_ready(resp): state = resp['Crawler']['State'] if state in ['READY']: return True return False def is_succeeded(resp): last_state = resp['Crawler']['LastCrawl']['Status'] if last_state in ['SUCCEEDED']: return True return False def logging_crawler_error(resp): last_state = resp['Crawler']['LastCrawl']['Status'] message = resp['Crawler']['LastCrawl']['ErrorMessage'] LOGGER.error("Failed to crawl, Status: %s, Message: %s\n", last_state, message) def wait_for_crawler_until_ready(crawler_name): while(True): resp = get_crawler(crawler_name) if is_ready(resp): return resp sleep(10) def main(): args = get_args() crawler_name = args.crawler LOGGER.info("Wait until READY before starting crawler: %s\n", crawler_name) wait_for_crawler_until_ready(crawler_name) LOGGER.info("Start glue crawler: %s\n", crawler_name) start_crawler(crawler_name) LOGGER.info("Wait for crawler to complete: %s\n", crawler_name) resp = wait_for_crawler_until_ready(crawler_name) if not is_succeeded(resp): logging_crawler_error(resp) exit(1) LOGGER.info("Succeeded to crawl") if __name__ == '__main__': main()
引数について
GlueのPython Shellは以下のような感じで呼ばれる形になります。/tmp/glue-python-scripts-XXXXXX/script_name.py --enable-metrics --scriptLocation s3://example-bucket/script_name.py --job-bookmark-option job-bookmark-disable --param-key param-val --job-language python
--param-key param-val
が該当します。それ以外にもGlue側が設定するパラメータも引数として渡されます。そのためスクリプト側で
argparse
を使って引数を取得しています。Glueはawsglue.utils
というパッケージにgetResolvedOptions
関数があり、そちらを使っても引数を取得することができます(多分)。今回はシンプルに標準パッケージのargparse
を使いました。
CloudFormationの設定
以下はGlueジョブをデプロイするためのCloudformationを一部抜粋したものです。Glueジョブで使うスクリプトはS3にあげて置く必要があるので、上述のスクリプトをS3にアップしている前提です。
SyncExecGlueCrawler: Type: AWS::Glue::Job Properties: Name: sync_exec_glue_crawler Role: Fn::GetAtt: [ "GlueJobIamRole", "Arn" ] Command: Name: pythonshell ScriptLocation: "s3://example-bucket/sync_exec_glue_crawler/glue_script.py" ExecutionProperty: MaxConcurrentRuns: 3 MaxRetries: 2
Properties:
Command:
Name:
にpythonshell
を指定しなければならない点が異なります。(Sparkを使ったジョブの場合はglueetl
になります。)こういった感じのCloudFormationを使えばデプロイすることができます。
動かしてみた
デプロイ後に動かしてみたら無事動きました。トリガーの設定を紐づけて取り込みジョブAが動いた後にクローラーを動かしてみたり、Step Functionsから呼んで動かしてみた場合も無事に動きました。はい。
なんか味気ないですが、スクショとか撮ったりするのがめんどかったので、、、
なかなか便利そうな気がします。が、並列実行数が3なのでいろんな処理から呼んでると痛い目にあいそうな予感、、、
終わりに
以上、GlueのPython Shellを使う一例の紹介でした。今回はGlueとの連携を意識した処理を作ってみましたが、データ処理周りのバッチは時間がかかる事が多いので、タイムアウトが無いことを活かして普通にバッチ実行環境として採用するのも面白いかなと思いました。
コメント
コメントを投稿