CommandService
概要
「CommandService」は、コマンドの管理と同期を容易にするフレームワークのコアコンポーネントです。主に、完全なコマンドと部分的なコマンドの両方を発行する方法を提供し、それらの処理を同期または非同期で行うことができるため、システム内のコマンド処理の全体的な効率と柔軟性が向上します。
CommandModule設定

CommandModuleは、データ同期ハンドラーを登録し、テーブル名に関連付けられたサービスを提供するために使用される動的モジュールです。このモジュールをインポートする際は、特定のオプションを指定する必要があります。
登録オプション
| プロパティ | 説明 |
|---|---|
tableName: string | テーブル名を指定 |
skipError?: boolean | 将来の使用のために予約済み。未実装です。 |
dataSyncHandlers?: Type[] | データ同期ハンドラーを登録 |
disableDefaultHandler?: boolean | trueに設定すると、デフォルトのDynamoDBデータ同期ハンドラーを無効にします |
登録例
import { CommandModule } from '@mbc-cqrs-serverless/core';
import { Module } from '@nestjs/common';
@Module({
imports: [
CommandModule.register({
tableName: 'cat',
dataSyncHandlers: [CatDataSyncRdsHandler],
}),
],
})
export class CatModule {}
ここでは、CommandModuleをcatテーブル名で登録し、CatDataSyncRdsHandlerをデータ同期ハンドラーに提供しています。
CommandServiceの使用
以下のメソッドの例では、次のように CommandModule をモジュールにインポートすると仮定します。
import { CommandModule } from "@mbc-cqrs-serverless/core";
import { Module } from "@nestjs/common";
import { CatDataSyncRdsHandler } from "./handler/cat-rds.handler";
import { CatController } from "./cat.controller";
import { CatService } from "./cat.service";
@Module({
imports: [
CommandModule.register({
tableName: "cat",
dataSyncHandlers: [CatDataSyncRdsHandler],
}),
],
controllers: [CatController],
providers: [CatService],
})
export class CatModule {}
これで、CommandService と DataService を他のサービスに挿入して使用できるようになります。
CommandServiceを使用した完全なCRUD実装パターンについては、Service実装パターンを参照してください。
メソッド
async publishAsync(input: CommandInputModel, options: ICommandOptions): Promise<CommandModel | null>
このメソッドを使用すると、コマンド データが command テーブルに挿入されるため、完全なコマンドを公開できます。
このメソッドはコマンド データをすぐに返すことによって即時フィードバックを提供するため、コマンドの処理を待たずに続行できます。その後、コマンドはバックグラウンドで非同期に処理され、処理中もアプリケーションの応答性が維持されます。
戻り値: 成功時はPromise<CommandModel>を返します。コマンドがdirtyでない場合(既存のコマンドと比較して変更が検出されない場合)はPromise<null>を返します。
たとえば、次のように新しい cat コマンドを発行できます。
import {
generateId,
getCommandSource,
VERSION_FIRST,
} from "@mbc-cqrs-serverless/core";
// class CatCommandDto extends CommandDto {}
const catCommand = new CatCommandDto({
pk: catPk,
sk: catSk,
tenantCode,
id: generateId(catPk, catSk),
code,
type: "CAT",
name: attributes.name,
version: VERSION_FIRST,
attributes,
});
const commandSource = getCommandSource(
basename(__dirname),
this.constructor.name,
"createCatCommand"
);
const item = await this.commandService.publishAsync(catCommand, {
source: commandSource,
invokeContext,
});
async publishPartialUpdateAsync( input: CommandPartialInputModel, options: ICommandOptions): Promise<CommandModel>
この方法を使用すると、同じ pk および sk (主キー) 値を持つ前のコマンドに基づいて新しいコマンド データを作成できます。
publishAsync メソッドと同様に、このメソッドはコマンドの処理を待たずに、更新されたコマンド データをすぐに返します。
たとえば、猫の名前を更新したいとします。
import { generateId, getCommandSource } from "@mbc-cqrs-serverless/core";
// ...
const catCommand: CommandPartialInputModel = {
pk: catPk,
sk: catSk,
version: storedItem.version,
name: attributes.name,
};
const commandSource = getCommandSource(
basename(__dirname),
this.constructor.name,
"updateCatCommand"
);
const item = await this.commandService.publishPartialUpdateAsync(catCommand, {
source: commandSource,
invokeContext,
});
async publishSync( input: CommandInputModel, options: ICommandOptions): Promise<CommandModel | null>
このメソッドは、publishAsync メソッドに相当する同期メソッドとして機能します。つまり、コマンドが完全に処理されるまでコードの実行を停止します。これにより、コード内で以降の操作を続行する前にコマンドの結果を確実に受け取ることができます。
v1.2.0以降、publishSync()とpublishPartialUpdateSync()はコマンドに変更がない場合(no-op)にnullを返します。プロパティにアクセスする前に必ずnullチェックを行ってください:
const result = await this.commandService.publishSync(command, { invokeContext });
if (!result) return; // no-op: コマンドに変更なし、何も書き込まれていません
console.log(result.pk); // nullチェック後は安全
v1.1.4以降、publishSyncは非同期パイプラインと同等の完全な監査証跡を書き込みます:
syncMode: 'SYNC'マーカー付きの不変イベントがCommandテーブルに書き込まれます- Historyテーブルにデータが書き込まれ、完全なイベントソーシングの一致が実現されます
- コマンドライフサイクル:
publish_sync:STARTED→finish:FINISHED(エラー時はpublish_sync:FAILED) - コマンドに変更がない場合は
nullを返します(publishAsyncの動作と同一) - DynamoDB Streamフィルターが
syncMode=SYNCレコードを除外し、Step Functionsの二重実行を防止
v1.1.4以前は、publishSyncはStep Functionsのトリガーを避けるためCommandテーブルをバイパスしており、監査証跡の欠落とHistoryテーブルへの書き込みがない状態でした。
変更が検出されない場合はnullを返します(ダーティチェック最適化)。
例えば
import {
generateId,
getCommandSource,
VERSION_FIRST,
} from "@mbc-cqrs-serverless/core";
// class CatCommandDto extends CommandDto {}
const catCommand = new CatCommandDto({
pk: catPk,
sk: catSk,
tenantCode,
id: generateId(catPk, catSk),
code,
type: "CAT",
name: attributes.name,
version: VERSION_FIRST,
attributes,
});
const commandSource = getCommandSource(
basename(__dirname),
this.constructor.name,
"createCatCommandSync"
);
const item = await this.commandService.publishSync(catCommand, {
source: commandSource,
invokeContext,
});
async publishPartialUpdateSync( input: CommandPartialInputModel, options: ICommandOptions): Promise<CommandModel | null>
このメソッドは、publishPartialUpdateAsync メソッドの同期バージョンです。コマンドが処理されるまでコードの実行がブロックされます。
v1.2.0以降、このメソッドはコマンドに変更がない場合(no-op)にnullを返します。結果を必ずnullチェックしてください。詳細はpublishSync null returnを参照。
このメソッドでは、入力の version フィールドが既存アイテムの現在のバージョンと一致する必要があります。アイテムが見つからないかバージョンが一致しない場合、「The input is not a valid, item not found or version not match」というメッセージで BadRequestException がスローされます。
たとえば、猫の名前を更新したいとします。
import { generateId, getCommandSource } from "@mbc-cqrs-serverless/core";
// ...
const catCommand: CommandPartialInputModel = {
pk: catPk,
sk: catSk,
version: storedItem.version,
name: attributes.name,
};
const commandSource = getCommandSource(
basename(__dirname),
this.constructor.name,
"updateCatCommandSync"
);
const item = await this.commandService.publishPartialUpdateSync(catCommand, {
source: commandSource,
invokeContext,
});
async publish(input: CommandInputModel, options: ICommandOptions): Promise<CommandModel | null> 削除済み
このメソッドはv1.1.0で削除されました。代わりにpublishAsyncメソッドを使用してください。
たとえば、次のように新しい cat コマンドを発行できます。
import {
generateId,
getCommandSource,
VERSION_FIRST,
} from "@mbc-cqrs-serverless/core";
// class CatCommandDto extends CommandDto {}
const catCommand = new CatCommandDto({
pk: catPk,
sk: catSk,
tenantCode,
id: generateId(catPk, catSk),
code,
type: "CAT",
name: attributes.name,
version: VERSION_FIRST,
attributes,
});
const commandSource = getCommandSource(
basename(__dirname),
this.constructor.name,
"createCatCommand"
);
const item = await this.commandService.publish(catCommand, {
source: commandSource,
invokeContext,
});
このメソッドはコマンド データを返します。
async publishPartialUpdate( input: CommandPartialInputModel, options: ICommandOptions): Promise<CommandModel | null> 削除済み
このメソッドはv1.1.0で削除されました。代わりにpublishPartialUpdateAsyncメソッドを使用してください。
この方法では、以前のコマンドに基づいて新しいコマンド データを作成できます。
たとえば、猫の名前を更新したいとします。
import { generateId, getCommandSource } from "@mbc-cqrs-serverless/core";
// ...
const catCommand: CommandPartialInputModel = {
pk: catPk,
sk: catSk,
version: storedItem.version,
name: attributes.name,
};
const commandSource = getCommandSource(
basename(__dirname),
this.constructor.name,
"updateCatCommand"
);
const item = await this.commandService.publishPartialUpdate(catCommand, {
source: commandSource,
invokeContext,
});
このメソッドは更新されたコマンド データを返します。
async reSyncData(): Promise<void>
データ同期ハンドラーを再適用する場合は、このメソッドを使用できるように設計されています。次のように関数を呼び出すだけです。
await this.commandService.reSyncData();
async getItem(key: DetailKey): Promise<CommandModel>
プライマリキーでコマンドアイテムを取得します。ソートキーにバージョン区切り文字が含まれていない場合、自動的にgetLatestItemを呼び出して最新 バージョンを取得します。
import { DetailKey } from "@mbc-cqrs-serverless/core";
// Get a specific version of a command (特定バージョンのコマンドを取得)
const command = await this.commandService.getItem({
pk: "CAT#tenant1",
sk: "CAT#cat001@2", // Includes version number (バージョン番号を含む)
});
// If no version in sk, automatically gets latest version (skにバージョンがない場合、自動的に最新バージョンを取得)
const latestCommand = await this.commandService.getItem({
pk: "CAT#tenant1",
sk: "CAT#cat001",
});