SAM CLI + Golang でlambda s3 to s3を実現する

SAM CLI + Golang でlambda s3 to s3を実現する:

このエントリはただの集団 Advent Calendar 2018の16日目の記事です。


概要

S3にgzipがputされたのを感知し、lambdaで編集&gzip化、S3にuploadする処理です。
AWS Design.png

環境

  • Golang 1.11.2
  • SAN CLI 0.6.2
  • Docker for Mac
  • LocalStack
  • Goland
  • MacOS


開発


1. 環境構築

まずは開発環境を用意します。下記を実行するとsampleが生成されます。

$ brew tap aws/tap 
$ brew install aws-sam-cli 
$ sam init --runtime go 
その他の使い方は下記のリンクを見ていただくと良いと思います
https://github.com/awslabs/aws-sam-cli/blob/develop/docs/usage.rst#generate-sample-event-payloads


2. パッケージ構成

ざっくりとしたパッケージ構成とその説明です。

. 
├── src                          
│   ├── main.go                 <-- Lambda function code 
│   └── main_test.go            <-- Unit tests 
├── template                     
│   ├── local.yaml              <-- sam cliの定義ファイル(local) 
│   └── staging.yaml            <-- sam cliの定義ファイル(staging) 
├── testdata                     
│   └── example.json.gz         <-- test data       
├── docker-compose.yaml         <-- localでS3を再現するためのlocalstack 
├── event_file.json             <-- localでevent起動するときのrequestファイル(sam local generate-event s3 put で生成し、編集) 
├── Makefile                    <-- command実行ファイル 
├── packaged.yaml               <-- sam package時生成される。deploy時に必要 
└── README.md 


3. 実行ファイル

開発はTDDで行いましたので、まずはtestの解説を...

TestMainで前処理としてbucketの作成、及び初期データを片方のbucketに突っ込んでいます。

その後は関数ごとにtestを書いています。

testの実行方法はcd src go testです(事前にenv TMPDIR=/private$TMPDIR docker-compose up -dでdockerを起動しておいてください)

main_test.go
package main 
 
import ( 
    "bytes" 
    "compress/gzip" 
    "context" 
    "encoding/json" 
    "fmt" 
    "github.com/aws/aws-lambda-go/events" 
    "github.com/aws/aws-sdk-go/aws" 
    "github.com/aws/aws-sdk-go/aws/awserr" 
    "github.com/aws/aws-sdk-go/aws/session" 
    "github.com/aws/aws-sdk-go/service/s3" 
    "github.com/aws/aws-sdk-go/service/s3/s3manager" 
    "io/ioutil" 
    "os" 
    "testing" 
    "time" 
) 
 
func TestMain(m *testing.M) { 
    // test前処理 
    println("before all...") 
 
    os.Setenv("REGION", "ap-northeast-1") 
    os.Setenv("S3_ENDPOINT", "http://localhost:4572") 
    os.Setenv("TARGET_S3", "bucket-example-convert") 
 
    var sess = session.Must(session.NewSession(&aws.Config{ 
        S3ForcePathStyle: aws.Bool(true), 
        Region:           aws.String(os.Getenv("REGION")), 
        Endpoint:         aws.String(os.Getenv("S3_ENDPOINT")), 
    })) 
    var creater = s3.New(sess) 
    var uploader = s3manager.NewUploader(sess) 
 
    _, err := creater.CreateBucket(&s3.CreateBucketInput{ 
        Bucket: aws.String("bucket-example"), 
    }) 
 
    if err != nil { 
        if aerr, ok := err.(awserr.Error); ok { 
            switch aerr.Code() { 
            case s3.ErrCodeBucketAlreadyExists: 
                fmt.Println(s3.ErrCodeBucketAlreadyExists, aerr.Error()) 
            case s3.ErrCodeBucketAlreadyOwnedByYou: 
                fmt.Println(s3.ErrCodeBucketAlreadyOwnedByYou, aerr.Error()) 
            default: 
                fmt.Println(aerr.Error()) 
            } 
        } else { 
            fmt.Println(err.Error()) 
        } 
    } 
    _, err = creater.CreateBucket(&s3.CreateBucketInput{ 
        Bucket: aws.String("bucket-example-convert"), 
    }) 
 
    if err != nil { 
        if aerr, ok := err.(awserr.Error); ok { 
            switch aerr.Code() { 
            case s3.ErrCodeBucketAlreadyExists: 
                fmt.Println(s3.ErrCodeBucketAlreadyExists, aerr.Error()) 
            case s3.ErrCodeBucketAlreadyOwnedByYou: 
                fmt.Println(s3.ErrCodeBucketAlreadyOwnedByYou, aerr.Error()) 
            default: 
                fmt.Println(aerr.Error()) 
            } 
        } else { 
            fmt.Println(err.Error()) 
        } 
    } 
 
    up, err := os.Open("./testdata/example.json.gz") 
    if err != nil { 
        fmt.Println("failed to open file") 
        return 
    } 
 
    gzip.NewWriter(up).Flush() 
 
    _, err = uploader.Upload(&s3manager.UploadInput{ 
        Bucket: aws.String("bucket-example"), 
        Key:    aws.String("example.json.gz"), 
        Body:   up, 
    }) 
    if err != nil { 
        fmt.Println("failed to upload file") 
        return 
    } 
 
    // test実行 
    code := m.Run() 
    // test後実行 
    println("after all...") 
    os.Exit(code) 
} 
 
