Step Functions
AWS Step Functionsは、分散アプリケーションを調整するためのサーバーレスワークフローオーケストレーションを提供します。MBC CQRS Serverlessフレームワークでは、Step Functionsは以下の目的で使用されます:
- 長時間実行ワークフローのオーケストレーション
- 分散トランザクションのためのSagaパターン実装
- Distributed Mapを使用した並列バッチ処理
- コールバックパターンを使用した非同期タスク調整
アーキテクチャ概要
ステートマシン
フレームワークは3つの事前設定済みステートマシンを提供します:
コマンドステートマシン
バージョン管理と並列処理を伴うデータ同期ワークフローを処理します。
主な機能:
- バージョンチェック:コマンドの順序を保証し、競合を防止
- 非同期コールバック:タスクトークンを使用して前のコマンドを待機
- 並列同期:Map状態を使用して複数のターゲットにデータを同期
- TTL管理:レコードの有効期限を自動設定
タスクステートマシン
制御された並行性で並列サブタスクを実行します。
主な機能:
- 制御された並行性:並列実行を制限(デフォルト:2)
- ステータス追跡:リアルタイムのタスクステータス更新
- エラーハンドリング:自動的な障害検出とレポート
CSVインポートステートマシン
AWS Distributed Mapを使用して大規模なCSVファイルを大規模並列処理します。
主な機能:
- S3ネイティブ統合:S3から直接CSVを読み取り
- バッチ処理:効率的な処理のために行をグルー プ化
- 高並行性:最大50の同時バッチプロセッサをサポート
- EXPRESS実行:子ステートマシンにExpressワークフローを使用
システム構成例
以下の図は、一般的な本番環境でStep FunctionsがどのようにAWSサービスと統合されるかを示しています:
データフローの例
Step Functionsを使用したコマンド実行の一般的なデータフローは以下の通りです:
CDK実装例
完全なコマンドステートマシン
以下のCDKコードは、完全なコマンドハンドラーステートマシンの作成方法を示しています:
import * as cdk from 'aws-cdk-lib';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as logs from 'aws-cdk-lib/aws-logs';
import { Construct } from 'constructs';
export class CommandStateMachineConstruct extends Construct {
public readonly stateMachine: sfn.StateMachine;
constructor(scope: Construct, id: string, props: { lambdaFunction: lambda.IFunction }) {
super(scope, id);
const { lambdaFunction } = props;
// Helper function to create Lambda invoke tasks
const createLambdaTask = (
stateName: string,
integrationPattern: sfn.IntegrationPattern = sfn.IntegrationPattern.REQUEST_RESPONSE
) => {
const payload: Record<string, any> = {
'source': 'step-function',
'context.$': '$$',
'input.$': '$',
};
// Add task token for callback pattern
if (integrationPattern === sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN) {
payload['taskToken'] = sfn.JsonPath.taskToken;
}
return new tasks.LambdaInvoke(this, stateName, {
lambdaFunction,
payload: sfn.TaskInput.fromObject(payload),
stateName,
outputPath: '$.Payload[0][0]',
integrationPattern,
retryOnServiceExceptions: true,
});
};
// Define states
const fail = new sfn.Fail(this, 'fail', {
stateName: 'fail',
causePath: '$.cause',
errorPath: '$.error',
});
const success = new sfn.Succeed(this, 'success', {
stateName: 'success',
});
// Create task states
const finish = createLambdaTask('finish').next(success);
const syncData = createLambdaTask('sync_data');
// Map state for parallel data sync
const syncDataAll = new sfn.Map(this, 'sync_data_all', {
stateName: 'sync_data_all',
maxConcurrency: 0, // Unlimited concurrency
itemsPath: sfn.JsonPath.stringAt('$'),
})
.itemProcessor(syncData)
.next(finish);
const transformData = createLambdaTask('transform_data').next(syncDataAll);
const historyCopy = createLambdaTask('history_copy').next(transformData);
const setTtlCommand = createLambdaTask('set_ttl_command').next(historyCopy);
// Callback pattern for waiting on previous command
const waitPrevCommand = createLambdaTask(
'wait_prev_command',
sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN
).next(setTtlCommand);
// Choice state for version checking
const checkVersionResult = new sfn.Choice(this, 'check_version_result', {
stateName: 'check_version_result',
})
.when(sfn.Condition.numberEquals('$.result', 0), setTtlCommand)
.when(sfn.Condition.numberEquals('$.result', 1), waitPrevCommand)
.when(sfn.Condition.numberEquals('$.result', -1), fail)
.otherwise(waitPrevCommand);
const checkVersion = createLambdaTask('check_version').next(checkVersionResult);
// Create log group
const logGroup = new logs.LogGroup(this, 'StateMachineLogGroup', {
logGroupName: '/aws/vendedlogs/states/command-handler-logs',
removalPolicy: cdk.RemovalPolicy.DESTROY,
retention: logs.RetentionDays.SIX_MONTHS,
});
// Create state machine
this.stateMachine = new sfn.StateMachine(this, 'CommandHandlerStateMachine', {
stateMachineName: 'command-handler',
comment: 'Handles command stream processing with version control',
definitionBody: sfn.DefinitionBody.fromChainable(checkVersion),
tracingEnabled: true,
logs: {
destination: logGroup,
level: sfn.LogLevel.ALL,
},
});
}
}
制御された並行性を持つタスクステートマシン
export class TaskStateMachineConstruct extends Construct {
public readonly stateMachine: sfn.StateMachine;
constructor(scope: Construct, id: string, props: { lambdaFunction: lambda.IFunction }) {
super(scope, id);
const { lambdaFunction } = props;
// Iterator task for each item
const iteratorTask = new tasks.LambdaInvoke(this, 'iterator', {
lambdaFunction,
payload: sfn.TaskInput.fromObject({
'source': 'step-function',
'context.$': '$$',
'input.$': '$',
}),
stateName: 'iterator',
outputPath: '$.Payload[0][0]',
});
// Map state with concurrency limit
const mapState = new sfn.Map(this, 'TaskMapState', {
stateName: 'map_state',
maxConcurrency: 2, // Process 2 items at a time
inputPath: '$',
itemsPath: sfn.JsonPath.stringAt('$'),
}).itemProcessor(iteratorTask);
// Create log group
const logGroup = new logs.LogGroup(this, 'TaskLogGroup', {
logGroupName: '/aws/vendedlogs/states/task-handler-logs',
removalPolicy: cdk.RemovalPolicy.DESTROY,
retention: logs.RetentionDays.SIX_MONTHS,
});
// Create state machine
this.stateMachine = new sfn.StateMachine(this, 'TaskHandlerStateMachine', {
stateMachineName: 'task-handler',
comment: 'Handles parallel task execution with concurrency control',
definition: mapState,
timeout: cdk.Duration.minutes(15),
tracingEnabled: true,
logs: {
destination: logGroup,
level: sfn.LogLevel.ALL,
},
});
}
}
CSVインポート用のDistributed Map
大規模CSVファイルの処理には、ネイティブS3統合を提供するDistributed Mapを使用します:
import { Map as SfnMap, ProcessorMode, ProcessorConfig, IChainable, JsonPath } from 'aws-cdk-lib/aws-stepfunctions';
// Custom Distributed Map class for S3 CSV processing
export class DistributedMap extends SfnMap {
public itemReader?: DistributedMapItemReader;
public itemBatcher?: DistributedMapItemBatcher;
public label?: string;
public override toStateJson(): object {
const mapStateJson = super.toStateJson();
return {
...mapStateJson,
ItemReader: this.itemReader,
ItemBatcher: this.itemBatcher,
Label: this.label,
};
}
public itemProcessor(processor: IChainable, config: ProcessorConfig = {}): DistributedMap {
super.itemProcessor(processor, {
...config,
mode: ProcessorMode.DISTRIBUTED,
});
return this;
}
public setItemReader(itemReader: DistributedMapItemReader): DistributedMap {
this.itemReader = itemReader;
return this;
}
public setItemBatcher(itemBatcher: DistributedMapItemBatcher): DistributedMap {
this.itemBatcher = itemBatcher;
return this;
}
public setLabel(label: string): DistributedMap {
this.label = label;
return this;
}
}
// Usage in your stack
const csvRowsHandler = new tasks.LambdaInvoke(this, 'csv_rows_handler', {
lambdaFunction,
payload: sfn.TaskInput.fromObject({
'source': 'step-function',
'context.$': '$$',
'input.$': '$',
}),
stateName: 'csv_rows_handler',
});
const importCsvDefinition = new DistributedMap(this, 'import-csv', {
maxConcurrency: 50, // Process up to 50 batches in parallel
})
.setLabel('import-csv')
.setItemReader({
Resource: 'arn:aws:states:::s3:getObject',
ReaderConfig: {
InputType: 'CSV',
CSVHeaderLocation: 'FIRST_ROW',
},
Parameters: {
'Bucket.$': '$.bucket',
'Key.$': '$.key',
},
})
.setItemBatcher({
MaxInputBytesPerBatch: 10,
BatchInput: {
'Attributes.$': '$',
},
})
.itemProcessor(csvRowsHandler, {
executionType: sfn.ProcessorType.EXPRESS, // Use EXPRESS for child executions
});
const importCsvStateMachine = new sfn.StateMachine(this, 'ImportCsvStateMachine', {
stateMachineName: 'import-csv',
comment: 'Processes large CSV files with distributed batch processing',
definitionBody: sfn.DefinitionBody.fromChainable(importCsvDefinition),
tracingEnabled: true,
});
イベントソース設定
DynamoDB StreamsとSQSを設定してStep Functionsをトリガーします:
// DynamoDB Stream event source
const tableNames = ['tasks', 'commands', 'import_tmp'];
for (const tableName of tableNames) {
const table = dynamodb.Table.fromTableAttributes(this, `${tableName}-table`, {
tableArn: `arn:aws:dynamodb:${region}:${account}:table/${prefix}${tableName}`,
tableStreamArn: `arn:aws:dynamodb:${region}:${account}:table/${prefix}${tableName}/stream/*`,
});
lambdaFunction.addEventSource(
new lambdaEventSources.DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
batchSize: 1,
filters: [
lambda.FilterCriteria.filter({
eventName: lambda.FilterRule.isEqual('INSERT'),
}),
],
})
);
}
// SQS event sources
const queues = ['task-action-queue', 'notification-queue', 'import-action-queue'];
for (const queueName of queues) {
const queue = sqs.Queue.fromQueueArn(
this,
queueName,
`arn:aws:sqs:${region}:${account}:${prefix}${queueName}`
);
lambdaFunction.addEventSource(
new lambdaEventSources.SqsEventSource(queue, {
batchSize: 1,
})
);
}
実装ガイド
ステップ1:インフラストラクチャのセットアップ
フレームワークはAWS CDK を使用してStep Functionsインフラストラクチャを自動的にプロビジョニングします。主要なリソースは以下の通りです:
// State machine definition in CDK
const commandStateMachine = new sfn.StateMachine(this, 'CommandHandler', {
stateMachineName: 'command',
definitionBody: sfn.DefinitionBody.fromChainable(definition),
timeout: Duration.minutes(15),
tracingEnabled: true,
logs: {
destination: logGroup,
level: sfn.LogLevel.ALL,
},
});
ステップ2:Step Functionイベントの定義
基本のStep Functionイベントを拡張するイベントクラスを作成します:
import { IEvent } from '@mbc-cqrs-serverless/core';
import { StepFunctionsContext } from '@mbc-cqrs-serverless/core';
export class CustomWorkflowEvent implements IEvent {
source: string;
context: StepFunctionsContext;
input?: WorkflowInput;
taskToken?: string;
}