メインコンテンツまでスキップ

キューモジュール

QueueModuleはSNS(パブ/サブ)とSQS(キュー)操作のメッセージングサービスを提供します。グローバルモジュールとして登録されているため、QueueModuleを明示的にインポートしなくても、SnsServiceSqsServiceをアプリケーション全体でインジェクションできます。

SnsService

SnsServiceはSNSのパブリッシュ操作を提供します。

publish<T extends SnsEvent>(msg: T, topicArn?: string)

SNSトピックへメッセージをパブリッシュします。

// SnsServiceをインジェクト
constructor(private readonly snsService: SnsService) {}

// デフォルトトピックへ送信(SNS_TOPIC_ARN環境変数)
await this.snsService.publish({ action: 'my-action', ...payload })

// 特定のトピックへ送信
await this.snsService.publish({ action: 'my-action', ...payload }, 'arn:aws:sns:...')

環境変数:

変数説明
SNS_TOPIC_ARNtopicArn未指定時に使用されるデフォルトのトピックARN
SNS_ENDPOINTカスタムエンドポイント(例: LocalStack用http://localhost:4566
SNS_REGIONAWSリージョン

SqsService

バージョンノート

SqsServiceバージョン1.2.1で追加されました。

SqsServiceはSQSの送信・受信・削除操作を提供します。

環境変数:

変数説明
SQS_ENDPOINTカスタムエンドポイント(例: LocalStack用http://localhost:4566
SQS_REGIONAWSリージョン

sendMessage(queueUrl, body, opts?)

SQSキューへ単一メッセージを送信します。

await this.sqsService.sendMessage(
'https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-queue',
JSON.stringify({ key: 'value' }),
)

// オプションフィールドを指定する場合
await this.sqsService.sendMessage(queueUrl, body, {
DelaySeconds: 5,
MessageGroupId: 'group-1', // FIFOキューのみ
MessageDeduplicationId: 'dedup-1', // FIFOキューのみ
MessageAttributes: { key: { DataType: 'String', StringValue: 'value' } }, // {{Custom message attributes}}
})

sendMessageBatch(queueUrl, entries)

1回のAPIコールで最大10件のメッセージを送信します。entries.length <= 10であることは呼び出し側の責任です。

await this.sqsService.sendMessageBatch(queueUrl, [
{ Id: '1', MessageBody: JSON.stringify({ key: 'value1' }) },
{ Id: '2', MessageBody: JSON.stringify({ key: 'value2' }) },
])

receiveMessages(queueUrl, opts?)

SQSキューからメッセージを受信します。デフォルトはMaxNumberOfMessages: 10WaitTimeSeconds: 0です。

// デフォルト: MaxNumberOfMessages=10, WaitTimeSeconds=0
const output = await this.sqsService.receiveMessages(queueUrl)
const messages = output.Messages ?? []

// オプションを指定する場合
const outputWithOpts = await this.sqsService.receiveMessages(queueUrl, {
MaxNumberOfMessages: 5,
WaitTimeSeconds: 20, // ロングポーリング
VisibilityTimeout: 30,
MessageSystemAttributeNames: ['All'],
})

deleteMessage(queueUrl, receiptHandle)

処理後にキューから単一メッセージを削除します。

for (const message of messages) {
// メッセージを処理...
await this.sqsService.deleteMessage(queueUrl, message.ReceiptHandle!)
}

deleteMessageBatch(queueUrl, entries)

1回のAPIコールで最大10件のメッセージを削除します。

await this.sqsService.deleteMessageBatch(queueUrl, [
{ Id: '1', ReceiptHandle: messages[0].ReceiptHandle! },
{ Id: '2', ReceiptHandle: messages[1].ReceiptHandle! },
])