Import
ImportModuleは、MBC CQRS Serverlessフレームワークで一括データインポート機能を提供します。単一レコードのインポート、CSVファイルのインポート、複数のCSVを含むZIPファイルのインポートをサポートしています。
アーキテクチャ
インストール
npm install @mbc-cqrs-serverless/import
モジュール登録
import { ImportModule } from "@mbc-cqrs-serverless/import";
@Module({
imports: [
ImportModule.register({
enableController: true, // 組み込みRESTコントローラーを有効化
profiles: [
{
tableName: "products",
importStrategy: ProductImportStrategy,
processStrategy: ProductProcessStrategy,
},
],
imports: [ProductModule], // ストラテジーの依存関係をエクスポートするモジュール
zipFinalizationHooks: [BackupToS3Hook], // オプション: インポート後のフック
}),
],
})
export class AppModule {}
モジュールオプション
| オプション | 型 | 必須 | 説明 |
|---|---|---|---|
profiles | ImportEntityProfile[] | はい | 各エンティティタイプのインポート設定の配列 |
enableController | boolean | いいえ | 組み込みのImportControllerエンドポイントを有効化 |
imports | ModuleMetadata['imports'] | いいえ | ストラテジークラスが必要とするプロバイダをエクスポートするモジュール |
zipFinalizationHooks | Type<IZipFinalizationHook>[] | いい え | ZIPインポート完了後に実行されるフック |
コアコンセプト
インポートエンティティプロファイル
インポートしたい各エンティティタイプには、プロファイル設定が必要です:
interface ImportEntityProfile {
tableName: string; // このデータタイプの一意識別子
importStrategy: Type<IImportStrategy<any, any>>; // 変換と検証
processStrategy: Type<IProcessStrategy<any, any>>; // 比較とマッピング
}
処理モード
| モード | 説明 | ユースケース |
|---|---|---|
DIRECT | リクエスト内でCSVを即座に処理 | 小さなファイル(100行未満) |
STEP_FUNCTION | AWS Step Functions経由で非同期処理 | 大きなファイル、本番インポート |
インポートステータス
| ステータス | 説明 |
|---|---|
CREATED | インポートジョブが作成され、処理待ち |
QUEUED | 処理のためにキューに入れられたジョブ |
PROCESSING | 現在処理中 |
COMPLETED | 正常に完了 |
FAILED | 処理失敗 |
APIリファレンス
ImportServiceのメソッド
createWithApi(dto: CreateImportDto, options): Promise<ImportEntity>
APIを使用して単一のインポートレコードを作成します。データは設定されたImportStrategyを使用して変換および検証されます。
const importEntity = await this.importService.createWithApi(
{
tableName: "products",
tenantCode: "tenant001",
attributes: {
code: "PROD001",
name: "Product One",
price: 100,
},
},
{ invokeContext }
);
handleCsvImport(dto: CreateCsvImportDto, options): Promise<ImportEntity[] | ImportEntity>
CSVインポートのメインルーター。processingModeに基づいて、直接処理またはStep Functionに委譲します。
// DIRECTモード - 作成されたインポートの配列を返す
const imports = await this.importService.handleCsvImport(
{
processingMode: "DIRECT",
bucket: "my-bucket",
key: "imports/products.csv",
tableName: "products",
tenantCode: "tenant001",
},
{ invokeContext }
);
// STEP_FUNCTIONモード - マスタージョブエンティティを返す
const masterJob = await this.importService.handleCsvImport(
{
processingMode: "STEP_FUNCTION",
bucket: "my-bucket",
key: "imports/products.csv",
tableName: "products",
tenantCode: "tenant001",
},
{ invokeContext }
);
createZipJob(dto: CreateZipImportDto, options): Promise<ImportEntity>
ZIPファイルインポート用のマスタージョブを作成します。ZIPファイルには複数のCSVファイルが含まれている必要があります。
const zipJob = await this.importService.createZipJob(
{
bucket: "my-bucket",
key: "imports/bulk-data.zip",
tenantCode: "tenant001",
sortedFileKeys: ["products.csv", "categories.csv"], // オプション: 処理順序を指定
tableName: "products", // オプション: tableName検出をオーバーライド
},
{ invokeContext }
);
updateStatus(key: DetailKey, status: string, payload?, attributes?, notifyId?): Promise<void>
インポートジョブのステータスを更新し、SNS経由で通知を送信します。
await this.importService.updateStatus(
{ pk: "IMPORT#tenant001", sk: "products#01ABC" },
"COMPLETED",
{ result: { recordsProcessed: 100 } }
);
getImportByKey(key: DetailKey): Promise<ImportEntity>
キーによってインポートエンティティを取得します。
const importJob = await this.importService.getImportByKey({
pk: "IMPORT#tenant001",
sk: "products#01ABC",
});
REST APIエンドポイント
enableController: trueの場合、以下のエンドポイントが利用可能です:
POST /imports
単一のインポートレコードを作成します。
{
"tableName": "products",
"tenantCode": "tenant001",
"attributes": {
"code": "PROD001",
"name": "Product One"
}
}
レスポンス: 202 Accepted
POST /imports/csv
CSVファイルのインポートを開始します。
{
"processingMode": "STEP_FUNCTION",
"bucket": "my-bucket",
"key": "imports/products.csv",
"tableName": "products",
"tenantCode": "tenant001"
}
レスポンス: 200 OK (DIRECT) / 202 Accepted (STEP_FUNCTION)
POST /imports/zip
ZIPファイルのインポートを開始します。
{
"bucket": "my-bucket",
"key": "imports/bulk-data.zip",
"tenantCode": "tenant001"
}
レスポンス: 202 Accepted