amqplib-binary-retry
v1.0.2
Published
Retry failed attempts to consume a message, with increasing delays between each attempt. For the delay use binary delay message arhitecture.
Downloads
106
Maintainers
Readme
amqplib-binary-retry
Retry failed attempts to consume a message, with increasing delays between each attempt. For the delay use binary delay message arhitecture.
The problem with a simple wait exchange and Queue Pattern.
The common pattern for the delay is that you set up a wait exchange and queue where you send messages for retries. You set a message TTL and set the dead letter exchange of your wait queue to your principal exchange. So if you set the TTL on your message to 5 minutes, then the message sits in the wait queue for 5 minutes then gets dead lettered back to your applications exchange to be consumed again.
The problem is because messages are only removed from the head of the queue. You cannot use a single wait queue for any back-off strategy. If you have a message with a TTL of 10 minutes at the head of the queue and a message with a TTL of 1 minute behind it, the second message will wait for ten minutes.
How this lib solves that
Create multiple wait queues and set a message TTL on the queues themselves. Bind those queues to the cooresponding exchanges on the each layer.
By default we create 16 delay layers. Each layer delays the message for the 2^<level>
seconds. Then we set custom headers on the each retryed message to know in which layers the messages should be delayed. Last delay layer forwards the message to the 'ready' queue. Then that message is republished with original headers to the original consumer queue.
Instalation
npm install amqplib-binary-retry
Example
(async () => {
const amqplib = require('amqplib');
const { retryer } = require('amqplib-binary-retry');
const CONSUMER_QUEUE = "example-service";
const connection = await amqplib.connect('amqp://localhost:5672');
const channel = await connection.createChannel();
// Create the client queue
await channel.assertQueue(CONSUMER_QUEUE, { durable: false, autoDelete: true });
// Define a message handler
const handler = function (msg) {
// no need to 'ack' or 'nack' messages
// messages that generate an exception (or a rejected Promise) will be retried
console.log(msg);
};
// Define the optional delayFunction
// const delayFunction = function (attempt) {
// // After three retries fail the message
// if (attempt > 3) {
// return -1;
// }
//
// // Delay for 5 seconds
// return 5000;
// };
// Use retryer as a consumer
// For more configuration options check the 'Options' section
channel.consume(CONSUMER_QUEUE, retryer({
channel,
consumerQueue: CONSUMER_QUEUE,
handler,
// delayFunction,
}));
})();
Options
| Parameter Name | Required | Default | Description |
|-|-|-|-|
| channel | X | | Amqplib channel |
| consumerQueue | X | | Name of the queue that holds the amqp messages that need to be processed. |
| handler | X | | Callback to be invoked with each message. |
| failureQueue | | <consumerQueue>.failure
| Name of the queue that holds the amqp messages that could not be processed in spite of the retries. |
| delayFunction | | Math.pow(2, <# of attempts>)
| Delay in milliseconds between retries. The function accepts the number of retry attempts. |
| exchangePrefixName | | delay_exchange_<level>
| Name of the exchanges used on the delay levels. |
| queuePrefixName | | delay_queue_<level>
| Name of the queues used on the delay levels. |
| delayLevels | | 16 | Number of delay layers used. Max avaliable seconds for the delay is 2^<levels>
|
Testing
Setup the required services.
docker-compose up
Transpile the typescript source.
npm run build
Run tests.
npm run test
License
MIT