インポート/エクスポートパターン
このガイドでは、CSVの処理、Excelファイルの処理、Step Functionsを使用したバッチデータ操作など、データのインポートおよびエクスポート操作のパターンについて説明します。
このガイドを使用するタイミング
以下の場合にこのガイドを使用してください:
- CSVまたはExcelファイルから一括データをインポートする
- 様々な形式にデータをエクスポートする
- Step Functionsで大規模なデータセットを処理する
- S3署名付きURLでファイルアップロードを実装する
- 外部形式と内部形式の間でデータを変換する
インポートアーキテクチャの概要
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │────>│ S3 │────>│Step Function│────>│ Lambda │
│ (Upload) │ │ (Storage) │ │(Orchestrate)│ │ (Process) │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│
┌───────────────────────────┘
▼
┌─────────────┐ ┌─────────────┐
│ DynamoDB │<────│ Import │
│ (Command) │ │ Handler │
└─────────────┘ └─────────────┘
ファイルアップロードパターン
ストレージサービス
安全なファイルアップロードのための署名付きURLを生成します:
// storage/storage.service.ts
import { Injectable } from '@nestjs/common';
import { S3Service } from '@mbc-cqrs-serverless/core';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { PutObjectCommand, GetObjectCommand } from '@aws-sdk/client-s3';
@Injectable()
export class StorageService {
constructor(private readonly s3Service: S3Service) {}
/**
* Generate upload URL for file import
* ファイルインポート用のアップロードURLを生成
*/
async genUploadUrl(
filename: string,
contentType = 'text/csv',
): Promise<{ bucket: string; key: string; url: string }> {
const bucket = this.s3Service.privateBucket;
const timestamp = Date.now();
const key = `imports/${timestamp}/${filename}`;
const command = new PutObjectCommand({
Bucket: bucket,
Key: key,
ContentType: contentType,
ACL: 'private',
});
const url = await getSignedUrl(this.s3Service.client, command, {
expiresIn: 3600, // 1 hour / 1時間
});
return { bucket, key, url };
}
/**
* Generate download URL for file export
* ファイルエクスポート用のダウンロードURLを生成
*/
async genDownloadUrl(
key: string,
filename?: string,
): Promise<{ url: string }> {
const command = new GetObjectCommand({
Bucket: this.s3Service.privateBucket,
Key: key,
ResponseContentDisposition: filename
? `attachment; filename="${filename}"`
: undefined,
});
const url = await getSignedUrl(this.s3Service.client, command, {
expiresIn: 3600,
});
return { url };
}
}
ストレージコントローラー
// storage/storage.controller.ts
import { Controller, Post, Get, Body, Query } from '@nestjs/common';
import { StorageService } from './storage.service';
@Controller('api/storage')
export class StorageController {
constructor(private readonly storageService: StorageService) {}
/**
* Get presigned URL for upload
* アップロード用の署名付きURLを取得
*/
@Post('upload-url')
async getUploadUrl(
@Body() dto: { filename: string; contentType?: string },
) {
return this.storageService.genUploadUrl(dto.filename, dto.contentType);
}
/**
* Get presigned URL for download
* ダウンロード用の署名付きURLを取得
*/
@Get('download-url')
async getDownloadUrl(
@Query('key') key: string,
@Query('filename') filename?: string,
) {
return this.storageService.genDownloadUrl(key, filename);
}
}
CSVインポートパターン
CSVインポートコントローラー
// csv-import/csv-import.controller.ts
import { Controller, Post, Body } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { StepFunctionService, INVOKE_CONTEXT, IInvoke } from '@mbc-cqrs-serverless/core';
export class CsvImportDto {
bucket: string;
key: string;
type: string; // Import type identifier / インポートタイプ識別子
}
@Controller('api/csv-import')
export class CsvImportController {
private readonly importArn: string;
constructor(
private readonly configService: ConfigService,
private readonly sfnService: StepFunctionService,
) {
this.importArn = this.configService.get<string>('SFN_CSV_IMPORT_ARN');
}
/**
* Start CSV import via Step Functions
* Step Functionsを経由してCSVインポートを開始
*/
@Post('/')
async startImport(
@INVOKE_CONTEXT() invokeContext: IInvoke,
@Body() dto: CsvImportDto,
) {
const executionName = `${dto.type}-${Date.now()}`;
return this.sfnService.startExecution(
this.importArn,
{
...dto,
invokeContext,
},
executionName,
);
}
}
CSVパーサーサービス
// csv-import/csv-parser.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { S3Service } from '@mbc-cqrs-serverless/core';
import { GetObjectCommand } from '@aws-sdk/client-s3';
import * as csvParser from 'csv-parser';
import { Readable } from 'stream';
export interface ParsedRow {
rowNumber: number;
data: Record<string, string>;
errors: string[];
}
@Injectable()
export class CsvParserService {
private readonly logger = new Logger(CsvParserService.name);
constructor(private readonly s3Service: S3Service) {}
/**
* Parse CSV file from S3
* S3からCSVファイルを解析
*/
async parseFromS3(
bucket: string,
key: string,
options?: { encoding?: string; delimiter?: string },
): Promise<ParsedRow[]> {
const command = new GetObjectCommand({ Bucket: bucket, Key: key });
const response = await this.s3Service.client.send(command);
const stream = response.Body as Readable;
const results: ParsedRow[] = [];
let rowNumber = 0;
return new Promise((resolve, reject) => {
stream
.pipe(csvParser({
separator: options?.delimiter || ',',
skipLines: 0,
}))
.on('data', (data) => {
rowNumber++;
results.push({
rowNumber,
data,
errors: [],
});
})
.on('end', () => {
this.logger.log(`Parsed ${results.length} rows from ${key}`);
resolve(results);
})
.on('error', (error) => {
this.logger.error(`Failed to parse CSV: ${error.message}`);
reject(error);
});
});
}
/**
* Validate parsed rows
* 解析された行をバリデート
*/
validateRows(
rows: ParsedRow[],
requiredFields: string[],
): ParsedRow[] {
return rows.map(row => {
const errors: string[] = [];
for (const field of requiredFields) {
if (!row.data[field] || row.data[field].trim() === '') {
errors.push(`Missing required field: ${field}`);
}
}
return { ...row, errors };
});
}
}
インポートイベントハンドラー
// csv-import/event/csv-import.event.handler.ts
import { Injectable, Logger } from '@nestjs/common';
import { EventHandler, IEventHandler, SNSService } from '@mbc-cqrs-serverless/core';
import { ConfigService } from '@nestjs/config';
import { CsvParserService } from '../csv-parser.service';
import { ProductService } from '../../product/product.service';
export class CsvImportEvent {
bucket: string;
key: string;
type: string;
invokeContext: any;
}
@EventHandler(CsvImportEvent)
@Injectable()
export class CsvImportEventHandler implements IEventHandler<CsvImportEvent> {
private readonly logger = new Logger(CsvImportEventHandler.name);
private readonly alarmTopicArn: string;
constructor(
private readonly csvParser: CsvParserService,
private readonly productService: ProductService,
private readonly snsService: SNSService,
private readonly configService: ConfigService,
) {
this.alarmTopicArn = this.configService.get<string>('SNS_ALARM_TOPIC_ARN');
}
/**
* Process CSV import event
* CSVインポートイベントを処理
*/
async execute(event: CsvImportEvent): Promise<any> {
this.logger.log(`Processing import: ${event.key}`);
try {
// Parse CSV / CSVを解析
const rows = await this.csvParser.parseFromS3(event.bucket, event.key);
// Validate / バリデート
const validatedRows = this.csvParser.validateRows(rows, [
'code', 'name', 'price',
]);
// Filter valid rows / 有効な行をフィルタ
const validRows = validatedRows.filter(r => r.errors.length === 0);
const invalidRows = validatedRows.filter(r => r.errors.length > 0);
if (invalidRows.length > 0) {
this.logger.warn(`${invalidRows.length} rows have validation errors`);
}
// Process in batches / バッチで処理
const batchSize = 30;
let processedCount = 0;
for (let i = 0; i < validRows.length; i += batchSize) {
const batch = validRows.slice(i, i + batchSize);
await Promise.all(
batch.map(row => this.processRow(row.data, event.invokeContext)),
);
processedCount += batch.length;
this.logger.log(`Processed ${processedCount}/${validRows.length} rows`);
}
return {
success: true,
totalRows: rows.length,
processedRows: processedCount,
errorRows: invalidRows.length,
};
} catch (error) {
await this.sendAlarm(event, error);
throw error;
}
}
private async processRow(data: Record<string, string>, invokeContext: any) {
await this.productService.publishCommand({
code: data.code,
name: data.name,
attributes: {
price: parseFloat(data.price),
category: data.category,
description: data.description,
},
}, invokeContext);
}
private async sendAlarm(event: CsvImportEvent, error: Error) {
await this.snsService.publish({
topicArn: this.alarmTopicArn,
subject: 'CSV Import Error',
message: JSON.stringify({
key: event.key,
error: error.message,
timestamp: new Date().toISOString(),
}),
});
}
}
Excelインポートパターン
Excelヘルパー関数
// helpers/excel.ts
import { Workbook, Worksheet, Cell } from 'exceljs';
/**
* Get cell value handling formulas and rich text
* 数式やリッチテキストを処理してセル値を取得
*/
export function getCellValue(row: any, column: string): string | undefined {
const cell = row.getCell(column);
if (!cell || cell.value === null || cell.value === undefined) {
return undefined;
}
// Handle formula result / 数式の結果を処理
if (typeof cell.value === 'object' && 'result' in cell.value) {
return String(cell.value.result);
}
// Handle rich text / リッチテキストを処理
if (typeof cell.value === 'object' && 'richText' in cell.value) {
return cell.value.richText.map((r: any) => r.text).join('');
}
return String(cell.value);
}
/**
* Get numeric cell value
* 数値のセル値を取得
*/
export function getCellNumber(row: any, column: string): number | undefined {
const value = getCellValue(row, column);
if (!value) return undefined;
const num = parseFloat(value.replace(/,/g, ''));
return isNaN(num) ? undefined : num;
}
/**
* Get date cell value
* 日付のセル値を取得
*/
export function getCellDate(row: any, column: string): Date | undefined {
const cell = row.getCell(column);
if (cell.value instanceof Date) {
return cell.value;
}
const value = getCellValue(row, column);
if (!value) return undefined;
const date = new Date(value);
return isNaN(date.getTime()) ? undefined : date;
}
/**
* Find header row by matching column headers
* 列ヘッダーをマッチングしてヘッダー行を検索
*/
export function findHeaderRow(
worksheet: Worksheet,
expectedHeaders: string[],
maxRows = 20,
): number {
for (let rowNum = 1; rowNum <= maxRows; rowNum++) {
const row = worksheet.getRow(rowNum);
const values = row.values as any[];
const matches = expectedHeaders.filter(header =>
values.some(v => v && String(v).includes(header)),
);
if (matches.length >= expectedHeaders.length * 0.8) {
return rowNum;
}
}
throw new Error('Header row not found');
}
Excelインポートサービス
// excel-import/excel-import.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { S3Service } from '@mbc-cqrs-serverless/core';
import { GetObjectCommand } from '@aws-sdk/client-s3';
import { Workbook } from 'exceljs';
import { getCellValue, getCellNumber, findHeaderRow } from '../helpers/excel';
export interface ExcelImportResult {
success: boolean;
sheetName: string;
totalRows: number;
processedRows: number;
errors: Array<{ row: number; message: string }>;
}
@Injectable()
export class ExcelImportService {
private readonly logger = new Logger(ExcelImportService.name);
constructor(private readonly s3Service: S3Service) {}
/**
* Load workbook from S3
* S3からワークブックを読み込む
*/
async loadWorkbook(bucket: string, key: string): Promise<Workbook> {
const command = new GetObjectCommand({ Bucket: bucket, Key: key });
const response = await this.s3Service.client.send(command);
const chunks: Buffer[] = [];
for await (const chunk of response.Body as any) {
chunks.push(chunk);
}
const buffer = Buffer.concat(chunks);
const workbook = new Workbook();
if (key.endsWith('.xlsx') || key.endsWith('.xlsm')) {
await workbook.xlsx.load(buffer);
} else if (key.endsWith('.xls')) {
// For .xls files, use different parser / .xlsファイルには別のパーサーを使用
throw new Error('XLS format not supported. Please use XLSX.');
}
return workbook;
}
/**
* Process worksheet with row processor
* 行プロセッサでワークシートを処理
*/
async processWorksheet<T>(
worksheet: any,
config: {
headerRow?: number;
expectedHeaders?: string[];
startRow?: number;
processor: (row: any, rowNumber: number) => Promise<T | null>;
},
): Promise<ExcelImportResult & { data: T[] }> {
const errors: Array<{ row: number; message: string }> = [];
const data: T[] = [];
// Find or use specified header row / ヘッダー行を検索または指定されたものを使用
const headerRow = config.headerRow || (
config.expectedHeaders
? findHeaderRow(worksheet, config.expectedHeaders)
: 1
);
const startRow = config.startRow || headerRow + 1;
let processedRows = 0;
let totalRows = 0;
worksheet.eachRow((row: any, rowNumber: number) => {
if (rowNumber < startRow) return;
totalRows++;
});
for (let rowNum = startRow; rowNum <= worksheet.rowCount; rowNum++) {
const row = worksheet.getRow(rowNum);
// Skip empty rows / 空行をスキップ
if (this.isEmptyRow(row)) continue;
try {
const result = await config.processor(row, rowNum);
if (result !== null) {
data.push(result);
processedRows++;
}
} catch (error) {
errors.push({
row: rowNum,
message: error.message,
});
}
}
return {
success: errors.length === 0,
sheetName: worksheet.name,
totalRows,
processedRows,
errors,
data,
};
}
private isEmptyRow(row: any): boolean {
const values = row.values as any[];
return !values || values.every(v => v === null || v === undefined || v === '');
}
}