Glueの使い方的な㉜(Python Shellを使う)
Glueの使い方的な㉜(Python Shellを使う):
Glueのジョブタイプは今まではSpark(PySpark,Scala)だけでしたが、新しくPython Shellというジョブタイプができました。GlueのジョブとしてPythonを実行できます。もちろん並列分散処理するわけではないので以下のようにライトなタスクでの用途を想定しています。そのため料金も秒課金になっています。
https://www.slideshare.net/AmazonWebServices/building-serverless-analytics-pipelines-with-aws-glue-ant308-aws-reinvent-2018
1つ目のPySparkジョブは以下のリンクの後半にあるse2_job3とほぼ同じ内容です
https://qiita.com/pioho07/items/523aec26ca5dc5bc9697
上記読んでいただければ1つ目のジョブの内容はわかります。簡単に概要だけ説明します。
以下のcsvデータをparquetフォーマットに変換し、'country', 'year','month','day','hour'でパーティションを切ります。一箇所追加したのはrepartition(1)で出力ファイルを1つにしています。
※①と同じデータ
se2_job15(se2_job3とほぼ同じ内容)
se2_in0
in0 (入力)
out15 (出力)
"part-00000-a0be54dc-83d1-4aeb-a167-db87d24457af.c000.snappy.parquet"という名前のファイルが出力されている
part-xxxというオブジェクト名を、パーティションのディレクトリの一部の"country=xxx"という名前にリネームする
se2_job16
out15 (入力/出力)
Glueの画面から"ジョブ"->[ジョブの追加]をクリックする
ジョブ名"se2_job16"、Typeで"Python shell"を選択し、"ユーザーが作成する新しいスクリプト"にチェックを入れる。
"セキュリティの設定"箇所をクリックしオプションを確認してみる。Maximum capacityはデフォルトが0.0625となっている。[次へ]をクリックする
次の画面で"接続"は必要ないので、そのまま[ジョブを保存してスクリプトを編集する]をクリックする
※Capacity:このジョブの実行時に割り当てることができるAWS Glueデータ処理ユニット(DPU)の最大数。 DPUは4vCPU,16GBメモリ。値は0.0625または1に設定できます。デフォルトは0.0625です
非常にシンプルな画面が出る
コードを書き[保存]をクリックする
今回はpart-xxxxxの長い名前を、パーティションで切っている国名にファイル名をリネームする(オブジェクトコピーして元オブジェクトをデリートだが).
作成されたジョブにチェックを入れ、アクションからジョブの実行を行う
ジョブが成功し"country=AUS"の名前でリネームされた。
"アクション"から"S3 Select"をクリックし
"Parquet"にチェックを入れ、[ファイルプレビューの表示]をクリックし、データが表示されることを確認できる
ジョブ1->ジョブ2の単純なフローを作る
正確にはWebで
https://aws.amazon.com/glue/pricing/
1秒あたりに課金され、PythonシェルタイプのETLジョブごとに最小1分で、1 DPU時間あたり0.44ドル
Python 2.7互換スクリプト
サポートライブラリ:
Boto3
collections
CSV
gzip
multiprocessing
NumPy
pandas
pickle
re
SciPy
sklearn
sklearn.feature_extraction
sklearn.preprocessing
xml.etree.ElementTree
zipfile
Python Shell Job in Glue
https://docs.aws.amazon.com/glue/latest/dg/add-job-python.html
Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f
re:Invent 2018での説明
https://www.slideshare.net/AmazonWebServices/building-serverless-analytics-pipelines-with-aws-glue-ant308-aws-reinvent-2018
Glue ジョブ の Python shell
Glueのジョブタイプは今まではSpark(PySpark,Scala)だけでしたが、新しくPython Shellというジョブタイプができました。GlueのジョブとしてPythonを実行できます。もちろん並列分散処理するわけではないので以下のようにライトなタスクでの用途を想定しています。そのため料金も秒課金になっています。https://www.slideshare.net/AmazonWebServices/building-serverless-analytics-pipelines-with-aws-glue-ant308-aws-reinvent-2018
Serverless Edge Node for triggering, light transformations, uncompress, tar extract, Parquet conversion
Python shellを使ったジョブを作る
内容
- 1つ目のジョブ(pyspark):csvデータをparquetフォーマットに変換し、パーティションを切り、出力ファイルを1つにする。
- 2つ目のジョブ(python shell):1つ目のジョブの出力ファイル part-xxxxx.snappy.parquetのファイル名をリネームする
流れ
- 1つ目のPySparkのジョブを実行 => 2つ目のPythonShellのジョブを実行
まず1つ目のジョブ
内容
1つ目のPySparkジョブは以下のリンクの後半にあるse2_job3とほぼ同じ内容ですhttps://qiita.com/pioho07/items/523aec26ca5dc5bc9697
上記読んでいただければ1つ目のジョブの内容はわかります。簡単に概要だけ説明します。
以下のcsvデータをparquetフォーマットに変換し、'country', 'year','month','day','hour'でパーティションを切ります。一箇所追加したのはrepartition(1)で出力ファイルを1つにしています。
ソースデータ(19件)
※①と同じデータcsvlog.csv
deviceid,uuid,appid,country,year,month,day,hour iphone,11111,001,JP,2017,12,14,12 android,11112,001,FR,2017,12,14,14 iphone,11113,009,FR,2017,12,16,21 iphone,11114,007,AUS,2017,12,17,18 other,11115,005,JP,2017,12,29,15 iphone,11116,001,JP,2017,12,15,11 pc,11118,001,FR,2017,12,01,01 pc,11117,009,FR,2017,12,02,18 iphone,11119,007,AUS,2017,11,21,14 other,11110,005,JP,2017,11,29,15 iphone,11121,001,JP,2017,11,11,12 android,11122,001,FR,2017,11,30,20 iphone,11123,009,FR,2017,11,14,14 iphone,11124,007,AUS,2017,12,17,14 iphone,11125,005,JP,2017,11,29,15 iphone,11126,001,JP,2017,12,19,08 android,11127,001,FR,2017,12,19,14 iphone,11128,009,FR,2017,12,09,04 iphone,11129,007,AUS,2017,11,30,14
ジョブ名
se2_job15(se2_job3とほぼ同じ内容)
クローラー名
se2_in0
S3
in0 (入力)out15 (出力)
ジョブの実行と確認
以下のジョブを実行する
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1") ## @type: ResolveChoice ## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"] ## @return: resolvechoice2 ## @inputs: [frame = applymapping1] resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2") ## @type: DropNullFields ## @args: [transformation_ctx = "dropnullfields3"] ## @return: dropnullfields3 ## @inputs: [frame = resolvechoice2] dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") ###add df = dropnullfields3.toDF() partitionby=['country','year','month','day','hour'] output='s3://test-glue00/se2/out15/' codec='snappy' df.repartition(1).write.partitionBy(partitionby).mode("overwrite").parquet(output,compression=codec) #df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").format('com.databricks.spark.avro').save(output,compression=codec) ###add ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out15"}, format = "parquet", transformation_ctx = "datasink4"] ## @return: datasink4 ## @inputs: [frame = dropnullfields3] #datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out15"}, format = "parquet", transformation_ctx = "datasink4") job.commit()
S3上の出力ファイルを確認
"part-00000-a0be54dc-83d1-4aeb-a167-db87d24457af.c000.snappy.parquet"という名前のファイルが出力されている
次に2つ目のジョブ
内容
part-xxxというオブジェクト名を、パーティションのディレクトリの一部の"country=xxx"という名前にリネームする
ジョブ名
se2_job16
S3
out15 (入力/出力)
python shellのジョブの作成と実行
Glueの画面から"ジョブ"->[ジョブの追加]をクリックするジョブ名"se2_job16"、Typeで"Python shell"を選択し、"ユーザーが作成する新しいスクリプト"にチェックを入れる。
"セキュリティの設定"箇所をクリックしオプションを確認してみる。Maximum capacityはデフォルトが0.0625となっている。[次へ]をクリックする
次の画面で"接続"は必要ないので、そのまま[ジョブを保存してスクリプトを編集する]をクリックする
※Capacity:このジョブの実行時に割り当てることができるAWS Glueデータ処理ユニット(DPU)の最大数。 DPUは4vCPU,16GBメモリ。値は0.0625または1に設定できます。デフォルトは0.0625です
非常にシンプルな画面が出る
コードを書き[保存]をクリックする
今回はpart-xxxxxの長い名前を、パーティションで切っている国名にファイル名をリネームする(オブジェクトコピーして元オブジェクトをデリートだが).
# -*- coding: utf-8 -*- import boto3 import re s3 = boto3.resource('s3') bucket = s3.Bucket('test-glue00') bucket_name='test-glue00' for object in bucket.objects.filter(Prefix='se2/tmp2/country='): #print(object.key) old_file = object.key pattern1 = r'.*part.*' result1 = re.match(pattern1, old_file) if result1: Copy_from = result1.group() Copy_to = result1.group().rsplit('/', 1)[0] + '/' + result1.group().split("/")[2] s3.Object(bucket_name,Copy_to).copy_from(CopySource=bucket_name + '/' + Copy_from ) s3.Object(bucket_name,Copy_from).delete()
ジョブが成功し"country=AUS"の名前でリネームされた。
S3 selectで確認
"アクション"から"S3 Select"をクリックし"Parquet"にチェックを入れ、[ファイルプレビューの表示]をクリックし、データが表示されることを確認できる
これジョブなので
Glue のトリガとしても設定できます
StepFunctionで直接よびだせます
ジョブ1->ジョブ2の単純なフローを作る{ "Comment": "A state machine that submits a Job to Glue Batch and monitors the Job until it completes.", "StartAt": "Glue PySpark Job", "States": { "Glue PySpark Job": { "Type": "Task", "Resource": "arn:aws:states:::glue:startJobRun.sync", "Parameters": { "JobName": "se2_job15" }, "Next": "Glue PythonShell Job", "Retry": [ { "ErrorEquals": ["States.ALL"], "IntervalSeconds": 1, "MaxAttempts": 3, "BackoffRate": 2.0 } ] }, "Glue PythonShell Job": { "Type": "Task", "Resource": "arn:aws:states:::glue:startJobRun.sync", "Parameters": { "JobName": "se2_job16" },"End": true, "Retry": [ { "ErrorEquals": ["States.ALL"], "IntervalSeconds": 1, "MaxAttempts": 3, "BackoffRate": 2.0 } ] } } }
その他
料金
正確にはWebでhttps://aws.amazon.com/glue/pricing/
1秒あたりに課金され、PythonシェルタイプのETLジョブごとに最小1分で、1 DPU時間あたり0.44ドル
ライブラリとか
Python 2.7互換スクリプトサポートライブラリ:
Boto3
collections
CSV
gzip
multiprocessing
NumPy
pandas
pickle
re
SciPy
sklearn
sklearn.feature_extraction
sklearn.preprocessing
xml.etree.ElementTree
zipfile
こちらも是非
Python Shell Job in Gluehttps://docs.aws.amazon.com/glue/latest/dg/add-job-python.html
Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f
re:Invent 2018での説明
https://www.slideshare.net/AmazonWebServices/building-serverless-analytics-pipelines-with-aws-glue-ant308-aws-reinvent-2018
コメント
コメントを投稿