func TestS3Upload(t *testing.T) { 
    t.Run("upload", func(t *testing.T) { 
        var buf bytes.Buffer 
        result, err := s3Upload(buf) 
        if err != nil { 
            t.Fatal("Error failed to s3upload") 
        } 
        if result.Location == "" { 
            t.Errorf("got: %v\nwant: %v", result.UploadID, "") 
        } 
        fmt.Println("Test s3upload...") 
    }) 
} 
 
func TestCompress(t *testing.T) { 
    t.Run("compress", func(t *testing.T) { 
        data := []SampleConvertData{ 
            {12345678, "abcdefgh", time.Now().String()}, 
            {23456781, "bcdefgha", time.Now().String()}} 
 
        var buf bytes.Buffer 
        err := compress(&buf, data) 
        if err != nil { 
            t.Fatal("Error failed to compress") 
        } 
        if len(buf.Bytes()) == 0 { 
            t.Fatal("Error failed to compress") 
            t.Errorf("got: %v\nwant: %v", buf.Bytes(), 0) 
        } 
        fmt.Println("Test compress...") 
    }) 
} 
 
func TestConvert(t *testing.T) { 
    t.Run("convert", func(t *testing.T) { 
        data := []SampleData{{12345678, "abcdefgh"}} 
        expected := time.Now().String() 
 
        convertData, err := convert(data, expected) 
        if err != nil { 
            t.Fatal("Error failed to convert") 
        } 
        if convertData[0].Time != expected { 
            t.Errorf("got: %v\nwant: %v", convertData[0].Time, expected) 
        } 
        fmt.Println("Test convert...") 
    }) 
} 
 
func TestExtract(t *testing.T) { 
    t.Run("extract", func(t *testing.T) { 
        file, _ := os.Open("./testdata/example.json.gz") 
        defer file.Close() 
        actual, err := extract(file) 
        if err != nil { 
            t.Fatal("Error failed to extract") 
        } 
        expected := "abcdefgh" 
        if actual[0].Value != expected { 
            t.Errorf("got: %v\nwant: %v", actual[0].Value, expected) 
        } 
        fmt.Println("Test extract...") 
    }) 
} 
 
func TestS3Download(t *testing.T) { 
    t.Run("s3 download test", func(t *testing.T) { 
        tmpFile, err := s3Download("bucket-example", "example.json.gz") 
        if err != nil { 
            t.Fatal("Error failed to s3 download") 
        } 
        if tmpFile.Name() == "" { 
            t.Errorf("got: %v\nwant: %v", "", "/tmp/srctmp_*********") 
        } 
        fmt.Println("Test s3Download...") 
    }) 
} 
 
func TestHandler(t *testing.T) { 
    t.Run("handler input test", func(t *testing.T) { 
        raw, err := ioutil.ReadFile("../event_file.json") 
        if err != nil { 
            t.Fatal("Error failed to event file load") 
        } 
        var event events.S3Event 
        json.Unmarshal(raw, &event) 
        err = handler(context.Background(), event) 
        if err != nil { 
            t.Fatal("Error failed to s3 event") 
        } 
        fmt.Println("Test handler...") 
    }) 
} 
mainです。

苦労したところはgolang特有tempFileを用意してのdownload処理、gzipへの圧縮処理です。

参考資料(gzip): http://text.baldanders.info/golang/gzip-operation/

main.go
package main 
 
