AWS lambdaを使ってS3の特定ファイルの存在を確認する

AWS lambdaを使ってS3の特定ファイルの存在を確認する:


本記事でやること

  • S3の特定バケットに指定ファイルが存在しているのかを確認する

    • もしファイルの存在が確認出来たら他のlambdaを起動させる
    • もし確認出来なければ一定の時間を空けて再度チェックする
イメージとしては、以下のようなアーキテクチャを想定しています。

また、S3の指定ファイルは日時で作成されている事を前提としています。



スクリーンショット 2019-01-13 14.27.54.png


本記事でやらないことは以下の通りなので、他の記事を参照してください。

  • Cloudwatch Eventsの設定
  • IAMロールの作成
  • lambda関数の作成の仕方


対象読者

  • AWSの各サービスの役割を一通り知っている方
  • AWS lambdaを動かしてみたい方
  • Python入門者


使用言語

  • Python 3.6.3


S3の特定バケット以下の構成

  • ファイルの存在を確認するバケットの階層は以下のようになっている事を想定しています。

    • 前日の日付のディレクトリにファイルが存在しているかをチェックしていきます。(今日が2019-01-02であれば前日の2019-01-01にtest.txtがあるのかを確認します。)
bucket 
├── 2019-01-01 
│   └── test.txt 
├── 2019-01-02 
│   └── test.txt 
├── 2019-01-03 
│    └── test.txt 
. 
. 
. 
  • またファイルが存在していない場合は、250秒待って再度ファイルの確認をしていきます。ここら辺は各々都合の良いように変えてください。


lambdaスクリプト

lambda_function.py
import json 
import time 
import logging 
import boto3 
from datetime import datetime, timedelta, timezone 
 
 
# データ確認用のバケット 
BUCKET = "test" 
# 存在を確認するファイル名 
FILE_NAME = "test.txt" 
 
# ファイルが存在していた場合の次に起動させるlambda関数名 
JOB_SENDER = "batch-job-sender" 
# ファイルが存在していない場合の再度起動させる自分の関数名 
DATA_CHECKER = "data_checker" 
 
# ファイルが存在していない場合の待ち時間 
WAITING_TIME = 250 
 
# loggerの作成 
LOGGER = logging.getLogger() 
LOGGER.setLevel(logging.INFO) 
 
 
def lambda_handler(event, _context): 
    LOGGER.info(event) 
 
    s3_client = boto3.client("s3") 
    lambda_client = boto3.client("lambda") 
 
    # 今日の日付を取得する 
    str_dt = get_date() 
    # 前日の日付を取得する 
    one_days_before = add_days(str_dt, -1) 
 
    # 指定ファイルが存在しているかどうかのステータス取得 
    status = check_data(s3_client, one_days_before) 
    execute(event, WAITING_TIME, lambda_client, status) 
 
 
def execute(event, waiting_time, lambda_client, status): 
 
    if status:  # 対象のバケットにファイルが存在しているとき 
        lambda_client.invoke( 
            FunctionName=JOB_SENDER, 
            InvocationType="RequestResponse", 
            Payload=json.dumps(event), 
            LogType="Tail" 
        ) 
 
    else:  # 対象のバケットにファイルが存在していないとき 
        time.sleep(waiting_time) 
        lambda_client.invoke( 
            FunctionName=DATA_CHECKER, 
            InvocationType="Event", 
            Payload=json.dumps(event), 
            LogType="None" 
        ) 
 
 
def check_data(s3_client, date): 
 
    prefix = date 
    response = s3_client.list_objects( 
        Bucket=BUCKET, 
        Prefix=prefix 
    ) 
 
    assumed_keys = [f'{date}/{FILE_NAME}'] 
    try: 
        keys = [content['Key'] for content in response['Contents']] 
        status = set(assumed_keys).issubset(keys) 
    except KeyError: 
        status = False 
 
    return status 
 
 
def get_date(): 
    jst = timezone(timedelta(hours=+9), "JST") 
    jst_now = datetime.now(jst) 
    dt = datetime.strftime(jst_now, "%Y-%m-%d") 
 
    return dt 
 
 
def datetime_to_str(date: datetime) -> str: 
    year = str(date.year) 
    month = str("{0:02d}".format(date.month)) 
    day = str("{0:02d}".format(date.day)) 
    str_date = '{0}-{1}-{2}'.format(year, month, day) 
 
    return str_date 
 
 
def str_to_datetime(str_date: str) -> datetime: 
 
    return datetime.strptime(str_date, '%Y-%m-%d') 
 
 
def add_days(str_dt: str, days: int) -> str: 
    datetime_dt = str_to_datetime(str_dt) 
    n_days_after = datetime_dt + timedelta(days=days) 
    str_n_days_after = datetime_to_str(n_days_after) 
 
    return str_n_days_after 
 


終わりに

AWS S3内に毎日何がしかのファイルを生成させている場合や前日分に生成されたファイルを使って何か処理を行う場合などにlambdaを使って確認することが出来、次の処理を行うlambdaを起動させることが出来ます。

次は、前回書いた「AWS lambdaを使ってBatchにjobを投げる」を組み合わせて、「S3に指定ファイルが存在している事を確認したら次のlambdaを起動しBatchにジョブを投げる」アーキテクチャを実現させてみたいと思います。

コメント

このブログの人気の投稿

投稿時間:2021-06-20 02:06:12 RSSフィード2021-06-20 02:00 分まとめ(3871件)

投稿時間:2021-04-30 23:37:32 RSSフィード2021-04-30 23:00 分まとめ(42件)

投稿時間:2023-02-05 02:09:04 RSSフィード2023-02-05 02:00 分まとめ(9件)