Data Sync Handlerの実装例
このガイドでは、DynamoDB(コマンドソース)からRDS(クエリデータベース)にデータを自動的に同期するData Sync Handlerの実装方法を説明します。これはCQRS読み取りモデルを可能にする中核的なメカニズムです。
こ のガイドを使用するタイミング
以下の場合にこのガイドを使用してください:
- 複雑なクエリのためにエンティティデータをDynamoDBからMySQL/PostgreSQLに同期する
- ネストされたJSON属性をリレーショナルカラムに変換する
- 同じDynamoDBテーブル内の異なるレコードタイプを処理する
- 親子関係(Order、OrderItem)を個別に処理する
このパターンが解決する問題
| 問題 | 解決策 |
|---|---|
| DynamoDBはJOINや複雑なフィルターができない | SQLクエリのためにデータをRDSに同期 |
| SKのバージョンサフィックスが重複レコードを引き起こす | upsert前にremoveSortKeyVersion()を使用 |
| 異なるレコードタイプは異なるRDSテーブルが必要 | ハンドラーでSKプレフィックスによりフィルター |
| JSON属性を検索可能なカラムにする必要がある | 属性を個別のRDSカラムにマッピング |
基本構造
すべてのData Sync Handlerは以下の基本構造に従います:
import { CommandModel, IDataSyncHandler, removeSortKeyVersion } from "@mbc-cqrs-serverless/core";
import { Injectable, Logger } from "@nestjs/common";
import { PrismaService } from "src/prisma";
@Injectable()
export class EntityDataSyncRdsHandler implements IDataSyncHandler {
private readonly logger = new Logger(EntityDataSyncRdsHandler.name);
constructor(private readonly prismaService: PrismaService) {}
async up(cmd: CommandModel): Promise<any> {
// Sync data to RDS
}
async down(cmd: CommandModel): Promise<any> {
// Optional: Handle rollback (usually just logs)
this.logger.debug(cmd);
}
}
例1: シンプルなエンティティ同期
ユースケース: 検索とフィルタリングを可能にする製品同期
シナリオ: DynamoDBに保存された製品をカテゴリ、価格帯、テキストで検索可能にする必要がある。
解決策: RDSに同期し、効率的なクエリのために属性をインデックス付きカラムにマッピングする。
import {
CommandModel,
IDataSyncHandler,
removeSortKeyVersion
} from "@mbc-cqrs-serverless/core";
import { Injectable, Logger } from "@nestjs/common";
import { PrismaService } from "src/prisma";
interface ProductAttributes {
name: string;
description: string;
price: number;
category: string;
inStock: boolean;
}
@Injectable()
export class ProductDataSyncRdsHandler implements IDataSyncHandler {
private readonly logger = new Logger(ProductDataSyncRdsHandler.name);
constructor(private readonly prismaService: PrismaService) {}
async up(cmd: CommandModel): Promise<any> {
// Remove version suffix from sort key (e.g., "PROD001@1" -> "PROD001")
const sk = removeSortKeyVersion(cmd.sk);
const attrs = cmd.attributes as ProductAttributes;
await this.prismaService.product.upsert({
where: { id: cmd.id },
update: {
pk: cmd.pk,
sk: sk,
name: cmd.name,
code: cmd.code,
version: cmd.version,
tenantCode: cmd.tenantCode,
// Map attributes to columns
description: attrs.description,
price: attrs.price,
category: attrs.category,
inStock: attrs.inStock,
// Audit fields
isDeleted: cmd.isDeleted ?? false,
createdAt: cmd.createdAt,
createdBy: cmd.createdBy,
updatedAt: cmd.updatedAt,
updatedBy: cmd.updatedBy,
},
create: {
id: cmd.id,
pk: cmd.pk,
sk: sk,
// Also store original keys with version for reference
cpk: cmd.pk,
csk: cmd.sk,
name: cmd.name,
code: cmd.code,
version: cmd.version,
tenantCode: cmd.tenantCode,
description: attrs.description,
price: attrs.price,
category: attrs.category,
inStock: attrs.inStock,
isDeleted: cmd.isDeleted ?? false,
createdAt: cmd.createdAt,
createdBy: cmd.createdBy,
updatedAt: cmd.updatedAt,
updatedBy: cmd.updatedBy,
},
});
}
async down(cmd: CommandModel): Promise<any> {
this.logger.debug(cmd);
}
}