Skip to main content

Data Sync Handler Examples

This guide explains how to implement Data Sync Handlers that automatically synchronize data from DynamoDB (command source) to RDS (query database). This is the core mechanism that enables the CQRS read model.

When to Use This Guide

Use this guide when you need to:

  • Sync entity data from DynamoDB to MySQL/PostgreSQL for complex queries
  • Transform nested JSON attributes into relational columns
  • Handle different record types within the same DynamoDB table
  • Process parent-child relationships (Order, OrderItem) separately

Problems This Pattern Solves

ProblemSolution
DynamoDB cannot do JOINs or complex filtersSync data to RDS for SQL queries
Version suffix in SK causes duplicate recordsUse removeSortKeyVersion() before upserting
Different record types need different RDS tablesFilter by SK prefix in handler
JSON attributes need to be searchable columnsMap attributes to individual RDS columns

Basic Structure

All Data Sync Handlers follow this basic structure:

import { CommandModel, IDataSyncHandler, removeSortKeyVersion } from "@mbc-cqrs-serverless/core";
import { Injectable, Logger } from "@nestjs/common";
import { PrismaService } from "src/prisma";

@Injectable()
export class EntityDataSyncRdsHandler implements IDataSyncHandler {
private readonly logger = new Logger(EntityDataSyncRdsHandler.name);

constructor(private readonly prismaService: PrismaService) {}

async up(cmd: CommandModel): Promise<any> {
// Sync data to RDS
}

async down(cmd: CommandModel): Promise<any> {
// Optional: Handle rollback (usually just logs)
this.logger.debug(cmd);
}
}

Example 1: Simple Entity Sync

Use Case: Sync Products to Enable Search and Filtering

Scenario: Products stored in DynamoDB need to be searchable by category, price range, and text.

Solution: Sync to RDS and map attributes to indexed columns for efficient queries.

import {
CommandModel,
IDataSyncHandler,
removeSortKeyVersion
} from "@mbc-cqrs-serverless/core";
import { Injectable, Logger } from "@nestjs/common";
import { PrismaService } from "src/prisma";

interface ProductAttributes {
name: string;
description: string;
price: number;
category: string;
inStock: boolean;
}

@Injectable()
export class ProductDataSyncRdsHandler implements IDataSyncHandler {
private readonly logger = new Logger(ProductDataSyncRdsHandler.name);

constructor(private readonly prismaService: PrismaService) {}

async up(cmd: CommandModel): Promise<any> {
// Remove version suffix from sort key (e.g., "PROD001@1" -> "PROD001")
const sk = removeSortKeyVersion(cmd.sk);
const attrs = cmd.attributes as ProductAttributes;

await this.prismaService.product.upsert({
where: { id: cmd.id },
update: {
pk: cmd.pk,
sk: sk,
name: cmd.name,
code: cmd.code,
version: cmd.version,
tenantCode: cmd.tenantCode,
// Map attributes to columns
description: attrs.description,
price: attrs.price,
category: attrs.category,
inStock: attrs.inStock,
// Audit fields
isDeleted: cmd.isDeleted ?? false,
createdAt: cmd.createdAt,
createdBy: cmd.createdBy,
updatedAt: cmd.updatedAt,
updatedBy: cmd.updatedBy,
},
create: {
id: cmd.id,
pk: cmd.pk,
sk: sk,
// Also store original keys with version for reference
cpk: cmd.pk,
csk: cmd.sk,
name: cmd.name,
code: cmd.code,
version: cmd.version,
tenantCode: cmd.tenantCode,
description: attrs.description,
price: attrs.price,
category: attrs.category,
inStock: attrs.inStock,
isDeleted: cmd.isDeleted ?? false,
createdAt: cmd.createdAt,
createdBy: cmd.createdBy,
updatedAt: cmd.updatedAt,
updatedBy: cmd.updatedBy,
},
});
}

async down(cmd: CommandModel): Promise<any> {
this.logger.debug(cmd);
}
}

