rab-q
v2.1.0
Published
A tiny (opinionated) wrapper over amqplib for RabbitMQ publish/subscribe pattern
Downloads
22
Readme
Table of Contents
Features
- minimal dependencies: only
amqplib
anduuid
are needed - promises-based: async/await support makes it easy to use
- small api: 4 methods only, there's not much to learn :wink:
- instantiation-able: the library itself is a Class ; get as many independent instance as needed
- events logging: choose your own log transport
- disconnections-proof: auto reconnect/resend when errors happen
Our use case at @RadioFrance
This wrapper was tailored made for our microservices architecture.
We use RabbitMQ as the bus in the publish-subscribe pattern. A message is never sent directly to a queue, it is always sent to a configured exchange.
Every message should be acknowledged by returning a value : ACK
, NACK
, REJECT
(message.ACK
, message.NACK
or message.REJECT
in the message object).
A non-acknowledged message will be redelivered one time to the exchange. A rejected message will be directly deleted or redirected to the dead letter exchange if configured.
Our queues and exchanges are defined by a external engine. rab-q
doesn't provide API to create, assert or delete these.
Installation
$ npm install rab-q
Usage
const RabQ = require('rab-q');
const rabQ = new RabQ({
exchange: 'your_topic_exchange',
queues: 'queue_bind_to_exchange'
});
rabQ.on('error', err => {
console.error(err);
});
rabQ.on('log', log => {
console[log.level](log.msg, log.err || '');
});
rabQ.subscribesTo(/.*/, message => {
// Do stuff!
// Then acknowledge message by returning a constant
return message.ACK;
});
rabQ.start()
.then(() => {
rabQ.publish('yourRoutingKeyHere', {your: 'message'});
});
API
new RabQ(options)
options
username
Type: string
Default: guest
User to connect on RabbitMQ.
password
Type: string
Default: guest
Password associate to username.
protocol
Type: amqp|amqps
Default: amqp
Connection protocol.
hostname
Type: string
Default: localhost
Hostname to trying to get connection.
port
Type: number
Default: 5672
Port of connection.
socketOptions
Type: object
Default: null
socketOptions for amqplib
https://amqp-node.github.io/amqplib/channel_api.html#connect
vhost
Type: string
Default: /
Selected virtual host.
exchange
Required
Type: string
Exchange to publish messages.
queues
Type: Array<string>
string
Consumes messages from these queues.
If this option is omitted or has a falsey value, server will create random names.
maxMessages
Type: number
Default: 10
Defines the number of messages prefetched by channel. Once the max number of messages is reached, RabbitMQ will wait for some messages to be acknowledged before proceeding with new messages.
nackDelay
Type: number
Default: 0
Defines a delay in milliseconds before a message is rejected with NACK.
reconnectInterval
Type: number
(milliseconds)
Default: 0
Time in milliseconds before trying to reconnect when connection is lost.
autoAck
Type: boolean
Default: false
Enables auto acknowledgment.
autoReconnect
Type: boolean
Default: true
Enables auto reconnection if an error happens while connecting to the server.
validators
Type: Object of function
validators.consumer
Type: boolean
Default: return true
Function run before each message treatment. If it return a false value, the message is reject.
beforeHook
Type: function
Default: () => {}
Parameters: message
(see below)
Function run before each message treatment, can modify message.
afterHook
Type: function
Default: () => {}
Parameters: message
(see below), subscriberResult
(ACK/NACK/REJECT)
Function run after each message treatment.
prePublish
Type: function
Default: null
Parameters: routingKey
, content
, properties
(see publish method)
Must return an object with {routingKey, content, properties}
Function run before each publish call
rabQ.start()
Starts a connection.
Returns a promise resolved with true
for a successful connection or false
if a connection already exists.
rabQ.stop()
Stops and closes a connection.
rabQ.publish(routingKey, content, properties)
Publishes a message on exchange rabQ.exchange
.
routingKey
Required
Type: string
A regular expression to match the routing keys of consumed messages.
content
Required
Type: Object
A message object to send to the exchange.
properties
Type: Object
Default: {}
Adds RabbitMQ properties to the message. See http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish (options)
If properties does not have a headers
key, properties
is considered to be headers
.
You can provide a property x-query-token
in headers
to trace the lifecyle of a request. If not provided a new UUID will be generated.
rabQ.subscribesTo(patternMatch, action)
Adds a Subscriber on consumed messages filtered by the routing key
patternMatch
Required
Type: RegExp
A regular expression to match on the routing keys of consumed messages.
action(message)
Required
Type: Function
A function should acknowledge (or not) messages by returning either a value or a resolved promise with value. ('ACK'
, 'NACK'
or 'REJECT'
)
message
is a object with following properties:
- ACK
string
: the constant returned for a positive acknowledgment - NACK
string
: the constant returned for a negative acknowledgment. NACK will re-queuing message one time. - REJECT
string
: the constant returned for a rejection without redelivery. - content
Object
: content of the received message - rk
string
: routing key of the message - queue
string
: queue name where the message is consumed - token
string
: a UUID to identify the message - consumeAt
number
: timestamp when the message is consumed
Events
'log'
Emits a log object with the following properties:
- level
string
: can beinfo
,warn
,error
- token
string
: the UUID to identify the message - msg
string
: the message, - err
Object
: the original error if log level is error
'error'
Emits an Error if something goes wrong with the connection. If you don't catch this event the process will sto