DataService
Overview
The DataService is the query side of the CQRS pattern, providing efficient read operations for data stored in DynamoDB. It handles all read operations from the data table (the read model) which is optimized for queries.
DataService reads from the DynamoDB data table, which is updated asynchronously via DynamoDB Streams. If you need a user to immediately see their own writes (before the stream propagates), use Repository instead — it is a drop-in replacement for DataService that adds RYW session caching. See Read-Your-Writes in CommandService for setup and usage.
Before using the DataService, you need to set up the CommandModule as described in the CommandService section.
Methods
async getItem(key: DetailKey): Promise<DataModel | undefined>
The getItem method returns a set of attributes for the item with the given detail/primary key. If there is no matching item, getItem returns undefined.
Example:
import { DataService, DataModel } from '@mbc-cqrs-serverless/core';
import { Injectable, NotFoundException } from '@nestjs/common';
@Injectable()
export class CatService {
constructor(private readonly dataService: DataService) {}
async getCat(pk: string, sk: string): Promise<CatDataEntity> {
const item = await this.dataService.getItem({ pk, sk });
if (!item) {
throw new NotFoundException('Cat not found');
}
return new CatDataEntity(item as CatDataEntity);
}
}
async listItemsByPk(pk: string, opts?: ListItemsOptions): Promise<DataListEntity>
The listItemsByPk method returns one or more items matching the partition key. It supports filtering, pagination, and sorting.
Basic Usage
List all items by primary key (pk):
const res = await this.dataService.listItemsByPk(pk);
return new CatListEntity(res);
With Sort Key Filter
List items by primary key (pk) and use a filter expression on the sort key (sk). For example, get items where the sort key starts with CAT# and limit to 100 items:
import { KEY_SEPARATOR } from '@mbc-cqrs-serverless/core';
const query = {
sk: {
skExpression: 'begins_with(sk, :typeCode)',
skAttributeValues: {
':typeCode': `CAT${KEY_SEPARATOR}`,
},
},
limit: 100,
};
const res = await this.dataService.listItemsByPk(pk, query);
return new CatDataListEntity(res);
Pagination
Implement pagination using startFromSk and limit:
async listCatsWithPagination(
tenantCode: string,
pageSize: number,
lastSk?: string
): Promise<{ items: CatDataEntity[]; lastSk?: string }> {
const pk = `CAT#${tenantCode}`;
const result = await this.dataService.listItemsByPk(pk, {
limit: pageSize,
startFromSk: lastSk,
});
return {
items: result.items.map(item => new CatDataEntity(item)),
lastSk: result.lastSk,
};
}
Sort Key Operators
The following sort key expressions are supported:
| Operator | Expression | Description |
|---|---|---|
| Equals | sk = :value | Exact match |
| Begins With | begins_with(sk, :prefix) | Prefix match |
| Between | sk BETWEEN :start AND :end | Range query |
| Less Than | sk < :value | Less than comparison |
| Greater Than | sk > :value | Greater than comparison |
Example with range query:
const query = {
sk: {
skExpression: 'sk BETWEEN :start AND :end',
skAttributeValues: {
':start': 'ORDER#2024-01-01',
':end': 'ORDER#2024-12-31',
},
},
};
const res = await this.dataService.listItemsByPk(pk, query);
async publish(cmd: CommandModel): Promise<DataModel>
The publish method publishes command data to the data table. This is typically called internally by the framework's data sync handlers, but can be used directly when implementing custom synchronization logic.
This method is primarily used internally by the framework. In most cases, you should use CommandService.publishSync() or CommandService.publishAsync() which automatically handles data synchronization.
import {
CommandModel,
DataModel,
DataService,
DataSyncHandler,
IDataSyncHandler,
} from "@mbc-cqrs-serverless/core";
import { Injectable } from "@nestjs/common";
// Custom data sync handler example
@DataSyncHandler('your-command-table-name')
@Injectable()
export class CustomDataSyncHandler implements IDataSyncHandler {
constructor(private readonly dataService: DataService) {}
async up(cmd: CommandModel): Promise<DataModel> {
// Publish command to data table
const dataModel = await this.dataService.publish(cmd);
// Additional synchronization to external systems
await this.syncToExternalSystem(dataModel);
return dataModel;
}
async down(cmd: CommandModel): Promise<void> {
// Handle rollback if needed
}
private async syncToExternalSystem(data: DataModel): Promise<void> {
// Custom sync logic
}
}
The method:
- Converts CommandModel to DataModel format
- Removes version suffix from sort key
- Preserves original creation metadata (createdAt, createdBy, createdIp)
- Updates modification metadata (updatedAt, updatedBy, updatedIp)
- Stores the data in the data table
Common Patterns
Search by Code
Find an item by its unique code within a tenant:
async findByCode(tenantCode: string, code: string): Promise<CatDataEntity | undefined> {
const pk = `CAT#${tenantCode}`;
const sk = `CAT#${code}`;
const item = await this.dataService.getItem({ pk, sk });
return item ? new CatDataEntity(item) : undefined;
}
List with Type Filter
List items filtered by type:
async listByType(tenantCode: string, type: string): Promise<CatDataEntity[]> {
const pk = `CAT#${tenantCode}`;
const result = await this.dataService.listItemsByPk(pk, {
sk: {
skExpression: 'begins_with(sk, :type)',
skAttributeValues: {
':type': `${type}#`,
},
},
});
return result.items.map(item => new CatDataEntity(item));
}
Error Handling
Handle common query errors gracefully:
import {
Injectable,
InternalServerErrorException,
Logger,
NotFoundException,
} from '@nestjs/common';
import { DataService } from '@mbc-cqrs-serverless/core';
@Injectable()
export class CatService {
private readonly logger = new Logger(CatService.name);
constructor(private readonly dataService: DataService) {}
async getItemSafely(pk: string, sk: string): Promise<CatDataEntity> {
try {
const item = await this.dataService.getItem({ pk, sk });
if (!item) {
throw new NotFoundException(`Item not found: ${pk}/${sk}`);
}
return new CatDataEntity(item);
} catch (error) {
if (error instanceof NotFoundException) {
throw error;
}
// Log and rethrow unexpected errors
this.logger.error('Unexpected error querying item:', error);
throw new InternalServerErrorException('Failed to retrieve item');
}
}
}
Type Definitions
DetailKey
interface DetailKey {
pk: string; // Partition key
sk: string; // Sort key
}
ListItemsOptions
ListItemsOptions is an inline type definition in the method signature, not a separately exported interface. The type structure is as follows:
{
sk?: {
skExpression: string;
skAttributeValues: Record<string, string>;
skAttributeNames?: Record<string, string>;
};
startFromSk?: string;
limit?: number; // Default: 10
order?: 'asc' | 'desc';
}
DataListEntity
DataListEntity is a class (not an interface) that wraps list query results. It provides a constructor for easy instantiation:
class DataListEntity {
items: DataEntity[]; // Array of data entities
lastSk?: string; // Sort key for pagination cursor
total?: number; // Total count (if available)
constructor(data: Partial<DataListEntity>);
}
The constructor accepts a partial object, allowing you to create instances from query results:
const result = await this.dataService.listItemsByPk(pk);
const listEntity = new DataListEntity(result);
HistoryService
The HistoryService provides access to the history table, which stores all previous versions of commands. Use it when you need to retrieve a specific historical version of an entity for audit trails or rollback scenarios.
import {
HistoryService,
addSortKeyVersion,
} from '@mbc-cqrs-serverless/core';
import { Injectable, NotFoundException } from '@nestjs/common';
@Injectable()
export class ProductService {
constructor(private readonly historyService: HistoryService) {}
async findVersion(pk: string, sk: string, version: number) {
const skWithVersion = addSortKeyVersion(sk, version);
const item = await this.historyService.getItem({ pk, sk: skWithVersion });
if (!item) {
throw new NotFoundException(`Version ${version} not found`);
}
return item;
}
}
async getItem(key: DetailKey): Promise<DataModel | undefined>
Retrieves a specific versioned item from the history table. The sort key must include the version suffix (use addSortKeyVersion to build it). Returns undefined if the version does not exist.
The history table uses the same key structure as the data table, but the sort key includes the version suffix: sk@version. Always use addSortKeyVersion(sk, version) to construct the key.
Best Practices
- Use projection expressions: Only retrieve the attributes you need to reduce data transfer
- Implement pagination: Always paginate large result sets to avoid memory issues
- Cache frequently accessed data: Consider caching static or slowly changing data
- Use appropriate key design: Design your keys to support your query patterns efficiently
- Handle not found cases: Always check if the item exists before using it
Related Documentation
- Command Service - Writing commands with CommandService
- Unit Testing - Unit testing DataService with mocks
- Service Patterns - Complete service layer patterns
- Key Patterns - DynamoDB key design for efficient queries
- Entity Patterns - Entity definition with DataEntity
- Serialization Helpers - Converting DynamoDB structures to API response format
- Data Sync Handler Examples - Using DataService.publish() in sync handlers