Example 2: Conditional Processing with SK Prefix

Use Case: Order and OrderItem in Same DynamoDB Table

Scenario: Orders and their items share the same PK but have different SK prefixes. Each needs to go to a different RDS table.

Solution: Check SK prefix to route to appropriate sync logic.

import {
CommandModel,
IDataSyncHandler,
KEY_SEPARATOR,
removeSortKeyVersion
} from "@mbc-cqrs-serverless/core";
import { Injectable, Logger } from "@nestjs/common";
import { PrismaService } from "src/prisma";

const ORDER_SK_PREFIX = "ORDER";
const ORDER_ITEM_SK_PREFIX = "ORDER_ITEM";

interface OrderAttributes {
customerId: string;
status: string;
totalAmount: number;
orderDate: string;
}

interface OrderItemAttributes {
orderId: string;
productId: string;
quantity: number;
unitPrice: number;
}

@Injectable()
export class OrderDataSyncRdsHandler implements IDataSyncHandler {
private readonly logger = new Logger(OrderDataSyncRdsHandler.name);

constructor(private readonly prismaService: PrismaService) {}

async up(cmd: CommandModel): Promise<any> {
const sk = removeSortKeyVersion(cmd.sk);

// Process only ORDER records, skip ORDER_ITEM
if (sk.startsWith(ORDER_SK_PREFIX) && !sk.startsWith(ORDER_ITEM_SK_PREFIX)) {
await this.syncOrder(cmd, sk);
} else if (sk.startsWith(ORDER_ITEM_SK_PREFIX)) {
await this.syncOrderItem(cmd, sk);
}
// Skip other record types
}

private async syncOrder(cmd: CommandModel, sk: string): Promise<void> {
const attrs = cmd.attributes as OrderAttributes;

await this.prismaService.order.upsert({
where: { id: cmd.id },
update: {
pk: cmd.pk,
sk: sk,
code: cmd.code,
version: cmd.version,
customerId: attrs.customerId,
status: attrs.status,
totalAmount: attrs.totalAmount,
orderDate: new Date(attrs.orderDate),
isDeleted: cmd.isDeleted ?? false,
updatedAt: cmd.updatedAt,
updatedBy: cmd.updatedBy,
},
create: {
id: cmd.id,
pk: cmd.pk,
sk: sk,
cpk: cmd.pk,
csk: cmd.sk,
code: cmd.code,
version: cmd.version,
tenantCode: cmd.tenantCode,
customerId: attrs.customerId,
status: attrs.status,
totalAmount: attrs.totalAmount,
orderDate: new Date(attrs.orderDate),
isDeleted: cmd.isDeleted ?? false,
createdAt: cmd.createdAt,
createdBy: cmd.createdBy,
updatedAt: cmd.updatedAt,
updatedBy: cmd.updatedBy,
},
});
}

private async syncOrderItem(cmd: CommandModel, sk: string): Promise<void> {
const attrs = cmd.attributes as OrderItemAttributes;

await this.prismaService.orderItem.upsert({
where: { id: cmd.id },
update: {
pk: cmd.pk,
sk: sk,
orderId: attrs.orderId,
productId: attrs.productId,
quantity: attrs.quantity,
unitPrice: attrs.unitPrice,
isDeleted: cmd.isDeleted ?? false,
updatedAt: cmd.updatedAt,
},
create: {
id: cmd.id,
pk: cmd.pk,
sk: sk,
cpk: cmd.pk,
csk: cmd.sk,
tenantCode: cmd.tenantCode,
orderId: attrs.orderId,
productId: attrs.productId,
quantity: attrs.quantity,
unitPrice: attrs.unitPrice,
isDeleted: cmd.isDeleted ?? false,
createdAt: cmd.createdAt,
updatedAt: cmd.updatedAt,
},
});
}

async down(cmd: CommandModel): Promise<any> {
this.logger.debug(cmd);
}
}

