バージョン競合ガイド
このガイドでは、MBC CQRS サーバーレスフレームワークでバージョン競合がどのように発生するかを説明し、予防と回復のための戦略を提供します。
バージョン競合の原因
バージョン競合は、2つ以上の操作が同時に同じアイテムを更新しようとしたときに発生しま す。サーバーレスアプリケーションのような分散システムでは、これは適切に処理する必要がある一般的なシナリオです。
競合シナリオ
ユーザーAがアイテムを読み取り(バージョン1)
ユーザーBがアイテムを読み取り(バージョン1)
ユーザーAがアイテムを更新(バージョン1 -> 2)- 成功
ユーザーBがアイテムを更新(バージョン1 -> 2)- 競合!
このシナリオでは、ユーザーBの更新は、ユーザーAによって既にアイテムが更新されているため失敗します。フレームワークはDynamoDBの条件付き書き込みを使用してこの状況を検出します。
楽観的ロックの仕組み
フレームワークは、各アイテムのversionフィールドを通じて楽観的ロックを実装します。このアプローチは、競合はまれであると想定し、リソースを事前にロックするのではなく、発生したときに処理します。
バージョン定数
import { VERSION_FIRST, VERSION_LATEST } from '@mbc-cqrs-serverless/core';
// VERSION_FIRST = 0: 新規アイテム作成時に使用
// VERSION_LATEST = -1: 最新バージョンに自動解決
内部動作の仕組み
コマンドを発行する際、フレームワークは:
- 入力バージョンを現在のアイテムのバージョンと照合します
- バージョン番号を1増加させます
- DynamoDBの条件式
attribute_not_exists(pk) AND attribute_not_exists(sk)を使用して一意性を確保します - 別の更新が先に発生した場合、DynamoDBは
ConditionalCheckFailedExceptionをスローします - フレームワークはこれをHTTP 409 Conflictレスポンスに変換します
// 内部実装(簡略化)
await this.dynamoDbService.putItem(
this.tableName,
command,
'attribute_not_exists(pk) AND attribute_not_exists(sk)', // 条件付き書き込み
);
予防戦略
1. 更新時に常にバージョンを含める
アイテムを更新する際は、常に現在のバージョン番号を含めてください:
import { CommandPartialInputModel } from '@mbc-cqrs-serverless/core';
// まず、現在のアイテムを取得してバージョンを確認
const currentItem = await this.dataService.getItem({ pk, sk });
const updateCommand: CommandPartialInputModel = {
pk: currentItem.pk,
sk: currentItem.sk,
version: currentItem.version, // 現在のバージョンを含める
name: 'Updated Name',
};
await this.commandService.publishPartialUpdateAsync(updateCommand, {
source: 'updateItem',
invokeContext,
});
2. 自動解決にVERSION_LATESTを使用する
正確なバージョン番号を気にせずに常に最新バージョンを更新したい場合:
import { VERSION_LATEST, CommandInputModel } from '@mbc-cqrs-serverless/core';
const command: CommandInputModel = {
pk: catPk,
sk: catSk,
id: generateId(catPk, catSk),
code,
type: 'CAT',
name: 'Updated Name',
version: VERSION_LATEST, // 最新バージョンに自動解決
attributes,
};
await this.commandService.publishAsync(command, {
source: 'updateCat',
invokeContext,
});
3. 新規アイテムにはVERSION_FIRSTを使用する
新規アイテムを作成する際は、VERSION_FIRST(0)を使用して最初のバージョンであることを示します:
import { VERSION_FIRST, CommandDto } from '@mbc-cqrs-serverless/core';
const newCatCommand = new CatCommandDto({
pk: catPk,
sk: catSk,
id: generateId(catPk, catSk),
code,
type: 'CAT',
name: 'New Cat',
version: VERSION_FIRST, // 0 - 新規アイテムを示す
attributes,
});
await this.commandService.publishAsync(newCatCommand, {
source: 'createCat',
invokeContext,
});
回復パターン
基本的なリトライロジック
一時的な競合を処理するためのリトライロジックを実装します:
import { ConditionalCheckFailedException } from '@aws-sdk/client-dynamodb';
async function updateWithRetry(
commandService: CommandService,
dataService: DataService,
pk: string,
sk: string,
updateData: Partial<CommandInputModel>,
invokeContext: IInvoke,
maxRetries = 3,
): Promise<CommandModel> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
// 最新バージョンを取得
const currentItem = await dataService.getItem({ pk, sk });
const command: CommandPartialInputModel = {
pk,
sk,
version: currentItem?.version || VERSION_FIRST,
...updateData,
};
return await commandService.publishPartialUpdateAsync(command, {
source: 'updateWithRetry',
invokeContext,
});
} catch (error) {
if (
error instanceof ConditionalCheckFailedException ||
error.statusCode === 409
) {
if (attempt === maxRetries) {
throw new Error(
`Failed to update after ${maxRetries} attempts due to version conflicts`,
);
}
// リトライ前に待機(指数バックオフ)
await new Promise((resolve) =>
setTimeout(resolve, Math.pow(2, attempt) * 100),
);
continue;
}
throw error;
}
}
}
指数バックオフパターン
高競合シナリオでは、ジッターを伴う指数バックオフを使用します:
async function exponentialBackoff(attempt: number): Promise<void> {
const baseDelay = 100; // ミリ秒単位の基本遅延
const maxDelay = 5000; // 最大遅延
// 指数バックオフで遅延を計算
const delay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay);
// サンダリングハードを防ぐためにランダムジッターを追加
const jitter = Math.random() * delay * 0.1;
await new Promise((resolve) => setTimeout(resolve, delay + jitter));
}