wtsqs
v1.3.0
Published
AWS SQS Worker Wrapper
Downloads
505
Maintainers
Readme
WTSQS
Simplified SQS Wrapper and Async Worker manager.
Features:
- Simple interface. :white_check_mark:
- Promise based. :white_check_mark:
- ES6. :white_check_mark:
- Optimized async worker. :white_check_mark:
Install
# Using npm
$ npm install wtsqs --save
# Or using yarn
$ yarn add wtsqs
Classes
Typedefs
WTSQS
A simplified sqs wrapper with interface similar to a normal queue data structure.
Kind: global class
- WTSQS
- new WTSQS(options)
- .size() ⇒ Promise.<integer>
- .enqueueOne(payload, [options], [sqsOptions]) ⇒ Promise
- .enqueueMany(payloads, [options], [sqsOptions]) ⇒ Promise
- .peekOne([options], [sqsOptions]) ⇒ Promise.<(Message|null)>
- .peekMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒ Promise.<Array.<Message>>
- .deleteOne(message) ⇒ Promise
- .deleteMany(messages) ⇒ Promise
- .deleteAll() ⇒ Promise
- .popOne([options], [sqsOptions]) ⇒ Promise.<(Message|null)>
- .popMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒ Promise.<Array.<Message>>
new WTSQS(options)
Constructs WTSQS object.
| Param | Type | Default | Description | | --- | --- | --- | --- | | options | Object | | Options object. | | options.url | String | | SQS queue url. | | [options.accessKeyId] | String | | AWS access key id. | | [options.secretAccessKey] | String | | AWS secret access key. | | [options.region] | String | us-east-1 | AWS regions where queue exists. | | [options.defaultMessageGroupId] | String | | FIFO queues only. Default tag assigned to a message that specifies it belongs to a specific message group. If not provided random uuid is assigned to each message which doesn't guarantee order but allows parallelism. | | [options.defaultVisibilityTimeout] | Integer | 60 | Default duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | | [options.defaultPollWaitTime] | Integer | 10 | Default duration (in seconds) for which read calls wait for a message to arrive in the queue before returning. | | [options.sqsOptions] | Object | | Additional options to extend/override the underlying SQS object creation. |
Example
const { WTSQS } = require('wtsqs')
// The most simple way to construct a WTSQS object
const wtsqs = new WTSQS({
url: '//queue-url',
accessKeyId: 'AWS_ACCESS_KEY_ID',
secretAccessKey: 'AWS_SECRET_ACCESS_KEY'
})
wtsqs.size() ⇒ Promise.<integer>
Get approximate total number of messages in the queue.
Kind: instance method of WTSQS
Example
const size = await wtsqs.size()
console.log(size) // output: 2
wtsqs.enqueueOne(payload, [options], [sqsOptions]) ⇒ Promise
Enqueue single payload in the queue.
Kind: instance method of WTSQS
See: SQS#sendMessage
| Param | Type | Default | Description | | --- | --- | --- | --- | | payload | Object | | JSON serializable object. | | [options] | Object | | Options. | | [options.messageGroupId] | String | | Message group id to override default id. | | [sqsOptions] | Object | {} | Additional options to extend/override the underlying SQS sendMessage request. |
Example
const myObj = { a: 1 }
await wtsqs.enqueueOne(myObj)
wtsqs.enqueueMany(payloads, [options], [sqsOptions]) ⇒ Promise
Enqueue batch of payloads in the queue.
Kind: instance method of WTSQS
See: SQS#sendMessageBatch
| Param | Type | Default | Description | | --- | --- | --- | --- | | payloads | Array.<Object> | | Array of JSON serializable objects. | | [options] | Object | | Options object. | | [options.messageGroupId] | String | | Message group id to override default id. | | [sqsOptions] | Object | {} | Additional options to extend/override the underlying SQS sendMessageBatch request. |
Example
const myObjList = [{ a: 1 }, { b: 3 }]
await wtsqs.enqueueMany(myObjList)
wtsqs.peekOne([options], [sqsOptions]) ⇒ Promise.<(Message|null)>
Retrieve single message without deleting it.
Kind: instance method of WTSQS
Returns: Promise.<(Message|null)> - Message object or null if queue is empty.
| Param | Type | Default | Description | | --- | --- | --- | --- | | [options] | Object | | Options object. | | [options.pollWaitTime] | Integer | | Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | | [options.visibilityTimeout] | Integer | | Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | | [sqsOptions] | Object | {} | Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessage = await wtsqs.peekOne()
console.log(myMessage)
// output:
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
}
wtsqs.peekMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒ Promise.<Array.<Message>>
Retrieve batch of messages without deleting them.
Kind: instance method of WTSQS
Returns: Promise.<Array.<Message>> - Array of retrieved messages.
See: SQS#receiveMessage
| Param | Type | Default | Description | | --- | --- | --- | --- | | [maxNumberOfMessages] | Number | 10 | Maximum number of messages to retrieve. Must be between 1 and 10. | | [options] | Object | | Options object. | | [options.pollWaitTime] | Integer | | Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | | [options.visibilityTimeout] | Integer | | Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | | [sqsOptions] | Object | {} | Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessageList = await wtsqs.peekMany(2)
console.log(myMessageList)
// output:
[
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
},
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { b: 3 }
}
]
wtsqs.deleteOne(message) ⇒ Promise
Delete single message from queue.
Kind: instance method of WTSQS
See: SQS#deleteMessage
| Param | Type | Description | | --- | --- | --- | | message | Message | Message to be deleted |
Example
const myMessage = await wtsqs.peekOne()
await wtsqs.deleteOne(myMessage)
wtsqs.deleteMany(messages) ⇒ Promise
Delete batch of messages from queue.
Kind: instance method of WTSQS
See: SQS#deleteMessageBatch
| Param | Type | Description | | --- | --- | --- | | messages | Array.<Message> | Messages to be deleted |
Example
const myMessageList = await wtsqs.peekMany(2)
await wtsqs.deleteMany(myMessageList)
wtsqs.deleteAll() ⇒ Promise
Delete ALL messages in the queue.
NOTE: Can only be called once every 60 seconds.
Kind: instance method of WTSQS
See: SQS#purgeQueue
Example
await wtsqs.deleteAll()
wtsqs.popOne([options], [sqsOptions]) ⇒ Promise.<(Message|null)>
Retrieve single message and immediately delete it.
Kind: instance method of WTSQS
Returns: Promise.<(Message|null)> - Message object or null if queue is empty.
| Param | Type | Default | Description | | --- | --- | --- | --- | | [options] | Object | | Options object. | | [options.pollWaitTime] | Integer | | Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | | [options.visibilityTimeout] | Integer | | Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | | [sqsOptions] | Object | {} | Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessage = await wtsqs.popOne()
// The message no longer exists in queue
console.log(myMessage)
// output:
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
}
wtsqs.popMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒ Promise.<Array.<Message>>
Retrieve batch of messages and immediately delete them.
Kind: instance method of WTSQS
Returns: Promise.<Array.<Message>> - Array of retrieved messages.
| Param | Type | Default | Description | | --- | --- | --- | --- | | [maxNumberOfMessages] | Number | 10 | Maximum number of messages to retrieve. Must be between 1 and 10. | | [options] | Object | | Options object. | | [options.pollWaitTime] | Integer | | Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | | [options.visibilityTimeout] | Integer | | Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | | [sqsOptions] | Object | {} | Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessageList = await wtsqs.popMany(2)
// Messages no longer exist in queue
console.log(myMessageList)
// output:
[
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
},
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { b: 3 }
}
]
WTSQSWorker
WTSQS worker job manager.
WTSQSWorker takes care of asynchronously fetching jobs from sqs while processing other jobs concurrently. It also takes care of deleting a job from the queue after successfully processing the message.
Kind: global class
- WTSQSWorker
- new WTSQSWorker(options)
- instance
- .run(handler)
- .shutdown() ⇒ Promise
- inner
- ~runHandler ⇒ Promise
new WTSQSWorker(options)
Constructs WTSQSWorker object.
| Param | Type | Default | Description | | --- | --- | --- | --- | | options | Object | | Options object. | | options.wtsqs | WTSQS | | WTSQS instance to use for connecting to sqs. | | [options.maxConcurrency] | Integer | 20 | Maximum number of concurrent jobs. | | [options.pollWaitTime] | Integer | 5 | Duration (in seconds) for which read calls wait for a job to arrive in the queue before returning. | | [options.visibilityTimeout] | Integer | 30 | Duration (in seconds) that the received jobs are hidden from subsequent retrieve requests. | | [options.logger] | Object | String | | Object with debug, info, warn, error methods to use for logging. Or a string with log level to use default internal logger. |
Example
const { WTSQS, WTSQSWorker } = require('wtsqs')
const wtsqs = new WTSQS({
url: '//queue-url',
accessKeyId: 'AWS_ACCESS_KEY_ID',
secretAccessKey: 'AWS_SECRET_ACCESS_KEY'
})
const worker = new WTSQSWorker({ wtsqs })
worker.run(async (job) => {
await someAsyncFunction(job.body)
console.log(job)
})
worker.run(handler)
Start fetching and processing jobs.
Kind: instance method of WTSQSWorker
| Param | Type | Description | | --- | --- | --- | | handler | runHandler | Async function to process a single job. |
worker.shutdown() ⇒ Promise
Shutsdown the worker and drain active jobs.
Kind: instance method of WTSQSWorker
Returns: Promise - Resolves when all active jobs have been drained.
WTSQSWorker~runHandler ⇒ Promise
Async callback function to process single job.
Kind: inner typedef of WTSQSWorker
| Param | Type | Description | | --- | --- | --- | | job | Job | A single job to process |
Message : Object
Received SQS Message
Kind: global typedef
Properties
| Name | Type | Description | | --- | --- | --- | | id | String | Message id. | | receiptHandle | String | Message receipt handle. | | md5 | String | Message body md5 hash sum. | | body | Object | Message body containing original payload. |
Job : Object
Worker Job
Kind: global typedef
Properties
| Name | Type | Description | | --- | --- | --- | | id | String | Job id. | | receiptHandle | String | Job receipt handle. | | md5 | String | Job body md5 hash sum. | | body | Object | Job body containing original payload. |