データ移行パターン
このガイドでは、テナント間データ移行、スキーマ進化、インポートモジュールを使用した一括データ操作、ロールバック手順など、MBC CQRS Serverlessアプリケーションにおけるデータ移行戦略について説明します。
このガイドを使用するタイミング
以下の場合にこのガイドを使用してください:
- テナント間でデータを移行する
- 互換性を維持しながらデータスキーマを進化させる
- インポートモジュールで一括データ操作を実行する
- 失敗した移行のロールバック手順を実装する
- 移行プロセス中にデータを検証する
移行アーキテクチャの概要
┌───────────────────────── ────────────────────────────────────────────────┐
│ データ移行フロー │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ ソース │────>│ 変換 │────>│ ターゲット │ │
│ │ データ │ │ & 検証 │ │ データ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────┐ │ │
│ │ │ インポート │ │ │
│ │ │ モジュール │ │ │
│ └─────────>│ ストラテジー │<─────────────┘ │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ CommandSvc│────>│ DynamoDB │ │
│ │ (バージョン) │ │ (履歴) │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
テナントコード正規化マイグレーション
getUserContext()関数はtenantCodeを小文字に正規化します。既存のデータがパーティションキーに大文字のテナントコードを使用している場合、これはデータアクセスに影響する破壊 的変更です。
完全なマイグレーションチェックリストとステップバイステップの手順については、v1.1.0 マイグレーションガイドを参照してください。
影響の理解
フレームワークは大文字小文字を区別しないマッチングのためにテナントコードを小文字に正規化します:
// Before: Cognito stores uppercase (変更前:Cognitoには大文字で保存)
custom:tenant = "MY_TENANT"
// After: getUserContext() returns lowercase (変更後:getUserContext()は小文字を返す)
const userContext = getUserContext(ctx);
console.log(userContext.tenantCode); // "my_tenant"
// Partition key generation uses lowercase (パーティションキー生成は小文字を使用)
const pk = `PRODUCT#${userContext.tenantCode}`; // "PRODUCT#my_tenant"
問題: 既存のDynamoDBデータに大文字のテナントコードを持つパーティションキー(例:PRODUCT#MY_TENANT)がある場合、正規化された小文字のテナントコードを使用したクエリではそのデータを見つけることができません。
マイグレーション戦略1:DynamoDBデータの更新
既存のDynamoDBデータを移行して、パーティションキーで小文字のテナントコードを使用するようにします。
// migration/tenant-normalization-migration.service.ts
import { Injectable, Logger } from '@nestjs/common';
import {
DynamoDbService,
CommandService,
KEY_SEPARATOR,
generateId,
getTenantCode,
IInvoke,
} from '@mbc-cqrs-serverless/core';
@Injectable()
export class TenantNormalizationMigrationService {
private readonly logger = new Logger(TenantNormalizationMigrationService.name);
constructor(
private readonly dynamoDbService: DynamoDbService,
private readonly commandService: CommandService,
) {}
/**
* Migrate all entities of a type to lowercase tenant codes (タイプのすべてのエンティティを小文字のテナントコードに移行)
*/
async migrateEntityType(
tableName: string,
entityPrefix: string,
invokeContext: IInvoke,
): Promise<{ migrated: number; errors: string[] }> {
let migrated = 0;
const errors: string[] = [];
// Scan for items with uppercase tenant codes (大文字のテナントコードを持つアイテムをスキャン)
const items = await this.scanForUppercaseTenants(tableName, entityPrefix);
for (const item of items) {
try {
const oldTenantCode = getTenantCode(item.pk);
const newTenantCode = oldTenantCode?.toLowerCase();
if (!newTenantCode || oldTenantCode === newTenantCode) {
continue; // Already lowercase or no tenant code (すでに小文字またはテナントコードなし)
}
// Create new record with lowercase tenant code (小文字のテナントコードで新しいレコードを作成)
const newPk = `${entityPrefix}${KEY_SEPARATOR}${newTenantCode}`;
const newId = generateId(newPk, item.sk);
await this.commandService.publishSync({
pk: newPk,
sk: item.sk,
id: newId,
tenantCode: newTenantCode,
code: item.code,
name: item.name,
type: item.type,
version: 0, // New entity (新しいエンティティ)
attributes: {
...item.attributes,
_migratedFrom: item.id,
_migrationReason: 'tenant_code_normalization',
_migratedAt: new Date().toISOString(),
},
}, { invokeContext });
// Mark old record as migrated (soft delete) (古いレコードを移行済みとしてマーク(ソフト削除))
await this.commandService.publishPartialUpdateSync({
pk: item.pk,
sk: item.sk,
version: item.version,
isDeleted: true,
attributes: {
...item.attributes,
_migratedTo: newId,
},
}, { invokeContext });
migrated++;
this.logger.log(`Migrated ${item.id} to ${newId}`);
} catch (error) {
errors.push(`Failed to migrate ${item.id}: ${error.message}`);
this.logger.error(`Migration error for ${item.id}`, error);
}
}
return { migrated, errors };
}
private async scanForUppercaseTenants(
tableName: string,
entityPrefix: string,
): Promise<any[]> {
// Implement scanning logic to find items with uppercase tenant codes (大文字のテナントコードを持つアイテムを見つけるスキャンロジックを実装)
// Filter for PKs that start with entityPrefix and contain uppercase letters (entityPrefixで始まり大文字を含むPKをフィルタリング)
return [];
}
}
マイグレーション戦略2:Cognitoユーザー属性の更新
Cognitoユーザー属性を小文字のテナントコードを使用するように更新します。このアプローチはデータ移行を回避しますが、Cognito管理者アクセスが必要です。
// migration/cognito-tenant-migration.service.ts
import { Injectable, Logger } from '@nestjs/common';
import {
CognitoIdentityProviderClient,
ListUsersCommand,
AdminUpdateUserAttributesCommand,
} from '@aws-sdk/client-cognito-identity-provider';
@Injectable()
export class CognitoTenantMigrationService {
private readonly logger = new Logger(CognitoTenantMigrationService.name);
private readonly cognitoClient: CognitoIdentityProviderClient;
constructor() {
this.cognitoClient = new CognitoIdentityProviderClient({});
}
/**
* Migrate all users' custom:tenant to lowercase (すべてのユーザーのcustom:tenantを小文字に移行)
*/
async migrateAllUsers(userPoolId: string): Promise<{
migrated: number;
errors: string[];
}> {
let migrated = 0;
const errors: string[] = [];
let paginationToken: string | undefined;
do {
const listResponse = await this.cognitoClient.send(
new ListUsersCommand({
UserPoolId: userPoolId,
PaginationToken: paginationToken,
}),
);
for (const user of listResponse.Users ?? []) {
try {
const tenantAttr = user.Attributes?.find(
(a) => a.Name === 'custom:tenant',
);
const rolesAttr = user.Attributes?.find(
(a) => a.Name === 'custom:roles',
);
if (!tenantAttr?.Value) continue;
const oldTenant = tenantAttr.Value;
const newTenant = oldTenant.toLowerCase();
if (oldTenant === newTenant) continue; // Already lowercase (すでに小文字)
// Update tenant attribute (テナント属性を更新)
const attributes = [
{ Name: 'custom:tenant', Value: newTenant },
];
// Update roles if they contain tenant references (テナント参照が含まれている場合はロールを更新)
if (rolesAttr?.Value) {
const roles = JSON.parse(rolesAttr.Value);
const updatedRoles = roles.map((r: any) => ({
...r,
tenant: (r.tenant || '').toLowerCase(),
}));
attributes.push({
Name: 'custom:roles',
Value: JSON.stringify(updatedRoles),
});
}
await this.cognitoClient.send(
new AdminUpdateUserAttributesCommand({
UserPoolId: userPoolId,
Username: user.Username,
UserAttributes: attributes,
}),
);
migrated++;
this.logger.log(`Migrated user ${user.Username}`);
} catch (error) {
errors.push(`Failed to migrate ${user.Username}: ${error.message}`);
}
}
paginationToken = listResponse.PaginationToken;
} while (paginationToken);
return { migrated, errors };
}
}
マイグレーシ ョン戦略3:デュアルリードアプローチ
段階的な移行のために、小文字と大文字の両方のテナントコードを試すデュアルリードアプローチを実装します。
// services/tenant-compatible-data.service.ts
import { Injectable } from '@nestjs/common';
import {
DataService,
KEY_SEPARATOR,
getTenantCode,
} from '@mbc-cqrs-serverless/core';
@Injectable()
export class TenantCompatibleDataService {
constructor(private readonly dataService: DataService) {}
/**
* Get item with fallback to uppercase tenant code (大文字テナントコードへのフォールバック付きでアイテムを取得)
*/
async getItemWithFallback(
entityPrefix: string,
tenantCode: string,
sk: string,
): Promise<any | null> {
// Try lowercase first (new format) (まず小文字を試す(新形式))
const lowercasePk = `${entityPrefix}${KEY_SEPARATOR}${tenantCode.toLowerCase()}`;
let item = await this.dataService.getItem({ pk: lowercasePk, sk });
if (item) {
return item;
}
// Fallback to uppercase (legacy format) (大文字にフォールバック(レガシー形式))
const uppercasePk = `${entityPrefix}${KEY_SEPARATOR}${tenantCode.toUpperCase()}`;
item = await this.dataService.getItem({ pk: uppercasePk, sk });
if (item) {
// Log for tracking migration progress (移行進捗追跡のためログ)
console.warn(
`Found legacy uppercase data: ${uppercasePk}#${sk}. Consider migrating.`,
);
}
return item;
}
/**
* List items with fallback to uppercase tenant code (大文字テナントコードへのフォールバック付きでアイテムをリスト)
*/
async listItemsWithFallback(
entityPrefix: string,
tenantCode: string,
): Promise<any[]> {
const lowercasePk = `${entityPrefix}${KEY_SEPARATOR}${tenantCode.toLowerCase()}`;
const uppercasePk = `${entityPrefix}${KEY_SEPARATOR}${tenantCode.toUpperCase()}`;
// Query both partitions (両方のパーティションをクエリ)
const [lowercaseItems, uppercaseItems] = await Promise.all([
this.dataService.listItemsByPk(lowercasePk),
this.dataService.listItemsByPk(uppercasePk),
]);
// Merge results, preferring lowercase (newer) data (結果をマージし、小文字(新しい)データを優先)
const itemMap = new Map<string, any>();
for (const item of uppercaseItems.items) {
itemMap.set(item.sk, item);
}
for (const item of lowercaseItems.items) {
itemMap.set(item.sk, item); // Overwrites uppercase if exists (大文字が存在する場合は上書き)
}
return Array.from(itemMap.values());
}
}
マイグレーションチェックリスト
小文字のテナントコードに移行する際は、このチェックリストに従ってください:
- 移行前にすべてのデータをバックアップ
- 影響を受けるテーブルを特定 - PKの大文字テナントコードをスキャン
- マイグレーション戦略を選択:
- 戦略1:DynamoDBデータを更新(クリーンな移行に推奨)
- 戦略2:Cognito属性を更新(データがすでに小文字の場合)
- 戦略3:デュアルリード(最小限のダウンタイムで段階的に移行)
- 非本番環境で移行をテスト
- トラフィックの少ない時間帯に移行を実行
- 移行後にデータアクセスを検証
- RDS同期を使用している場合はRDSデータを更新(tenantCodeカラム)
- 移行成功を確認後、レガシーデータを削除
RDSマイグレーション
RDS同期を使用している場合は、tenantCodeカラムも更新する必要があります:
-- Update tenantCode to lowercase in RDS (RDSでtenantCodeを小文字に更新)
UPDATE your_table
SET tenant_code = LOWER(tenant_code)
WHERE tenant_code != LOWER(tenant_code);
検証スクリプト
移行後、すべてのデータがアクセス可能であることを検証します:
// scripts/verify-tenant-migration.ts
async function verifyMigration(
dataService: DataService,
entityPrefix: string,
tenantCode: string,
): Promise<{ success: boolean; issues: string[] }> {
const issues: string[] = [];
// Check lowercase data exists (小文字データが存在するか確認)
const pk = `${entityPrefix}#${tenantCode.toLowerCase()}`;
const items = await dataService.listItemsByPk(pk);
if (items.items.length === 0) {
issues.push(`No items found for ${pk}`);
}
// Check no uppercase data remains (大文字データが残っていないか確認)
const uppercasePk = `${entityPrefix}#${tenantCode.toUpperCase()}`;
const legacyItems = await dataService.listItemsByPk(uppercasePk);
if (legacyItems.items.length > 0) {
issues.push(
`Found ${legacyItems.items.length} legacy items in ${uppercasePk}`,
);
}
return {
success: issues.length === 0,
issues,
};
}
テナント間データ移行
パターン1:テナント間でデータをコピー
CommandServiceを使用してあるテナントから別のテナントにデータをコピーします。これにより、ターゲットテナントでイベントソーシング履歴が保持されます。
// migration/tenant-migration.service.ts
import { Injectable, Logger } from '@nestjs/common';
import {
CommandService,
DataService,
KEY_SEPARATOR,
generateId,
IInvoke,
} from '@mbc-cqrs-serverless/core';
@Injectable()
export class TenantMigrationService {
private readonly logger = new Logger(TenantMigrationService.name);
constructor(
private readonly commandService: CommandService,
private readonly dataService: DataService,
) {}
/**
* Copy all entities of a type from source to target tenant (ソースからターゲットテナントにすべてのエンティティをコピー)
*/
async copyEntities(
entityType: string,
sourceTenantCode: string,
targetTenantCode: string,
invokeContext: IInvoke,
): Promise<{ copied: number; errors: string[] }> {
const sourcePk = `${entityType}${KEY_SEPARATOR}${sourceTenantCode}`;
const sourceData = await this.dataService.listItemsByPk(sourcePk);
let copied = 0;
const errors: string[] = [];
for (const item of sourceData.items) {
try {
// Create new keys for target tenant (ターゲットテナント用の新しいキーを作成)
const targetPk = `${entityType}${KEY_SEPARATOR}${targetTenantCode}`;
const targetId = generateId(targetPk, item.sk);
await this.commandService.publishSync({
pk: targetPk,
sk: item.sk,
id: targetId,
tenantCode: targetTenantCode,
code: item.code,
name: item.name,
type: item.type,
attributes: item.attributes,
// Track migration source (移行ソースを追跡)
metadata: {
migratedFrom: item.id,
migratedAt: new Date().toISOString(),
sourceTenant: sourceTenantCode,
},
}, { invokeContext });
copied++;
} catch (error) {
errors.push(`Failed to copy ${item.id}: ${error.message}`);
this.logger.error(`Migration error for ${item.id}`, error);
}
}
this.logger.log(`Copied ${copied} ${entityType} items from ${sourceTenantCode} to ${targetTenantCode}`);
return { copied, errors };
}
}
パターン2:インポートモジュールを使用した一括移行
変換サポート付きの大規模なテナント間移行にインポートモジュールを使用します。
// migration/bulk-migration.strategy.ts
import { Injectable } from '@nestjs/common';
import { KEY_SEPARATOR, generateId } from '@mbc-cqrs-serverless/core';
import { BaseImportStrategy } from '@mbc-cqrs-serverless/import';
export interface MigrationInput {
sourcePk: string;
sourceSk: string;
code: string;
name: string;
attributes: Record<string, any>;
targetTenantCode: string;
}
export interface MigrationDto {
pk: string;
sk: string;
id: string;
code: string;
name: string;
tenantCode: string;
type: string;
attributes: Record<string, any>;
}
@Injectable()
export class BulkMigrationImportStrategy
extends BaseImportStrategy<MigrationInput, MigrationDto>
{
/**
* Transform source data to target tenant format (ソースデータをターゲットテナント形式に変換)
*/
async transform(input: MigrationInput): Promise<MigrationDto> {
const { targetTenantCode } = input;
// Extract entity type from source PK (ソースPKからエンティティタイプを抽出)
const pkParts = input.sourcePk.split(KEY_SEPARATOR);
const entityType = pkParts[0];
// Build target keys (ターゲットキーを構築)
const targetPk = `${entityType}${KEY_SEPARATOR}${targetTenantCode}`;
const targetId = generateId(targetPk, input.sourceSk);
return {
pk: targetPk,
sk: input.sourceSk,
id: targetId,
code: input.code,
name: input.name,
tenantCode: targetTenantCode,
type: entityType,
attributes: {
...input.attributes,
// Add migration metadata (移行メタデータを追加)
_migrated: true,
_sourceKey: `${input.sourcePk}#${input.sourceSk}`,
},
};
}
}
スキーマ進化戦略
戦略1:後方互換性のある変更
デフォルト値を持つ新しいフィールドを追加します。既存のデータは移行なしで引き続き動作します。
// Backward compatible attribute evolution (後方互換性のある属性進化)
interface ProductAttributesV1 {
name: string;
price: number;
}
interface ProductAttributesV2 extends ProductAttributesV1 {
category?: string; // New optional field (新しいオプションフィールド)
tags?: string[]; // New optional field (新しいオプションフィールド)
description?: string; // New optional field (新しいオプションフィールド)
}
// product/product.service.ts
@Injectable()
export class ProductService {
/**
* Get product with schema version handling (スキーマバージョン処理を含む製品を取得)
*/
async getProduct(key: DetailKey): Promise<ProductDataEntity> {
const product = await this.dataService.getItem(key);
// Apply defaults for missing fields (欠落フィールドにデフォルトを適用)
return {
...product,
attributes: {
...product.attributes,
category: product.attributes.category ?? 'uncategorized',
tags: product.attributes.tags ?? [],
description: product.attributes.description ?? '',
},
};
}
}