メインコンテンツまでスキップ

Import

ImportModuleは、MBC CQRS Serverlessフレームワークで一括データインポート機能を提供します。単一レコードのインポート、CSVファイルのインポート、複数のCSVを含むZIPファイルのインポートをサポートしています。

アーキテクチャ

インストール

npm install @mbc-cqrs-serverless/import

モジュール登録

import { ImportModule } from "@mbc-cqrs-serverless/import";

@Module({
imports: [
ImportModule.register({
enableController: true, // 組み込みRESTコントローラーを有効化
profiles: [
{
tableName: "products",
importStrategy: ProductImportStrategy,
processStrategy: ProductProcessStrategy,
},
],
imports: [ProductModule], // ストラテジーの依存関係をエクスポートするモジュール
zipFinalizationHooks: [BackupToS3Hook], // オプション: インポート後のフック
}),
],
})
export class AppModule {}

モジュールオプション

オプション必須説明
profilesImportEntityProfile[]はい各エンティティタイプのインポート設定の配列
enableControllerbooleanいいえ組み込みのImportControllerエンドポイントを有効化
importsModuleMetadata['imports']いいえストラテジークラスが必要とするプロバイダをエクスポートするモジュール
zipFinalizationHooksType<IZipFinalizationHook>[]いいえZIPインポート完了後に実行されるフック

コアコンセプト

インポートエンティティプロファイル

インポートしたい各エンティティタイプには、プロファイル設定が必要です:

interface ImportEntityProfile {
tableName: string; // このデータタイプの一意識別子
importStrategy: Type<IImportStrategy<any, any>>; // 変換と検証
processStrategy: Type<IProcessStrategy<any, any>>; // 比較とマッピング
}

処理モード

モード説明ユースケース
DIRECTリクエスト内でCSVを即座に処理小さなファイル(100行未満)
STEP_FUNCTIONAWS Step Functions経由で非同期処理大きなファイル、本番インポート

インポートステータス

ステータス説明
CREATEDインポートジョブが作成され、処理待ち
QUEUED処理のためにキューに入れられたジョブ
PROCESSING現在処理中
COMPLETED正常に完了
FAILED処理失敗

APIリファレンス

ImportServiceのメソッド

createWithApi(dto: CreateImportDto, options): Promise<ImportEntity>

APIを使用して単一のインポートレコードを作成します。データは設定されたImportStrategyを使用して変換および検証されます。

const importEntity = await this.importService.createWithApi(
{
tableName: "products",
tenantCode: "tenant001",
attributes: {
code: "PROD001",
name: "Product One",
price: 100,
},
},
{ invokeContext }
);

handleCsvImport(dto: CreateCsvImportDto, options): Promise<ImportEntity[] | ImportEntity>

CSVインポートのメインルーター。processingModeに基づいて、直接処理またはStep Functionに委譲します。

// DIRECTモード - 作成されたインポートの配列を返す
const imports = await this.importService.handleCsvImport(
{
processingMode: "DIRECT",
bucket: "my-bucket",
key: "imports/products.csv",
tableName: "products",
tenantCode: "tenant001",
},
{ invokeContext }
);

// STEP_FUNCTIONモード - マスタージョブエンティティを返す
const masterJob = await this.importService.handleCsvImport(
{
processingMode: "STEP_FUNCTION",
bucket: "my-bucket",
key: "imports/products.csv",
tableName: "products",
tenantCode: "tenant001",
},
{ invokeContext }
);

createZipJob(dto: CreateZipImportDto, options): Promise<ImportEntity>

ZIPファイルインポート用のマスタージョブを作成します。ZIPファイルには複数のCSVファイルが含まれている必要があります。

const zipJob = await this.importService.createZipJob(
{
bucket: "my-bucket",
key: "imports/bulk-data.zip",
tenantCode: "tenant001",
sortedFileKeys: ["products.csv", "categories.csv"], // オプション: 処理順序を指定
tableName: "products", // オプション: tableName検出をオーバーライド
},
{ invokeContext }
);

