Skip to main content

Queue Module

The QueueModule provides messaging services for SNS (pub/sub) and SQS (queue) operations. It is registered as a global module, so SnsService and SqsService are available for injection throughout the application without explicitly importing QueueModule.

SnsService

SnsService provides SNS publish operations.

publish<T extends SnsEvent>(msg: T, topicArn?: string)

Publishes a message to an SNS topic.

// Inject SnsService
constructor(private readonly snsService: SnsService) {}

// Publish to default topic (SNS_TOPIC_ARN env var)
await this.snsService.publish({ action: 'my-action', ...payload })

// Publish to a specific topic
await this.snsService.publish({ action: 'my-action', ...payload }, 'arn:aws:sns:...')

Environment variables:

VariableDescription
SNS_TOPIC_ARNDefault topic ARN used when topicArn is not provided
SNS_ENDPOINTCustom endpoint (e.g. http://localhost:4566 for LocalStack)
SNS_REGIONAWS region

SqsService

Version Note

SqsService was added in version 1.2.1.

SqsService provides SQS send, receive, and delete operations.

Environment variables:

VariableDescription
SQS_ENDPOINTCustom endpoint (e.g. http://localhost:4566 for LocalStack)
SQS_REGIONAWS region

sendMessage(queueUrl, body, opts?)

Sends a single message to an SQS queue.

await this.sqsService.sendMessage(
'https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-queue',
JSON.stringify({ key: 'value' }),
)

// With optional fields
await this.sqsService.sendMessage(queueUrl, body, {
DelaySeconds: 5,
MessageGroupId: 'group-1', // FIFO queues only
MessageDeduplicationId: 'dedup-1', // FIFO queues only
MessageAttributes: { key: { DataType: 'String', StringValue: 'value' } }, // {{Custom message attributes}}
})

sendMessageBatch(queueUrl, entries)

Sends up to 10 messages in a single API call. Caller is responsible for ensuring entries.length <= 10.

await this.sqsService.sendMessageBatch(queueUrl, [
{ Id: '1', MessageBody: JSON.stringify({ key: 'value1' }) },
{ Id: '2', MessageBody: JSON.stringify({ key: 'value2' }) },
])

receiveMessages(queueUrl, opts?)

Receives messages from an SQS queue. Defaults to MaxNumberOfMessages: 10 and WaitTimeSeconds: 0.

// Default: MaxNumberOfMessages=10, WaitTimeSeconds=0
const output = await this.sqsService.receiveMessages(queueUrl)
const messages = output.Messages ?? []

// With options
const outputWithOpts = await this.sqsService.receiveMessages(queueUrl, {
MaxNumberOfMessages: 5,
WaitTimeSeconds: 20, // Long polling
VisibilityTimeout: 30,
MessageSystemAttributeNames: ['All'],
})

deleteMessage(queueUrl, receiptHandle)

Deletes a single message from the queue after processing.

for (const message of messages) {
// Process message...
await this.sqsService.deleteMessage(queueUrl, message.ReceiptHandle!)
}

deleteMessageBatch(queueUrl, entries)

Deletes up to 10 messages in a single API call.

await this.sqsService.deleteMessageBatch(queueUrl, [
{ Id: '1', ReceiptHandle: messages[0].ReceiptHandle! },
{ Id: '2', ReceiptHandle: messages[1].ReceiptHandle! },
])