Example 3: Complex Attribute Transformation

Use Case: Notifications with Different Content Types

Scenario: Notification entity has different content structures based on type (Alert, Info, Promotion).

Solution: Extract and flatten type-specific fields into common RDS columns.

import {
CommandModel,
IDataSyncHandler,
removeSortKeyVersion
} from "@mbc-cqrs-serverless/core";
import { Injectable, Logger } from "@nestjs/common";
import { PrismaService } from "src/prisma";

enum NotificationType {
ALERT = "ALERT",
INFO = "INFO",
PROMOTION = "PROMOTION",
}

interface AlertContent {
title: string;
message: string;
severity: string;
}

interface InfoContent {
headline: string;
body: string;
}

interface PromotionContent {
campaignName: string;
discount: number;
validUntil: string;
}

interface NotificationAttributes {
type: NotificationType;
alertContent?: AlertContent;
infoContent?: InfoContent;
promotionContent?: PromotionContent;
targetUsers: string[];
tags: string[];
schedule: {
startDate: string;
endDate: string;
};
}

@Injectable()
export class NotificationDataSyncRdsHandler implements IDataSyncHandler {
private readonly logger = new Logger(NotificationDataSyncRdsHandler.name);

constructor(private readonly prismaService: PrismaService) {}

async up(cmd: CommandModel): Promise<any> {
const sk = removeSortKeyVersion(cmd.sk);
const attrs = cmd.attributes as NotificationAttributes;

// Extract title based on notification type
const title = this.getTitle(attrs);
const body = this.getBody(attrs);

await this.prismaService.notification.upsert({
where: { id: cmd.id },
update: {
pk: cmd.pk,
sk: sk,
code: cmd.code,
version: cmd.version,
type: attrs.type,
title: title,
body: body,
// Convert arrays to comma-separated strings for RDS
targetUsers: attrs.targetUsers?.join(",") ?? null,
tags: attrs.tags?.join(",") ?? null,
// Handle dates
startDate: attrs.schedule?.startDate
? new Date(attrs.schedule.startDate)
: null,
endDate: attrs.schedule?.endDate
? new Date(attrs.schedule.endDate)
: null,
isDeleted: cmd.isDeleted ?? false,
updatedAt: cmd.updatedAt,
updatedBy: cmd.updatedBy,
},
create: {
id: cmd.id,
pk: cmd.pk,
sk: sk,
cpk: cmd.pk,
csk: cmd.sk,
code: cmd.code,
version: cmd.version,
tenantCode: cmd.tenantCode,
type: attrs.type,
title: title,
body: body,
targetUsers: attrs.targetUsers?.join(",") ?? null,
tags: attrs.tags?.join(",") ?? null,
startDate: attrs.schedule?.startDate
? new Date(attrs.schedule.startDate)
: null,
endDate: attrs.schedule?.endDate
? new Date(attrs.schedule.endDate)
: null,
isDeleted: cmd.isDeleted ?? false,
createdAt: cmd.createdAt,
createdBy: cmd.createdBy,
updatedAt: cmd.updatedAt,
updatedBy: cmd.updatedBy,
},
});
}

/**
* Extract title based on notification type
*/
private getTitle(attrs: NotificationAttributes): string | null {
switch (attrs.type) {
case NotificationType.ALERT:
return attrs.alertContent?.title ?? null;
case NotificationType.INFO:
return attrs.infoContent?.headline ?? null;
case NotificationType.PROMOTION:
return attrs.promotionContent?.campaignName ?? null;
default:
return null;
}
}

/**
* Extract body/message based on notification type
*/
private getBody(attrs: NotificationAttributes): string | null {
switch (attrs.type) {
case NotificationType.ALERT:
return attrs.alertContent?.message ?? null;
case NotificationType.INFO:
return attrs.infoContent?.body ?? null;
case NotificationType.PROMOTION:
return `${attrs.promotionContent?.discount}% off until ${attrs.promotionContent?.validUntil}`;
default:
return null;
}
}

async down(cmd: CommandModel): Promise<any> {
this.logger.debug(cmd);
}
}

