Task
The Task package provides comprehensive task management functionality in the MBC CQRS Serverless framework. It enables:
- Asynchronous task execution
 - Task status tracking
 - Progress monitoring
 - Error handling and retries
 - Task queue management
 - Task history and logging
 
Installation
npm install @mbc-cqrs-serverless/task
Usage
There are 2 type task processing:
- Single task processing
 - Task processing with Step function
 
Single task processing
- Define task event
 
import { TaskQueueEvent } from "@mbc-cqrs-serverless/task";
export class TaskEvent extends TaskQueueEvent {}
- Define task event handler
 
import { EventHandler, IEventHandler } from "@mbc-cqrs-serverless/core";
import { Logger } from "@nestjs/common";
import { TaskEvent } from "./task.event";
@EventHandler(TaskEvent)
export class TaskEventHandler implements IEventHandler<TaskEvent> {
  private readonly logger = new Logger(TaskEventHandler.name);
  constructor() {}
  async execute(event: TaskEvent): Promise<any> {
    this.logger.debug("executing task event::", event);
    //
    this.logger.debug(`Process task completed: ${event.taskEvent.eventID}`);
    return "Result after process";
  }
}
- Implement 
ITaskQueueEventFactory 
import {
  ITaskQueueEventFactory,
  TaskQueueEvent,
} from "@mbc-cqrs-serverless/task";
import { TaskEvent } from "src/sample/handler/task.event";
export class TaskQueueEventFactory implements ITaskQueueEventFactory {
  async transformTask(event: TaskQueueEvent): Promise<any[]> {
    return [new TaskEvent().fromSqsRecord(event)];
  }
}
- Custom 
TaskModule 
import { TaskModule } from "@mbc-cqrs-serverless/task";
import { Module } from "@nestjs/common";
import { TaskEventHandler } from "src/sample/handler/task.handler";
import { TaskQueueEventFactory } from "./task-queue-event-factory";
@Module({
  imports: [
    TaskModule.register({
      taskQueueEventFactory: TaskQueueEventFactory,
    }),
  ],
  providers: [TaskEventHandler],
  exports: [TaskModule],
})
export class CustomTaskModule {}
- Custom 
EventFactoryAddedTask 
import { EventFactory, IEvent } from "@mbc-cqrs-serverless/core";
import { EventFactoryAddedTask, TaskEvent } from "@mbc-cqrs-serverless/task";
import { Logger } from "@nestjs/common";
import { DynamoDBStreamEvent } from "aws-lambda";
@EventFactory()
export class CustomEventFactory extends EventFactoryAddedTask {
  private readonly logger = new Logger(CustomEventFactory.name);
  async transformDynamodbStream(event: DynamoDBStreamEvent): Promise<IEvent[]> {
    const curEvents = await super.transformDynamodbStream(event);
    const taskEvents = event.Records.map((record) => {
      if (
        record.eventSourceARN.endsWith("tasks") ||
        record.eventSourceARN.includes("tasks" + "/stream/")
      ) {
        if (record.eventName === "INSERT") {
          return new TaskEvent().fromDynamoDBRecord(record);
        }
      }
      return undefined;
    })
      .filter((event) => !!event)
      .filter((event) => event.taskEntity.sk.split("#").length < 3);
    return [...curEvents, ...taskEvents];
  }
}
- Create a Task
 
Task processing with Step function
- Define step function task event
 
import { StepFunctionTaskEvent } from "@mbc-cqrs-serverless/task.sfn.event";
export class SfnTaskEvent extends StepFunctionTaskEvent {}
- Define step function task event handler
 
import {
  EventHandler,
  IEventHandler,
  StepFunctionService,
} from "@mbc-cqrs-serverless/core";
import { TaskService } from "@mbc-cqrs-serverless/task";
import { Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { SfnTaskEvent } from "./sfn-task.event";
@EventHandler(SfnTaskEvent)
export class SfnTaskEventHandler implements IEventHandler<SfnTaskEvent> {
  private readonly logger = new Logger(SfnTaskEventHandler.name);
  constructor() {}
  async execute(event: TaskEvent): Promise<any> {
    this.logger.debug("executing task event::", event);
    //
    return "Result after process";
  }
}
- Implement 
ITaskQueueEventFactory 
import {
  ITaskQueueEventFactory,
  TaskQueueEvent,
} from "@mbc-cqrs-serverless/task";
import { TaskEvent } from "src/sample/handler/sfn-task.event";
export class TaskQueueEventFactory implements ITaskQueueEventFactory {
  async transformStepFunctionTask(event: TaskQueueEvent): Promise<any[]> {
    return [new SfnTaskEvent(event)];
  }
}
- Custom 
TaskModule 
import { TaskModule } from "@mbc-cqrs-serverless/task";
import { Module } from "@nestjs/common";
import { TaskEventHandler } from "src/sample/handler/task.handler";
import { TaskQueueEventFactory } from "./task-queue-event-factory";
@Module({
  imports: [
    TaskModule.register({
      taskQueueEventFactory: TaskQueueEventFactory,
    }),
  ],
  providers: [TaskEventHandler],
  exports: [TaskModule],
})
export class CustomTaskModule {}
- Custom 
EventFactoryAddedTask 
import { EventFactory, IEvent } from "@mbc-cqrs-serverless/core";
import { EventFactoryAddedTask, TaskEvent } from "@mbc-cqrs-serverless/task";
import { Logger } from "@nestjs/common";
import { DynamoDBStreamEvent } from "aws-lambda";
@EventFactory()
export class CustomEventFactory extends EventFactoryAddedTask {
  private readonly logger = new Logger(CustomEventFactory.name);
  async transformDynamodbStream(event: DynamoDBStreamEvent): Promise<IEvent[]> {
    const curEvents = await super.transformDynamodbStream(event);
    const taskEvents = event.Records.map((record) => {
      if (
        record.eventSourceARN.endsWith("tasks") ||
        record.eventSourceARN.includes("tasks" + "/stream/")
      ) {
        if (record.eventName === "INSERT") {
          return new TaskEvent().fromDynamoDBRecord(record);
        }
      }
      return undefined;
    })
      .filter((event) => !!event)
      .filter((event) => event.taskEntity.sk.split("#").length < 3);
    return [...curEvents, ...taskEvents];
  }
}
- Create a step function task
 
const item = [
  { key: "value1" },
  { key: "value2" },
  { key: "value3" },
  { key: "value4" },
  { key: "value5" },
  { key: "value6" },
];
await this.taskService.createStepFunctionTask(
  {
    input: item,
    taskType: "cat",
    tenantCode: "mbc",
  },
  { invokeContext }
);