データ移行パターン
このガイドでは、テナント間データ移行、スキーマ進化、インポートモジュールを使用した一括データ操作、ロールバック手順など、MBC CQRS Serverlessアプリケーションにおけるデータ移行戦略について説明します。
このガイドを使用するタイミング
以下の場合にこのガイドを使用してください:
- テナント間でデータを移行する
- 互換性を維持しながらデータスキーマを進化させる
- インポートモジュールで一括データ操作を実行する
- 失敗した移行のロールバック手順を実装する
- 移行プロセス中にデータを検証する
移行アーキテクチャの概要
┌─────────────────────────────────────────────────────────────────────────┐
│ データ移行フロー │
├────────────────────────────────── ───────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ ソース │────>│ 変換 │────>│ ターゲット │ │
│ │ データ │ │ & 検証 │ │ データ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────┐ │ │
│ │ │ インポート │ │ │
│ │ │ モジュール │ │ │
│ └─────────>│ ストラテジー │<─────────────┘ │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ CommandSvc│────>│ DynamoDB │ │
│ │ (バージョン) │ │ (履歴) │ │
│ └─────────────┘ └─────────────┘ │
│ │
└────────────────────── ───────────────────────────────────────────────────┘
テナント間データ移行
パターン1:テナント間でデータをコピー
CommandServiceを使用してあるテナントから別のテナントにデータをコピーします。これにより、ターゲットテナントでイベントソーシング履歴が保持されます。
// migration/tenant-migration.service.ts
import { Injectable, Logger } from '@nestjs/common';
import {
CommandService,
DataService,
KEY_SEPARATOR,
generateId,
IInvoke,
} from '@mbc-cqrs-serverless/core';
@Injectable()
export class TenantMigrationService {
private readonly logger = new Logger(TenantMigrationService.name);
constructor(
private readonly commandService: CommandService,
private readonly dataService: DataService,
) {}
/**
* Copy all entities of a type from source to target tenant (ソースからターゲットテナントにすべてのエンティティをコピー)
*/
async copyEntities(
entityType: string,
sourceTenantCode: string,
targetTenantCode: string,
invokeContext: IInvoke,
): Promise<{ copied: number; errors: string[] }> {
const sourcePk = `${entityType}${KEY_SEPARATOR}${sourceTenantCode}`;
const sourceData = await this.dataService.listItemsByPk(sourcePk);
let copied = 0;
const errors: string[] = [];
for (const item of sourceData.items) {
try {
// Create new keys for target tenant (ターゲットテナント用の新しいキーを作成)
const targetPk = `${entityType}${KEY_SEPARATOR}${targetTenantCode}`;
const targetId = generateId(targetPk, item.sk);
await this.commandService.publishSync({
pk: targetPk,
sk: item.sk,
id: targetId,
tenantCode: targetTenantCode,
code: item.code,
name: item.name,
type: item.type,
attributes: item.attributes,
// Track migration source (移行ソースを追跡)
metadata: {
migratedFrom: item.id,
migratedAt: new Date().toISOString(),
sourceTenant: sourceTenantCode,
},
}, { invokeContext });
copied++;
} catch (error) {
errors.push(`Failed to copy ${item.id}: ${error.message}`);
this.logger.error(`Migration error for ${item.id}`, error);
}
}
this.logger.log(`Copied ${copied} ${entityType} items from ${sourceTenantCode} to ${targetTenantCode}`);
return { copied, errors };
}
}
パターン2:インポートモジュールを使用した一括移行
変換サポート付きの大規模なテナント間移行にインポートモジュールを使用します。
// migration/bulk-migration.strategy.ts
import { Injectable } from '@nestjs/common';
import { KEY_SEPARATOR, generateId } from '@mbc-cqrs-serverless/core';
import { BaseImportStrategy } from '@mbc-cqrs-serverless/import';
export interface MigrationInput {
sourcePk: string;
sourceSk: string;
code: string;
name: string;
attributes: Record<string, any>;
targetTenantCode: string;
}
export interface MigrationDto {
pk: string;
sk: string;
id: string;
code: string;
name: string;
tenantCode: string;
type: string;
attributes: Record<string, any>;
}
@Injectable()
export class BulkMigrationImportStrategy
extends BaseImportStrategy<MigrationInput, MigrationDto>
{
/**
* Transform source data to target tenant format (ソースデータをターゲットテナント形式に変換)
*/
async transform(input: MigrationInput): Promise<MigrationDto> {
const { targetTenantCode } = input;
// Extract entity type from source PK (ソースPKからエンティティタイプを抽出)
const pkParts = input.sourcePk.split(KEY_SEPARATOR);
const entityType = pkParts[0];
// Build target keys (ターゲットキーを構築)
const targetPk = `${entityType}${KEY_SEPARATOR}${targetTenantCode}`;
const targetId = generateId(targetPk, input.sourceSk);
return {
pk: targetPk,
sk: input.sourceSk,
id: targetId,
code: input.code,
name: input.name,
tenantCode: targetTenantCode,
type: entityType,
attributes: {
...input.attributes,
// Add migration metadata (移行メタデータを追加)
_migrated: true,
_sourceKey: `${input.sourcePk}#${input.sourceSk}`,
},
};
}
}
スキーマ進化戦略
戦略1:後方互換性のある変更
デフォルト値 を持つ新しいフィールドを追加します。既存のデータは移行なしで引き続き動作します。
// Backward compatible attribute evolution (後方互換性のある属性進化)
interface ProductAttributesV1 {
name: string;
price: number;
}
interface ProductAttributesV2 extends ProductAttributesV1 {
category?: string; // New optional field (新しいオプションフィールド)
tags?: string[]; // New optional field (新しいオプションフィールド)
description?: string; // New optional field (新しいオプションフィールド)
}
// product/product.service.ts
@Injectable()
export class ProductService {
/**
* Get product with schema version handling (スキーマバージョン処理を含む製品を取得)
*/
async getProduct(key: DetailKey): Promise<ProductDataEntity> {
const product = await this.dataService.getItem(key);
// Apply defaults for missing fields (欠落フィールドにデフォルトを適用)
return {
...product,
attributes: {
...product.attributes,
category: product.attributes.category ?? 'uncategorized',
tags: product.attributes.tags ?? [],
description: product.attributes.description ?? '',
},
};
}
}
戦略2:データ変換移行
インポートモジュールのストラテジーを使用して既存データを新しいスキーマ形式に変換します。
// migration/schema-migration.strategy.ts
import { Injectable } from '@nestjs/common';
import {
CommandService,
DataService,
DataModel,
} from '@mbc-cqrs-serverless/core';
import {
BaseProcessStrategy,
ComparisonResult,
ComparisonStatus,
} from '@mbc-cqrs-serverless/import';
interface OldProductAttributes {
productName: string; // Old field name (旧フィールド名)
unitPrice: number; // Old field name (旧フィールド名)
categoryCode: string; // Renamed field (名前変更されたフィールド)
}
interface NewProductAttributes {
name: string; // New field name (新フィールド名)
price: number; // New field name (新フィールド名)
category: string; // New field name (新フィールド名)
version: 'v2'; // Schema version marker (スキーマバージョンマーカー)
}
@Injectable()
export class SchemaTransformationStrategy
extends BaseProcessStrategy<DataModel, NewProductAttributes>
{
constructor(
private readonly commandService: CommandService,
private readonly dataService: DataService,
) {
super();
}
getCommandService(): CommandService {
return this.commandService;
}
/**
* Compare to detect schema version differences (スキーマバージョンの違いを検出するために比較)
*/
async compare(
dto: NewProductAttributes,
tenantCode: string,
): Promise<ComparisonResult<DataModel>> {
// Check if data needs transformation (データが変換を必要とするか確認)
const existing = await this.dataService.getItem({
pk: dto.pk,
sk: dto.sk,
});
if (!existing) {
return { status: ComparisonStatus.NOT_EXIST };
}
// Check schema version (スキーマバージョンを確認)
if (existing.attributes?.version !== 'v2') {
return { status: ComparisonStatus.CHANGED, existingData: existing };
}
return { status: ComparisonStatus.EQUAL };
}
/**
* Map old schema to new schema (旧スキーマを新スキーマにマッピング)
*/
async map(
status: ComparisonStatus,
dto: NewProductAttributes,
tenantCode: string,
existingData?: DataModel,
) {
if (status === ComparisonStatus.CHANGED && existingData) {
const oldAttrs = existingData.attributes as OldProductAttributes;
return {
pk: existingData.pk,
sk: existingData.sk,
version: existingData.version,
attributes: {
// Transform field names (フィールド名を変換)
name: oldAttrs.productName,
price: oldAttrs.unitPrice,
category: oldAttrs.categoryCode,
version: 'v2',
},
};
}
return { ...dto, version: 0 };
}
}