amqplib-r
v2.0.3
Published
Auto-reconnect module for Node AMQP Client on Connections and Channels.
Downloads
391
Maintainers
Readme
amqplib-r : Auto-reconnecting the Connections and Channels on failiure
Intoduction
This library is a wrapper over existing amqplib enabling auto-reconnecting mechanism for Connection
and Channels
Official Documentation and Examples
For latest documentation and examples, please visit official docs.
Below is a quick samples to get started with.
Quick Start Examples
Note
You may notice that this library just provides you connections, channels and attach worker/consumer/subscriber
(actually any JS function accepting amqp channel
) and take care of reattachment of same workers automatically
in case of connection failures.
The rest of code uses official amqplib APIs. So not required to change your existing code base too much. Just fit your model in the way we can handle connection, channels and workers for you.
Config
process.env.AMQPUrl = 'amqp://amqp_user:H@rd_t0_gue$$@localhost:5672';
Only Producer/Publisher
const amqp_r = require('amqplib-r');
const sendHello = (queueName, data) => {
// Create new Channel over Single Connection
amqp_r.newChannel((err, channel) => {
if(err)
console.error("[APP] Channel error", err);
else {
const payload = data ? ( typeof data === 'object' ? JSON.stringify(data) : data) : "";
channel.sendToQueue(queueName, Buffer.from(payload)); // amqplib -> sendToQueue()
console.log('[APP] Message Sent');
channel.close(); // close channel once done.
}
});
}
sendHello('QUEUE_NAME', 'This is not my first message.');
Only Consumer/Worker/Subscriber
process.env.AMQPUrl = 'amqp://amqp_user:H@rd_t0_gue$$@localhost:5672';
const amqp_r = require('amqplib-r');
const consumeMessage = (channel) => {
// get new channel over Single Connection
const queueName = 'QUEUE_NAME';
channel.prefetch(10); // max 10 messages can be unacked
channel.checkQueue(queueName, (err, ok) => {
if(err)
console.error("[APP] Error on queue check.. Queue don't exist.");
if(ok) {
channel.consume(queueName, (msg) => {
try {
console.log('[APP] Received', msg.content.toString());
console.log('[APP] Proccessed Successfully, Sending `ack`')
channel.ack(msg);
} catch(messageProcessingError){
console.error('[APP] Consumed message cannot be processed, Sending `nack` and requeued');
channel.nack(msg);
}
}, { noAck: false });
console.log("[APP] Consumer added successfully for ", queueName);
}
});
// do not close channel for consumers/workers as the worker will be disconnected.
}
const allMyConsumers = [ consumeMessage ]; // add all your consumer functions in an array
amqp_r.attach(allMyConsumers); // pass to our wrapper, we'll take care of connection failures and reattachment
Full Example with Publisher + Subscriber (Suitable for Copy-Paste and play)
const amqp_r = require('amqplib-r');
const sendHello = (queueName, data) => {
// Create new Channel over Single Connection
amqp_r.newChannel((err, channel) => {
if(err)
console.error("[APP] Channel error", err);
else {
const payload = data ? ( typeof data === 'object' ? JSON.stringify(data) : data) : "";
channel.sendToQueue(queueName, Buffer.from(payload)); // amqplib -> sendToQueue()
console.log('[APP] Message Sent');
channel.close(); // close channel once done.
}
});
}
const consumeMessage = (channel) => {
// get new channel over Single Connection
const queueName = 'QUEUE_NAME';
channel.prefetch(10); // max 10 messages can be unacked
channel.checkQueue(queueName, (err, ok) => {
if(err)
console.error("[APP] Error on queue check.. Queue don't exist.");
if(ok) {
channel.consume(queueName, (msg) => {
try {
console.log('[APP] Received', msg.content.toString());
console.log('[APP] Proccessed Successfully, Sending `ack`')
channel.ack(msg);
} catch(messageProcessingError){
console.error('[APP] Consumed message cannot be processed, Sending `nack` and requeued');
channel.nack(msg);
}
}, { noAck: false });
console.log("[APP] Consumer added successfully for ", queueName);
}
});
// do not close channel for consumers/workers as the worker will be disconnected.
}
sendHello('QUEUE_NAME', 'This is not my first message.');
const allMyConsumers = [ consumeMessage ]; // add all your consumer functions in an array
amqp_r.attach(allMyConsumers); // pass to our wrapper, we'll take care of connection failures and reattachment
History (skip if you are in hurry)
This library is published as a solution to a problem (probably common to all) being faced in one of my pet-projects where I was in need of the below functionality while using RabbitMQ's
nodejs client amqplib
.
- The application must recover (retry) connection automatically
- The application must recover (retry) channels automatically
- The consumers (if any, are there) must be attached on recovery automatically.
When I was looking for the possible solutions, I found that there are many implementation of AMQP in various languages, where it supports auto-recovery/reconnections as part of implementation (like the Java client of AMQP has this auto-recovery by default) but not supported in NodeJS client by default. So I created a wrapper over existing library and used in many projects and found very handy and worth using wrapper than the direct client. So far, it's working like a charm for me (your use-case may differ).
Best-practice (skip if you are in hurry)
As a best-practice, below are recommended.
- One process/app should have only ONE (
physical
) connection between each client and the server. - One (
physical
) connection should be longed-lived (active for the longest time) as creating connection and closing multiple times can put significant load on the Server and some delay in Client as well. - One (
physical
) connection should be used to create multiple channels (akavirtual connections
) - Channels (
Virtual Connections
) can beopened
andclosed
more frequently than (physical
) connections, if ever required, wihout much load. - One thread should use only one Channel (
virtual connection
) and should not be shared within mutiple threads as many implementations are not thread-safe for Channels. - Publishers/Producers should use a separate channel. Each Subscribers/Consumers should use separate channel.
The wrapper by-default supports all above recommendations.
Since node is single-threaded VM, here each
Consumer Function
will behave as asynchronous process for consuming the message, we will use separate channel per consumer. Cluster of node app is also safe using this wrapper.
Assumptions (skip if you are in hurry)
The original
amqplib
will be present in node_modules or dependency, it is neither shipped nor downloaded with this module. Ideally, your package.json should be repsponsible to download theamqplib
module of the version you want.I assume, you might want to re-register automatically the consumers when the connection and/or the channel is recovered from failure or disconnects. For the very same reason, I have exposed a function which accpets array of consumer functions. Whenever the error is recovered, it will call each function supplied in the array so that you can have your custom logic for attaching consumer written else-where but still re-registered always.
If you do not have any consumer available (i.e. only producers) then you just don't need to pass anything or pass empty array (
[]
). Simple.The connection url will be accepted by environment variable
process.env.AMQPUrl
only.
Missing Feature (expect coming in near future)
Currently it supports connection using only
url based
connection fromenv
variable and not with complex options. I'm working on it's support at my end.A version compatibility list with
amqplib
is not available at present, but expect it to be available in future documentations.This is enterly an open source project but the source code is not made public yet because of vendor selection. Soon it will be launched either on github or gitab.
Any new feature request will be allowed on github/gitlab issues once repo vendor is finalized and marked open.
Contributing Guidelines
- PRs from anyone will be considered only if code quality is maintained and self-documented.
- Try writing least documented code, the code must be self-explanatory in most cases.
- Try to adher with code structure and folder structure as much as possible.
- A new release will be made available as soon as we accept 4 PRs minimum.
- No timelined/scheduled relases will be possible.
- Pathces or Security/Performance leaks will be released as soon as fixed without waiting for minimum PR count.
- Based on the feature weightage, releases can be preponed or postponed.
- Wanted features will be chosen on issue vote counts and it's technical feasibilty.
- Any new ideas for integrtion of any other/third party library/software is also welcomed as long as it does not violate any above rules.
License
ISC - Do Whatever You Want (DWYW) License.
***Its less ISC but more DWYW license.
Happy. Free. Code.