メインコンテンツまでスキップ

タスクモジュール

Taskパッケージは、MBC CQRS Serverlessフレームワークにおいて包括的なタスク管理機能を提供します。これにより、以下のことが可能になります:

  • 非同期タスク実行
  • タスクステータスの追跡
  • 進捗の監視
  • エラーハンドリングと再試行
  • タスキュー管理
  • タスク履歴とロギング

アーキテクチャ

インストール

npm install @mbc-cqrs-serverless/task

使用方法

タスク処理には2つのタイプがあります:

  • 単一タスク処理
  • Step Functionを使用したタスク処理

単一タスク処理

  1. タスクイベントの定義
import { TaskQueueEvent } from "@mbc-cqrs-serverless/task";

export class TaskEvent extends TaskQueueEvent {}
  1. タスクイベントハンドラーの定義
import { EventHandler, IEventHandler } from "@mbc-cqrs-serverless/core";
import { Logger } from "@nestjs/common";

import { TaskEvent } from "./task.event";

@EventHandler(TaskEvent)
export class TaskEventHandler implements IEventHandler<TaskEvent> {
private readonly logger = new Logger(TaskEventHandler.name);

constructor() {}

async execute(event: TaskEvent): Promise<any> {
this.logger.debug("executing task event::", event);

//

this.logger.debug(`Process task completed: ${event.taskEvent.eventID}`);
return "Result after process";
}
}
  1. ITaskQueueEventFactoryの実装
import {
ITaskQueueEventFactory,
TaskQueueEvent,
} from "@mbc-cqrs-serverless/task";
import { TaskEvent } from "src/sample/handler/task.event";

export class TaskQueueEventFactory implements ITaskQueueEventFactory {
async transformTask(event: TaskQueueEvent): Promise<any[]> {
return [new TaskEvent().fromSqsRecord(event)];
}
}
  1. カスタムTaskModule
import { TaskModule } from "@mbc-cqrs-serverless/task";
import { Module } from "@nestjs/common";
import { TaskEventHandler } from "src/sample/handler/task.handler";

import { TaskQueueEventFactory } from "./task-queue-event-factory";

@Module({
imports: [
TaskModule.register({
taskQueueEventFactory: TaskQueueEventFactory,
enableController: true, // Optional: enable REST endpoints for task management
}),
],
providers: [TaskEventHandler],
exports: [TaskModule],
})
export class CustomTaskModule {}
  1. カスタムEventFactoryAddedTask
import { EventFactory, IEvent } from "@mbc-cqrs-serverless/core";
import { EventFactoryAddedTask, TaskEvent } from "@mbc-cqrs-serverless/task";
import { Logger } from "@nestjs/common";
import { DynamoDBStreamEvent } from "aws-lambda";

@EventFactory()
export class CustomEventFactory extends EventFactoryAddedTask {
private readonly logger = new Logger(CustomEventFactory.name);
async transformDynamodbStream(event: DynamoDBStreamEvent): Promise<IEvent[]> {
const curEvents = await super.transformDynamodbStream(event);
const taskEvents = event.Records.map((record) => {
if (
record.eventSourceARN.endsWith("tasks") ||
record.eventSourceARN.includes("tasks" + "/stream/")
) {
if (record.eventName === "INSERT") {
return new TaskEvent().fromDynamoDBRecord(record);
}
}
return undefined;
})
.filter((event) => !!event)
.filter((event) => event.taskEntity.sk.split("#").length < 3);

return [...curEvents, ...taskEvents];
}
}
  1. タスクの作成

Step Functionを使用したタスク処理

  1. Step Functionタスクイベントの定義
import { StepFunctionTaskEvent } from "@mbc-cqrs-serverless/task";

export class SfnTaskEvent extends StepFunctionTaskEvent {}
  1. Step Functionタスクイベントハンドラーの定義
