AWS GlueのPython Shellジョブを使ってGlue Crawlerを呼ぶ

AWS GlueのPython Shellジョブを使ってGlue Crawlerを呼ぶ:


はじめに

つい先日、AWS GlueにPython Shellというジョブのタイプが追加がされました。
https://aws.amazon.com/jp/about-aws/whats-new/2019/01/introducing-python-shell-jobs-in-aws-glue/

本記事は、GlueのPython Shellについて簡単に説明した上で、サンプルとしてGlue Crawlerを起動するPython Shellを紹介します。


AWS GlueのPython Shellとは?

AWS Glueはサーバレスなコンピューティング環境にScalaやPythonのSparkジョブをサブミットして実行する事ができる、いわばフルマネージドSparkといえるような機能を持っています。

AWS GlueのPython ShellとはそんなGlueのコンピューティング環境でSparkではない普通のPythonスクリプトを実行できる機能です。

雑にまとめると、サーバレス環境でPythonスクリプトを実行できる機能なんですが、それ何てLambda?と思いますよね?(僕も思いました。)


AWS Lambdaとの比較

出自が違うので技術的な比較はしないですが、使い勝手としてGlueのPython ShellとAWS Lambdaを比較してみます。


利点


欠点

  • ランタイムがPython2.7のみ
  • ランタイムがPython2.7のみ
  • ランタイムがPython2.7のみ
  • 最小実行時間が1分とLambdaよりは粒度が荒い

    • ただし課金は秒単位
  • Lambdaほど柔軟な呼び出し方には対応してない

    • Glueトリガーから呼ぶか、がStep Functionsから呼ぶか、API経由で直で呼ぶか、といった感じ
  • 最大並列実行数が3と少なめ
あたりまえですが、普通にPythonスクリプトを実行するだけならLambdaの方が使い勝手が良いので、データ処理の文脈でGlueと絡めて使うのが良いと思います。


料金感

GlueはDPU呼ばれる計算機リソースの単位で課金されます。DPUとは4つのvCoreと16GBのメモリが割り当てられたリソースです。1DPU時間(つまり1DPUを1時間使った場合)で0.44$の料金が発生します。

Sparkを使ったGlueジョブの場合は1DPUが最小の割当単位ですが、Python Shellの場合は0.0625DPU(つまり1/16DPU)か1DPUを選択する事ができます。

0.0625DPUで一秒あたりの料金は

0.0625 * 0.44 * 1/3600 = 0.000007638888889 ($/sec) 
くらいになります。最小実行時間が1分なので、一回の実行当たり最小でも0.00046$くらいかかる感じになります。

Lambdaの場合は1GBのメモリで0.00001667($/sec)、128MBのメモリで0.00000208($/sec)の料金なので料金感としても特別割高という訳では無いのではないでしょうか??


使ってみる

それではPython Shellを使ったジョブを作ってみます。今回はGlue CrawlerをStartした後、その終了を待つスクリプトを作成します。

GlueのPython Shellとして作成することで、以下のような使い道を想定しています。

  • Step FunctionsからGlue Crawlerを起動できるようになる

    • ただしLambdaでも同じ事はできる
    • 強いていえばタイムアウトが無いのがPython Shellとして作る利点
  • Glueトリガーと連携できるようになる

    • 例えばデータを取り込むGlue Jobが成功した場合にCrawler起動するといった連携が可能になる
この使い方の場合、GlueCrawlerの終了を待ってる間、ずっとジョブが起動してる感じになるのでお金がかかってしまうのですが、まぁ仮に1時間起動しっぱなしだっとしても3円くらいなので良しとしています。


スクリプトの作成

GlueのPython Shellは本当にただのPython2のスクリプトです。以下は引数--crawlerで受け取った値のGlue Crawlerを起動して、その終了を待つといった動きをするスクリプトになります。

# -*- coding: utf-8 -*- 
import argparse 
import logging 
import sys 
from time import sleep 
import boto3 
 
# Setup logger 
fmt = "%(asctime)s %(levelname)s %(message)s" 
handler = logging.StreamHandler(sys.stdout) 
handler.setFormatter(logging.Formatter(fmt=fmt)) 
LOGGER = logging.getLogger(__name__) 
LOGGER.setLevel(logging.INFO) 
LOGGER.addHandler(handler) 
 
# Setup glue client 
glue_client = boto3.client('glue') 
 
 
def get_args(): 
    parser = argparse.ArgumentParser() 
    # crawler is glue crawler name 
    parser.add_argument("--crawler", type=str, required=True) 
 
    # parse_known_args return (known_args, unknown_args) 
    args, _ = parser.parse_known_args() 
    return args 
 
 
