pxlt-rabbit-handler
v5.0.0
Published
A generic class that handles RabbitMQ connection, consume and produce functionality.
Downloads
11
Keywords
Readme
pxlt-rabbit-handler
Description
A generic class which handles RabbitMQ connect, consume and produce functionalities.
npm install pxlt-rabbit-handler
Overview
A library that uses amqplib and provides out-of-the-box RabbitMQ related functionality: connect, consume, publish, disconnection recovery, and thus allowing for decoupling of business logic from Rabbit-related code, by emitting events to notify clients on any Rabbit related occurrences.
The 'connect' logic creates a single connection based on parameters provided in the object initialization.
Upon connect failure or connection closed event, an exponential backoff retry mechanism will be applied to re-obtain a connection. The maximum number of retry attempts and other relevant parameters can be set in the constructor. If not provided, defaults will be used as described in Class parameters section below.
The 'consume' logic contains a creation of a channel and consume on that channel. It accepts a callback function to be executed upon every message arrival.
It is based on a user-defined prefetch value (defaults to 1). The channel created during consume defaults to confirmation mode (using createConfirmChannel). To create a channel using createChannel, set 'confirm' parameter to false when calling consume.
One channel will be created per consumed queue.
Upon a message arrival, the provided processing callback will be invoked.
Upon a connection closed event, all non-cancelled consumed queues will be automatically restored by the class if connection successfully re-established.
In case of consume failure after a reconnection, connection will be closed and 'connectionClosed' event will be emitted, so it's important to listen to this event and is recommended to exit the app.
The 'publish' method accepts, apart from the mandatory parameters of exchange, routing key and message, an optional
'options' object that complies with amqplib's documentation. In addition, a 'timeout' option can be provided - if provided, an exception will be thrown if the timeout has passed before the server sent an ack/nack.
All publish/sendMessage calls will use one channel, which will be a separate channel than any consume activity.
The 'getMessage' method supports RabbitMQ's pull api, meaning pulling single messages from a specific queue.
All getMessage calls will use once channel, same as publish/sendMessage.
The 'getConnectionState' method returns the connection state: 'active', 'closed', 'error', 'blocked'.
Class parameters
- connString (Required): string containing connection information (username, password, endpoint, virtual host).
- options (Optional): an object containing any supported rabbit options, and additional custom parameters:
- 'maxBackoffIntervalMs', the maximum interval in ms of the exponential backoff retry mechanism for the connect function. Defaults to 60000 ms if not provided
- 'backoffFactor', the multiplication factor of the base-2 exponent. Defaults to 50 if not provided
- 'maxBackoffCount', the maximum connect retry count desired. Defaults to 5 if not provided.
- 'logLevel', the log level from which logging to console is enabled. Defaults to 'info' if not provided.
Emitted events
- connectionClosed: upon failure to achieve connection after maxBackoffCount retries
Promise API example
const rabbit = require('pxlt-rabbit-handler');
const rabbitInst = new rabbit('amqp://<USERNAME>:<PASSWORD>@<CLUSTER_ENDPOINT>/<VHOST>', { heartbeat: 10, maxBackoffIntervalMs: 60000, logLevel: 'debug' });
// Consumer
// New incoming message
function processMessage1(message) {
logger.info(Buffer.from(message.message.content).toString('utf8'));
return Promise.resolve() // do business logic of msgs from queue1
.then(() => rabbitInst.ack(message))
.catch((err) => rabbitInst.nack(message));
}
function processMessage2(message) {
logger.info(Buffer.from(message.message.content).toString('utf8'));
return Promise.resolve() // do business logic of msgs from queue2
.then(() => rabbitInst.ack(message))
.catch((err) => rabbitInst.nack(message));
}
// If connection was closed and could not be re-established after maxBackoffCount or re-consume failed, exit
rabbitInst.on('connectionClosed', (err) => {
logger.error(`Failed to connect to rabbit: ${err}, Exiting...`);
process.exit(1);
});
rabbitInst.connect()
.then(() => {
const queues = [{ q: 'queueName1', cb: processMessage1 }, { q: 'queueName2', cb: processMessage2 }];
const options = { prefetch: 10 };
return Promise.all(queues.map((q) => rabbitInst.consume(q.q, q.cb, options).catch((err) => { logger.info(err); })));
})
.then(() => rabbitInst.cancelConsumer('queueName1'))
.catch((err) => {
logger.info(err);
});
// Publisher
rabbitInst.connect()
.then(() => rabbitInst.checkExchange('<EXCHANGE_NAME>'))
.then(() => rabbitInst.publish('<EXCHANGE_NAME>', '<ROUTING_KEY>', '<MESSAGE_BUFFER>', { timeout: 10000 }))
.then((res) => {
console.log(res);
return rabbitInst.gracefullyDisconnect();
})
.catch((err) => {
// error logic
});
Running Tests
Unit
npm run test
Integration
npm run test:integraion
Integration tests require an RMQ instance. There is a docker-compose.yaml that can lauch one. Alternativelly, any other instance may be used, but the needed configuration should be provided
For minikube users: PXLT_RMQ_HOST=$(minikube ip) npm run test:integration