import {
EventHandler,
IEventHandler,
StepFunctionService,
} from "@mbc-cqrs-serverless/core";
import { TaskService } from "@mbc-cqrs-serverless/task";
import { Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";

import { SfnTaskEvent } from "./sfn-task.event";

@EventHandler(SfnTaskEvent)
export class SfnTaskEventHandler implements IEventHandler<SfnTaskEvent> {
private readonly logger = new Logger(SfnTaskEventHandler.name);

constructor() {}

async execute(event: SfnTaskEvent): Promise<any> {
this.logger.debug("executing task event::", event);

//

return "Result after process";
}
}
  1. ITaskQueueEventFactoryの実装
import {
ITaskQueueEventFactory,
StepFunctionTaskEvent,
} from "@mbc-cqrs-serverless/task";
import { SfnTaskEvent } from "src/sample/handler/sfn-task.event";

export class TaskQueueEventFactory implements ITaskQueueEventFactory {
async transformStepFunctionTask(event: StepFunctionTaskEvent): Promise<any[]> {
return [new SfnTaskEvent(event)];
}
}
  1. カスタムTaskModule
import { TaskModule } from "@mbc-cqrs-serverless/task";
import { Module } from "@nestjs/common";
import { TaskEventHandler } from "src/sample/handler/task.handler";

import { TaskQueueEventFactory } from "./task-queue-event-factory";

@Module({
imports: [
TaskModule.register({
taskQueueEventFactory: TaskQueueEventFactory,
enableController: true, // Optional: enable REST endpoints for task management
}),
],
providers: [TaskEventHandler],
exports: [TaskModule],
})
export class CustomTaskModule {}
  1. カスタムEventFactoryAddedTask
import { EventFactory, IEvent } from "@mbc-cqrs-serverless/core";
import { EventFactoryAddedTask, TaskEvent } from "@mbc-cqrs-serverless/task";
import { Logger } from "@nestjs/common";
import { DynamoDBStreamEvent } from "aws-lambda";

@EventFactory()
export class CustomEventFactory extends EventFactoryAddedTask {
private readonly logger = new Logger(CustomEventFactory.name);
async transformDynamodbStream(event: DynamoDBStreamEvent): Promise<IEvent[]> {
const curEvents = await super.transformDynamodbStream(event);
const taskEvents = event.Records.map((record) => {
if (
record.eventSourceARN.endsWith("tasks") ||
record.eventSourceARN.includes("tasks" + "/stream/")
) {
if (record.eventName === "INSERT") {
return new TaskEvent().fromDynamoDBRecord(record);
}
}
return undefined;
})
.filter((event) => !!event)
.filter((event) => event.taskEntity.sk.split("#").length < 3);

return [...curEvents, ...taskEvents];
}
}
  1. Step Functionタスクの作成
const item = [
{ key: "value1" },
{ key: "value2" },
{ key: "value3" },
{ key: "value4" },
{ key: "value5" },
{ key: "value6" },
];

await this.taskService.createStepFunctionTask(
{
input: item,
taskType: "cat",
tenantCode: "mbc",
},
{ invokeContext }
);

APIリファレンス

TaskServiceメソッド

createTask(dto: CreateTaskDto, options: { invokeContext: IInvoke }): Promise<TaskEntity>

単一タスク処理用の新しいタスクを作成します。

const task = await this.taskService.createTask(
{
taskType: "data-export",
tenantCode: "mbc",
name: "Export user data",
input: { userId: "123", format: "csv" },
},
{ invokeContext }
);

createStepFunctionTask(dto: CreateTaskDto, options: { invokeContext: IInvoke }): Promise<TaskEntity>

Step Functions処理用の新しいタスクを作成します。入力配列はサブタスクとして処理されます。

const task = await this.taskService.createStepFunctionTask(
{
taskType: "batch-process",
tenantCode: "mbc",
name: "Process batch items",
input: [{ id: 1 }, { id: 2 }, { id: 3 }],
},
{ invokeContext }
);

getTask(key: DetailKey): Promise<TaskEntity>

プライマリキーでタスクを取得します。

const task = await this.taskService.getTask({
pk: "TASK#mbc",
sk: "data-export#01HXYZ123",
});

listItemsByPk(tenantCode: string, type?: string, options?: ListTaskOptions): Promise<TaskListEntity>

テナントコードとタイプでタスクを一覧表示します。

ListTaskOptions:

interface ListTaskOptions {
sk?: {
skExpession: string;
skAttributeValues: Record<string, string>;
skAttributeNames?: Record<string, string>;
};
startFromSk?: string; // For pagination (ページネーション用)
limit?: number;
order?: 'asc' | 'desc';
}
// List all tasks for a tenant (テナントのすべてのタスクを一覧表示)
const tasks = await this.taskService.listItemsByPk("mbc", "TASK", {
limit: 10,
order: "desc",
});

// List Step Function tasks (Step Functionタスクを一覧表示)
const sfnTasks = await this.taskService.listItemsByPk("mbc", "SFN_TASK");

// Paginate through results (結果をページネーション)
const nextPage = await this.taskService.listItemsByPk("mbc", "TASK", {
startFromSk: tasks.lastSk,
limit: 10,
});

createSubTask(event: TaskQueueEvent): Promise<TaskEntity[]>

親タスクの入力配列からサブタスクを作成します。入力配列の各項目が個別のサブタスクになります。

// Typically called within a TaskQueueEvent handler (通常、TaskQueueEventハンドラー内で呼び出す)
const subTasks = await this.taskService.createSubTask(event);
// Returns array of TaskEntity for each input item (各入力項目に対するTaskEntityの配列を返す)

getAllSubTask(subTask: DetailKey): Promise<TaskEntity[]>

親タスクのすべてのサブタスクを取得します。

const subTasks = await this.taskService.getAllSubTask({
pk: "SFN_TASK#mbc",
sk: "batch-process#01HXYZ123#0", // Any subtask key (任意のサブタスクキー)
});
// Returns all subtasks under the parent task (親タスク配下のすべてのサブタスクを返す)

updateStatus(key: DetailKey, status: string, attributes?: { result?: any; error?: any }, notifyId?: string): Promise<void>

タスクのステータスを更新し、SNS通知を送信します。

// Mark task as completed (タスクを完了としてマーク)
await this.taskService.updateStatus(
{ pk: "TASK#mbc", sk: "data-export#01HXYZ123" },
"COMPLETED",
{ result: { exportedRows: 100 } }
);

// Mark task as failed (タスクを失敗としてマーク)
await this.taskService.updateStatus(
{ pk: "TASK#mbc", sk: "data-export#01HXYZ123" },
"FAILED",
{ error: { message: "Export failed", code: "EXPORT_ERROR" } }
);

updateSubTaskStatus(key: DetailKey, status: string, attributes?: { result?: any; error?: any }, notifyId?: string): Promise<void>

サブタスクのステータスを更新し、アクション"sub-task-status"でSNS通知を送信します。

await this.taskService.updateSubTaskStatus(
{ pk: "SFN_TASK#mbc", sk: "batch-process#01HXYZ123#0" },
"COMPLETED",
{ result: { processedItem: { id: 1 } } }
);

updateStepFunctionTask(key: DetailKey, attributes?: Record<string, any>, status?: string, notifyId?: string): Promise<void>

属性とステータスでStep Functionタスクを更新し、SNS通知を送信します。

await this.taskService.updateStepFunctionTask(
{ pk: "SFN_TASK#mbc", sk: "batch-process#01HXYZ123" },
{ executionArn: "arn:aws:states:..." },
"PROCESSING"
);

publishAlarm(event: TaskQueueEvent | StepFunctionTaskEvent, errorDetails: any): Promise<void>

タスク処理中にエラーが発生した際にSNS経由でアラーム通知を発行します。このメソッドは通常、タスク処理ワークフローのエラーハンドラーから呼び出されます。

try {
// Process task (タスクを処理)
await this.processTask(event);
} catch (error) {
// Send alarm notification (アラーム通知を送信)
await this.taskService.publishAlarm(event, {
message: error.message,
stack: error.stack,
});
throw error;
}

アラーム通知に含まれる情報:

  • タスクキー (pk, sk)
  • テナントコード
  • エラー詳細
  • アクションタイプ: "sfn-alarm"

formatTaskStatus(tasks: TaskEntity[]): Promise<object>

サブタスク数を計算しステータス情報を集計してタスクステータスをフォーマットします。UIでのタスク進捗表示に便利です。

// Get all subtasks for a parent task (親タスクのすべてのサブタスクを取得)
const subTasks = await this.taskService.getAllSubTask({
pk: "SFN_TASK#mbc",
sk: "batch-process#01HXYZ123#0"
});

const formattedStatus = await this.taskService.formatTaskStatus(subTasks);
// 戻り値:
// {
// subTaskCount: 10, // サブタスクの総数
// subTaskSucceedCount: 7, // 完了したサブタスクの数
// subTaskFailedCount: 1, // 失敗したサブタスクの数
// subTaskRunningCount: 2, // 進行中のサブタスクの数
// subTasks: [ // Array of subtask summaries (サブタスクサマリーの配列)
// { pk: "...", sk: "...", status: "COMPLETED" },
// ...
// ]
// }

戻り値オブジェクトの構造:

{
subTaskCount: number; // サブタスクの合計数
subTaskSucceedCount: number; // COMPLETEDサブタスク
subTaskFailedCount: number; // FAILEDサブタスク
subTaskRunningCount: number; // PROCESSINGサブタスク
subTasks: Array<{ // Subtask summary array (サブタスクサマリー配列)
pk: string;
sk: string;
status: string;
}>;
}

タスクステータス値

ステータス説明
CREATEDタスクは作成されましたが、まだ開始されていません
QUEUEDタスクは処理のためにキューに入れられました
STARTEDタスクの実行が開始されました
PROCESSINGタスクは現在処理中です
FINISHEDタスクの実行が終了しました
COMPLETEDタスクは正常に完了しました
ERROREDタスクは実行中にエラーが発生しました
FAILEDタスクはエラーで失敗しました

CreateTaskDto

CreateTaskDtoクラスは、新しいタスクを作成するための構造を定義します:

interface CreateTaskDto {
tenantCode: string; // Required: Tenant identifier (必須: テナント識別子)
taskType: string; // Required: Type/category of the task (必須: タスクのタイプ/カテゴリ)
name?: string; // Optional: Display name (defaults to taskType) (オプション: 表示名、デフォルトはtaskType)
input: Record<string, any> | any[]; // Required: Task input data (必須: タスク入力データ)
}
Step Functions用の入力形式

createStepFunctionTask() を使用する場合、input フィールドは 配列 である必要があります。配列内の各項目は、Step FunctionsのMapステートで処理される個別のサブタスクになります。createTask() による単一タスク処理の場合、入力は任意のオブジェクトで構いません。

ITaskQueueEventFactoryインターフェース

ITaskQueueEventFactoryインターフェースは、タスクイベントを変換するためのオプションメソッドを定義します。使用するケースに関連するメソッドのみを実装する必要があります:

interface ITaskQueueEventFactory<TEvent extends IEvent = any> {
transformTask?(event: TaskQueueEvent): Promise<TEvent[]>; // Optional: For single task processing (オプション: 単一タスク処理用)
transformStepFunctionTask?(event: StepFunctionTaskEvent): Promise<TEvent[]>; // Optional: For Step Function task processing (オプション: Step Functionタスク処理用)
}

注意: 両方のメソッドはオプションです。単一タスク処理にはtransformTaskを、Step Functionタスク処理にはtransformStepFunctionTaskを実装します。アプリケーションが両方のタイプを使用する場合は両方を実装してください。