akx-mq
v0.1.1
Published
This is a small library to handle rabbitmq messages. Mostly a wrapper.
Downloads
2
Readme
AkxMQ
================================ A wrapper around '''amqplib''' to handle retries, adding bulk consumers, and connect style middleware.
#Installation
npm install akx-mq --save
Usage
file: akx-mq-config.js
var mqConfig = require( './config.json' );
var akxMq = require( 'akx-mq' )( mqConfig );
var logger = require( './logger' );
akxMq.addErrorHandling( function( err ){
logger.log( 'mq', err );
} );
exports.akxMqMiddleware = function(){
var getMessage = function( req, res ){
return res._body.wire.dataValues;
};
var callback = function( err, req, res, next ){
if( error ){
logger.log( 'mq', err, { req: req } )
}
return next();
};
return akxMq.publishMiddleware( { queue: 'wiresQ', getMessage: getMessage, callback: callback } );
};
exports.akxMqAddConsumer = akxMq.addConsumer;
Then in your routes file call the akx-mq-config.js file and create publisherMiddleware to publish messages to the queues, and add consumers to consume messages for they're respected queues.
file: routes.js
var akxMqConfig = require( './akx-mq-config' );
var users = require('../controllers/users');
server.post('/users', authMiddleware, users.create, akxMqConfig.akxMqMiddleware() );
akxMqConfig.akxMqAddConsumer( {
queueNameHere: [ fn1, fn2, fn3, ... ],
anotherQueueName: [ fn1, fn4, ... ]
} );
Note: The addConsumer function uses the keys as the queue names here so make sure that they are in the config file
before adding them here. Also the array of functions as the value will be called in order synchronously (connect style middleware).
Note that the publish and consumer share the same connection and channel, this is by design. The above example calls
akxMqConfig.akxMqMiddleware()
function returns back the middleware that will be used. Below are the api references and options.
API Reference
#publishMiddleware
instance.publishMiddleware({[queue, [getMessage, [callback]]]})
Returns a middleware function with the regular req, res, and next arguments.
Takes a POJO with three properties:
- queue: String name of the queue you want to publish to.
- getMessage: Function that are given the req, res objects to extract the message.
- callback: Function that are given the req, res, and next arguments if an error occurs you can log.
example: instance.publishMiddleware({queue:'users', getMessage: getMessageFunc, callback: callbackFunc})
Note: All three are required.
#addConsumer
instance.addConsumer({[queue, [array of functions]]})
Adds consumers to the respected queue.
Takes a POJO. Uses the keys as the queue name and the values as the consumer.
- queue
{queueName:[ fn1, fn2, ...]}
example: instance.addConsume({queueName:[ fn1, fn2, ...]})
Note: that functions will be called in order with the arguments: queueName, parsedMsg, next. The next callback is similar
to connect style but you can pass in next( true ) which will raise a flag that will ignore the rest of the functions and
call the last function in the list. Else you can just call next() and it will call them in order one at a time. There is no
limit to how many consumers you can add. Also the message wont be acknowledge until the last function has finished.
#addErrorHandling
instance.addErrorHandling([function])
Adds a catch all error handling function
Takes a function to be the general error handling function.
- function: Just a regular function, named or anonymous.
example: instance.addErrorHandling(function(){ logger.log('message')})
Note: This is optional.
Options
| Property | DataType | Default | Description | |----------|----------|---------|-------------| | retry | Number | 60000 | How many milliseconds before retrying to connect to the server | | host | String | 'amqp://localhost' | The host for the server | |persistent| Boolean | false | If you want the message to persist if server goes down | | prefetch | Number | 0 | How many unAcked messages you want to allow before sending more down to the consumer | | noAck | Boolean | false | If you want no Acknowledge meant | | queues | Array | [ ] | An array of objects with 'name', and 'durable' properties |