AWSで一度Lambdaを利用すると、その利便性から徐々に利用範囲が広がっていくことがよく観測されます。一度だけならと一度手を出すと、いつの間にかLambda以外で動かすことによるイベント連携の手間や、キャパシティプランニング、CI/CDパイプライン構築と運用監視の手間など、フルマネージド及び周辺システムのエコシステムの恩恵を得られなくてイライラする事も、身体に耐性がついてつい利用量が増えていくこともしばしばです。
本記事ではAWS Lambdaでのバッチ処理について検討します。
AWS Lambdaとバッチ処理との相性
そんなAWS Lambdaですが2021.06.01時点では実行時間に制約があり15分です。過去、5分から15分にアップデートされた経緯がありますが、今後も何かしらの制約は残るでしょう。
そのため、AWS Lambdaのサービス単体でバッチ処理を行うのは難しく、他のサービスあるいは、他のサービスと組み合わせてバッチ処理を実現することが多いです。
よく見るのは以下3つでしょうか。
- ECSで処理
- AWS Batchで処理
- Step FunctionsとLambdaを組み合わせ
今回はLambda中毒者らしく、1のECSや2のAWS Batchに頼らず、3のStep Functionsでもなく、LambdaとKinesisでバッチ処理を実現しようと思います。
Scatter(Fanout)パターン
Enterprise Integration Patternで分散・集約するパターンをScatter-Gatherパターンと呼びますが、今回はこちらの分散する部分のみを利用します。GatherなしのScatterパターンとはあまり聞かないので、Fanoutパターンが正しいのかもです。タイトルでは15分の壁を超えてとありますが、個別のLambdaでその壁は超えられないので、複数のLambdaを並列で実行させることで、それら処理時間の合計で仮想的に超えることにします。

何かしらの入力ソースを元に、Lambdaで分散数だけKinesisにジョブを示すメッセージを送信。後続のLambdaで処理を行う構成にします。最初のLambdaがScatterと呼ばれる分散の開始部分です。

Gatherについては、DynamoDBのIncrement an Atomic Counterなどを利用しつつ実装するのかなと思います。同じ設計で実装するのであれば下記のような構成になるかなと思います。