updateStatus(key: DetailKey, status: string, payload?, attributes?, notifyId?): Promise<void>

インポートジョブのステータスを更新し、SNS経由で通知を送信します。

await this.importService.updateStatus(
{ pk: "IMPORT#tenant001", sk: "products#01ABC" },
"COMPLETED",
{ result: { recordsProcessed: 100 } }
);

getImportByKey(key: DetailKey): Promise<ImportEntity>

キーによってインポートエンティティを取得します。

const importJob = await this.importService.getImportByKey({
pk: "IMPORT#tenant001",
sk: "products#01ABC",
});

REST APIエンドポイント

enableController: trueの場合、以下のエンドポイントが利用可能です:

POST /imports

単一のインポートレコードを作成します。

{
"tableName": "products",
"tenantCode": "tenant001",
"attributes": {
"code": "PROD001",
"name": "Product One"
}
}

レスポンス: 202 Accepted

POST /imports/csv

CSVファイルのインポートを開始します。

{
"processingMode": "STEP_FUNCTION",
"bucket": "my-bucket",
"key": "imports/products.csv",
"tableName": "products",
"tenantCode": "tenant001"
}

レスポンス: 200 OK (DIRECT) / 202 Accepted (STEP_FUNCTION)

POST /imports/zip

ZIPファイルのインポートを開始します。

{
"bucket": "my-bucket",
"key": "imports/bulk-data.zip",
"tenantCode": "tenant001"
}

レスポンス: 202 Accepted

インポートストラテジーの実装

IImportStrategyインターフェース

ImportStrategyはデータの変換と検証を処理します:

interface IImportStrategy<TInput, TAttributesDto> {
transform(input: TInput): Promise<TAttributesDto>;
validate(data: TAttributesDto): Promise<void>;
}

BaseImportStrategyの使用

共通機能のために基底クラスを拡張します:

import { BaseImportStrategy } from "@mbc-cqrs-serverless/import";
import { Injectable } from "@nestjs/common";

@Injectable()
export class ProductImportStrategy extends BaseImportStrategy<
CsvProductRow,
ProductAttributesDto
> {
async transform(input: CsvProductRow): Promise<ProductAttributesDto> {
return {
code: input.product_code?.trim(),
name: input.product_name?.trim(),
price: parseFloat(input.price),
category: input.category?.trim(),
};
}

// validate()は継承されます - class-validatorを使用
}

IProcessStrategyインターフェース

ProcessStrategyは既存データとの比較とコマンドへのマッピングを処理します:

interface IProcessStrategy<TEntity, TAttributesDto> {
compare(
importAttributes: TAttributesDto,
tenantCode: string
): Promise<ComparisonResult<TEntity>>;

map(
status: ComparisonStatus,
importAttributes: TAttributesDto,
tenantCode: string,
existingData?: TEntity
): Promise<CommandInputModel | CommandPartialInputModel>;

getCommandService(): CommandService;
}

比較ステータス

ステータス説明
NOT_EXISTエンティティが存在しない - 新規作成
CHANGEDエンティティは存在するが変更あり - 更新
EQUALエンティティが存在し同一 - スキップ

ProcessStrategyの例

import {
BaseProcessStrategy,
ComparisonResult,
ComparisonStatus,
} from "@mbc-cqrs-serverless/import";
import { CommandService, DataService } from "@mbc-cqrs-serverless/core";
import { Injectable } from "@nestjs/common";