Example 4: PK Prefix Filtering

Use Case: User Records in Shared Table

Scenario: Multiple entity types share a DynamoDB table. Handler should only process USER records.

Solution: Check PK prefix and skip non-matching records early.

import {
CommandModel,
IDataSyncHandler,
KEY_SEPARATOR,
removeSortKeyVersion
} from "@mbc-cqrs-serverless/core";
import { Injectable, Logger } from "@nestjs/common";
import { PrismaService } from "src/prisma";

const USER_PK_PREFIX = "USER";

interface UserAttributes {
email: string;
userId: string;
displayName: string;
role: string;
lastLoginAt?: string;
}

@Injectable()
export class UserDataSyncRdsHandler implements IDataSyncHandler {
private readonly logger = new Logger(UserDataSyncRdsHandler.name);

constructor(private readonly prismaService: PrismaService) {}

async up(cmd: CommandModel): Promise<any> {
// Only process USER records
if (!cmd.pk.startsWith(USER_PK_PREFIX + KEY_SEPARATOR)) {
return;
}

// Skip temporary or profile records
if (cmd.sk.startsWith("temp") || cmd.sk.startsWith("profile")) {
return;
}

const sk = removeSortKeyVersion(cmd.sk);
const attrs = cmd.attributes as UserAttributes;

await this.prismaService.user.upsert({
where: { id: cmd.id },
update: {
pk: cmd.pk,
sk: sk,
code: cmd.code,
version: cmd.version,
email: attrs.email,
userId: attrs.userId,
displayName: attrs.displayName,
role: attrs.role,
lastLoginAt: attrs.lastLoginAt
? new Date(attrs.lastLoginAt)
: null,
isDeleted: cmd.isDeleted ?? false,
updatedAt: cmd.updatedAt,
updatedBy: cmd.updatedBy,
},
create: {
id: cmd.id,
pk: cmd.pk,
sk: sk,
cpk: cmd.pk,
csk: cmd.sk,
code: cmd.code,
version: cmd.version,
tenantCode: cmd.tenantCode,
type: cmd.type,
email: attrs.email,
userId: attrs.userId,
displayName: attrs.displayName,
role: attrs.role,
lastLoginAt: attrs.lastLoginAt
? new Date(attrs.lastLoginAt)
: null,
isDeleted: cmd.isDeleted ?? false,
createdAt: cmd.createdAt,
createdBy: cmd.createdBy,
updatedAt: cmd.updatedAt,
updatedBy: cmd.updatedBy,
},
});
}

async down(cmd: CommandModel): Promise<any> {
this.logger.debug(cmd);
}
}

Example 5: Parsing SK for Derived Data

Use Case: Master Data with Category Information in SK

Scenario: SK contains structured data like "SETTING#category#code" that should be stored as separate columns.

Solution: Parse SK to extract type, category, and code for querying.

import {
CommandModel,
IDataSyncHandler,
KEY_SEPARATOR,
removeSortKeyVersion
} from "@mbc-cqrs-serverless/core";
import { Injectable, Logger } from "@nestjs/common";
import { PrismaService } from "src/prisma";

const SETTING_SK_PREFIX = "SETTING";
const DATA_SK_PREFIX = "DATA";

interface MasterAttributes {
value: any;
displayOrder: number;
isActive: boolean;
}

