message-subscriber
v1.0.55
Published
Subscribe to messages at any Cloud Service. Receive messages only by registering a EventEmitter Listener. (AWS SQS, Azure ESQ, Google Tasks)
Downloads
475
Maintainers
Readme
message-subscriber
Async message subscriber for receiving messages from queueing services available in cloud services. Gives the option to implement your own MessageAdapter to plug new services and use the core message-subscriber. Can handle hundreds of parallel processing using Node async nature, the messages are delivered by subscribing to an Event dispatched from Node EventEmitter.
Installation
npm install --save message-subscriber
MessageSubscriber
The MessageSubscriber is the main interface to manage and receive the messages from the queueing service.
const messageSubscriber = new MessageSubscriber({
messageAdapter: sqsAdapter,
parallelism: 100,
refreshInterval: 10
});
messageAdapter
- required, message adapter that will be called to send de commands to the cloud queue service (E.g.: SQSAdapter)parallelism
- required the number of parallel messages that you will receive to processrefreshInterval
- optional, when you receive one message from the queue service it becomes unavailable for a period of time, using the refresh interval the MessageSubscriber will delay the message using the interval passed (in seconds) (E.g.: Using aws sqs when you receive a message it becomes invisible for 30 seconds, if your processing takes more than 30 seconds, the message will become available and you can have duplicity, with refreshInterval the code will call delay on the message from time to time and the message will not become available when processing).
Events
The MessageSubscriber emits the following events:
message
: Comes with the message received- Message:
id: string
- The id of the messagereceipt?: string
- The receipt of the message (Generally used to delete)payload: any
- The payload of the messageattributes?: any
- The attributes of the messagereceivedTimestamp: number
- The timestamp that the message was receiveddelete: async function
- The function to delete messagedelay: async function
- The function to delay message
- Message:
empty
: When the queue is empty this event is emittederror
: If any operation errors this event will be dispatched with the errordrained
: When gracefulShutdown() is called the code will wait all the messages that are queued to be processed, when all are processed this event is calledpaused
: When pause() is called this event is emittedresumed
: When resume() is called this event is emittedstoped
: `When stop() is called this event is emitter. (Note: when the queue is stoped it CAN NOT start again, this DO NOT wait for the queued messages to be processed)
MessageAdapters
The message adapters are interfaces to communicate with the queueing services at the cloud. You can use the MessageAdapter Interface to create your own adapter and plug at the MessageSubscriber to use the queueing service that you need.
SQSAdapter
To use this package with AWS, you need to have at least one SQS queue created in your account. You'll need the queue URL to pass as a parameter to the SQSAdapter. E.g.: https://sqs.us-east-1.amazonaws.com/000000000000/your-queue
.
Authentication to AWS can be done using any methods of setting credentials in the AWS Javascript SDK. Your credential must have the following permissions in the queues that you are going to use in this package:
sqs:ReceiveMessage
sqs:DeleteMessage
sqs:ChangeMessageVisibility
const sqsAdapter = new SQSAdapter({
queueURL: 'QUEUE_URL',
maxNumberOfMessages: 10,
sqs: {
region: 'us-east-1'
}
})
queueURL
- required, the url of the AWS SQS QueuemaxNumberOfMessages
- optional the max number of messages to receive at one SQS receiveMessage call, default: 10sqs
- required, the params to configure the aws sqs queue (these are the params that are passed to aws-sdk sqs client).
Usage
This example uses que SQSAdapter to subscribe for messages using de AWS SQS.
import { Message, MessageSubscriber, SQSAdapter } from 'message-subscriber';
const sqsAdapter = new SQSAdapter({
queueURL: 'QUEUE_URL',
maxNumberOfMessages: 10,
sqs: {
region: 'us-east-1'
}
})
const messageSubscriber = new MessageSubscriber({
messageAdapter: sqsAdapter,
parallelism: 100,
refreshInterval: 10 // This will automatically refresh the delay of the message visibility at the queue.
});
// Registering event listeners
messageSubscriber.on('message', (message: Message) => {
console.log('messageReceived', message);
console.log('queue length', messageSubscriber.length); // You can see the length of the queue that the messages are being dispatched
await message.delete(); // You can call delete directly from message
});
messageSubscriber.on('empty', () => {
console.log('queue is empty');
});
messageSubscriber.on('error', (err: Error) => {
console.log('messageSubscriber error', err);
});
// Handling process termination
const handleShutdown = async () => {
try {
await messageSubscriber.gracefulShutdown(); // This will wait for queue to process all pending messages.
} catch (err) {
console.log(err);
process.exit(1);
}
process.exit(0);
};
process.on('SIGINT', handleShutdown);
process.on('SIGTERM', handleShutdown);
// Starting message-subscriber
messageSubscriber.start();
License
MIT