def start_crawler(crawler_name): 
    glue_client.start_crawler( 
        Name=crawler_name 
    ) 
 
 
def get_crawler(crawler_name): 
    resp = glue_client.get_crawler( 
        Name=crawler_name 
    ) 
    return resp 
 
 
def is_ready(resp): 
    state = resp['Crawler']['State'] 
    if state in ['READY']: 
        return True 
    return False 
 
 
def is_succeeded(resp): 
    last_state = resp['Crawler']['LastCrawl']['Status'] 
    if last_state in ['SUCCEEDED']: 
        return True 
    return False 
 
 
def logging_crawler_error(resp): 
    last_state = resp['Crawler']['LastCrawl']['Status'] 
    message = resp['Crawler']['LastCrawl']['ErrorMessage'] 
    LOGGER.error("Failed to crawl, Status: %s, Message: %s\n", 
                 last_state, message) 
 
 
def wait_for_crawler_until_ready(crawler_name): 
    while(True): 
        resp = get_crawler(crawler_name) 
        if is_ready(resp): 
            return resp 
        sleep(10) 
 
 
def main(): 
    args = get_args() 
    crawler_name = args.crawler 
 
    LOGGER.info("Wait until READY before starting crawler: %s\n", crawler_name) 
    wait_for_crawler_until_ready(crawler_name) 
    LOGGER.info("Start glue crawler: %s\n", crawler_name) 
    start_crawler(crawler_name) 
    LOGGER.info("Wait for crawler to complete: %s\n", crawler_name) 
    resp = wait_for_crawler_until_ready(crawler_name) 
 
    if not is_succeeded(resp): 
        logging_crawler_error(resp) 
        exit(1) 
    LOGGER.info("Succeeded to crawl") 
 
 
if __name__ == '__main__': 
    main() 
 


引数について

GlueのPython Shellは以下のような感じで呼ばれる形になります。

/tmp/glue-python-scripts-XXXXXX/script_name.py --enable-metrics --scriptLocation s3://example-bucket/script_name.py --job-bookmark-option job-bookmark-disable --param-key param-val --job-language python 
Glue実行時に任意のパラメータのキーとバリューを与える事ができますが、Python Shell実行時はその値がそのままスクリプトの引数として与えられます。上のコマンドの例では--param-key param-valが該当します。それ以外にもGlue側が設定するパラメータも引数として渡されます。

そのためスクリプト側でargparseを使って引数を取得しています。Glueはawsglue.utilsというパッケージにgetResolvedOptions関数があり、そちらを使っても引数を取得することができます(多分)。今回はシンプルに標準パッケージのargparseを使いました。


CloudFormationの設定

以下はGlueジョブをデプロイするためのCloudformationを一部抜粋したものです。

Glueジョブで使うスクリプトはS3にあげて置く必要があるので、上述のスクリプトをS3にアップしている前提です。

SyncExecGlueCrawler: 
    Type: AWS::Glue::Job 
    Properties: 
      Name: sync_exec_glue_crawler 
      Role: 
        Fn::GetAtt: [ "GlueJobIamRole", "Arn" ] 
      Command: 
        Name: pythonshell 
        ScriptLocation: "s3://example-bucket/sync_exec_glue_crawler/glue_script.py" 
      ExecutionProperty: 
        MaxConcurrentRuns: 3 
      MaxRetries: 2 
Sparkを使うGlueジョブと比べてProperties:Command:Name:pythonshellを指定しなければならない点が異なります。(Sparkを使ったジョブの場合はglueetlになります。)

こういった感じのCloudFormationを使えばデプロイすることができます。


動かしてみた

デプロイ後に動かしてみたら無事動きました。

トリガーの設定を紐づけて取り込みジョブAが動いた後にクローラーを動かしてみたり、Step Functionsから呼んで動かしてみた場合も無事に動きました。はい。

なんか味気ないですが、スクショとか撮ったりするのがめんどかったので、、、

なかなか便利そうな気がします。が、並列実行数が3なのでいろんな処理から呼んでると痛い目にあいそうな予感、、、


終わりに

以上、GlueのPython Shellを使う一例の紹介でした。

今回はGlueとの連携を意識した処理を作ってみましたが、データ処理周りのバッチは時間がかかる事が多いので、タイムアウトが無いことを活かして普通にバッチ実行環境として採用するのも面白いかなと思いました。

コメント

このブログの人気の投稿

投稿時間:2021-06-17 05:05:34 RSSフィード2021-06-17 05:00 分まとめ(1274件)

投稿時間:2021-06-20 02:06:12 RSSフィード2021-06-20 02:00 分まとめ(3871件)

投稿時間:2020-12-01 09:41:49 RSSフィード2020-12-01 09:00 分まとめ(69件)