インポート/エクスポートパターン
このガイドでは、CSVの処理、Excelファイルの処理、Step Functionsを使用したバッチデータ操作など、データのインポートおよびエクスポート操作のパターンについて説明します。
このガイドを使用するタイミング
以下の場合にこのガイドを使用してください:
- CSVまたはExcelファイルから一括データをインポートする
- 様々な形式にデータをエクスポートする
- Step Functionsで大規模なデータセットを処理する
- S3署名付きURLでファイルアップロードを実装する
- 外部形式と内部形式の間でデータを変換する
インポートアーキテクチャの概要
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │────>│ S3 │────>│Step Function│────>│ Lambda │
│ (Upload) │ │ (Storage) │ │(Orchestrate)│ │ (Process) │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│
┌───────────────────────────┘
▼
┌─────────────┐ ┌─────────────┐
│ DynamoDB │<────│ Import │
│ (Command) │ │ Handler │
└───────── ────┘ └─────────────┘
ファイルアップロードパターン
ストレージサービス
安全なファイルアップロードのための署名付きURLを生成します:
// storage/storage.service.ts
import { Injectable } from '@nestjs/common';
import { S3Service } from '@mbc-cqrs-serverless/core';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { PutObjectCommand, GetObjectCommand } from '@aws-sdk/client-s3';
@Injectable()
export class StorageService {
constructor(private readonly s3Service: S3Service) {}
/**
* Generate upload URL for file import (ファイルインポート用のアップロードURLを生成)
*/
async genUploadUrl(
filename: string,
contentType = 'text/csv',
): Promise<{ bucket: string; key: string; url: string }> {
const bucket = this.s3Service.privateBucket;
const timestamp = Date.now();
const key = `imports/${timestamp}/${filename}`;
const command = new PutObjectCommand({
Bucket: bucket,
Key: key,
ContentType: contentType,
ACL: 'private',
});
const url = await getSignedUrl(this.s3Service.client, command, {
expiresIn: 3600, // 1 hour (1時間)
});
return { bucket, key, url };
}
/**
* Generate download URL for file export (ファイルエクスポート用のダウンロードURLを生成)
*/
async genDownloadUrl(
key: string,
filename?: string,
): Promise<{ url: string }> {
const command = new GetObjectCommand({
Bucket: this.s3Service.privateBucket,
Key: key,
ResponseContentDisposition: filename
? `attachment; filename="${filename}"`
: undefined,
});
const url = await getSignedUrl(this.s3Service.client, command, {
expiresIn: 3600,
});
return { url };
}
}
ストレージコントローラー
// storage/storage.controller.ts
import { Controller, Post, Get, Body, Query } from '@nestjs/common';
import { StorageService } from './storage.service';
@Controller('api/storage')
export class StorageController {
constructor(private readonly storageService: StorageService) {}
/**
* Get presigned URL for upload (アップロード用の署名付きURLを取得)
*/
@Post('upload-url')
async getUploadUrl(
@Body() dto: { filename: string; contentType?: string },
) {
return this.storageService.genUploadUrl(dto.filename, dto.contentType);
}
/**
* Get presigned URL for download (ダウンロード用の署名付きURLを取得)
*/
@Get('download-url')
async getDownloadUrl(
@Query('key') key: string,
@Query('filename') filename?: string,
) {
return this.storageService.genDownloadUrl(key, filename);
}
}
CSVインポートパターン
CSVインポートコントローラー
// csv-import/csv-import.controller.ts
import { Controller, Post, Body } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { StepFunctionService, INVOKE_CONTEXT, IInvoke } from '@mbc-cqrs-serverless/core';
export class CsvImportDto {
bucket: string;
key: string;
type: string; // Import type identifier (インポートタイプ識別子)
}
@Controller('api/csv-import')
export class CsvImportController {
private readonly importArn: string;
constructor(
private readonly configService: ConfigService,
private readonly sfnService: StepFunctionService,
) {
this.importArn = this.configService.get<string>('SFN_CSV_IMPORT_ARN');
}
/**
* Start CSV import via Step Functions (Step FunctionsでCSVインポートを開始)
*/
@Post('/')
async startImport(
@INVOKE_CONTEXT() invokeContext: IInvoke,
@Body() dto: CsvImportDto,
) {
const executionName = `${dto.type}-${Date.now()}`;
return this.sfnService.startExecution(
this.importArn,
{
...dto,
invokeContext,
},
executionName,
);
}
}
CSVパーサーサービス
// csv-import/csv-parser.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { S3Service } from '@mbc-cqrs-serverless/core';
import { GetObjectCommand } from '@aws-sdk/client-s3';
import * as csvParser from 'csv-parser';
import { Readable } from 'stream';
export interface ParsedRow {
rowNumber: number;
data: Record<string, string>;
errors: string[];
}
@Injectable()
export class CsvParserService {
private readonly logger = new Logger(CsvParserService.name);
constructor(private readonly s3Service: S3Service) {}
/**
* Parse CSV file from S3 (S3からCSVファイルを解析)
*/
async parseFromS3(
bucket: string,
key: string,
options?: { encoding?: string; delimiter?: string },
): Promise<ParsedRow[]> {
const command = new GetObjectCommand({ Bucket: bucket, Key: key });
const response = await this.s3Service.client.send(command);
const stream = response.Body as Readable;
const results: ParsedRow[] = [];
let rowNumber = 0;
return new Promise((resolve, reject) => {
stream
.pipe(csvParser({
separator: options?.delimiter || ',',
skipLines: 0,
}))
.on('data', (data) => {
rowNumber++;
results.push({
rowNumber,
data,
errors: [],
});
})
.on('end', () => {
this.logger.log(`Parsed ${results.length} rows from ${key}`);
resolve(results);
})
.on('error', (error) => {
this.logger.error(`Failed to parse CSV: ${error.message}`);
reject(error);
});
});
}
/**
* Validate parsed rows (解析された行をバリデート)
*/
validateRows(
rows: ParsedRow[],
requiredFields: string[],
): ParsedRow[] {
return rows.map(row => {
const errors: string[] = [];
for (const field of requiredFields) {
if (!row.data[field] || row.data[field].trim() === '') {
errors.push(`Missing required field: ${field}`);
}
}
return { ...row, errors };
});
}
}
インポートイベントハンドラー
// csv-import/event/csv-import.event.handler.ts
import { Injectable, Logger } from '@nestjs/common';
import { EventHandler, IEventHandler, SNSService } from '@mbc-cqrs-serverless/core';
import { ConfigService } from '@nestjs/config';
import { CsvParserService } from '../csv-parser.service';
import { ProductService } from '../../product/product.service';
export class CsvImportEvent {
bucket: string;
key: string;
type: string;
invokeContext: any;
}
@EventHandler(CsvImportEvent)
@Injectable()
export class CsvImportEventHandler implements IEventHandler<CsvImportEvent> {
private readonly logger = new Logger(CsvImportEventHandler.name);
private readonly alarmTopicArn: string;
constructor(
private readonly csvParser: CsvParserService,
private readonly productService: ProductService,
private readonly snsService: SNSService,
private readonly configService: ConfigService,
) {
this.alarmTopicArn = this.configService.get<string>('SNS_ALARM_TOPIC_ARN');
}
/**
* Process CSV import event (CSVインポートイベントを処理)
*/
async execute(event: CsvImportEvent): Promise<any> {
this.logger.log(`Processing import: ${event.key}`);
try {
// Parse CSV (CSVを解析)
const rows = await this.csvParser.parseFromS3(event.bucket, event.key);
// Validate (バリデート)
const validatedRows = this.csvParser.validateRows(rows, [
'code', 'name', 'price',
]);
// Filter valid rows (有効な行をフィ ルタ)
const validRows = validatedRows.filter(r => r.errors.length === 0);
const invalidRows = validatedRows.filter(r => r.errors.length > 0);
if (invalidRows.length > 0) {
this.logger.warn(`${invalidRows.length} rows have validation errors`);
}
// Process in batches (バッチで処理)
const batchSize = 30;
let processedCount = 0;
for (let i = 0; i < validRows.length; i += batchSize) {
const batch = validRows.slice(i, i + batchSize);
await Promise.all(
batch.map(row => this.processRow(row.data, event.invokeContext)),
);
processedCount += batch.length;
this.logger.log(`Processed ${processedCount}/${validRows.length} rows`);
}
return {
success: true,
totalRows: rows.length,
processedRows: processedCount,
errorRows: invalidRows.length,
};
} catch (error) {
await this.sendAlarm(event, error);
throw error;
}
}
private async processRow(data: Record<string, string>, invokeContext: any) {
await this.productService.publishCommand({
code: data.code,
name: data.name,
attributes: {
price: parseFloat(data.price),
category: data.category,
description: data.description,
},
}, invokeContext);
}
private async sendAlarm(event: CsvImportEvent, error: Error) {
await this.snsService.publish({
topicArn: this.alarmTopicArn,
subject: 'CSV Import Error',
message: JSON.stringify({
key: event.key,
error: error.message,
timestamp: new Date().toISOString(),
}),
});
}
}
Excelインポートパターン
Excelヘルパー関数
// helpers/excel.ts
import { Workbook, Worksheet, Cell } from 'exceljs';
/**
* Get cell value handling formulas and rich text (数式やリッチテキストを処理してセル値を取得)
*/
export function getCellValue(row: any, column: string): string | undefined {
const cell = row.getCell(column);
if (!cell || cell.value === null || cell.value === undefined) {
return undefined;
}
// Handle formula result (数式の結果を処理)
if (typeof cell.value === 'object' && 'result' in cell.value) {
return String(cell.value.result);
}
// Handle rich text (リッチテキストを処理)
if (typeof cell.value === 'object' && 'richText' in cell.value) {
return cell.value.richText.map((r: any) => r.text).join('');
}
return String(cell.value);
}
/**
* Get numeric cell value (数値のセル値を取得)
*/
export function getCellNumber(row: any, column: string): number | undefined {
const value = getCellValue(row, column);
if (!value) return undefined;
const num = parseFloat(value.replace(/,/g, ''));
return isNaN(num) ? undefined : num;
}
/**
* Get date cell value (日付のセル値を取得)
*/
export function getCellDate(row: any, column: string): Date | undefined {
const cell = row.getCell(column);
if (cell.value instanceof Date) {
return cell.value;
}
const value = getCellValue(row, column);
if (!value) return undefined;
const date = new Date(value);
return isNaN(date.getTime()) ? undefined : date;
}
/**
* Find header row by matching column headers (列ヘッダーをマッチングしてヘッダー行を検索)
*/
export function findHeaderRow(
worksheet: Worksheet,
expectedHeaders: string[],
maxRows = 20,
): number {
for (let rowNum = 1; rowNum <= maxRows; rowNum++) {
const row = worksheet.getRow(rowNum);
const values = row.values as any[];
const matches = expectedHeaders.filter(header =>
values.some(v => v && String(v).includes(header)),
);
if (matches.length >= expectedHeaders.length * 0.8) {
return rowNum;
}
}
throw new Error('Header row not found');
}
Excelインポートサービス
// excel-import/excel-import.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { S3Service } from '@mbc-cqrs-serverless/core';
import { GetObjectCommand } from '@aws-sdk/client-s3';
import { Workbook } from 'exceljs';
import { getCellValue, getCellNumber, findHeaderRow } from '../helpers/excel';
export interface ExcelImportResult {
success: boolean;
sheetName: string;
totalRows: number;
processedRows: number;
errors: Array<{ row: number; message: string }>;
}
@Injectable()
export class ExcelImportService {
private readonly logger = new Logger(ExcelImportService.name);
constructor(private readonly s3Service: S3Service) {}
/**
* Load workbook from S3 (S3からワークブックを読み込む)
*/
async loadWorkbook(bucket: string, key: string): Promise<Workbook> {
const command = new GetObjectCommand({ Bucket: bucket, Key: key });
const response = await this.s3Service.client.send(command);
const chunks: Buffer[] = [];
for await (const chunk of response.Body as any) {
chunks.push(chunk);
}
const buffer = Buffer.concat(chunks);
const workbook = new Workbook();
if (key.endsWith('.xlsx') || key.endsWith('.xlsm')) {
await workbook.xlsx.load(buffer);
} else if (key.endsWith('.xls')) {
// For .xls files, use different parser (.xlsファイルには別のパーサーを使用)
throw new Error('XLS format not supported. Please use XLSX.');
}
return workbook;
}
/**
* Process worksheet with row processor (行プロセッサでワークシートを処理)
*/
async processWorksheet<T>(
worksheet: any,
config: {
headerRow?: number;
expectedHeaders?: string[];
startRow?: number;
processor: (row: any, rowNumber: number) => Promise<T | null>;
},
): Promise<ExcelImportResult & { data: T[] }> {
const errors: Array<{ row: number; message: string }> = [];
const data: T[] = [];
// Find or use specified header row (ヘッダー行を検索または指定されたものを使用)
const headerRow = config.headerRow || (
config.expectedHeaders
? findHeaderRow(worksheet, config.expectedHeaders)
: 1
);
const startRow = config.startRow || headerRow + 1;
let processedRows = 0;
let totalRows = 0;
worksheet.eachRow((row: any, rowNumber: number) => {
if (rowNumber < startRow) return;
totalRows++;
});
for (let rowNum = startRow; rowNum <= worksheet.rowCount; rowNum++) {
const row = worksheet.getRow(rowNum);
// Skip empty rows (空行をスキップ)
if (this.isEmptyRow(row)) continue;
try {
const result = await config.processor(row, rowNum);
if (result !== null) {
data.push(result);
processedRows++;
}
} catch (error) {
errors.push({
row: rowNum,
message: error.message,
});
}
}
return {
success: errors.length === 0,
sheetName: worksheet.name,
totalRows,
processedRows,
errors,
data,
};
}
private isEmptyRow(row: any): boolean {
const values = row.values as any[];
return !values || values.every(v => v === null || v === undefined || v === '');
}
}
インポートストラテジーパターン
// import/base-import.strategy.ts
import { BadRequestException } from '@nestjs/common';
import { validate, ValidationError } from 'class-validator';
import { IInvoke } from '@mbc-cqrs-serverless/core';
/**
* Base interface for import strategies (インポート戦略の基本インターフェース)
* @typeParam TInput - The input type, must be an object (入力型、オブジェクトである必要があります)
* @typeParam TAttributesDto - The output DTO type, must be an object (出力DTO型、オブジェクトである必要があります)
*/
export interface IImportStrategy<TInput extends object, TAttributesDto extends object> {
/**
* Transform raw input to command DTO (生の入力をコマンドDTOに変換)
*/
transform(input: TInput): Promise<TAttributesDto>;
/**
* Validate transformed DTO (変換後のDTOをバリデート)
*/
validate(data: TAttributesDto): Promise<void>;
}
/**
* Base import strategy with common functionality (共通機能を持つ基本インポート戦略)
* @typeParam TInput - The input type, must be an object (入力型、オブジェクトである必要があります)
* @typeParam TAttributesDto - The output DTO type, must be an object (出力DTO型、オブジェクトである必要があります)
*/
export abstract class BaseImportStrategy<TInput extends object, TAttributesDto extends object>
implements IImportStrategy<TInput, TAttributesDto>
{
/**
* Transform raw input to command DTO (default: return as-is) (生の入力をコマンドDTOに変換 、デフォルト:そのまま返す)
*/
async transform(input: TInput): Promise<TAttributesDto> {
return input as unknown as TAttributesDto;
}
/**
* Validate transformed DTO using class-validator (class-validatorを使用して変換後のDTOをバリデート)
*/
async validate(data: TAttributesDto): Promise<void> {
// Uses class-validator for validation (class-validatorでバリデーションを実行)
const errors = await validate(data as object);
if (errors.length > 0) {
throw new BadRequestException({
statusCode: 400,
message: this.flattenValidationErrors(errors),
error: 'Bad Request',
});
}
}
/**
* Flatten validation errors to a simple format (バリデーションエラーをシンプルな形式にフラット化)
*/
private flattenValidationErrors(
errors: ValidationError[],
parentPath = '',
): string[] {
const messages: string[] = [];
for (const error of errors) {
const currentPath = parentPath
? `${parentPath}.${error.property}`
: error.property;
if (error.children && error.children.length > 0) {
messages.push(
...this.flattenValidationErrors(error.children, currentPath),
);
} else if (error.constraints) {
const firstConstraint = Object.values(error.constraints)[0];
const message = firstConstraint.replace(error.property, currentPath);
messages.push(message);
}
}
return messages;
}
}
具象インポートストラテジー
// product/import/product-import.strategy.ts
import { Injectable } from '@nestjs/common';
import { KEY_SEPARATOR, generateId } from '@mbc-cqrs-serverless/core';
import { BaseImportStrategy } from '@mbc-cqrs-serverless/import';
import { ulid } from 'ulid';
import { ProductCommandDto } from '../dto/product-command.dto';
const PRODUCT_PK_PREFIX = 'PRODUCT';
export interface ProductImportInput {
code: string;
name: string;
category?: string;
price?: string;
description?: string;
tenantCode: string; // Passed from import context
}
@Injectable()
export class ProductImportStrategy
extends BaseImportStrategy<ProductImportInput, ProductCommandDto>
{
/**
* Transform import data to command DTO (インポートデータをコマンドDTOに変換)
*/
async transform(input: ProductImportInput): Promise<ProductCommandDto> {
const { tenantCode } = input;
const pk = `${PRODUCT_PK_PREFIX}${KEY_SEPARATOR}${tenantCode}`;
const sk = ulid();
const id = generateId(pk, sk);
return new ProductCommandDto({
pk,
sk,
id,
tenantCode,
code: input.code?.trim(),
name: input.name?.trim(),
type: 'PRODUCT',
attributes: {
category: input.category?.trim(),
price: input.price ? parseFloat(input.price.replace(/,/g, '')) : undefined,
description: input.description?.trim(),
},
});
}
/**
* Validate import input (インポート入力をバリデート)
*/
async validate(input: ProductImportInput): Promise<void> {
if (!input.code) {
throw new Error('Product code is required');
}
if (!input.name) {
throw new Error('Product name is required');
}
if (input.price && isNaN(parseFloat(input.price.replace(/,/g, '')))) {
throw new Error('Invalid price format');
}
}
}
エクスポートパターン
注意
以下に示すエクスポートパターンは、アプリケーション用の実装例です。インポートモジュール(@mbc-cqrs-serverless/import)とは異なり、フレームワークには専用のエクスポートパッケージはありません。これらのパターンをアプリケーションコードに直接実装できます。
エクスポートサービス
// export/export.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { S3Service } from '@mbc-cqrs-serverless/core';
import { PutObjectCommand } from '@aws-sdk/client-s3';
import { Workbook } from 'exceljs';
@Injectable()
export class ExportService {
private readonly logger = new Logger(ExportService.name);
constructor(private readonly s3Service: S3Service) {}
/**
* Export data to CSV and upload to S3 (データをCSVにエクスポートしてS3にアップロード)
*/
async exportToCsv(
data: Record<string, any>[],
headers: { key: string; label: string }[],
filename: string,
): Promise<{ bucket: string; key: string }> {
// Build CSV content (CSVコンテンツを構築)
const headerRow = headers.map(h => h.label).join(',');
const dataRows = data.map(row =>
headers.map(h => this.escapeCsvValue(row[h.key])).join(','),
);
const csvContent = [headerRow, ...dataRows].join('\n');
// Upload to S3 (S3にアップロード)
const bucket = this.s3Service.privateBucket;
const key = `exports/${Date.now()}/${filename}`;
await this.s3Service.client.send(new PutObjectCommand({
Bucket: bucket,
Key: key,
Body: csvContent,
ContentType: 'text/csv; charset=utf-8',
}));
this.logger.log(`Exported ${data.length} rows to ${key}`);
return { bucket, key };
}
/**
* Export data to Excel and upload to S3 (データをExcelにエクスポートしてS3にアップロード)
*/
async exportToExcel(
data: Record<string, any>[],
headers: { key: string; label: string; width?: number }[],
filename: string,
sheetName = 'Data',
): Promise<{ bucket: string; key: string }> {
const workbook = new Workbook();
const worksheet = workbook.addWorksheet(sheetName);
// Set columns (列を設定)
worksheet.columns = headers.map(h => ({
header: h.label,
key: h.key,
width: h.width || 15,
}));
// Style header row (ヘッダー行にスタイルを適用)
worksheet.getRow(1).font = { bold: true };
worksheet.getRow(1).fill = {
type: 'pattern',
pattern: 'solid',
fgColor: { argb: 'FFE0E0E0' },
};
// Add data rows (データ行を追加)
data.forEach(row => {
const rowData: Record<string, any> = {};
headers.forEach(h => {
rowData[h.key] = row[h.key];
});
worksheet.addRow(rowData);
});
// Generate buffer (バッファを生成)
const buffer = await workbook.xlsx.writeBuffer();
// Upload to S3 (S3にアップロード)
const bucket = this.s3Service.privateBucket;
const key = `exports/${Date.now()}/${filename}`;
await this.s3Service.client.send(new PutObjectCommand({
Bucket: bucket,
Key: key,
Body: buffer as Buffer,
ContentType: 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
}));
this.logger.log(`Exported ${data.length} rows to ${key}`);
return { bucket, key };
}
private escapeCsvValue(value: any): string {
if (value === null || value === undefined) return '';
const str = String(value);
if (str.includes(',') || str.includes('"') || str.includes('\n')) {
return `"${str.replace(/"/g, '""')}"`;
}
return str;
}
}
Step Functions統合
インポートオーケストレーション
// Import workflow with Step Functions (Step Functionsによるインポートワークフロー)
// serverless.yml
/*
stepFunctions:
stateMachines:
csvImport:
name: ${self:custom.prefix}-csv-import
definition:
StartAt: ParseFile
States:
ParseFile:
Type: Task
Resource: !GetAtt ParseFileLambda.Arn
Next: ProcessBatches
Catch:
- ErrorEquals: ["States.ALL"]
Next: HandleError
ProcessBatches:
Type: Map
ItemsPath: $.batches
MaxConcurrency: 5
Iterator:
StartAt: ProcessBatch
States:
ProcessBatch:
Type: Task
Resource: !GetAtt ProcessBatchLambda.Arn
End: true
Next: Finalize
Finalize:
Type: Task
Resource: !GetAtt FinalizeLambda.Arn
End: true
HandleError:
Type: Task
Resource: !GetAtt HandleErrorLambda.Arn
End: true
*/
ベストプラクティス
1. バッチ処理
大きなファイルは常にバッチで処理します:
const BATCH_SIZE = 30;
async processBatches<T>(
items: T[],
processor: (item: T) => Promise<void>,
): Promise<void> {
for (let i = 0; i < items.length; i += BATCH_SIZE) {
const batch = items.slice(i, i + BATCH_SIZE);
await Promise.all(batch.map(processor));
}
}
2. エラ ーハンドリング
処理を停止せずにエラーを収集し報告します:
const errors: Array<{ row: number; error: string }> = [];
for (const [index, row] of rows.entries()) {
try {
await processRow(row);
} catch (error) {
errors.push({ row: index + 1, error: error.message });
// Continue processing (処理を継続)
}
}
if (errors.length > 0) {
await this.reportErrors(errors);
}
3. 処理前のバ リデーション
インポートを開始する前にすべてのデータをバリデーションします:
// First pass: validate (最初のパス:バリデート)
const validationErrors = await this.validateAll(rows);
if (validationErrors.length > 0) {
return { success: false, errors: validationErrors };
}
// Second pass: process (2番目のパス:処理)
await this.processAll(rows);
4. 進捗報告
長時間実行されるインポートの進捗を報告します:
const total = rows.length;
let processed = 0;
for (const batch of batches) {
await processBatch(batch);
processed += batch.length;
// Report progress every 100 rows (100行ごとに進捗を報告)
if (processed % 100 === 0) {
this.logger.log(`Progress: ${processed}/${total} (${Math.round(processed/total*100)}%)`);
}
}