タスク
Taskパッケージは、MBC CQRS Serverlessフレームワークにおいて包 括的なタスク管理機能を提供します。これにより、以下のことが可能になります:
- 非同期タスク実行
- タスクステータスの追跡
- 進捗の監視
- エラーハンドリングと再試行
- タスキュー管理
- タスク履歴とロギング
インストール
npm install @mbc-cqrs-serverless/task
使用方法
タスク処理には2つのタイプがあります:
- 単一タスク処理
- Step Functionを使用したタスク処理
単一タスク処理
- タスクイベントの定義
import { TaskQueueEvent } from "@mbc-cqrs-serverless/task";
export class TaskEvent extends TaskQueueEvent {}
- タスクイベントハンドラーの定義
import { EventHandler, IEventHandler } from "@mbc-cqrs-serverless/core";
import { Logger } from "@nestjs/common";
import { TaskEvent } from "./task.event";
@EventHandler(TaskEvent)
export class TaskEventHandler implements IEventHandler<TaskEvent> {
private readonly logger = new Logger(TaskEventHandler.name);
constructor() {}
async execute(event: TaskEvent): Promise<any> {
this.logger.debug("executing task event::", event);
//
this.logger.debug(`Process task completed: ${event.taskEvent.eventID}`);
return "Result after process";
}
}
ITaskQueueEventFactory
の実装
import {
ITaskQueueEventFactory,
TaskQueueEvent,
} from "@mbc-cqrs-serverless/task";
import { TaskEvent } from "src/sample/handler/task.event";
export class TaskQueueEventFactory implements ITaskQueueEventFactory {
async transformTask(event: TaskQueueEvent): Promise<any[]> {
return [new TaskEvent().fromSqsRecord(event)];
}
}
- カスタム
TaskModule
import { TaskModule } from "@mbc-cqrs-serverless/task";
import { Module } from "@nestjs/common";
import { TaskEventHandler } from "src/sample/handler/task.handler";
import { TaskQueueEventFactory } from "./task-queue-event-factory";
@Module({
imports: [
TaskModule.register({
taskQueueEventFactory: TaskQueueEventFactory,
}),
],
providers: [TaskEventHandler],
exports: [TaskModule],
})
export class CustomTaskModule {}
- カスタム
EventFactoryAddedTask
import { EventFactory, IEvent } from "@mbc-cqrs-serverless/core";
import { EventFactoryAddedTask, TaskEvent } from "@mbc-cqrs-serverless/task";
import { Logger } from "@nestjs/common";
import { DynamoDBStreamEvent } from "aws-lambda";
@EventFactory()
export class CustomEventFactory extends EventFactoryAddedTask {
private readonly logger = new Logger(CustomEventFactory.name);
async transformDynamodbStream(event: DynamoDBStreamEvent): Promise<IEvent[]> {
const curEvents = await super.transformDynamodbStream(event);
const taskEvents = event.Records.map((record) => {
if (
record.eventSourceARN.endsWith("tasks") ||
record.eventSourceARN.includes("tasks" + "/stream/")
) {
if (record.eventName === "INSERT") {
return new TaskEvent().fromDynamoDBRecord(record);
}
}
return undefined;
})
.filter((event) => !!event)
.filter((event) => event.taskEntity.sk.split("#").length < 3);
return [...curEvents, ...taskEvents];
}
}
- タスクの作成
Step Functionを使用したタスク処理
- Step Functionタスクイベントの定義
import { StepFunctionTaskEvent } from "@mbc-cqrs-serverless/task.sfn.event";
export class SfnTaskEvent extends StepFunctionTaskEvent {}
- Step Functionタスクイベントハンドラーの定義
import {
EventHandler,
IEventHandler,
StepFunctionService,
} from "@mbc-cqrs-serverless/core";
import { TaskService } from "@mbc-cqrs-serverless/task";
import { Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { SfnTaskEvent } from "./sfn-task.event";
@EventHandler(SfnTaskEvent)
export class SfnTaskEventHandler implements IEventHandler<SfnTaskEvent> {
private readonly logger = new Logger(SfnTaskEventHandler.name);
constructor() {}
async execute(event: TaskEvent): Promise<any> {
this.logger.debug("executing task event::", event);
//
return "Result after process";
}
}
ITaskQueueEventFactory
の実装
import {
ITaskQueueEventFactory,
TaskQueueEvent,
} from "@mbc-cqrs-serverless/task";
import { TaskEvent } from "src/sample/handler/sfn-task.event";
export class TaskQueueEventFactory implements ITaskQueueEventFactory {
async transformStepFunctionTask(event: TaskQueueEvent): Promise<any[]> {
return [new SfnTaskEvent(event)];
}
}
- カスタム
TaskModule
import { TaskModule } from "@mbc-cqrs-serverless/task";
import { Module } from "@nestjs/common";
import { TaskEventHandler } from "src/sample/handler/task.handler";
import { TaskQueueEventFactory } from "./task-queue-event-factory";
@Module({
imports: [
TaskModule.register({
taskQueueEventFactory: TaskQueueEventFactory,
}),
],
providers: [TaskEventHandler],
exports: [TaskModule],
})
export class CustomTaskModule {}
- カスタム
EventFactoryAddedTask
import { EventFactory, IEvent } from "@mbc-cqrs-serverless/core";
import { EventFactoryAddedTask, TaskEvent } from "@mbc-cqrs-serverless/task";
import { Logger } from "@nestjs/common";
import { DynamoDBStreamEvent } from "aws-lambda";
@EventFactory()
export class CustomEventFactory extends EventFactoryAddedTask {
private readonly logger = new Logger(CustomEventFactory.name);
async transformDynamodbStream(event: DynamoDBStreamEvent): Promise<IEvent[]> {
const curEvents = await super.transformDynamodbStream(event);
const taskEvents = event.Records.map((record) => {
if (
record.eventSourceARN.endsWith("tasks") ||
record.eventSourceARN.includes("tasks" + "/stream/")
) {
if (record.eventName === "INSERT") {
return new TaskEvent().fromDynamoDBRecord(record);
}
}
return undefined;
})
.filter((event) => !!event)
.filter((event) => event.taskEntity.sk.split("#").length < 3);
return [...curEvents, ...taskEvents];
}
}
- Step Functionタスクの作成
const item = [
{ key: "value1" },
{ key: "value2" },
{ key: "value3" },
{ key: "value4" },
{ key: "value5" },
{ key: "value6" },
];
await this.taskService.createStepFunctionTask(
{
input: item,
taskType: "cat",
tenantCode: "mbc",
},
{ invokeContext }
);