@sinet/lapin
v4.7.10
Published
RPC library for AMQP protocol
Downloads
2
Readme
Lapin wrapper for RabbitMQ
Currently this project is using Rabbus and Wascally. This project is aiming to support several producer / consumer patterns. The following are is a list of the planned patterns, and the checked ones are currently implemented:
- [X] Send / Receive
- [X] Publish / Subscribe
- [X] Request / Response
The JSend specification is required to determine if an error has occurred in a response.
Installation and Usage
As lapin uses wascally you need to install it along with lapin:
npm install wascally
npm install lapin
Require lapin and wascally:
var rabbit = require( 'wascally' );
var lapin = require( 'lapin' )( rabbit );
// or
var options = {
'logger' : logger,
'rabbit' : wascally
};
var lapin = require( 'lapin' )( options )
The following are simple usage examples:
Send / Receive
Sender Options
exchange, messageType, routingKey, autoDelete
Please refer to Rabbus options' info
Sender
options = 'v1.logs.log';
// or
options = {
'messageType' : 'v1.logs.log',
'exchange' : 'logs'
}
lapin.send( options , message, function ( error, response ) {
// handling the response is optional
if ( !error ) {
console.log( response );
}
} );
Or use the promise style send
lapin.sendPromise( 'v1.logs.log', message )
.then( function ( response ) {
// Return for chain then and handle response
console.log( response );
} )
.catch( function ( error ) {
// Handler error
} );
Receiver Options
queue, exchange, messageType, autoDelete, limit, noBatch
Receiver
options = 'v1.logs.log';
// or
options = {
'messageType' : 'v1.logs.log',
'exchange' : logs
}
lapin.receive( options, function ( message, done ) {
someDatabaseQuery( message, function ( err, body ) {
if ( err ) {
throw err;
}
done();
} );
} );
Publish / Subscribe
Publisher Options
exchange, messageType, autoDelete
Publisher
options = 'v1.users.login';
// or
options = {
'messageType' : 'v1.users.login',
'exchange' : 'users' // recommended not to prefix or suffix `exchange` lapin will do it for us
}
lapin.publish( options, message, function ( error, response ) {
// handling the response is optional
if ( !error ) {
console.log( response );
}
} );
Subscriber Options
queue, exchange, messageType, autoDelete, limit, noBatch
Subscriber
options = 'v1.users.login';
// or
options = {
'messageType' : 'v1.users.login',
'queue' : 'users' // recommended not to put `queue` suffix or prefix, lapin will do it for you
'exchange' : 'users'
}
lapin.subscribe( options, function ( message, done ) {
someDatabaseQuery( message, function ( err, body ) {
if ( err ) {
throw err;
}
done();
} );
} );
Request / Response
Request Options
exchange, messageType, autoDelete, routingKey, forceAck
Requester
options = 'v1.users.findAll'
// or
options = {
'messageType' : 'v1.users.findAll',
'exchange' : 'users'
}
lapin.request( options, message, function ( error, data ) {
if ( error ) {
return reply( error ).code( 500 );
}
return reply( data.data );
} );
Or use the promise style request
lapin.requestPromise( 'v1.users.findAll', message )
.then( function ( data ) {
// Handle data
return reply( data.data );
} )
.catch( function ( error ) {
// Handle error
} );
Responder Options
exchange, queue, autoDelete, routingKey, limit, noBatch
Responder
options = 'v1.users.findAll';
// or
options = {
'messageType' : 'v1.users.findAll',
'limit' : 1
}
lapin.respond( options, function ( message, respond ) {
if ( message.invalid ) {
return respond.fail( 'Invalid data' );
}
someDatabaseQuery().then( function ( result ) {
// JSend success with data
respond.success( result );
} ).catch( function handleError ( error ) {
// JSend error
respond.error( 'Failed query', error, 500 );
// or -- code is optional
respond.error( 'Failed query', error );
// or -- data is optional
respond.error( 'Failed query' );
} );
} );
Please refer to JSEND for standard reply attributes
Response with Validation using Joi
// Responder
lapin.respond( {
'messageType' : 'v1.users.findAll',
'validate' : Joi.object().keys( {
'username' : Joi.string().alphanum().min( 3 ).max( 30 ).required(),
'password' : Joi.string().regex( /[a-zA-Z0-9]{3,30}/ ),
'access_token' : [ Joi.string(), Joi.number() ],
'birthyear' : Joi.number().integer().min( 1900 ).max( 2013 ),
'email' : Joi.string().email()
} ).with( 'username', 'birthyear' ).without( 'password', 'access_token' ),
'validateOptions' : {} // <optional> see https://github.com/hapijs/joi for validation options
} , function ( message, respond ) {
// consumer process
} );
If validation fails, lapin will bypass respond callback and response a fail status as seen below:
respond( {
'status' : 'fail',
'data' : <Validation error message>
} );
Please refer to Joi Validation for validation examples, structure and validation options
To Consider
Make sure to use the same messageType, routingKey and exchange options.
Whenever a String
option is supplied instead of the Object
option, lapin will automatically create the ff:
- exchange and messageType ( Producer )
- exchange, messageType and queue ( Consumer )
Contributing
All pull requests must follow coding conventions and standards.
Additional Information
RPC over RabbitMQ
In general, doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response the client needs to send a 'callback' queue address with the request.
- When the client starts up, it creates an exclusive callback queue.
- For an RPC request, the Client sends a message with two required properties:
reply_to
, which is set to the callback queue andcorrelation_id
, which is set to a unique value for every request. - The request is sent to an
rpc_queue
queue. - The RPC worker (aka: server) is waiting for requests on that queue. When a message appears, it does the job and sends a message with the result back to the Client, using the queue from the
reply_to
field. - The client waits for data on the callback queue. When a message appears, it checks the
correlation_id
property. If it matches the value from the request it returns the response to the application.
Standards/Conventions
messageType:
<version>
.<resource>
.<action>
exchange:
<pattern>
.<resource>
-exchangequeue:
<pattern>
.-queue
Where
Patterns:
req-res
pub-sub
send-rec
Version:
v1
v2
and so on.