AWS S3にアップロードされたExcelファイルをLambdaで処理
AWS S3にアップロードされたExcelファイルをLambdaで処理:
参考: チュートリアル: AWS Lambda 関数をトリガーするように Amazon SQS キューへの着信メッセージを設定する
概要
- AWSのS3にアップされたExcelファイルをLamda関数を使って処理する方法を試したので、手順を共有する。
課題
- 定型のExcelファイルを使ってデータを納品する業務をシステム化したい。
- なぜExcelか? 古い業界なので、いまでもExcelが活躍している。Excelマクロを組めるとヒーローだ。CSV/TSVのほうが機械処理には楽だが、人間にとってはExcelのほうが運用性・業務親和性が高い。
- そこで担当者がExcelファイルをS3にアップしたら、その内容を自動的にRDSやDynamoに登録するような仕組みを実現したい
課題解決の方針
- S3にExcelファイルがアップされたらEvent機能でSQSに通知して、それをLamda関数で処理するまでの範囲を本稿で検討する。S3EventからLambdaを直接キックしないのは、エラー再処理などの運用性を考慮しているからである。
- システム構成図
- 運用者がExcelファイルをS3の特定パスにアップすることは別のソリューションで行う。S3互換のファイル転送ソフトなどを用いることも可能。
手段の検討
- Lambdaは、ランタイムでPython3系を選択する1。同期処理を考えなくてもよいバッチ処理のようなコーディングには簡易だからである。
- Excelファイル読み取りでxlrdというライブラリを選択する。もしかすると古いExcelファイルも処理したくなるかもしれないので。他のライブラリは試していないので、これがベストな選択かどうかはわからない。そのあとの処理で集計するならPandasもよさそうだ。
- xlrdを使うために、対象ファイルをローカルに取得する必要がある。Lambdaでは一般的にローカル領域として/tmpを使うのだが、インスタンスを共有しているのでファイル名がぶつからないように考慮する。/tmpが満杯になるともちろん処理が失敗するので、処理が終わったらファイルは削除するべき。
参考 AWS Lambda の制限
構築手順
- AWSで 実用的なLambda関数をつくったことがあるレベルの人を前提に以降で手順を紹介する。
S3バケットの作成
- 新規バケットを作成する。バケット名、ARNをメモしておく。
SQSキューを作成
- S3バケットと同じリージョンで、マネジメントコンソールからSQSを選択。
- 新規キューを作成する。
キュー名は notify_upload_excelfile (任意)
標準キューを指定
キューの詳細設定はデフォルトのままでよい。後で実際の業務にあわせて調整すればよい - キューが作成されたら、キューのURLとARNをメモしておく。
- キューを選択して、アクセス許可でS3を追加する。
方法は以下を参考に。サンプルのポリシーをコピペして、環境にあわせて修正すればよい。
バケットを通知用に設定する (メッセージの宛先: SNS トピックおよび SQS キュー)
S3バケットにイベント通知を設定する
- 作成したバケットのプロパティからEventsをクリック
通知の追加
名前 : NotifyExcelFilePut(任意)
「すべてのオブジェクト作成イベント」にチェック
サフィックス: .xlsx
送信先 SQSで上記で作成したキューARNを指定する。
Lambda 関数の作成
- マネジメントコンソールのLambda画面で「一から作成」を選択
名前 : function_readExcelfile
ランタイム : Python 3.7 - ロールは新規作成
ロール名 : lambdaexec_readExcelfile(任意)
作成後に以下アクセス許可を追加
sqs:ChangeMessageVisibility
sqs:DeleteMessage
sqs:GetQueueAttributes
sqs:ReceiveMessage
IAMポリシーの例
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "s3:GetObject", "logs:CreateLogStream", "sqs:ChangeMessageVisibility", "sqs:ReceiveMessage", "sqs:GetQueueAttributes", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:*:*:*", "arn:aws:sqs:us-west-2:xxxxxxxxxxx:notify_upload_excelfile(キューARN)", "arn:aws:s3:::xxxxxxxxxxxxx(バケットARN)/*" ] }, { "Sid": "VisualEditor1", "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:*:*:*" } ] }
- Lambda関数の作成
ローカルPC上で適当なフォルダで以下のPythonファイルを作成する。
lambda_function.py
import logging import os import json import boto3 import urllib.parse from datetime import datetime import xlrd import random import pprint logger = logging.getLogger() logger.setLevel(logging.INFO) count_success=0 s3 = boto3.resource('s3') def read_file(bucket,key): global count_success # ローカルの一時ファイルパスを生成する local_file_path = '/tmp/tmp_file_' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S-') + str(random.randint(0,999999)) logger.info('local_path: ' + local_file_path) try: # S3からファイルを一時パスにダウンロード bucket = s3.Bucket(bucket) bucket.download_file(key, local_file_path) # ダウンロードしたファイルをExcelと解釈して読み込む(エラー処理はしていない) wb = xlrd.open_workbook(local_file_path) sheet = wb.sheet_by_index(0) # 特定のセルから値を取得する例 #logger.info("Cell A01 is {0}".format(sheet.cell_value(rowx=0, colx=0))) # 各行を取り出して成形して表示(実際の処理ではこの部分をカスタマイズする) for rx in range(sheet.nrows): print(sheet.row(rx)) count_success+=1 # 不要な一時ファイルを削除 if os.path.exists(local_file_path): os.remove(local_file_path) # logger.info('tmp-file removed : ' + local_file_path) except Exception as e: logger.info(str(e)) def parce_message(messages): messages_dict = json.loads(messages) if ('Records' in messages_dict): for record in messages_dict['Records']: input_backet = record['s3']['bucket']['name'] input_key = urllib.parse.unquote_plus(record['s3']['object']['key'], encoding='utf-8') # logger.info('update:' + input_backet + ':' + input_key) read_file(input_backet,input_key) else: logger.warning('ignore invalid SQS message: ' + str(messages_dict)) def lambda_handler(event, context): for record in event['Records']: parce_message(record["body"]) logger.info('Success ' + str(count_success) + ' files')
- ライブラリをダウンロードしZipファイルを作成
以下でローカルフォルダにxlrdライブラリを取得。キャッシュファイルは削除して、Zipファイルに固める
pip install xlrd -t .
- ZipファイルをマネジメントコンソールのLambda関数の関数コードの箇所で、Zipファイルをロードして保存
- Lamda関数のトリガーにSQSを指定
上記キューのARNを指定
バッチサイズは 1 - S3バケットにExcelファイルをアップして、CloudWatchログで出力を確認する。
まとめ
- S3にExcelファイルをアップして、Lambda関数で自動的に取得しExcelファイルの内容にアクセスできることを確認した。
- 日本語処理については完全性は保証できないが、Windows10環境のOffice2016で作成したExcelファイル名が日本語、データに日本語を含む場合に日本語を処理できたことは確認した。
今後の課題
- Excelファイルはxlrdの処理前にチェックしていないが、Lambda関数の実行リソースに制限があるのでExcelファイルが信用できない場合は、対策が必要である2。
- 現状のコードだと、コード中のtryで囲った範囲外でエラーした場合は、何度もリトライして処理が詰まる状態になる。キューのメッセージ数の監視と、エラーになった場合の処理フローや、キュー設定のチューニング、デッドレターキュー設定を検討すべきである。
- SQSの仕様上、標準キューはメッセージ順序、排他取得を保証していない。またLambdaの処理が開始する前に、同じExcelファイルが更新される可能性があるので、Excelの処理ロジックは疎連携を考慮した内容にすべきである3。
参考文献
- xlrdのドキュメント
https://github.com/python-excel/xlrd
- PythonでExcelファイルを扱うライブラリの比較
https://note.nkmk.me/python-excel-library/
- PythonでExcelファイルを読み込み・書き込みするxlrd, xlwtの使い方
https://note.nkmk.me/python-xlrd-xlwt-usage/
- S3からのファイル取得とローカル保存
https://recipe.kc-cloud.jp/archives/10035
- AWS Lambda を利用する上で知っておいたほうがよいこと
https://www.bokukoko.info/entry/2015/09/17/AWS_Lambda_%E3%82%92%E5%88%A9%E7%94%A8%E3%81%99%E3%82%8B%E4%B8%8A%E3%81%A7%E7%9F%A5%E3%81%A3%E3%81%A6%E3%81%8A%E3%81%84%E3%81%9F%E3%81%BB%E3%81%86%E3%81%8C%E3%82%88%E3%81%84%E3%81%93%E3%81%A8
コメント
コメントを投稿