Step Functions
AWS Step Functionsは、分散アプリケーションを調整するためのサーバーレスワークフローオーケストレーションを提供します。MBC CQRS Serverlessフレームワークでは、Step Functionsは以下の目的で使用されます:
- 長時間実行ワークフローのオーケストレーション
- 分散トランザクションのためのSagaパターン実装
- Distributed Mapを使用した並列バッチ処理
- コールバックパターンを使用した非同期タスク調整
アーキテクチャ概要
ステートマシン
フレームワークは3つの事前設定済みステートマシンを提供します:
コマンドステートマシン
バージョン管理と並列処理を伴うデータ同期ワークフローを処理します。
主な機能:
- バージョンチェック:コマンドの順序を保証し、競合を防止
- 非同期コールバック:タスクトークンを使用して前のコマンドを待機
- 並列同期:Map状態を使用して複数のターゲットにデータを同期
- TTL管理:レコードの有効期限を自動設定
タスクステートマシン
制御された並行性で並列サブタスクを実行します。
主な機能:
- 制御された並行性:並列実行を制限(デフォルト:2)
- ステータス追跡:リアルタイムのタスクステータス更新
- エラーハンドリング:自動的な障害検出とレポート
CSVインポートステートマシン
AWS Distributed Mapを使用して大規模なCSVファイルを大規模並列処理します。
主な機能:
- S3ネイティブ統合:S3から直接CSVを読み取り
- バッチ処理:効率的な処理のために行をグループ化
- 高並行性:最大50の同時バッチプロセッサをサポート
- EXPRESS実行:子ステートマシンにExpressワークフローを使用
システム構成例
以下の図は、一般的な本番環境でStep FunctionsがどのようにAWSサービスと統合されるかを示しています:
データフローの例
Step Functionsを使用したコマンド実行の一般的なデータフローは以下の通りです:
CDK実装例
完全なコマンドステートマシン
以下のCDKコードは、完全なコマンドハンドラーステートマシンの作成方法を示しています:
import * as cdk from 'aws-cdk-lib';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as logs from 'aws-cdk-lib/aws-logs';
import { Construct } from 'constructs';
export class CommandStateMachineConstruct extends Construct {
public readonly stateMachine: sfn.StateMachine;
constructor(scope: Construct, id: string, props: { lambdaFunction: lambda.IFunction }) {
super(scope, id);
const { lambdaFunction } = props;
// Helper function to create Lambda invoke tasks
const createLambdaTask = (
stateName: string,
integrationPattern: sfn.IntegrationPattern = sfn.IntegrationPattern.REQUEST_RESPONSE
) => {
const payload: Record<string, any> = {
'source': 'step-function',
'context.$': '$$',
'input.$': '$',
};
// Add task token for callback pattern
if (integrationPattern === sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN) {
payload['taskToken'] = sfn.JsonPath.taskToken;
}
return new tasks.LambdaInvoke(this, stateName, {
lambdaFunction,
payload: sfn.TaskInput.fromObject(payload),
stateName,
outputPath: '$.Payload[0][0]',
integrationPattern,
retryOnServiceExceptions: true,
});
};
// Define states
const fail = new sfn.Fail(this, 'fail', {
stateName: 'fail',
causePath: '$.cause',
errorPath: '$.error',
});
const success = new sfn.Succeed(this, 'success', {
stateName: 'success',
});
// Create task states
const finish = createLambdaTask('finish').next(success);
const syncData = createLambdaTask('sync_data');
// Map state for parallel data sync
const syncDataAll = new sfn.Map(this, 'sync_data_all', {
stateName: 'sync_data_all',
maxConcurrency: 0, // Unlimited concurrency
itemsPath: sfn.JsonPath.stringAt('$'),
})
.itemProcessor(syncData)
.next(finish);
const transformData = createLambdaTask('transform_data').next(syncDataAll);
const historyCopy = createLambdaTask('history_copy').next(transformData);
const setTtlCommand = createLambdaTask('set_ttl_command').next(historyCopy);
// Callback pattern for waiting on previous command
const waitPrevCommand = createLambdaTask(
'wait_prev_command',
sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN
).next(setTtlCommand);
// Choice state for version checking
const checkVersionResult = new sfn.Choice(this, 'check_version_result', {
stateName: 'check_version_result',
})
.when(sfn.Condition.numberEquals('$.result', 0), setTtlCommand)
.when(sfn.Condition.numberEquals('$.result', 1), waitPrevCommand)
.when(sfn.Condition.numberEquals('$.result', -1), fail)
.otherwise(waitPrevCommand);
const checkVersion = createLambdaTask('check_version').next(checkVersionResult);
// Create log group
const logGroup = new logs.LogGroup(this, 'StateMachineLogGroup', {
logGroupName: '/aws/vendedlogs/states/command-handler-logs',
removalPolicy: cdk.RemovalPolicy.DESTROY,
retention: logs.RetentionDays.SIX_MONTHS,
});
// Create state machine
this.stateMachine = new sfn.StateMachine(this, 'CommandHandlerStateMachine', {
stateMachineName: 'command-handler',
comment: 'Handles command stream processing with version control',
definitionBody: sfn.DefinitionBody.fromChainable(checkVersion),
tracingEnabled: true,
logs: {
destination: logGroup,
level: sfn.LogLevel.ALL,
},
});
}
}