librabbitmq
v0.8.0
Published
Easy to use methods implementing for common PubSub and job queue patterns with RabbitMQ.
Downloads
51
Maintainers
Readme
librabbitmq
Easy to use methods implementing common PubSub and task queue patterns with RabbitMQ.
Configuration
The plugin supports the following configuration (defaults shown):
const config = {
url: 'amqp://localhost',
preserveChannels: true,
connection: {
socket: {},
tuning: {},
retry: {
retries: 0,
factor: 2,
minTimeout: 1000,
maxTimeout: Infinity,
randomize: false
},
useExisting: false
},
retryQueue: {
suffix: '_retry',
maxCount: 10,
factor: 2,
minTimeout: 1 * 1000,
maxTimeout: 60 * 1000,
maxLength: 10000
},
doneQueue: {
suffix: '_done',
maxLength: 10000
}
};
connection.socket
options will be passed through to the underlyingamqp.connect
connection.tuning
is an object that constructs the various RabbitMQ tuning query string paramsconnection.retry
affects the connection retry attempts using the underlying retry moduleconnection.useExisting
will return an existing default connection upon invocation ofcreateConnection
, useful if you have many plugins but want to just use a single connection. Defaults tofalse
.preserveChannels
will keep around publish and push channels, minimizing request overhead, but potentially causing issues, though none I've been able to replicateretryQueue
settings for the retry queue, supporting limits and exponential backoffdoneQueue
can write finished tasks to a final queue. Defaults tofalse
, because it seems like an odd pattern. You're probably better off writing to your own db.
Additionally, all of the exposed methods take options that get passed to the underlying amqplib
calls.
Usage
Generally speaking, you only need to create a connection once, and that will be reused for all of your channel creation. You do have the option of creating multiple connections and passing those to the methods that create channels, if you need greater control.
Below are the easiest examples.
PubSub
import * as rmq from 'librabbitmq';
const subscriber = function (message) {
return new Promise(() => {
console.log(' [x] Received \'%s\'', message.payload);
});
};
const testIt = async function () {
await rmq.createConnection({
url: 'amqp://localhost'
});
await rmq.addSubscriber({
exchange: 'pubsub',
subscriber
});
return rmq.publishMessage({
exchange: 'pubsub',
topic: 'request',
payload: 'a message!'
});
}
testIt();
Task queue
import * as rmq from 'librabbitmq';
const {ACK} = rmq.constants;
const worker = function (task) {
const secs = 10;
console.log(' [x] Received payload id: %s', task.properties.correlationId);
console.log(' [x] Task takes %d seconds', secs);
return new Promise(resolve => {
setTimeout(() => {
console.log(' [x] Done');
console.log(task.payload);
resolve({code: ACK});
}, secs * 1000);
});
};
const testIt = async function () {
await rmq.createConnection({
url: 'amqp://localhost'
});
await rmq.addWorker({
queue: 'work',
worker
});
return rmq.pushTask({
queue: 'work',
type: 'foo',
correlationId: '123'
payload: request.payload
});
}
testIt();
Retry
This module implements a retry queue with exponential backoff and is enabled by default for work queues. I've found this to be very useful in my projects, and is perhaps the best justification for using this library over amqplib directly.
Implementations
Requirements
- node.js >= 6.0
- RabbitMQ 3.6.11 (only version tested)
TODO
- Handle queue assertion failures
- Add real tests
- Add fanout pattern
- Add RPC pattern