Glueの使い方的な㉜(Python Shellを使う)

Glueの使い方的な㉜(Python Shellを使う):


Glue ジョブ の Python shell

Glueのジョブタイプは今まではSpark(PySpark,Scala)だけでしたが、新しくPython Shellというジョブタイプができました。GlueのジョブとしてPythonを実行できます。もちろん並列分散処理するわけではないので以下のようにライトなタスクでの用途を想定しています。そのため料金も秒課金になっています。

https://www.slideshare.net/AmazonWebServices/building-serverless-analytics-pipelines-with-aws-glue-ant308-aws-reinvent-2018

Serverless Edge Node for triggering, light transformations, uncompress, tar extract, Parquet conversion


Python shellを使ったジョブを作る


内容

  • 1つ目のジョブ(pyspark):csvデータをparquetフォーマットに変換し、パーティションを切り、出力ファイルを1つにする。
  • 2つ目のジョブ(python shell):1つ目のジョブの出力ファイル part-xxxxx.snappy.parquetのファイル名をリネームする


流れ

  • 1つ目のPySparkのジョブを実行 => 2つ目のPythonShellのジョブを実行


まず1つ目のジョブ


内容

1つ目のPySparkジョブは以下のリンクの後半にあるse2_job3とほぼ同じ内容です
https://qiita.com/pioho07/items/523aec26ca5dc5bc9697

上記読んでいただければ1つ目のジョブの内容はわかります。簡単に概要だけ説明します。

以下のcsvデータをparquetフォーマットに変換し、'country', 'year','month','day','hour'でパーティションを切ります。一箇所追加したのはrepartition(1)で出力ファイルを1つにしています。


ソースデータ(19件)

※①と同じデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour 
iphone,11111,001,JP,2017,12,14,12 
android,11112,001,FR,2017,12,14,14 
iphone,11113,009,FR,2017,12,16,21 
iphone,11114,007,AUS,2017,12,17,18 
other,11115,005,JP,2017,12,29,15 
iphone,11116,001,JP,2017,12,15,11 
pc,11118,001,FR,2017,12,01,01 
pc,11117,009,FR,2017,12,02,18 
iphone,11119,007,AUS,2017,11,21,14 
other,11110,005,JP,2017,11,29,15 
iphone,11121,001,JP,2017,11,11,12 
android,11122,001,FR,2017,11,30,20 
iphone,11123,009,FR,2017,11,14,14 
iphone,11124,007,AUS,2017,12,17,14 
iphone,11125,005,JP,2017,11,29,15 
iphone,11126,001,JP,2017,12,19,08 
android,11127,001,FR,2017,12,19,14 
iphone,11128,009,FR,2017,12,09,04 
iphone,11129,007,AUS,2017,11,30,14 


ジョブ名

se2_job15(se2_job3とほぼ同じ内容)


クローラー名

se2_in0


S3

in0 (入力)

out15 (出力)


ジョブの実行と確認


以下のジョブを実行する

import sys 
from awsglue.transforms import * 
from awsglue.utils import getResolvedOptions 
from pyspark.context import SparkContext 
from awsglue.context import GlueContext 
from awsglue.job import Job 
 
## @params: [JOB_NAME] 
args = getResolvedOptions(sys.argv, ['JOB_NAME']) 
 
sc = SparkContext() 
glueContext = GlueContext(sc) 
spark = glueContext.spark_session 
job = Job(glueContext) 
job.init(args['JOB_NAME'], args) 
## @type: DataSource 
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"] 
## @return: datasource0 
## @inputs: [] 
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0") 
## @type: ApplyMapping 
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"] 
## @return: applymapping1 
## @inputs: [frame = datasource0] 
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1") 
## @type: ResolveChoice 
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"] 
## @return: resolvechoice2 
## @inputs: [frame = applymapping1] 
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2") 
## @type: DropNullFields 
## @args: [transformation_ctx = "dropnullfields3"] 
## @return: dropnullfields3 
## @inputs: [frame = resolvechoice2] 
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") 
 
###add 
df = dropnullfields3.toDF() 
 
partitionby=['country','year','month','day','hour'] 
output='s3://test-glue00/se2/out15/' 
codec='snappy' 
 
df.repartition(1).write.partitionBy(partitionby).mode("overwrite").parquet(output,compression=codec) 
#df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").format('com.databricks.spark.avro').save(output,compression=codec) 
###add 
 
 
## @type: DataSink 
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out15"}, format = "parquet", transformation_ctx = "datasink4"] 
## @return: datasink4 
## @inputs: [frame = dropnullfields3] 
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out15"}, format = "parquet", transformation_ctx = "datasink4") 
job.commit() 


S3上の出力ファイルを確認

"part-00000-a0be54dc-83d1-4aeb-a167-db87d24457af.c000.snappy.parquet"という名前のファイルが出力されている



スクリーンショット 0031-01-19 11.52.50.png



次に2つ目のジョブ


内容

part-xxxというオブジェクト名を、パーティションのディレクトリの一部の"country=xxx"という名前にリネームする


ジョブ名

se2_job16


S3

out15 (入力/出力)


python shellのジョブの作成と実行

