Extend amqp-connection-manager to support RPC.
Extend amqp-connection-manager connection management for amqplib to support Remote procedure call (RPC).
- Time to live for RPC requests.
- Exceptions transmitted from RPC server to RPC client.
- Simple async function API design
npm install --save amqplib amqp-connection-manager amqp-connection-manager-rpc
The basic idea described at rabbitmq. To manage responses from an RPC server, node-cache is used.
Here's the RPC client example:
var amqp = require('amqp-connection-manager-rpc');
// Create a new connection manager
var connection = amqp.connect(['amqp://localhost'], {json: true});
// Setup a channel for RPC requests.
const ttl = 60; // Time to live for RPC request (seconds). 0 - infinite
var channelWrapper = connection.createRPCClient('RPC-QUEUE-test', ttl);
// Send some request to RPC server and receive reply. Exception can occupied!
let req = { a: 1, b: 2}; //request data
let prc_reply = await channelWrapper.sendRPC(req);
console.log("RPC reply: ", prc_reply);
} catch (err) {
console.log("RPC error: ", err);
Here's the RPC server example:
var amqp = require('amqp-connection-manager-rpc');
// Create a new connection manager
var connection = amqp.connect(['amqp://localhost'], {json: true});
// Set up a channel for RPC requests.
var channelWrapper = connection.createRPCServer('RPC-QUEUE-test', doRpcJob);
//do RPC job
async function doRpcJob(msgJson, msg) {
if (!msgJson.b) throw new Error('B is not set'); //Exceptions allowed! Will be send to RPC client.
let reply = {
a: msgJson.a ? msgJson.a + 1 : null
return reply;
See a complete example in the examples folder.
See amqp-connection-manager API.
AmqpConnectionManager#createRPCClient(queue_name[, ttl [, setup]])
Create a new RPC client ChannelWrapper.
- Name of queue for RPC request.ttl
- time to live for RPC request (seconds). To infinite set to 0. If not defined used 0.setup
- async function(channel) for setup queue and exchange. Must return RPC queue. Default: async function (channel) => { return await channel.assertQueue('', { exclusive: true }) };
Returns ChannelWrapper
AmqpConnectionManager#createRPCServer(queue_name, callback[, options] )
Create a new RPC server ChannelWrapper.
- Name of queue for RPC request.callback
- A callback function, which returns a Promise. This should return RPC server json reply. Callback function has two argument: json message from RPC client, full message from RPC client.
- if true errors stack will be send to client. Default - false.options.setup
- async function(channel) for setup channel, exchange. Must return RPC queue name. Default: async function (channel) => { channel.prefetch(1); await channel.assertQueue(queue_name, { durable: false }); return queue_name; };
Returns ChannelWrapper
ChannelWrapper#sendRPC(msg [,ttl [, exchangeName [, routingKey]]])
Send RPC request to RPC server. Call it on client only.
- request Object to RPC server.ttl
- time to live for RPC request (seconds). To infinite set to 0. If not defined used value from createRPCClient().exchangeName
- name of exchange for RPC request.routingKey
- routing key for RPC request.
Returns Object with RPC job reply or Exception
Fork it!
Pull requests, issues, and feedback are welcome.