通知モジュール
NotificationModuleは、MBC CQRS Serverlessフレームワークで2種類の通知機能を提供します:
- WebSocketベースの更新用のリアルタイム通知(AWS AppSync経由)
- メール送信用のメール通知(AWS SES経由)
アーキテクチャ
リアルタイム通知
概要
DynamoDBでデータ変更が発生すると、リアルタイム通知が自動的に送信されます。システムはAWS AppSyncを使用して、購読中のWebSocketクライアントに通知を配信します。
INotificationインターフェース
通知ペイロードの構造:
interface INotification {
id: string; // Unique notification ID (一意の通知ID)
table: string; // Source DynamoDB table name (ソースDynamoDBテーブル名)
pk: string; // 変更されたアイテムのパーティションキー
sk: string; // 変更されたアイテムのソートキー
tenantCode: string; // Tenant code for filtering notifications (通知フィルタリング用のテナントコード)
action: string; // Type of change: 'INSERT', 'MODIFY', 'REMOVE' (変更タイプ)
content?: object; // Optional payload with changed data (変更データを含むオプションのペイロード)
}
AppSyncService
AppSyncServiceはリアルタイム通知をAppSyncに送信し、WebSocket経由で配信します。
メソッド: sendMessage(msg: INotification): Promise<void>
GraphQLミューテーション経由でAppSyncに通知を送信します。通知はすべての購読中のWebSocketクライアントに配信されます。
sendMessage の戻り値型はバージョン1.3.0で Promise<any> から Promise<void> に変更されました。テストでこのメソッドを mockResolvedValue(null) でモックしている場合は mockResolvedValue(undefined) に更新してください。詳細はv1.3.0移行ガイドを参照してください。
await this.appSyncService.sendMessage({
id: "unique-id",
table: "my-table",
pk: "ITEM#tenant1",
sk: "ITEM#001",
tenantCode: "tenant1",
action: "MODIFY",
content: { status: "updated" },
});
設定
以下の環境変数を設定してください:
APPSYNC_ENDPOINT=https://xxxxx.appsync-api.ap-northeast-1.amazonaws.com/graphql
APPSYNC_API_KEY=da2-xxxxxxxxxx # Optional: Use API key auth instead of IAM (オプション: IAMの代わりにAPIキー認証を使用)
使用方法
import { Injectable } from "@nestjs/common";
import { AppSyncService, INotification } from "@mbc-cqrs-serverless/core";
@Injectable()
export class MyService {
constructor(private readonly appSyncService: AppSyncService) {}
async notifyClients() {
const notification: INotification = {
id: "notification-123",
table: "my-table",
pk: "ITEM#tenant1",
sk: "ITEM#item001",
tenantCode: "tenant1",
action: "MODIFY",
content: { status: "updated" },
};
await this.appSyncService.sendMessage(notification);
}
}
認証
AppSyncServiceは2つの認証方法をサポートしています:
- API キー:
APPSYNC_API_KEY環境変数を設定 - IAM署名V4: APIキーが設定されていない場合に自動的に使用
自動通知
フレームワークはデータ変更時に以下の流れで自動的に通知を送信します:
- DynamoDBストリームが
NotificationEventHandlerをトリガー - ハンドラーが変更情報を抽出し
INotificationを作成 AppSyncService.sendMessage()がAppSyncに配信- 接続されたクライアントがWebSocket購読経由で更新を受信
NotificationEvent
NotificationEventクラスはSQSからの通知イベントを表します。IEventを実装し、通知データを含むSQSレコードをラップします。
import { NotificationEvent } from "@mbc-cqrs-serverless/core";
class NotificationEvent implements IEvent, SQSRecord {
source: string;
messageId: string;
receiptHandle: string;
body: string; // JSON string containing INotification data (INotificationデータを含むJSON文字列)
attributes: SQSRecordAttributes;
messageAttributes: SQSMessageAttributes;
md5OfBody: string;
eventSource: string;
eventSourceARN: string;
awsRegion: string;
// Creates a NotificationEvent from an SQS record (SQSレコードからNotificationEventを作成)
fromSqsRecord(record: SQSRecord): NotificationEvent;
}
NotificationEventHandler
NotificationEventHandlerはNotificationEventを処理してAppSyncに通知を送信する組み込みイベントハンドラーです。通知モジュールを使用する際に自動的に登録されます。
import { EventHandler, IEventHandler, NotificationEvent } from "@mbc-cqrs-serverless/core";
@EventHandler(NotificationEvent)
export class NotificationEventHandler implements IEventHandler<NotificationEvent> {
async execute(event: NotificationEvent): Promise<void> {
// Parses the notification from event body (イベント本文から通知をパース)
// Sends to AppSync via sendMessage() (sendMessage()経由でAppSyncに送信)
}
}
通常、このハンドラーと直接やり取りする必要はありません - SQSキューに通知が発行されると自動的に動作します。
AppSync Events API(オプトイン)
AppSyncEventsService とデュアルパブリッシュサポートは バージョン 1.3.0 で追加されました。
概要
AppSyncEventsService は AWS AppSync Events API(スキーマ不要の HTTP pub/sub サービス)をベースにした、代替(または補完)のリアルタイムトランスポートを提供します。GraphQL サブスクリプションとは異なり、GraphQL スキーマが不要で、クライアントはワイルドカードチャンネルパスでサブスクライブします。
NOTIFICATION_TRANSPORTS=appsync-event を設定してオプトインします。NOTIFICATION_TRANSPORTS=appsync-graphql,appsync-event と両方のエンドポイントを設定すると、フレームワークは両方のトランスポートにデュアルパブリッシュし、ダウンタイムなしのマイグレーションを可能にします。
メソッド: sendMessage(msg: INotification): Promise<void>
AppSync Events チャンネルに通知をパブリッシュします。通知はサブスクライブ中のすべてのクライアントに配信されます。
await this.appSyncEventsService.sendMessage({
id: "command-123",
table: "orders-table",
pk: "ORDER#tenant1",
sk: "ORDER#001",
tenantCode: "tenant1",
action: "MODIFY",
content: { status: "confirmed" },
});
使用方法
import { Injectable } from "@nestjs/common";
import { AppSyncEventsService, INotification } from "@mbc-cqrs-serverless/core";
@Injectable()
export class MyService {
constructor(private readonly appSyncEventsService: AppSyncEventsService) {}
async notifyClients() {
const notification: INotification = {
id: "notification-456",
table: "my-table",
pk: "ITEM#tenant1",
sk: "ITEM#item001",
tenantCode: "tenant1",
action: "MODIFY",
content: { status: "updated" },
};
await this.appSyncEventsService.sendMessage(notification);
}
}
チャンネル構造
すべての通知は最も具体的な単一チャンネルにパブリッシュされます。クライアントは AppSync Events のワイルドカード(/*)を使って必要な粒度でサブスクライブします:
/{namespace}/{tenantCode}/{action}/{sanitizedId}
seg 1 seg 2 seg 3 seg 4
| クライアントの目的 | サブスクライブ先 |
|---|---|
| テナントの全イベント | /{namespace}/{tenantCode}/* |
| アクションでフィルタリング | /{namespace}/{tenantCode}/{action}/* |
| 特定コマンドを追跡 | /{namespace}/{tenantCode}/{action}/{sanitizedId} |
設定
# Events API トランスポートを有効にします(Events API のみ)
NOTIFICATION_TRANSPORTS=appsync-event
APPSYNC_EVENTS_ENDPOINT=https://xxxx.appsync-api.ap-northeast-1.amazonaws.com/event
# オプション:AppSync Event API に事前作成されたネームスペースと一致する必要があります
APPSYNC_EVENTS_NAMESPACE=default
AppSync Events 環境変数 を参照してください。
GraphQL サブスクリプションからの移行
デュアルパブリッシュモードを使用して、ダウンタイムなしでクライアントを段階的に移行します:
フェーズ 1 — デュアルパブリッシュ(両方のトランスポートが有効):
NOTIFICATION_TRANSPORTS=appsync-graphql,appsync-event
APPSYNC_EVENTS_ENDPOINT=https://xxxx.appsync-api.ap-northeast-1.amazonaws.com/event
APPSYNC_EVENTS_NAMESPACE=default
APPSYNC_ENDPOINT=https://xxxx.appsync-api.ap-northeast-1.amazonaws.com/graphql # 設定済みのまま
フェーズ 2 — Events API のみ(全クライアントの移行完了後):
NOTIFICATION_TRANSPORTS=appsync-event
APPSYNC_EVENTS_ENDPOINT=https://xxxx.appsync-api.ap-northeast-1.amazonaws.com/event
APPSYNC_EVENTS_NAMESPACE=default
# APPSYNC_ENDPOINT を削除
CDK インフラストラクチャ
EventApi と ChannelNamespace を自動的にプロビジョニングするには、Config に appsyncEvents を追加します:
// infra/config/config.ts
export const config: Config = {
// ...
appsyncEvents: {
enabled: true,
namespace: 'default', // optional, defaults to 'default' (オプション、デフォルトは 'default')
apiKeyExpireDays: 365, // optional, defaults to 365 (オプション、デフォルトは 365)
},
}
CDK スタックは AppSyncEventsHttpEndpoint と AppSyncEventsNamespace を出力し、Lambda と ECS に NOTIFICATION_TRANSPORTS、APPSYNC_EVENTS_ENDPOINT、APPSYNC_EVENTS_NAMESPACE を自動で注入します。
認証
AppSyncEventsService はパブリッシュに2種類の認証方式をサポートします:
- IAM SigV4(Lambda/ECS 推奨): パブリッシュ時に自動で使用されます。CDK スタックの
grantPublish()により Lambda と ECS タスクロールにappsync:EventPublish権限が付与されます。 - API キー: ブラウザクライアントのサブスクライブに使用します。クライアント側(Amplify の
apiKey設定など)で設定します。サーバー側では不要です。
クライアントサイドサブスクリプション(ブラウザ)
ブラウザクライアントは AWS Amplify v6 の events API を使って AppSync Events チャンネルをサブスクライブします。チャンネルパスで受信する通知を決定します — /* ワイルドカードで任意の粒度でサブスクライブできます。
AWS Amplify v6 でのセットアップ
npm install aws-amplify
import { Amplify } from 'aws-amplify';
import { events } from 'aws-amplify/data';
// アプリ起動時に一度 Amplify を設定します(例: _app.tsx または main.ts)
Amplify.configure({
API: {
Events: {
endpoint: 'https://YOUR_APPSYNC_EVENTS_ENDPOINT/event',
region: 'ap-northeast-1',
defaultAuthMode: 'apiKey',
apiKey: 'YOUR_API_KEY', // CDK から AppSyncEventsApiKey として出力されます
},
},
});
通知のサブスクライブ
interface INotification {
id: string;
table: string; // 例: "dev-myapp-order-data"
pk: string; // 変更されたアイテムのパーティションキー
sk: string; // 変更されたアイテムのソートキー
tenantCode: string;
action: string; // INSERT | MODIFY | REMOVE
content?: object; // 変更内容のオプショナルペイロード
}
// テナントの全通知をサブスクライブ
const channel = await events.connect('/default/tenant001/*');
const subscription = channel.subscribe({
next: ({ data }: { data: INotification }) => {
console.log(`${data.action} on ${data.table}: pk=${data.pk}`);
// UI を更新するかローカルステートを更新
},
error: (err) => {
console.error('Subscription error:', err);
},
});
// コンポーネントのアンマウント時にサブスクライブ解除
subscription.unsubscribe();
React フックの例
import { useEffect, useRef } from 'react';
import { events } from 'aws-amplify/data';
function useOrderNotifications(tenantCode: string, orderId: string) {
const subscriptionRef = useRef<{ unsubscribe(): void } | null>(null);
useEffect(() => {
// 特定の注文をサブスクライブ
const channel = `/default/${tenantCode}/MODIFY/${orderId.replace(/#/g, '_')}`;
events.connect(channel).then((conn) => {
subscriptionRef.current = conn.subscribe({
next: ({ data }) => {
console.log('Order updated:', data);
// 再フェッチをトリガーするかローカルステートを更新
},
error: (err) => console.error(err),
});
});
return () => {
subscriptionRef.current?.unsubscribe();
};
}, [tenantCode, orderId]);
}
チャンネルパスのサニタイズされたアイテム ID は # を _ に置換します。例えば ORDER#ORD001 は ORDER_ORD001 になります。クライアントサイドでは sk.replace(/#/g, '_') を使って正確なチャンネルパスを算出してください。
カスタム通知トランスポート(オプション)
カスタムトランスポート用の @NotificationTransport デコレーターはバージョン 1.3.0で追加されました。
フレームワークのコードを変更せずに、任意の外部システム(Slack、PagerDuty、カスタム WebSocket サーバーなど)へ配信するカスタム通知トランスポートを登録できます。