Glueの画面から"ジョブ"->[ジョブの追加]をクリックする



スクリーンショット 0031-01-19 11.57.24.png


ジョブ名"se2_job16"、Typeで"Python shell"を選択し、"ユーザーが作成する新しいスクリプト"にチェックを入れる。



スクリーンショット 0031-01-19 12.00.44.png


"セキュリティの設定"箇所をクリックしオプションを確認してみる。Maximum capacityはデフォルトが0.0625となっている。[次へ]をクリックする

次の画面で"接続"は必要ないので、そのまま[ジョブを保存してスクリプトを編集する]をクリックする

※Capacity:このジョブの実行時に割り当てることができるAWS Glueデータ処理ユニット(DPU)の最大数。 DPUは4vCPU,16GBメモリ。値は0.0625または1に設定できます。デフォルトは0.0625です



スクリーンショット 0031-01-19 12.01.53.png


非常にシンプルな画面が出る



スクリーンショット 0031-01-19 12.06.49.png


コードを書き[保存]をクリックする

今回はpart-xxxxxの長い名前を、パーティションで切っている国名にファイル名をリネームする(オブジェクトコピーして元オブジェクトをデリートだが).

# -*- coding: utf-8 -*- 
import boto3 
import re 
 
s3 = boto3.resource('s3') 
bucket = s3.Bucket('test-glue00') 
bucket_name='test-glue00' 
for object in bucket.objects.filter(Prefix='se2/tmp2/country='): 
    #print(object.key) 
    old_file = object.key 
 
    pattern1 = r'.*part.*' 
    result1 = re.match(pattern1, old_file) 
    if result1: 
        Copy_from = result1.group() 
        Copy_to = result1.group().rsplit('/', 1)[0] + '/' + result1.group().split("/")[2] 
        s3.Object(bucket_name,Copy_to).copy_from(CopySource=bucket_name + '/' + Copy_from ) 
        s3.Object(bucket_name,Copy_from).delete() 
作成されたジョブにチェックを入れ、アクションからジョブの実行を行う



スクリーンショット 0031-01-19 12.10.42.png


ジョブが成功し"country=AUS"の名前でリネームされた。



スクリーンショット 0031-01-19 12.21.59.png



S3 selectで確認

"アクション"から"S3 Select"をクリックし



スクリーンショット 0031-01-19 12.26.26.png


"Parquet"にチェックを入れ、[ファイルプレビューの表示]をクリックし、データが表示されることを確認できる



スクリーンショット 0031-01-19 12.26.39.png



これジョブなので


Glue のトリガとしても設定できます



スクリーンショット 0031-01-19 12.25.28.png



StepFunctionで直接よびだせます

ジョブ1->ジョブ2の単純なフローを作る

{ 
  "Comment": "A state machine that submits a Job to Glue Batch and monitors the Job until it completes.", 
  "StartAt": "Glue PySpark Job", 
  "States": { 
      "Glue PySpark Job": { 
      "Type": "Task", 
      "Resource": "arn:aws:states:::glue:startJobRun.sync", 
      "Parameters": { 
        "JobName": "se2_job15" 
      }, 
      "Next": "Glue PythonShell Job", 
      "Retry": [ 
        { 
          "ErrorEquals": ["States.ALL"], 
          "IntervalSeconds": 1, 
          "MaxAttempts": 3, 
          "BackoffRate": 2.0 
        } 
      ] 
    }, 
    "Glue PythonShell Job": { 
    "Type": "Task", 
    "Resource": "arn:aws:states:::glue:startJobRun.sync", 
    "Parameters": { 
      "JobName": "se2_job16" 
    },"End": true, 
    "Retry": [ 
      { 
        "ErrorEquals": ["States.ALL"], 
        "IntervalSeconds": 1, 
        "MaxAttempts": 3, 
        "BackoffRate": 2.0 
      } 
    ] 
  } 
  } 
} 
 


スクリーンショット 0031-01-19 12.50.21.png



その他


料金

正確にはWebで

https://aws.amazon.com/glue/pricing/

1秒あたりに課金され、PythonシェルタイプのETLジョブごとに最小1分で、1 DPU時間あたり0.44ドル


ライブラリとか

Python 2.7互換スクリプト

サポートライブラリ:

Boto3

collections

CSV

gzip

multiprocessing

NumPy

pandas

pickle

re

SciPy

sklearn

sklearn.feature_extraction

sklearn.preprocessing

xml.etree.ElementTree

zipfile


こちらも是非

Python Shell Job in Glue
https://docs.aws.amazon.com/glue/latest/dg/add-job-python.html

Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f

re:Invent 2018での説明
https://www.slideshare.net/AmazonWebServices/building-serverless-analytics-pipelines-with-aws-glue-ant308-aws-reinvent-2018

コメント

このブログの人気の投稿

投稿時間:2021-06-17 22:08:45 RSSフィード2021-06-17 22:00 分まとめ(2089件)

投稿時間:2021-06-20 02:06:12 RSSフィード2021-06-20 02:00 分まとめ(3871件)

投稿時間:2021-06-17 05:05:34 RSSフィード2021-06-17 05:00 分まとめ(1274件)