ただ、現実的にはスクラッチでGatherの作り込みを行うのではなく、Step FunctionsやManaged Workflows for Apache Airflowを利用することが大半だと思いますので、本記事では細かい設計・実装について触れていきません。
実装
バッチ処理対象のデータはDynamoDBにあることにします。あるテーブルをフルスキャンしてそのデータを駆動にして何かしら処理を加えると仮定します。
コードはこちらにあげていますので、適時参照ください。
Kinesis Data Streamに連携するペイロード
Jobという名前のStructの形式で連携します。Totalに分散トータル数、Segに今の分散番号を指定する仕様にします。
1 2 3 4 | type Job struct { Total int64 `json:”total”` Seg int64 `json:”seg”` } |
また、処理対象のDynamoDBには7000件弱のデータと登録しました。
Scatter
最初に処理対象のDynamoDBテーブルの件数を取得し、一定のしきい値単位になるように分散数を計算します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | var db = dynamodb.New(session.Must(session.NewSession())) var kc = kinesis.New(session.Must(session.NewSession())) func main() { lambda.Start(Handle) } func Handle(ctx context.Context) error { resp, err := db.ScanWithContext(ctx, &dynamodb.ScanInput{ Select: aws.String(dynamodb.SelectCount), TableName: aws.String(“TestTable”), }) if err != nil { return err } total := 1 if int(*resp.Count) > 1000 { total = int(*resp.Count) / 1000 } for i := 0; i < total; i++ { log.Printf(“i=%d\n”, i) job := Job{ Total: int64(total), Seg: int64(i), } b, err := json.Marshal(job) if err != nil { return err } if _, err = kc.PutRecordWithContext(ctx, &kinesis.PutRecordInput{ StreamName: aws.String(“scatter”), PartitionKey: aws.String(fmt.Sprintf(“partitionKey-%d”, i)), Data: b, }); err != nil { return err } } log.Println(“done”) return nil } |
db.ScanWithContext
でテーブル件数を取得し、今回はしきい値を1000で分散トータル数を導出しています。
あとは、その数だけKinesis Data StreamにJobを送信します。
動かしてみると、以下のような結果がでると、無事KinesisにメッセージをPutできています。
1 2 3 4 5 6 7 8 9 10 11 | START RequestId: 9469393a-eba9-4a21-9e98-8a3fc7f2a3f5 Version: $LATEST 2021/06/01 03:45:44 i=0 2021/06/01 03:45:44 i=1 2021/06/01 03:45:44 i=2 2021/06/01 03:45:44 i=3 2021/06/01 03:45:44 i=4 2021/06/01 03:45:44 i=5 2021/06/01 03:45:44 i=6 2021/06/01 03:45:44 done END RequestId: 9469393a-eba9-4a21-9e98-8a3fc7f2a3f5 REPORT RequestId: 9469393a-eba9-4a21-9e98-8a3fc7f2a3f5 Duration: 299.30 ms Billed Duration: 300 ms Memory Size: 512 MB Max Memory Used: 45 MB Init Duration: 118.30 ms |
並列処理部分
次は、Kinesisのメッセージトリガーで起動するLambdaです。分散処理で行うメインの処理を実装する部分です。
今回は、 executeBizLogic
の中身では単純に指定された件数のカウントを取る内容にしています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | func main() { lambda.Start(Handle) } func Handle(ctx context.Context, e events.KinesisEvent) error { for _, r := range e.Records { var job Job if err := json.Unmarshal(r.Kinesis.Data, &job); err != nil { return err } count, err := executeBizLogic(ctx, job.Total, job.Seg) if err != nil { return err } log.Printf(“count: %v”, count) } return nil } var db = dynamodb.New(session.Must(session.NewSession())) func executeBizLogic(ctx context.Context, total, seg int64) (int64, error) { out, err := db.ScanWithContext(ctx, &dynamodb.ScanInput{ TableName: aws.String(“TestTable”), TotalSegments: aws.Int64(total), Segment: aws.Int64(seg), Select: aws.String(dynamodb.SelectCount), }) if err != nil { return 0, fmt.Errorf(“db.ScanWithContext: %w”, err) } return *out.Count, nil } |
動かしてみると以下のようなログが出る想定です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | 2021-06-01T12:45:46.143+09:00 START RequestId: d44ecc4c-433c-481b-9051-435280c81900 Version: $LATEST 2021-06-01T12:45:46.338+09:00 2021/06/01 03:45:46 count: 1016 2021-06-01T12:45:46.339+09:00 END RequestId: d44ecc4c-433c-481b-9051-435280c81900 2021-06-01T12:45:46.339+09:00 REPORT RequestId: d44ecc4c-433c-481b-9051-435280c81900 Duration: 195.60 ms Billed Duration: 196 ms Memory Size: 512 MB Max Memory Used: 44 MB Init Duration: 107.35 ms 2021-06-01T12:45:46.391+09:00 START RequestId: bd6839aa-bdb0-40f5-9455-a9a445035b4d Version: $LATEST 2021-06-01T12:45:46.406+09:00 2021/06/01 03:45:46 count: 1085 2021-06-01T12:45:46.406+09:00 END RequestId: bd6839aa-bdb0-40f5-9455-a9a445035b4d 2021-06-01T12:45:46.406+09:00 REPORT RequestId: bd6839aa-bdb0-40f5-9455-a9a445035b4d Duration: 10.36 ms Billed Duration: 11 ms Memory Size: 512 MB Max Memory Used: 45 MB aa4f-30057e967514 Duration: 7.72 ms Billed Duration: 8 ms Memory Size: 512 MB Max Memory Used: 45 MB 中略 2021-06-01T12:45:46.611+09:00 START RequestId: 5f9cec98-2434-4872-8de9-df605b3e339c Version: $LATEST 2021-06-01T12:45:46.624+09:00 2021/06/01 03:45:46 count: 1072 2021-06-01T12:45:46.625+09:00 END RequestId: 5f9cec98-2434-4872-8de9-df605b3e339c 2021-06-01T12:45:46.625+09:00 REPORT RequestId: 5f9cec98-2434-4872-8de9-df605b3e339c Duration: 8.59 ms Billed Duration: 9 ms Memory Size: 512 MB Max M |
最初のLambdaでKinesisに呼び出した分だけ、後続のLambdaが起動していることがわかると思います。
この実装の使い所
ジョブの依存関係が多くない場合で、かつECSやAWS Batchを導入せずカジュアルに分散実行したい時に試せるかと思います。
また、今回の例としては分散トータル数+分散番号の処理モデルですが、処理対象のデータモデル次第では、ユーザIDの範囲をしていしたり、良い感じのカーディナリティになる業務コードを指定しても良いかなと思います。支社CDとか営業エリアCDとかです。
リランについて
後続のLambdaのある分散番号だけ障害が発生し、リカバリが必要になった場合の対応について説明します。
今回の実装では作り込んでいないんですが、例えばエラージョブキューを格納するDead Letter Queueを用意するのが一手です。ちょっとツールで利用する用途であれば、連携された連携情報を awscli で直接Kinesisにputするのも一手だと思います
処理データの一貫性
実行するLambdaが分割される関係上、利用するのがPostgreSQLのようなRDBであっても、バッチ処理全体の処理結果の一貫性を保つことはできません。そのため分散処理単位で登録してよいかが利用する上での大前提だと思います。
実用上の話
この手の分散処理を1日に複数回実行し、かつある分散処理番号のみ失敗する場合を考慮すると、システム運用上の負荷が高まります。
そのため、実用に耐えうるためにはScatterのLambdaの起動のたびに一意な実行IDを取得し、それをキーに管理テーブルを用意することが多いと思います。それぞれが成功・失敗・処理中などのステータスを持つと役立つことが多いです。それぞれの処理件数や処理時間を格納すると嬉しいこともおおいでしょう。
一方でそこまでやるのであれば、Managed Workflows for Apache Airflowといったワークフローエンジンの導入も検討したほうが良いかもしれません。
また、分散しなくてもAWS BatchやECSで15分の制約を持たないサービスを選定したほうが良い場面は多分にあるので、程々に利用しましょう。
インフラ上の注意
並列処理Lambdaですが、Kinesisトリガーのバッチサイズは 1
が推奨です。これが10などにすると今回だとタイミングによっては1つのLambdaですべてメッセージを処理してしまう可能性があるからです。
また、 シャードあたりの同時バッチ
で1シャードあたりの同時起動数を制御できます。デフォルトはおそらく1です。現状は1シャードあたり最大10までです。Kinesis Data Streamをジョブキューとして利用すると、だいたいシャードは1なので通常は同時起動数の制約がここで決まります。
通常は10並列で十分かと思いますが、もし上げたい場合はシャードを増やす対応です。多少費用が増えますが通常は誤差の範囲かと思います。