Data Migration Patterns
This guide covers data migration strategies in MBC CQRS Serverless applications, including cross-tenant data migration, schema evolution, bulk data operations using the Import module, and rollback procedures.
When to Use This Guide
Use this guide when you need to:
- Migrate data between tenants
- Evolve data schemas while maintaining compatibility
- Perform bulk data operations with the Import module
- Implement rollback procedures for failed migrations
- Validate data during migration processes
Migration Architecture Overview
┌─────────────────────────────────────────────────────────────────────────┐
│ Data Migration Flow │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Source │────>│ Transform │────>│ Target │ │
│ │ Data │ │ & Validate │ │ Data │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────┐ │ │
│ │ │ Import │ │ │
│ │ │ Module │ │ │
│ └─────────>│ Strategy │<─────────────┘ │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ CommandSvc│────>│ DynamoDB │ │
│ │ (Version) │ │ (History) │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Cross-Tenant Data Migration
Pattern 1: Copy Data Between Tenants
Copy data from one tenant to another using the CommandService. This preserves the event sourcing history in the target tenant.
// migration/tenant-migration.service.ts
import { Injectable, Logger } from '@nestjs/common';
import {
CommandService,
DataService,
KEY_SEPARATOR,
generateId,
IInvoke,
} from '@mbc-cqrs-serverless/core';
@Injectable()
export class TenantMigrationService {
private readonly logger = new Logger(TenantMigrationService.name);
constructor(
private readonly commandService: CommandService,
private readonly dataService: DataService,
) {}
/**
* Copy all entities of a type from source to target tenant
*/
async copyEntities(
entityType: string,
sourceTenantCode: string,
targetTenantCode: string,
invokeContext: IInvoke,
): Promise<{ copied: number; errors: string[] }> {
const sourcePk = `${entityType}${KEY_SEPARATOR}${sourceTenantCode}`;
const sourceData = await this.dataService.listItemsByPk(sourcePk);
let copied = 0;
const errors: string[] = [];
for (const item of sourceData.items) {
try {
// Create new keys for target tenant
const targetPk = `${entityType}${KEY_SEPARATOR}${targetTenantCode}`;
const targetId = generateId(targetPk, item.sk);
await this.commandService.publishSync({
pk: targetPk,
sk: item.sk,
id: targetId,
tenantCode: targetTenantCode,
code: item.code,
name: item.name,
type: item.type,
attributes: item.attributes,
// Track migration source
metadata: {
migratedFrom: item.id,
migratedAt: new Date().toISOString(),
sourceTenant: sourceTenantCode,
},
}, { invokeContext });
copied++;
} catch (error) {
errors.push(`Failed to copy ${item.id}: ${error.message}`);
this.logger.error(`Migration error for ${item.id}`, error);
}
}
this.logger.log(`Copied ${copied} ${entityType} items from ${sourceTenantCode} to ${targetTenantCode}`);
return { copied, errors };
}
}
Pattern 2: Bulk Migration with Import Module
Use the Import module for large-scale cross-tenant migrations with transformation support.
// migration/bulk-migration.strategy.ts
import { Injectable } from '@nestjs/common';
import { KEY_SEPARATOR, generateId } from '@mbc-cqrs-serverless/core';
import { BaseImportStrategy } from '@mbc-cqrs-serverless/import';
export interface MigrationInput {
sourcePk: string;
sourceSk: string;
code: string;
name: string;
attributes: Record<string, any>;
targetTenantCode: string;
}
export interface MigrationDto {
pk: string;
sk: string;
id: string;
code: string;
name: string;
tenantCode: string;
type: string;
attributes: Record<string, any>;
}
@Injectable()
export class BulkMigrationImportStrategy
extends BaseImportStrategy<MigrationInput, MigrationDto>
{
/**
* Transform source data to target tenant format
*/
async transform(input: MigrationInput): Promise<MigrationDto> {
const { targetTenantCode } = input;
// Extract entity type from source PK
const pkParts = input.sourcePk.split(KEY_SEPARATOR);
const entityType = pkParts[0];
// Build target keys
const targetPk = `${entityType}${KEY_SEPARATOR}${targetTenantCode}`;
const targetId = generateId(targetPk, input.sourceSk);
return {
pk: targetPk,
sk: input.sourceSk,
id: targetId,
code: input.code,
name: input.name,
tenantCode: targetTenantCode,
type: entityType,
attributes: {
...input.attributes,
// Add migration metadata
_migrated: true,
_sourceKey: `${input.sourcePk}#${input.sourceSk}`,
},
};
}
}
Schema Evolution Strategies
Strategy 1: Backward Compatible Changes
Add new fields with default values. Existing data continues to work without migration.
// Backward compatible attribute evolution
interface ProductAttributesV1 {
name: string;
price: number;
}
interface ProductAttributesV2 extends ProductAttributesV1 {
category?: string; // New optional field
tags?: string[]; // New optional field
description?: string; // New optional field
}
// product/product.service.ts
@Injectable()
export class ProductService {
/**
* Get product with schema version handling
*/
async getProduct(key: DetailKey): Promise<ProductDataEntity> {
const product = await this.dataService.getItem(key);
// Apply defaults for missing fields
return {
...product,
attributes: {
...product.attributes,
category: product.attributes.category ?? 'uncategorized',
tags: product.attributes.tags ?? [],
description: product.attributes.description ?? '',
},
};
}
}
Strategy 2: Data Transformation Migration
Transform existing data to a new schema format using the Import module strategies.
// migration/schema-migration.strategy.ts
import { Injectable } from '@nestjs/common';
import {
CommandService,
DataService,
DataModel,
} from '@mbc-cqrs-serverless/core';
import {
BaseProcessStrategy,
ComparisonResult,
ComparisonStatus,
} from '@mbc-cqrs-serverless/import';
interface OldProductAttributes {
productName: string; // Old field name
unitPrice: number; // Old field name
categoryCode: string; // Renamed field
}
interface NewProductAttributes {
name: string; // New field name
price: number; // New field name
category: string; // New field name
version: 'v2'; // Schema version marker
}
@Injectable()
export class SchemaTransformationStrategy
extends BaseProcessStrategy<DataModel, NewProductAttributes>
{
constructor(
private readonly commandService: CommandService,
private readonly dataService: DataService,
) {
super();
}
getCommandService(): CommandService {
return this.commandService;
}
/**
* Compare to detect schema version differences
*/
async compare(
dto: NewProductAttributes,
tenantCode: string,
): Promise<ComparisonResult<DataModel>> {
// Check if data needs transformation
const existing = await this.dataService.getItem({
pk: dto.pk,
sk: dto.sk,
});
if (!existing) {
return { status: ComparisonStatus.NOT_EXIST };
}
// Check schema version
if (existing.attributes?.version !== 'v2') {
return { status: ComparisonStatus.CHANGED, existingData: existing };
}
return { status: ComparisonStatus.EQUAL };
}
/**
* Map old schema to new schema
*/
async map(
status: ComparisonStatus,
dto: NewProductAttributes,
tenantCode: string,
existingData?: DataModel,
) {
if (status === ComparisonStatus.CHANGED && existingData) {
const oldAttrs = existingData.attributes as OldProductAttributes;
return {
pk: existingData.pk,
sk: existingData.sk,
version: existingData.version,
attributes: {
// Transform field names
name: oldAttrs.productName,
price: oldAttrs.unitPrice,
category: oldAttrs.categoryCode,
version: 'v2',
},
};
}
return { ...dto, version: 0 };
}
}
Strategy 3: Versioned Attributes Pattern
Store schema version in attributes for gradual migration.
// entity/versioned-entity.ts
export interface VersionedAttributes {
_schemaVersion: number;
[key: string]: any;
}
// migration/attribute-migrator.service.ts
@Injectable()
export class AttributeMigratorService {
private readonly migrations: Map<number, (attrs: any) => any> = new Map();
constructor() {
// Register migration functions
this.migrations.set(1, this.migrateV1ToV2.bind(this));
this.migrations.set(2, this.migrateV2ToV3.bind(this));
}
/**
* Apply all necessary migrations to bring attributes to current version
*/
migrate(attributes: VersionedAttributes, currentVersion: number): any {
let version = attributes._schemaVersion || 1;
let result = { ...attributes };
while (version < currentVersion) {
const migrationFn = this.migrations.get(version);
if (migrationFn) {
result = migrationFn(result);
version++;
} else {
throw new Error(`No migration found for version ${version}`);
}
}
result._schemaVersion = currentVersion;
return result;
}
private migrateV1ToV2(attrs: any): any {
return {
...attrs,
// V1 to V2: Split fullName into firstName and lastName
firstName: attrs.fullName?.split(' ')[0] ?? '',
lastName: attrs.fullName?.split(' ').slice(1).join(' ') ?? '',
};
}
private migrateV2ToV3(attrs: any): any {
return {
...attrs,
// V2 to V3: Add new required fields with defaults
createdAt: attrs.createdAt ?? new Date().toISOString(),
status: attrs.status ?? 'active',
};
}
}
Using Import Module for Bulk Operations
CSV-Based Bulk Migration
Export data to CSV, transform, and re-import using the Import module.
// migration/csv-migration.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { S3Service, DataService, KEY_SEPARATOR } from '@mbc-cqrs-serverless/core';
import { ProcessingMode } from '@mbc-cqrs-serverless/import';
import { PutObjectCommand } from '@aws-sdk/client-s3';
@Injectable()
export class CsvMigrationService {
private readonly logger = new Logger(CsvMigrationService.name);
constructor(
private readonly s3Service: S3Service,
private readonly dataService: DataService,
) {}
/**
* Export entities to CSV for migration
*/
async exportToMigrationCsv(
entityType: string,
tenantCode: string,
targetTenantCode: string,
): Promise<{ bucket: string; key: string }> {
const pk = `${entityType}${KEY_SEPARATOR}${tenantCode}`;
const data = await this.dataService.listItemsByPk(pk);
// Build CSV with migration columns
const headers = ['sourcePk', 'sourceSk', 'code', 'name', 'attributes', 'targetTenantCode'];
const rows = data.items.map(item => [
item.pk,
item.sk,
item.code,
item.name,
JSON.stringify(item.attributes),
targetTenantCode,
]);
const csvContent = [
headers.join(','),
...rows.map(row => row.map(this.escapeCsvValue).join(',')),
].join('\n');
// Upload to S3
const bucket = this.s3Service.privateBucket;
const key = `migrations/${Date.now()}/migration-${entityType}-${tenantCode}-to-${targetTenantCode}.csv`;
await this.s3Service.client.send(new PutObjectCommand({
Bucket: bucket,
Key: key,
Body: csvContent,
ContentType: 'text/csv; charset=utf-8',
}));
this.logger.log(`Exported ${data.items.length} items to ${key}`);
return { bucket, key };
}
/**
* Create migration import job
*/
async startMigrationImport(
bucket: string,
key: string,
tenantCode: string,
): Promise<{ jobId: string }> {
// Import module handles the migration via configured strategy
// The strategy will transform data to target tenant format
return {
jobId: `migration-${Date.now()}`,
// Actual job creation would use ImportModule API
};
}
private escapeCsvValue(value: any): string {
if (value === null || value === undefined) return '';
const str = String(value);
if (str.includes(',') || str.includes('"') || str.includes('\n')) {
return `"${str.replace(/"/g, '""')}"`;
}
return str;
}
}
Import Process Strategy for Migration
Implement a process strategy that handles migration-specific logic.
// migration/migration-process.strategy.ts
import { Injectable } from '@nestjs/common';
import {
CommandService,
DataService,
DataModel,
CommandInputModel,
CommandPartialInputModel,
} from '@mbc-cqrs-serverless/core';
import {
BaseProcessStrategy,
ComparisonResult,
ComparisonStatus,
IProcessStrategy,
} from '@mbc-cqrs-serverless/import';
interface MigrationDto {
pk: string;
sk: string;
id: string;
code: string;
name: string;
tenantCode: string;
type: string;
attributes: Record<string, any>;
}
@Injectable()
export class MigrationProcessStrategy
extends BaseProcessStrategy<DataModel, MigrationDto>
implements IProcessStrategy<DataModel, MigrationDto>
{
constructor(
private readonly commandService: CommandService,
private readonly dataService: DataService,
) {
super();
}
getCommandService(): CommandService {
return this.commandService;
}
/**
* Compare migrated data with existing target data
*/
async compare(
dto: MigrationDto,
tenantCode: string,
): Promise<ComparisonResult<DataModel>> {
try {
const existing = await this.dataService.getItem({
pk: dto.pk,
sk: dto.sk,
});
if (!existing) {
return { status: ComparisonStatus.NOT_EXIST };
}
// Check if already migrated
if (existing.attributes?._migrated) {
return { status: ComparisonStatus.EQUAL };
}
// Data exists but not migrated - update it
return { status: ComparisonStatus.CHANGED, existingData: existing };
} catch (error) {
return { status: ComparisonStatus.NOT_EXIST };
}
}
/**
* Map migration data to command payload
*/
async map(
status: ComparisonStatus,
dto: MigrationDto,
tenantCode: string,
existingData?: DataModel,
): Promise<CommandInputModel | CommandPartialInputModel> {
if (status === ComparisonStatus.NOT_EXIST) {
// Create new record
return {
pk: dto.pk,
sk: dto.sk,
id: dto.id,
tenantCode: dto.tenantCode,
code: dto.code,
name: dto.name,
type: dto.type,
version: 0,
attributes: dto.attributes,
};
}
// Update existing record
return {
pk: dto.pk,
sk: dto.sk,
version: existingData?.version ?? 0,
attributes: {
...existingData?.attributes,
...dto.attributes,
_migrated: true,
_migratedAt: new Date().toISOString(),
},
};
}
}
Rollback Procedures
Event Sourcing Based Rollback
Leverage the history table to restore previous versions.
// migration/rollback.service.ts
import { Injectable, Logger } from '@nestjs/common';
import {
CommandService,
DataService,
HistoryService,
IInvoke,
} from '@mbc-cqrs-serverless/core';
@Injectable()
export class RollbackService {
private readonly logger = new Logger(RollbackService.name);
constructor(
private readonly commandService: CommandService,
private readonly dataService: DataService,
private readonly historyService: HistoryService,
) {}
/**
* Rollback entity to a specific version
*/
async rollbackToVersion(
pk: string,
sk: string,
targetVersion: number,
invokeContext: IInvoke,
): Promise<void> {
// Get historical version
const historicalData = await this.historyService.getVersion(
pk,
sk,
targetVersion,
);
if (!historicalData) {
throw new Error(`Version ${targetVersion} not found for ${pk}#${sk}`);
}
// Get current version for optimistic locking
const currentData = await this.dataService.getItem({ pk, sk });
// Restore to historical state
await this.commandService.publishSync({
pk,
sk,
id: currentData.id,
tenantCode: currentData.tenantCode,
code: historicalData.code,
name: historicalData.name,
type: historicalData.type,
version: currentData.version,
attributes: {
...historicalData.attributes,
_rolledBackFrom: currentData.version,
_rolledBackAt: new Date().toISOString(),
},
}, { invokeContext });
this.logger.log(`Rolled back ${pk}#${sk} from v${currentData.version} to v${targetVersion}`);
}
/**
* Rollback migration by timestamp
*/
async rollbackMigration(
entityType: string,
tenantCode: string,
migrationTimestamp: string,
invokeContext: IInvoke,
): Promise<{ rolledBack: number; errors: string[] }> {
const pk = `${entityType}#${tenantCode}`;
const data = await this.dataService.listItemsByPk(pk);
let rolledBack = 0;
const errors: string[] = [];
for (const item of data.items) {
// Check if item was part of the migration
if (item.attributes?._migratedAt === migrationTimestamp) {
try {
// Find version before migration
const history = await this.historyService.listVersions(
item.pk,
item.sk,
);
const preVersion = history.find(h =>
!h.attributes?._migrated ||
new Date(h.updatedAt) < new Date(migrationTimestamp)
);
if (preVersion) {
await this.rollbackToVersion(
item.pk,
item.sk,
preVersion.version,
invokeContext,
);
rolledBack++;
}
} catch (error) {
errors.push(`Failed to rollback ${item.id}: ${error.message}`);
}
}
}
return { rolledBack, errors };
}
}
Soft Delete Pattern for Safe Rollback
Use soft delete to enable easy recovery.
// migration/safe-migration.service.ts
@Injectable()
export class SafeMigrationService {
/**
* Migrate with soft delete for rollback capability
*/
async migrateWithSoftDelete(
sourcePk: string,
targetPk: string,
invokeContext: IInvoke,
): Promise<{ migratedIds: string[]; originalIds: string[] }> {
const sourceData = await this.dataService.listItemsByPk(sourcePk);
const migratedIds: string[] = [];
const originalIds: string[] = [];
for (const item of sourceData.items) {
// Create new record in target
const targetId = generateId(targetPk, item.sk);
await this.commandService.publishSync({
pk: targetPk,
sk: item.sk,
id: targetId,
tenantCode: this.extractTenant(targetPk),
code: item.code,
name: item.name,
type: item.type,
attributes: {
...item.attributes,
_migratedFrom: item.id,
},
}, { invokeContext });
migratedIds.push(targetId);
// Soft delete original (don't hard delete)
await this.commandService.publishPartialUpdateSync({
pk: item.pk,
sk: item.sk,
version: item.version,
isDeleted: true,
attributes: {
...item.attributes,
_migratedTo: targetId,
_deletedAt: new Date().toISOString(),
},
}, { invokeContext });
originalIds.push(item.id);
}
return { migratedIds, originalIds };
}
/**
* Rollback by restoring soft-deleted records
*/
async rollbackSoftDeleteMigration(
originalIds: string[],
migratedIds: string[],
invokeContext: IInvoke,
): Promise<void> {
// Restore original records
for (const id of originalIds) {
const item = await this.dataService.getItemById(id);
if (item?.isDeleted) {
await this.commandService.publishPartialUpdateSync({
pk: item.pk,
sk: item.sk,
version: item.version,
isDeleted: false,
attributes: {
...item.attributes,
_migratedTo: undefined,
_deletedAt: undefined,
_restoredAt: new Date().toISOString(),
},
}, { invokeContext });
}
}
// Hard delete migrated records
for (const id of migratedIds) {
const item = await this.dataService.getItemById(id);
if (item) {
await this.commandService.publishPartialUpdateSync({
pk: item.pk,
sk: item.sk,
version: item.version,
isDeleted: true,
}, { invokeContext });
}
}
}
private extractTenant(pk: string): string {
return pk.split('#')[1] ?? 'unknown';
}
}