simple-amqplib-rpc
v1.0.3
Published
simple amqp rpc interface
Downloads
1
Readme
Simple amqplib RPC
Simple RPC interface for amqplib
Installation
This module is installed via npm:
$ npm install simple-amqplib-rpc
Example Usage
Client side:
const amqplib = require('amqplib');
const { request } = require('simple-amqplib-rpc');
const config = {
url: 'amqp://guest:[email protected]:5672//',
exchange: 'exchange',
routingKey: 'sum'
};
const connection = await amqplib.connect(config.url);
try {
const content = [ 1, 2, 3 ];
const opts = { exchange: config.exchange, timeout: 5000 };
const resp = await request(connection, config.routingKey, content, opts);
// resp = 6
} catch (err) {
switch (err.name) {
case 'ChannelClosedError': // the connection was closed unexpectedly.
case 'NoRouteError': // the specified routing key goes nowhere (server needs to bindQueue).
case 'ResponseError': // the request returned an error.
case 'TimeoutError': // the request took more than 5 seconds.
}
}
Server side:
const amqplib = require('amqplib');
const { checkReplyQueue, error, reply } = require('simple-amqplib-rpc');
const config = {
url: 'amqp://guest:[email protected]:5672//',
exchange: 'exchange',
queue: 'sum',
routingKey: 'sum'
};
const connection = await amqplib.connect(config.url);
const consumeChannel = await connection.createChannel();
await consumeChannel.assertQueue(config.queue);
await consumeChannel.bindQueue(config.queue, config.exchange, config.routingKey);
const publishChannel = await connection.createChannel();
consumeChannel.consume(config.queue, async message => {
if (!await checkReplyQueue(connection, message)) { // the consumer doesn't exist anymore
return consumeChannel.reject(message, false); // reject and don't requeue
}
try {
const numbers = JSON.parse(message.content);
const sum = numbers.reduce((acc, n) => acc + n, 0);
reply(consumeChannel, message, sum, publishChannel);
} catch (err) {
// if something went wrong, return an error to the client
error(consumeChannel, message, err, publishChannel);
}
});
API
checkReplyQueue(connection, message) ⇒ boolean
Check if a "reply to" queue exists or not. Will create a separate channel so that it doesn't kill an existing one if the queue check fails.
Kind: global function
Returns: boolean - whether the reply queue exists or not
| Param | Type | Description | | --- | --- | --- | | connection | amqplibConnection | amqplib connection | | message | object | incomming message |
error(channel, message, error, publisherChannel)
Reply to an rpc request with an error. Will automatically nack and not requeue the message after the error response has been sent.
Kind: global function
| Param | Type | Description |
| --- | --- | --- |
| channel | AmqplibChannel | the amqplib channel on which the message was received |
| message | object | incomming message |
| error | Error | an error object. { message, code }
will be returned to the client. |
| publisherChannel | AmqplibChannel | optional separate channel to publish response on |
reply(channel, message, content, publisherChannel)
Reply to an rpc request. Will automatically ack the message after the response has been sent.
Kind: global function
| Param | Type | Description | | --- | --- | --- | | channel | AmqplibChannel | on which the message was received | | message | object | incomming message | | content | * | response message | | publisherChannel | AmqplibChannel | optional separate channel to publish response on |
request(connection, key, content, opts) ⇒ *
Make an rpc request. Each request will have its own channel.
Kind: global function Returns: * - json decoded response Throws:
- ChannelClosedError when the channel is closed
- NoRouteError if a published message has nowhere to go
- ResponseError if the request returned an error
- TimeoutError after the specified timeout period
| Param | Type | Description | | --- | --- | --- | | connection | amqplibConnection | amqplib connection | | key | string | the routing key for the rpc service | | content | * | must be json serialisable | | opts | object | | | opts.exchange | string | the amqp exchange to publish to (defaults to '') | | opts.timeout | number | optional max time to wait for a response |
Note: To regenerate this section from the jsdoc run npm run docs
and paste
the output above.
License
The BSD License
Copyright (c) 2019, Andrew Harris
All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
Neither the name of the Andrew Harris nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.