import ( 
    "bytes" 
    "compress/gzip" 
    "context" 
    "encoding/json" 
    "fmt" 
    "github.com/aws/aws-lambda-go/events" 
    "github.com/aws/aws-lambda-go/lambda" 
    "github.com/aws/aws-sdk-go/aws" 
    "github.com/aws/aws-sdk-go/aws/session" 
    "github.com/aws/aws-sdk-go/service/s3" 
    "github.com/aws/aws-sdk-go/service/s3/s3manager" 
    "github.com/pkg/errors" 
    "io" 
    "io/ioutil" 
    "os" 
    "time" 
) 
 
type SampleData struct { 
    Id    int    `json:"id"` 
    Value string `json:"value"` 
} 
 
type SampleConvertData struct { 
    Id    int    `json:"id"` 
    Value string `json:"value"` 
    Time  string `json:"time"` 
} 
 
func createSession() *session.Session { 
    var sess = session.Must(session.NewSession(&aws.Config{ 
        S3ForcePathStyle: aws.Bool(true), 
        Region:           aws.String(os.Getenv("REGION")), 
        Endpoint:         aws.String(os.Getenv("S3_ENDPOINT")), 
    })) 
    return sess 
} 
 
func s3Upload(buf bytes.Buffer) (*s3manager.UploadOutput, error) { 
    sess := createSession() 
 
    var uploader = s3manager.NewUploader(sess) 
 
    result, err := uploader.Upload(&s3manager.UploadInput{ 
        Bucket: aws.String(os.Getenv("TARGET_S3")), 
        Key:    aws.String("example-convert.json.gz"), 
        Body:   bytes.NewReader(buf.Bytes()), 
    }) 
    if err != nil { 
        return nil, errors.Wrap(err, "failed to upload file") 
    } 
 
    return result, err 
} 
 
func compress(w io.Writer, convertData []SampleConvertData) error { 
    b, _ := json.Marshal(convertData) 
    gw, err := gzip.NewWriterLevel(w, gzip.BestCompression) 
    gw.Write(b) 
    defer gw.Close() 
    return err 
} 
 
func convert(data []SampleData, time string) ([]SampleConvertData, error) { 
    var dataConvert []SampleConvertData 
    for _, d := range data { 
        dataConvert = append(dataConvert, SampleConvertData{ 
            Id:    d.Id, 
            Value: d.Value, 
            Time:  time, 
        }) 
    } 
    return dataConvert, nil 
} 
 
func extract(file *os.File) ([]SampleData, error) { 
    gzipReader, _ := gzip.NewReader(file) 
    defer gzipReader.Close() 
 
    raw, err := ioutil.ReadAll(gzipReader) 
    if err != nil { 
        fmt.Println(err.Error()) 
    } 
 
    var data []SampleData 
    err = json.Unmarshal(raw, &data) 
    if err != nil { 
        fmt.Println(err.Error()) 
    } 
 
    return data, err 
} 
 
func s3Download(bucket string, key string) (f *os.File, err error) { 
    sess := createSession() 
 
    tmpFile, _ := ioutil.TempFile("/tmp", "srctmp_") 
    defer os.Remove(tmpFile.Name()) 
 
    var downloader = s3manager.NewDownloader(sess) 
 
    _, err = downloader.Download( 
        tmpFile, 
        &s3.GetObjectInput{ 
            Bucket: aws.String(bucket), 
            Key:    aws.String(key), 
        }) 
    if err != nil { 
        return nil, errors.Wrap(err, "file download error") 
    } 
 
    return tmpFile, err 
} 
 
func handler(ctx context.Context, req events.S3Event) error { 
    bucketName := req.Records[0].S3.Bucket.Name 
    key := req.Records[0].S3.Object.Key 
    file, err := s3Download(bucketName, key) 
    if err != nil { 
        return errors.Wrap(err, "Error failed to s3 download") 
    } 
    data, err := extract(file) 
    if err != nil { 
        return errors.Wrap(err, "Error failed to extract") 
    } 
    timeNow := time.Now().String() 
    convertData, err := convert(data, timeNow) 
    if err != nil { 
        return errors.Wrap(err, "Error failed to convert") 
    } 
    var buf bytes.Buffer 
    err = compress(&buf, convertData) 
    if err != nil { 
        return errors.Wrap(err, "Error failed compress") 
    } 
    _, err = s3Upload(buf) 
    if err != nil { 
        return errors.Wrap(err, "Error failed to s3 upload") 
    } 
 
    return nil 
} 
 
