Service実装パターン
このガイドでは、MBC CQRS ServerlessでCRUD操作を処理するサービスクラスの実装方法を説明します。サービスはビジネスロジックの中核であり、コントローラー、コマンド、データアクセスを調整します。
このガイドを使用するタイミング
以下が必要な場合にこのガイドを使用してください:
- 新しいドメインエンティティのサービスレイヤーを構築する
- 作成、読み取り、更新、削除(CRUD)操作を実装する
- マルチテナントデータの分離を処理する
- 並行更新のための楽観的ロックを使用する
- 大量データ処理のためのバッチ操作を実装する
このパターンが解決する問題
| 問題 | 解決策 |
|---|---|
| データベースへの直接アクセスはCQRSパターンをバイパスする | 書き込みにはCommandService、読み取りにはDataServiceを使用する |
| データ変更の監査証跡がない | ユーザーとタイムスタンプを記録するためにinvokeContextを渡す |
| 並行更新が互いを上書きする | 楽観的ロックのためにversionフィールドを使用する |
| 同期処理による遅いレスポンス | 非ブロッキングコマンド発行のためにpublishAsyncを使用する |
基本的なService構造
一般的なサービスは、書き込み操作にCommandService、読み取り操作にDataServiceの両方を使用します:
import {
CommandService,
DataService,
generateId,
getUserContext,
VERSION_FIRST,
KEY_SEPARATOR,
} from "@mbc-cqrs-serverless/core";
import { Injectable } from "@nestjs/common";
import { ulid } from "ulid";
import { PrismaService } from "src/prisma";
import { ProductCommandDto } from "./dto/product-command.dto";
import { ProductDataEntity } from "./entity/product-data.entity";
import { CreateProductDto } from "./dto/create-product.dto";
import { UpdateProductDto } from "./dto/update-product.dto";
import { IInvoke } from "./interfaces";
const PRODUCT_PK_PREFIX = "PRODUCT";
@Injectable()
export class ProductService {
constructor(
private readonly commandService: CommandService,
private readonly dataService: DataService,
private readonly prismaService: PrismaService,
) {}
// CRUD methods will be implemented below
}
Create操作
ユースケース:新しい商品を作成する
シナリオ:ユーザーがカタログに新しい商品を追加するフォームを送信する。
フロー:ControllerがCreateProductDtoを受信 → Serviceがキーを生成 → CommandがDynamoDBに発行 → データがRDSに同期。
async create(
createDto: CreateProductDto,
opts: { invokeContext: IInvoke },
): Promise<ProductDataEntity> {
// Get tenant context from the invoke context
const { tenantCode } = getUserContext(opts.invokeContext);
// Generate PK and SK
const pk = `${PRODUCT_PK_PREFIX}${KEY_SEPARATOR}${tenantCode}`;
const sk = ulid(); // Use ULID for sortable unique ID
const id = generateId(pk, sk);
// Create command DTO
const command = new ProductCommandDto({
pk,
sk,
id,
tenantCode,
code: sk,
type: "PRODUCT",
name: createDto.name,
version: VERSION_FIRST,
attributes: {
description: createDto.description,
price: createDto.price,
category: createDto.category,
inStock: createDto.inStock ?? true,
},
});
// Publish command (async - returns immediately)
const item = await this.commandService.publishAsync(command, {
invokeContext: opts.invokeContext,
});
return new ProductDataEntity(item);
}
Read操作
キーによる単一取得
ユースケース:商品詳細ページを取得する
シナリオ:ユーザーが商品詳細ページに移動し、完全な商品データが必要。
使用するタイミング:pkとskがある場合の単一アイテム検索。
async findOne(
detailDto: { pk: string; sk: string },
): Promise<ProductDataEntity> {
const item = await this.dataService.getItem(detailDto);
return new ProductDataEntity(item);
}
ページネーション付き全件取得(RDSから)
ユースケース:フィルタリング付き商品リスト
シナリオ:ユーザーがカテゴリや検索でフィルタリングできるページネーション付き商品リストを表示する。
RDSを使用する理由:DynamoDBは複雑なクエリに最適化されていません。フィルタリングと全文検索にはPrisma/RDSを使用します。
async findAll(
searchDto: {
tenantCode: string;
category?: string;
inStock?: boolean;
page?: number;
limit?: number;
},
): Promise<{ items: ProductDataEntity[]; total: number }> {
const page = searchDto.page ?? 1;
const limit = searchDto.limit ?? 20;
const skip = (page - 1) * limit;
// Build where clause
const where: any = {
tenantCode: searchDto.tenantCode,
isDeleted: false,
};
if (searchDto.category) {
where.category = searchDto.category;
}
if (searchDto.inStock !== undefined) {
where.inStock = searchDto.inStock;
}
// Execute parallel queries for count and data
const [total, items] = await Promise.all([
this.prismaService.product.count({ where }),
this.prismaService.product.findMany({
where,
take: limit,
skip,
orderBy: { createdAt: "desc" },
}),
]);
return {
total,
items: items.map((item) => new ProductDataEntity(item)),
};
}
Update操作
ユースケース:商品詳細を編集する
シナリオ:ユーザーが編集フォームで商品名や価格を更新する。
重要:楽観的ロックを有効にし、並行更新の競合を防ぐためにversionフィールドを含めてください。
async update(
detailDto: { pk: string; sk: string },
updateDto: UpdateProductDto,
opts: { invokeContext: IInvoke },
): Promise<ProductDataEntity> {
// First, get the existing item
const existing = await this.dataService.getItem(detailDto);
if (!existing) {
throw new Error("Product not found");
}
// Merge existing attributes with updates
const updatedAttributes = {
...existing.attributes,
...updateDto.attributes,
};
// Create partial update command
const command: CommandPartialInputModel = {
pk: existing.pk,
sk: existing.sk,
version: existing.version, // Required for optimistic locking
name: updateDto.name ?? existing.name,
attributes: updatedAttributes,
};
// Publish partial update
const item = await this.commandService.publishPartialUpdateAsync(command, {
invokeContext: opts.invokeContext,
});
return new ProductDataEntity(item);
}
Delete操作(論理削除)
ユースケース:カタログから商品を削除する
シナリオ:管理者が廃止商品を削除する。
論理削除の理由:データは物理的に削除されるのではなく、削除済み(isDeleted=true)としてマークされ、監査履歴が保持されます。
async remove(
detailDto: { pk: string; sk: string },
opts: { invokeContext: IInvoke },
): Promise<ProductDataEntity> {
// Get existing item
const existing = await this.dataService.getItem(detailDto);
if (!existing) {
throw new Error("Product not found");
}
// Create soft delete command
const command: CommandPartialInputModel = {
pk: existing.pk,
sk: existing.sk,
version: existing.version,
isDeleted: true,
};
const item = await this.commandService.publishPartialUpdateAsync(command, {
invokeContext: opts.invokeContext,
});
return new ProductDataEntity(item);
}
完全なService例
完全なサービス実装を示します:
import {
CommandPartialInputModel,
CommandService,
DataService,
generateId,
getUserContext,
VERSION_FIRST,
KEY_SEPARATOR,
IInvoke,
} from "@mbc-cqrs-serverless/core";
import { Injectable, NotFoundException } from "@nestjs/common";
import { ulid } from "ulid";
import { PrismaService } from "src/prisma";
import { ProductCommandDto } from "./dto/product-command.dto";
import { ProductDataEntity } from "./entity/product-data.entity";
import { ProductListEntity } from "./entity/product-list.entity";
import { CreateProductDto } from "./dto/create-product.dto";
import { UpdateProductDto } from "./dto/update-product.dto";
import { SearchProductDto } from "./dto/search-product.dto";
import { DetailDto } from "./dto/detail.dto";
const PRODUCT_PK_PREFIX = "PRODUCT";
@Injectable()
export class ProductService {
constructor(
private readonly commandService: CommandService,
private readonly dataService: DataService,
private readonly prismaService: PrismaService,
) {}
/**
* Create a new product
*/
async create(
createDto: CreateProductDto,
opts: { invokeContext: IInvoke },
): Promise<ProductDataEntity> {
const { tenantCode } = getUserContext(opts.invokeContext);
const pk = `${PRODUCT_PK_PREFIX}${KEY_SEPARATOR}${tenantCode}`;
const sk = ulid();
const id = generateId(pk, sk);
const command = new ProductCommandDto({
pk,
sk,
id,
tenantCode,
code: sk,
type: "PRODUCT",
name: createDto.name,
version: VERSION_FIRST,
attributes: {
description: createDto.description,
price: createDto.price,
category: createDto.category,
inStock: createDto.inStock ?? true,
},
});
const item = await this.commandService.publishAsync(command, {
invokeContext: opts.invokeContext,
});
return new ProductDataEntity(item);
}
/**
* Find all products with filtering and pagination
*/
async findAll(searchDto: SearchProductDto): Promise<ProductListEntity> {
const page = searchDto.page ?? 1;
const limit = searchDto.limit ?? 20;
const skip = (page - 1) * limit;
const where: any = {
tenantCode: searchDto.tenantCode,
isDeleted: false,
};
if (searchDto.category) {
where.category = searchDto.category;
}
if (searchDto.inStock !== undefined) {
where.inStock = searchDto.inStock;
}
if (searchDto.search) {
where.OR = [
{ name: { contains: searchDto.search}},
{ description: { contains: searchDto.search}},
];
}
const [total, items] = await Promise.all([
this.prismaService.product.count({ where }),
this.prismaService.product.findMany({
where,
take: limit,
skip,
orderBy: { createdAt: "desc" },
}),
]);
return new ProductListEntity({
total,
items: items.map((item) => new ProductDataEntity(item)),
});
}
/**
* Find one product by key
*/
async findOne(detailDto: DetailDto): Promise<ProductDataEntity> {
const item = await this.dataService.getItem(detailDto);
if (!item) {
throw new NotFoundException("Product not found");
}
return new ProductDataEntity(item);
}
/**
* Update a product
*/
async update(
detailDto: DetailDto,
updateDto: UpdateProductDto,
opts: { invokeContext: IInvoke },
): Promise<ProductDataEntity> {
const existing = await this.dataService.getItem(detailDto);
if (!existing) {
throw new NotFoundException("Product not found");
}
const command: CommandPartialInputModel = {
pk: existing.pk,
sk: existing.sk,
version: existing.version,
name: updateDto.name ?? existing.name,
attributes: {
...existing.attributes,
...updateDto.attributes,
},
};
const item = await this.commandService.publishPartialUpdateAsync(command, {
invokeContext: opts.invokeContext,
});
return new ProductDataEntity(item);
}
/**
* Soft delete a product
*/
async remove(
detailDto: DetailDto,
opts: { invokeContext: IInvoke },
): Promise<ProductDataEntity> {
const existing = await this.dataService.getItem(detailDto);
if (!existing) {
throw new NotFoundException("Product not found");
}
const command: CommandPartialInputModel = {
pk: existing.pk,
sk: existing.sk,
version: existing.version,
isDeleted: true,
};
const item = await this.commandService.publishPartialUpdateAsync(command, {
invokeContext: opts.invokeContext,
});
return new ProductDataEntity(item);
}
}
バッチ操作
ユースケース:複数の商品をインポートする
シナリオ:管理者がインポートする複数の商品を含むCSVファイルをアップロードする。
解決策:パフォーマンス向上のためにPromise.allを使用してアイテムを並列処理する。
async createBatch(
items: CreateProductDto[],
opts: { invokeContext: IInvoke },
): Promise<ProductDataEntity[]> {
const { tenantCode } = getUserContext(opts.invokeContext);
const pk = `${PRODUCT_PK_PREFIX}${KEY_SEPARATOR}${tenantCode}`;
// Create all commands
const commands = items.map((item) => {
const sk = ulid();
return new ProductCommandDto({
pk,
sk,
id: generateId(pk, sk),
tenantCode,
code: sk,
type: "PRODUCT",
name: item.name,
version: VERSION_FIRST,
attributes: {
description: item.description,
price: item.price,
category: item.category,
inStock: item.inStock ?? true,
},
});
});
// Publish all commands in parallel
const results = await Promise.all(
commands.map((command) =>
this.commandService.publishAsync(command, {
invokeContext: opts.invokeContext,
}),
),
);
return results.map((item) => new ProductDataEntity(item));
}
チャンク化バッチ操作
ユースケース:大規模データ移行
シナリオ:レガシーシステムから数千件のレコードを移行する。
問題:一度に全て処理するとLambdaのタイムアウトやメモリの問題が発生する可能性がある。
解決策:Lambdaの制限内に収めるために100アイテムのチャンクで処理する。
async createLargeBatch(
items: CreateProductDto[],
opts: { invokeContext: IInvoke },
): Promise<ProductDataEntity[]> {
const { tenantCode } = getUserContext(opts.invokeContext);
const pk = `${PRODUCT_PK_PREFIX}${KEY_SEPARATOR}${tenantCode}`;
const chunkSize = 100;
const results: ProductDataEntity[] = [];
for (let i = 0; i < items.length; i += chunkSize) {
const chunk = items.slice(i, i + chunkSize);
const commands = chunk.map((item) => {
const sk = ulid();
return new ProductCommandDto({
pk,
sk,
id: generateId(pk, sk),
tenantCode,
code: sk,
type: "PRODUCT",
name: item.name,
version: VERSION_FIRST,
attributes: item,
});
});
const chunkResults = await Promise.all(
commands.map((command) =>
this.commandService.publishAsync(command, {
invokeContext: opts.invokeContext,
}),
),
);
results.push(...chunkResults.map((item) => new ProductDataEntity(item)));
}
return results;
}
Copy操作
ユースケース:商品を別のテナントにクローンする
シナリオ:テンプレート商品を新しいテナントにコピーする必要があるマルチテナントSaaS。
解決策:ソースエンティティを読み取り、異なるテナントのキーで新しいエンティティを作成する。
async copy(
sourceKey: { pk: string; sk: string },
targetTenantCode: string,
opts: { invokeContext: IInvoke },
): Promise<ProductDataEntity> {
// Get source item
const source = await this.dataService.getItem(sourceKey);
if (!source) {
throw new NotFoundException("Source product not found");
}
// Create new keys for target tenant
const pk = `${PRODUCT_PK_PREFIX}${KEY_SEPARATOR}${targetTenantCode}`;
const sk = ulid();
const id = generateId(pk, sk);
// Create command with source data
const command = new ProductCommandDto({
pk,
sk,
id,
tenantCode: targetTenantCode,
code: sk,
type: source.type,
name: source.name,
version: VERSION_FIRST,
attributes: source.attributes,
});
const item = await this.commandService.publishAsync(command, {
invokeContext: opts.invokeContext,
});
return new ProductDataEntity(item);
}