@Injectable()
export class ProductProcessStrategy extends BaseProcessStrategy<
ProductEntity,
ProductAttributesDto
> {
constructor(
private readonly dataService: DataService,
private readonly commandService: CommandService
) {
super();
}

async compare(
importAttributes: ProductAttributesDto,
tenantCode: string
): Promise<ComparisonResult<ProductEntity>> {
const pk = `PRODUCT#${tenantCode}`;
const sk = `PRODUCT#${importAttributes.code}`;

const existing = await this.dataService.getItem({ pk, sk });

if (!existing) {
return { status: ComparisonStatus.NOT_EXIST };
}

// 関連フィールドを比較
if (
existing.name !== importAttributes.name ||
existing.attributes?.price !== importAttributes.price
) {
return { status: ComparisonStatus.CHANGED, existingData: existing };
}

return { status: ComparisonStatus.EQUAL };
}

async map(
status: ComparisonStatus,
importAttributes: ProductAttributesDto,
tenantCode: string,
existingData?: ProductEntity
) {
const pk = `PRODUCT#${tenantCode}`;
const sk = `PRODUCT#${importAttributes.code}`;

if (status === ComparisonStatus.NOT_EXIST) {
return {
pk,
sk,
code: importAttributes.code,
name: importAttributes.name,
tenantCode,
type: "PRODUCT",
attributes: { price: importAttributes.price },
};
}

// 既存を更新
return {
pk,
sk,
name: importAttributes.name,
version: existingData.version,
attributes: {
...existingData.attributes,
price: importAttributes.price,
},
};
}

getCommandService(): CommandService {
return this.commandService;
}
}

ZIPファイナライゼーションフック

フックはZIPインポート完了後に実行されます。後処理タスクに使用します。

import { IZipFinalizationHook, ZipFinalizationContext } from "@mbc-cqrs-serverless/import";
import { S3Service } from "@mbc-cqrs-serverless/core";
import { Injectable } from "@nestjs/common";

@Injectable()
export class BackupToS3Hook implements IZipFinalizationHook {
constructor(private readonly s3Service: S3Service) {}

async execute(context: ZipFinalizationContext): Promise<void> {
const { executionInput, status, results } = context;
const { bucket, key } = executionInput.parameters;

if (status === "COMPLETED") {
// ファイルをバックアップ場所に移動
const backupKey = `backup/${new Date().toISOString()}/${key}`;
await this.s3Service.copyObject({
sourceBucket: bucket,
sourceKey: key,
destinationBucket: bucket,
destinationKey: backupKey,
});
}
}
}

ZipFinalizationContext

プロパティ説明
eventZipImportSfnEvent元のStep Functionイベント
masterJobKeyDetailKeyマスターZIPジョブのキー
resultsobject集計結果(totalRows、processedRows、failedRows)
statusImportStatusEnumジョブの最終ステータス
executionInputany元のStep Functions実行入力

DTO

CreateImportDto

プロパティ必須説明
tableNamestringはいターゲットエンティティタイプ
tenantCodestringはいテナントコード
attributesobjectはいインポートデータの属性
sourceIdstringいいえソース識別子
namestringいいえインポートの表示名

CreateCsvImportDto

プロパティ必須説明
processingMode'DIRECT' | 'STEP_FUNCTION'はいCSVの処理方法
bucketstringはいS3バケット名
keystringはいS3オブジェクトキー
tableNamestringはいターゲットエンティティタイプ
tenantCodestringはいテナントコード

CreateZipImportDto

プロパティ必須説明
bucketstringはいS3バケット名
keystringはいS3オブジェクトキー
tenantCodestringはいテナントコード
sortedFileKeysstring[]いいえこの順序でファイルを処理
tableNamestringいいえ自動検出されたtableNameをオーバーライド

ベストプラクティス

CSVファイル形式

  • UTF-8エンコーディングを使用
  • 列名を含むヘッダー行を含める
  • 列名は自動的にトリムされます
  • 値は自動的にトリムされます

大規模ファイルのインポート

  • 100行以上のファイルにはSTEP_FUNCTIONモードを使用
  • SNS通知で進行状況を監視
  • 順序どおりにインポートする必要がある関連データにはZIPインポートを使用

エラー処理

  • 無効な行はCSV処理でログに記録されスキップされます
  • failedRowsカウンターを使用して失敗を追跡
  • APIまたは通知でインポートステータスを確認

関連項目