func main() { 
    lambda.Start(handler) 
} 


統合テスト,ビルド,デプロイ


1. template.yaml

SAMのYAML定義になります。

ローカルでのテスト用に、local.yamlでは環境変数を設定しています。

AWS上で動く用のstaging.yamlではPoliciesを定義すると、Roleが自動生成されて、lambdaに反映されます。

staging.yaml
AWSTemplateFormatVersion: '2010-09-09' 
Transform: AWS::Serverless-2016-10-31 
Description: > 
  sam-app 
 
  Sample SAM Template for sam-app 
 
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst 
Globals: 
  Function: 
    Timeout: 5 
 
Resources: 
  MainFunction: 
    Type: AWS::Serverless::Function 
    Properties: 
      Handler: main 
      Runtime: go1.x 
      CodeUri: ../src 
      FunctionName: main 
      Description: >- 
        An Amazon S3 trigger that retrieves metadata for the object that has 
        been updated. 
      MemorySize: 128 
      Policies: 
      - Version: '2012-10-17' 
        Statement: 
        - Effect: Allow 
          Action: 
          - 's3:GetObject' 
          - 's3:PutObject' 
          Resource: "arn:aws:s3:::bucket-example-*" 
      Events: 
        S3Event: 
          Type: S3 
          Properties: 
            Bucket: !Ref TestBucket 
            Events: 
            - 's3:ObjectCreated:Put' 
      Environment: 
        Variables: 
          TARGET_S3: "bucket-example-convert-staging" 
 
  TestBucket: 
    Type: 'AWS::S3::Bucket' 
    Properties: 
      BucketName: "bucket-example-staging" 
  TestBucketConvert: 
    Type: 'AWS::S3::Bucket' 
    Properties: 
      BucketName: "bucket-example-convert-staging" 


2. 統合テスト

Makefileのintegration-testingで統合テストをしています。

コマンドはmake integration-testingです。

ここではdockerを一度落として起動し、その後bucketを作成し、sample dataをPUTした後にsam cliのイベントが動くようになっています。

PROJECT_NAME:= "localstack-example" 
 
.PHONY: deps clean build integration-testing deploy 
 
deps: 
    go get -u ./... 
 
clean:  
    rm -rf ./src/main 
 
build: 
    GOOS=linux GOARCH=amd64 go build -o src/main ./src 
 
integration-testing: build 
    docker-compose -p $(PROJECT_NAME) down 
    env TMPDIR=/private$TMPDIR docker-compose -p $(PROJECT_NAME) up -d 
    sleep 5s 
    aws --endpoint-url=http://localhost:4572 s3 mb s3://bucket-example 
    aws --endpoint-url=http://localhost:4572 s3 mb s3://bucket-example-convert 
    aws --endpoint-url=http://localhost:4572 s3 cp ./testdata/example.json.gz s3://bucket-example/example.json.gz 
    sam local invoke MainFunction --event event_file.json --template ./template/local.yaml \ 
    --docker-network $$(docker network ls -q -f name=$(PROJECT_NAME)) 
 
deploy: build 
    sam package --template-file ./template/staging.yaml --s3-bucket package-bucket-example --output-template-file packaged.yaml 
    sam deploy --template-file packaged.yaml --stack-name sam-cli-example --capabilities CAPABILITY_IAM 


3. ビルド

Makefileのbuildでビルドをしています。

コマンドはmake buildです。


3. デプロイ

Makefileのdeployでビルドをしています。

コマンドはmake deployです。

ここではソースをS3にあげて(S3は事前にbucketを作成)、その後AWSにデプロイをしています。


参考


まとめ

SAM CLI + Golang でlambda s3 to s3を実現してみました。

gzip圧縮したりなど、なるべく本番で使う想定で実装してみました。

本当はdeploy周りはCodeBuildを使いたかったのですが、時間が足らず...あとAWS Toolkits for IntelliJもやりたかった。

次回にしたいと思います。

今回使ったプロジェクトは下記にありますので、ご参照ください。
https://github.com/yoshihir/samcli-s3-to-s3-example

コメント

このブログの人気の投稿

投稿時間:2021-06-20 02:06:12 RSSフィード2021-06-20 02:00 分まとめ(3871件)

投稿時間:2021-04-30 23:37:32 RSSフィード2021-04-30 23:00 分まとめ(42件)

投稿時間:2023-02-05 02:09:04 RSSフィード2023-02-05 02:00 分まとめ(9件)