CommandService
概要
「CommandService」は、コマンドの管理と同期を容易にするフレームワークのコアコンポーネントです。主に、完全なコマンドと部分的なコマンドの両方を発行する方法を提供し、それらの処理を同期または非同期で行うことができるため、システム内のコマンド処理の全体的な効率と柔軟性が向上します。
CommandModule設定

CommandModuleは、データ同期ハンドラーを登録し、テーブル名に関連付けられ たサービスを提供するために使用される動的モジュールです。このモジュールをインポートする際は、特定のオプションを指定する必要があります。
登録オプション
| プロパティ | 説明 |
|---|---|
tableName: string | テーブル名を指定 |
skipError?: boolean | 将来の使用のために予約済み。未実装です。 |
dataSyncHandlers?: Type[] | データ同期ハンドラーを登録 |
disableDefaultHandler?: boolean | trueに設定すると、デフォルトのDynamoDBデータ同期ハンドラーを無効にします |
登録例
import { CommandModule } from '@mbc-cqrs-serverless/core';
import { Module } from '@nestjs/common';
@Module({
imports: [
CommandModule.register({
tableName: 'cat',
dataSyncHandlers: [CatDataSyncRdsHandler],
}),
],
})
export class CatModule {}
ここでは、CommandModuleをcatテーブル名で登録し、CatDataSyncRdsHandlerをデータ同期ハンドラーに提供しています。
CommandServiceの使用
以下のメソッドの例では、次のように CommandModule をモジュールにインポートすると仮定します。
import { CommandModule } from "@mbc-cqrs-serverless/core";
import { Module } from "@nestjs/common";
import { CatDataSyncRdsHandler } from "./handler/cat-rds.handler";
import { CatController } from "./cat.controller";
import { CatService } from "./cat.service";
@Module({
imports: [
CommandModule.register({
tableName: "cat",
dataSyncHandlers: [CatDataSyncRdsHandler],
}),
],
controllers: [CatController],
providers: [CatService],
})
export class CatModule {}
これで、CommandService と DataService を他のサービスに挿入して使用できるようになります。
CommandServiceを使用した完全なCRUD実装パターンについては、Service実装パターンを参照してください。
メソッド
async publishAsync(input: CommandInputModel, options: ICommandOptions): Promise<CommandModel | null>
このメソッドを使用すると、コマンド データが command テーブルに挿入されるため、完全なコマンドを公開できます。
このメソッドはコマンド データをすぐに返すことによって即時フィードバックを提供するため、コマンドの処理を待たずに続行できます。その後、コマンドはバックグラウンドで非同期に処理され、処理中もアプリケーションの応答性が維持されます。
戻り値: 成功時はPromise<CommandModel>を返します。コマンドがdirtyでない場合(既存のコマンドと比較して変更が検出されない場合)はPromise<null>を返します。
たとえば、次のように新しい cat コマンドを発行できます。
import {
generateId,
getCommandSource,
VERSION_FIRST,
} from "@mbc-cqrs-serverless/core";
// class CatCommandDto extends CommandDto {}
const catCommand = new CatCommandDto({
pk: catPk,
sk: catSk,
tenantCode,
id: generateId(catPk, catSk),
code,
type: "CAT",
name: attributes.name,
version: VERSION_FIRST,
attributes,
});
const commandSource = getCommandSource(
basename(__dirname),
this.constructor.name,
"createCatCommand"
);
const item = await this.commandService.publishAsync(catCommand, {
source: commandSource,
invokeContext,
});
async publishPartialUpdateAsync( input: CommandPartialInputModel, options: ICommandOptions): Promise<CommandModel>
この方法を使用すると、同じ pk および sk (主キー) 値を持つ前のコマンドに基づいて新しいコマンド データを作成できます。
publishAsync メソッドと同様に、このメソッドはコマンドの処理を待たずに、更新されたコマンド データをすぐに返します。
たとえば、猫の名前を更新したいとします。
import { generateId, getCommandSource } from "@mbc-cqrs-serverless/core";
// ...
const catCommand: CommandPartialInputModel = {
pk: catPk,
sk: catSk,
version: storedItem.version,
name: attributes.name,
};
const commandSource = getCommandSource(
basename(__dirname),
this.constructor.name,
"updateCatCommand"
);
const item = await this.commandService.publishPartialUpdateAsync(catCommand, {
source: commandSource,
invokeContext,
});
async publishSync( input: CommandInputModel, options: ICommandOptions): Promise<CommandModel>
このメソッドは、publishAsync メソッドに相当する同期メソッドとして機能します。つまり、コマンドが完全に処理されるまでコードの実行を停止します。これにより、コード内で以降の操作を続行する前にコマンドの結果を確実に受け取ることができます。
例えば
import {
generateId,
getCommandSource,
VERSION_FIRST,
} from "@mbc-cqrs-serverless/core";
// class CatCommandDto extends CommandDto {}
const catCommand = new CatCommandDto({
pk: catPk,
sk: catSk,
tenantCode,
id: generateId(catPk, catSk),
code,
type: "CAT",
name: attributes.name,
version: VERSION_FIRST,
attributes,
});
const commandSource = getCommandSource(
basename(__dirname),
this.constructor.name,
"createCatCommandSync"
);
const item = await this.commandService.publishSync(catCommand, {
source: commandSource,
invokeContext,
});
async publishPartialUpdateSync( input: CommandPartialInputModel, options: ICommandOptions): Promise<CommandModel>
このメソッドは、publishPartialUpdateAsync メソッドの同期バージョンです。コマンドが処理されるまでコードの実行がブロックされます。
このメソッドでは、入力の version フィールドが既存アイテムの現在のバージョンと一致する必要があります。アイテムが見つからないかバージョンが一致しない場合、「The input is not a valid, item not found or version not match」というメッセージで BadRequestException がスローされます。
たとえば、猫の名前を更新したいとします。
import { generateId, getCommandSource } from "@mbc-cqrs-serverless/core";
// ...
const catCommand: CommandPartialInputModel = {
pk: catPk,
sk: catSk,
version: storedItem.version,
name: attributes.name,
};
const commandSource = getCommandSource(
basename(__dirname),
this.constructor.name,
"updateCatCommandSync"
);
const item = await this.commandService.publishPartialUpdateSync(catCommand, {
source: commandSource,
invokeContext,
});
async publish(input: CommandInputModel, options: ICommandOptions): Promise<CommandModel | null> 非推奨
非推奨、削除予定: この API 要素は将来のバージョンで削除される可能性があります。代わりに publishAsync メソッド を使用してください。
たとえば、次のように新しい cat コマンドを発行できます。
import {
generateId,
getCommandSource,
VERSION_FIRST,
} from "@mbc-cqrs-serverless/core";
// class CatCommandDto extends CommandDto {}
const catCommand = new CatCommandDto({
pk: catPk,
sk: catSk,
tenantCode,
id: generateId(catPk, catSk),
code,
type: "CAT",
name: attributes.name,
version: VERSION_FIRST,
attributes,
});
const commandSource = getCommandSource(
basename(__dirname),
this.constructor.name,
"createCatCommand"
);
const item = await this.commandService.publish(catCommand, {
source: commandSource,
invokeContext,
});
このメソッドはコマンド データを返します。
async publishPartialUpdate( input: CommandPartialInputModel, options: ICommandOptions): Promise<CommandModel | null> 非推奨
非推奨、削除予定: この API 要素は将来のバージョンで削除される可能性があります。代わりに publishPartialUpdateAsync メソッド を使用してください。
この方法では、以前のコマンドに基づいて新しいコマンド データを作成できます。
たとえば、猫の名前を更新したいとします。
import { generateId, getCommandSource } from "@mbc-cqrs-serverless/core";
// ...
const catCommand: CommandPartialInputModel = {
pk: catPk,
sk: catSk,
version: storedItem.version,
name: attributes.name,
};
const commandSource = getCommandSource(
basename(__dirname),
this.constructor.name,
"updateCatCommand"
);
const item = await this.commandService.publishPartialUpdate(catCommand, {
source: commandSource,
invokeContext,
});
このメソッドは更新されたコマンド データを返します。
async reSyncData(): Promise<void>
データ同期ハンドラーを再適用する場合は、このメソッドを使用できるように設計されています。次のように関数を呼び出すだけです。
await this.commandService.reSyncData();
async getItem(key: DetailKey): Promise<CommandModel>
プライマリキーでコマンドアイテムを取得します。ソートキーにバージョン区切り文字が含まれていない場合、自動的にgetLatestItemを呼び出して最新バージョンを取得します。
import { DetailKey } from "@mbc-cqrs-serverless/core";
// Get a specific version of a command (特定バージョンのコマンドを取得)
const command = await this.commandService.getItem({
pk: "CAT#tenant1",
sk: "CAT#cat001@2", // Includes version number (バージョン番号を含む)
});
// If no version in sk, automatically gets latest version (skにバージョンがない場合、自動的に最新バージョンを取得)
const latestCommand = await this.commandService.getItem({
pk: "CAT#tenant1",
sk: "CAT#cat001",
});
async getLatestItem(key: DetailKey): Promise<CommandModel>
プライマリキーでコマンドアイテムの最新バージョンを取得します。このメソッドは、データテーブルのバージョンから開始し、最新のコマンドバージョンを見つけるために上下に検索するルックアップアルゴリズムを使用します。
import { DetailKey } from "@mbc-cqrs-serverless/core";
const latestCommand = await this.commandService.getLatestItem({
pk: "CAT#tenant1",
sk: "CAT#cat001", // Sort key without version (バージョンなしのソートキー)
});
if (latestCommand) {
console.log(`Latest version: ${latestCommand.version}`);
}
async getNextCommand(currentKey: DetailKey): Promise<CommandModel>
現在のコマンドのキーに基づいて、次のバージョンのコマンドを取得します。これは、コマンドチェーンの処理やリトライロジックの実装に役立ちます。
import { DetailKey } from "@mbc-cqrs-serverless/core";
const currentKey: DetailKey = {
pk: "CAT#tenant1",
sk: "CAT#cat001@2",
};
const nextCommand = await this.commandService.getNextCommand(currentKey);
// Returns command with sk: "CAT#cat001@3" if exists (存在する場合、sk: "CAT#cat001@3"のコマンドを返す)
async updateStatus(key: DetailKey, status: string, notifyId?: string): Promise<void>
コマンドのステータスを更新し、SNS通知を送信します。これは、タスクやプロセスのステータスを更新し、変更をサブスクライバーに通知するために一般的に使用されます。
import { DetailKey } from "@mbc-cqrs-serverless/core";
const key: DetailKey = {
pk: "CAT#tenant1",
sk: "CAT#cat001@1",
};
// Update status and send SNS notification (ステータスを更新してSNS通知を送信)
await this.commandService.updateStatus(key, "COMPLETED");
// With custom notification ID (カスタム通知IDを指定)
await this.commandService.updateStatus(key, "FAILED", "custom-notify-id");
SNS通知ペイロードには以下が含まれます:
action:"command-status"pk,sk: コマンドキーtable: コマンドテーブル名id: 通知ID(カスタムまたは自動生成)tenantCode: pkから抽出content:statusとsourceを含むオブジェクト
async duplicate(key: DetailKey, options: ICommandOptions): Promise<CommandModel>
既存のコマンドをバージョン番号を増加させて複製します。複製されたコマンドはsourceが"duplicated"に設定され、メタデータ(タイムスタンプ、ユーザー、IP)が更新されます。
import { DetailKey, getCommandSource } from "@mbc-cqrs-serverless/core";
import { basename } from "path";
const key: DetailKey = {
pk: "CAT#tenant1",
sk: "CAT#cat001@1",
};
const commandSource = getCommandSource(
basename(__dirname),
this.constructor.name,
"duplicateCatCommand"
);
const duplicatedCommand = await this.commandService.duplicate(key, {
source: commandSource,
invokeContext,
});
// The duplicated command has: (複製されたコマンドの内容:)
// - version incremented by 1 (- バージョンが1増加)
// - source set to "duplicated" (- sourceが"duplicated"に設定)
// - updated timestamps and user info (- タイムスタンプとユーザー情報が更新)
async updateTaskToken(key: DetailKey, token: string): Promise<CommandModel>
コマンドアイテムにAWS Step Functionsタスクトークンを保存します。これは、Step Functionsと統合してコールバックパターンを有効にする際に使用されます。
import { DetailKey } from "@mbc-cqrs-serverless/core";
const key: DetailKey = {
pk: "CAT#tenant1",
sk: "CAT#cat001@1",
};
// Store the Step Functions task token (Step Functionsタスクトークンを保存)
await this.commandService.updateTaskToken(key, event.taskToken);
// Later, use the token to send task success/failure (後でトークンを使用してタスクの成功/失敗を送信)
// via SendTaskSuccessCommand or SendTaskFailureCommand (SendTaskSuccessCommandまたはSendTaskFailureCommandを使用)
async updateTtl(key: DetailKey): Promise<any | null>
コマンドの前のバージョンのTTL(Time To Live)を更新します。これは通常、コマンド履歴の保持を管理するために内部的に使用されます。バージョンが低すぎるか、前のコマンドが存在しない場合はnullを返します。
import { DetailKey } from "@mbc-cqrs-serverless/core";
const key: DetailKey = {
pk: "CAT#tenant1",
sk: "CAT#cat001@3", // Version 3 (バージョン3)
};
// Updates TTL of version 2 (previous version) (バージョン2(前のバージョン)のTTLを更新)
const result = await this.commandService.updateTtl(key);
このメソッドは主にフレームワークがコマンド履歴管理のために内部的に使用します。アプリケーションコードでの直接使用はほとんど必要ありません。
dataSyncHandlers (getter): IDataSyncHandler[]
このCommandServiceインスタンスに登録されているデータ同期ハンドラーの配列を返します。ハンドラーをプログラムで検査または反復処理する必要がある場合に便利です。
// Get all registered data sync handlers (登録済みのすべてのデータ同期ハンドラーを取得)
const handlers = this.commandService.dataSyncHandlers;
handlers.forEach((handler) => {
console.log(`Handler: ${handler.constructor.name}, Type: ${handler.type}`);
});
getDataSyncHandler(name: string): IDataSyncHandler | undefined
クラス名で特定のデータ同期ハンドラーを取得します。指定された名前のハンドラーが見つからない場合はundefinedを返します。
// Get a specific handler by name (名前で特定のハンドラーを取得)
const rdsHandler = this.commandService.getDataSyncHandler('CatDataSyncRdsHandler');
if (rdsHandler) {
// Use the handler directly (ハンドラーを直接使用)
await rdsHandler.up(commandModel);
}
isNotCommandDirty(item: CommandModel, input: CommandInputModel): boolean
既存のコマンドアイテムと新しい入力を比較して、実際に変更があるかどうかを判定します。コマンドがdirtyでない場合(変更なし)はtrueを返し、変更がある場合はfalseを返します。
このメソッドは、変更が検出されない場合に不要な書き込みをスキップするためにpublishメソッド内部で使用されます。publishを呼び出す前に更新で変更が発生するかどうかを直接確認することもできます。
// Check if an update would result in changes (更新で変更が発生するか確認)
const existingCommand = await this.commandService.getItem({ pk, sk });
if (existingCommand && this.commandService.isNotCommandDirty(existingCommand, newInput)) {
// No changes detected, skip the update (変更が検出されないため、更新をスキップ)
console.log('Command has no changes, skipping update');
return existingCommand;
}
// Proceed with the update (更新を続行)
const result = await this.commandService.publishAsync(newInput, options);
tableName (getter/setter): string
このCommandServiceインスタンスのDynamoDBテーブル名を取得または設定します。テーブル名はCommandModuleを登録する際に設定されますが、必要に応じて実行時に変更することもできます。
// Get the current table name (現在のテーブル名を取得)
const currentTable = this.commandService.tableName;
console.log(`Operating on table: ${currentTable}`);
// Set a different table name (別のテーブル名を設定)
this.commandService.tableName = 'another-table';
実行時にテーブル名を変更することは高度なユースケースです。ほとんどのアプリケーションでは、CommandModule.register()でテーブル名を設定し、その後は変更しないでください。