@Injectable()
export class MasterDataSyncRdsHandler implements IDataSyncHandler {
private readonly logger = new Logger(MasterDataSyncRdsHandler.name);

constructor(private readonly prismaService: PrismaService) {}

async up(cmd: CommandModel): Promise<any> {
const sk = removeSortKeyVersion(cmd.sk);
const attrs = cmd.attributes as MasterAttributes;

// Parse SK to extract type and code
// SK format: "SETTING#category#code" or "DATA#category#code"
const skParts = sk.split(KEY_SEPARATOR);

let masterType: string;
let masterCategory: string;
let masterCode: string;

if (sk.startsWith(SETTING_SK_PREFIX)) {
masterType = "SETTING";
masterCategory = skParts[1] ?? "";
masterCode = skParts[2] ?? "";
} else if (sk.startsWith(DATA_SK_PREFIX)) {
masterType = "DATA";
masterCategory = skParts[1] ?? "";
masterCode = skParts[2] ?? "";
} else {
// Skip unknown types
return;
}

await this.prismaService.master.upsert({
where: { id: cmd.id },
update: {
pk: cmd.pk,
sk: sk,
code: cmd.code,
version: cmd.version,
masterType: masterType,
masterCategory: masterCategory,
masterCode: masterCode,
name: cmd.name,
value: JSON.stringify(attrs.value),
displayOrder: attrs.displayOrder,
isActive: attrs.isActive,
isDeleted: cmd.isDeleted ?? false,
updatedAt: cmd.updatedAt,
updatedBy: cmd.updatedBy,
},
create: {
id: cmd.id,
pk: cmd.pk,
sk: sk,
cpk: cmd.pk,
csk: cmd.sk,
code: cmd.code,
version: cmd.version,
tenantCode: cmd.tenantCode,
masterType: masterType,
masterCategory: masterCategory,
masterCode: masterCode,
name: cmd.name,
value: JSON.stringify(attrs.value),
displayOrder: attrs.displayOrder,
isActive: attrs.isActive,
isDeleted: cmd.isDeleted ?? false,
createdAt: cmd.createdAt,
createdBy: cmd.createdBy,
updatedAt: cmd.updatedAt,
updatedBy: cmd.updatedBy,
},
});
}

async down(cmd: CommandModel): Promise<any> {
this.logger.debug(cmd);
}
}

Registering Multiple Handlers

You can register multiple handlers for the same table to handle different record types:

import { CommandModule } from "@mbc-cqrs-serverless/core";
import { Module } from "@nestjs/common";

import { OrderDataSyncRdsHandler } from "./handler/order-rds.handler";
import { OrderItemDataSyncRdsHandler } from "./handler/order-item-rds.handler";
import { OrderHistoryDataSyncRdsHandler } from "./handler/order-history-rds.handler";
import { OrderController } from "./order.controller";
import { OrderService } from "./order.service";

@Module({
imports: [
CommandModule.register({
tableName: "order",
dataSyncHandlers: [
OrderDataSyncRdsHandler,
OrderItemDataSyncRdsHandler,
OrderHistoryDataSyncRdsHandler,
],
}),
],
controllers: [OrderController],
providers: [OrderService],
})
export class OrderModule {}

Best Practices

1. Always Remove Version from SK

Use removeSortKeyVersion() to get a consistent SK for RDS storage:

const sk = removeSortKeyVersion(cmd.sk); // "ORDER001@3" -> "ORDER001"

2. Handle undefined isDeleted

Always provide a default value for isDeleted:

isDeleted: cmd.isDeleted ?? false,

3. Store Both Original and Cleaned Keys

Store original keys (cpk, csk) in create operations for reference:

create: {
pk: cmd.pk, // Cleaned PK
sk: sk, // Cleaned SK (without version)
cpk: cmd.pk, // Original PK (same as pk for most cases)
csk: cmd.sk, // Original SK (with version)
// ...
}

4. Type Your Attributes

Define interfaces for attributes to ensure type safety:

interface ProductAttributes {
name: string;
price: number;
// ...
}

const attrs = cmd.attributes as ProductAttributes;

5. Handle Null/Undefined Gracefully

Use nullish coalescing and optional chaining:

tags: attrs.tags?.join(",") ?? null,
startDate: attrs.schedule?.startDate
? new Date(attrs.schedule.startDate)
: null,