@trusk/amqp-connector
v1.6.3
Published
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/facebook/react/blob/master/LICENSE) [![CircleCI](https://circleci.com/gh/trusk-official/amqp-connector.svg?style=svg)](https://circleci.com/gh/trusk-official/amqp-co
Downloads
813
Keywords
Readme
AMQP connector
An middle level amqp.node wrapper for every day use. It requires node.js >= 6.
Features:
- Promise based
- Compatible with direct/topic/fanout/headers exchanges and send to queue
- Built-in RPC
- Based on node-amqp-connection-manager
- Automatically reconnect when your amqplib broker dies in a fire
- Round-robin connections between multiple brokers in a cluster
- If messages are sent while the broker is unavailable, queues messages in memory until we reconnect
- Queued message are persisted to disk in case of unexpected crash/reboot, and recovered in memory
- Built-in distributed tracing
- Joi message structure validation on message reception (
listen
,subscribeToMessage
) - Provide your own transport to log every microservice message
- Automatic retry with dead-letter
- Stream over amqp (alpha)
Install
npm install @trusk/amqp-connector
Run tests
# Launch RabbitMQ with docker (guest:guest)
docker run -d -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management
# Launch tests
npm test
Usage
Connection
// see http://www.squaremobius.net/amqp.node/channel_api.html#connect
const connection = amqpconnector({
urls: ["amqp://localhost:5672"],
serviceName: "default",
serviceVersion: "1.2.3",
transport: console // or any logger instance ex https://github.com/winstonjs/winston
}).connect();
Channel
const connection = amqpconnector().connect();
// see https://github.com/trusk-official/node-amqp-connection-manager#amqpconnectionmanagercreatechanneloptions
const channel = connection.buildChannelIfNotExists({
name: "default",
json: true, // use false to work with Buffers
swap_path: path.resolve("./swap/my_channel"),
swap_size: 50000,
prefetchCount: 5,
prefetchGlobal: true,
rejectTimeout: 0 // The timeout before a message is rejected (defaults to 0)
realm: "my_realm." // scopes every exchange/queue/routing key by adding the realm as prefix
});
Direct Exchange (with message validation)
const connection = amqpconnector({
serviceVersion: "1.2.3"
}).connect();
const channel = connection.buildChannelIfNotExists({ json: true });
/**
* @callback subscribeCallback
* @param {object} message - an amqp message, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
* @param {function} invoke - a contextual invoke function
* @param {function} publishMessage - a contextual publishMessage function
* Subscribes an exchange
* @param {string} qualifier - the subscription string, see Subscribe qualifier
* @param {subscribeCallback} callback - The message handler
* @param {object} options - The subscribe options
* @param {object} options.exchange - The exchange parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange
* @param {object} options.queue - The queue parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
* @param {object} options.headers - The subscribe headers, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_bindQueue
* @param {object} options.schema - a Joi validation schema, see https://github.com/hapijs/joi/blob/v16.0.0-rc2/API.md#object---inherits-from-any
* @param {object} options.validator - a validation function. Must return an { error, value } object. Schema takes precedence over validator.
* @param {object} options.nack - a message nack arguments object
* @param {bool} options.nack.allUpTo - defaults to false, see allUpTo https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
* @param {bool} options.nack.requeue - defaults to false, see requeue https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
* @param {number} options.retry - the ttl for retrying the message on top of the queue. Will create a dead letter exchange and no consumer queue to enable it
* @param {number} options.dlPrefix - the prefix for the dead letter exchange and no consumer queue
* @param {number} options.maxTries - The number of tries when sent to deadletter (defaults to none (unlimited retries))
* @param {string} options.dumpQueue - the queue where to send the message when the maxTries is reached (defaults to none (discards if maxTries is set))
* @return {Promise<object>} A promise that resolves { consumerTag }, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
*/
channel.subscribeToMessages(
"direct/my-direct-exchange/my.routing.key/my-queue",
async ({ message }) => {
// handle message
},
{
schema: Joi.object({
content: Joi.object(),
properties: Joi.object({
headers: Joi.object({
"x-service-version": Joi.string().regex(/^1.2.\d$/) // validates every 1.2 patches
}).unknown()
}).unknown()
}).unknown()
}
);
// or use a text schema definition
channel.subscribeToMessages(
"direct/my-direct-exchange/my.routing.key/my-queue",
async ({ message }) => {
// handle message
},
{
schema: {
type: "object",
flags: {
unknown: true
},
keys: {
content: {
type: "object"
},
properties: {
type: "object",
flags: {
unknown: true
},
keys: {
headers: {
type: "object",
flags: {
unknown: true
},
keys: {
"x-service-version": {
type: "string",
rules: [
{
name: "pattern",
args: {
regex: "/^1.2.\\d$/"
}
}
]
}
}
}
}
}
}
}
}
);
// or use a validator function
// The validator function must return an { error, value } object
// schema takes precedence over validator
const schema = Joi.object({
content: Joi.object(),
properties: Joi.object({
headers: Joi.object({
"x-service-version": Joi.string().regex(/^1.2.\d$/) // validates every 1.2 patches
}).unknown()
}).unknown()
}).unknown();
channel.subscribeToMessages(
"direct/my-direct-exchange/my.routing.key/my-queue",
async ({ message }) => {
// handle message
},
{
validator: schema.validator.bind(schema)
}
);
/**
* Publish to an exchange
* @param {string} qualifier - the publish string, see Publish qualifier
* @param {object|Buffer} message - The message
* @param {object} options - The subscribe options
* @param {object} options.headers - Additional headers for the message
* @return {Promise<Bool>} A promise that resolves Bool, see http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish
*/
channel.publishMessage("direct/my-direct-exchange/my.routing.key", {
foo: "bar"
});
Topic Exchange
const connection = amqpconnector().connect();
const channel = connection.buildChannelIfNotExists({ json: true });
/**
* @callback subscribeCallback
* @param {object} message - an amqp message, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
* @param {function} invoke - a contextual invoke function
* @param {function} publishMessage - a contextual publishMessage function
* Subscribes an exchange
* @param {string} qualifier - the subscription string, see Subscribe qualifier
* @param {subscribeCallback} callback - The message handler
* @param {object} options - The subscribe options
* @param {object} options.exchange - The exchange parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange
* @param {object} options.queue - The queue parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
* @param {object} options.headers - The subscribe headers, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_bindQueue
* @param {object} options.schema - a Joi validation schema, see https://github.com/hapijs/joi/blob/v16.0.0-rc2/API.md#object---inherits-from-any
* @param {object} options.validator - a validation function. Must return an { error, value } object. Schema takes precedence over validator.
* @param {object} options.nack - a message nack arguments object
* @param {bool} options.nack.allUpTo - defaults to false, see allUpTo https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
* @param {bool} options.nack.requeue - defaults to false, see requeue https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
* @param {number} options.retry - the ttl for retrying the message on top of the queue. Will create a dead letter exchange and no consumer queue to enable it
* @param {number} options.dlPrefix - the prefix for the dead letter exchange and no consumer queue
* @param {number} options.maxTries - The number of tries when sent to deadletter (defaults to none (unlimited retries))
* @param {string} options.dumpQueue - the queue where to send the message when the maxTries is reached (defaults to none (discards if maxTries is set))
* @return {Promise<object>} A promise that resolves { consumerTag }, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
*/
channel.subscribeToMessages(
"topic/my-topic-exchange/my.routing.key/my-queue",
async ({ message }) => {
// handle message
}
);
/**
* Publish to an exchange
* @param {string} qualifier - the publish string, see Publish qualifier
* @param {object|Buffer} message - The message
* @param {object} options - The subscribe options
* @param {object} options.headers - Additional headers for the message
* @return {Promise<Bool>} A promise that resolves Bool, see http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish
*/
channel.publishMessage("topic/my-topic-exchange/my.routing.*", {
foo: "bar"
});
Fanout Exchange
const connection = amqpconnector().connect();
const channel = connection.buildChannelIfNotExists({ json: true });
/**
* @callback subscribeCallback
* @param {object} message - an amqp message, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
* @param {function} invoke - a contextual invoke function
* @param {function} publishMessage - a contextual publishMessage function
* Subscribes an exchange
* @param {string} qualifier - the subscription string, see Subscribe qualifier
* @param {subscribeCallback} callback - The message handler
* @param {object} options - The subscribe options
* @param {object} options.exchange - The exchange parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange
* @param {object} options.queue - The queue parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
* @param {object} options.headers - The subscribe headers, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_bindQueue
* @param {object} options.schema - a Joi validation schema, see https://github.com/hapijs/joi/blob/v16.0.0-rc2/API.md#object---inherits-from-any
* @param {object} options.validator - a validation function. Must return an { error, value } object. Schema takes precedence over validator.
* @param {object} options.nack - a message nack arguments object
* @param {bool} options.nack.allUpTo - defaults to false, see allUpTo https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
* @param {bool} options.nack.requeue - defaults to false, see requeue https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
* @param {number} options.retry - the ttl for retrying the message on top of the queue. Will create a dead letter exchange and no consumer queue to enable it
* @param {number} options.dlPrefix - the prefix for the dead letter exchange and no consumer queue
* @param {number} options.maxTries - The number of tries when sent to deadletter (defaults to none (unlimited retries))
* @param {string} options.dumpQueue - the queue where to send the message when the maxTries is reached (defaults to none (discards if maxTries is set))
* @return {Promise<object>} A promise that resolves { consumerTag }, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
*/
channel.subscribeToMessages(
"fanout/my-fanout-exchange/my-queue",
async ({ message }) => {
// handle message
}
);
/**
* Publish to an exchange
* @param {string} qualifier - the publish string, see Publish qualifier
* @param {object|Buffer} message - The message
* @param {object} options - The subscribe options
* @param {object} options.headers - Additional headers for the message
* @return {Promise<Bool>} A promise that resolves Bool, see http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish
*/
channel.publishMessage("fanout/my-fanout-exchange", {
foo: "bar"
});
Headers Exchange
const connection = amqpconnector().connect();
const channel = connection.buildChannelIfNotExists({ json: true });
/**
* @callback subscribeCallback
* @param {object} message - an amqp message, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
* @param {function} invoke - a contextual invoke function
* @param {function} publishMessage - a contextual publishMessage function
* Subscribes an exchange
* @param {string} qualifier - the subscription string, see Subscribe qualifier
* @param {subscribeCallback} callback - The message handler
* @param {object} options - The subscribe options
* @param {object} options.exchange - The exchange parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange
* @param {object} options.queue - The queue parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
* @param {object} options.headers - The subscribe headers, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_bindQueue
* @param {object} options.schema - a Joi validation schema, see https://github.com/hapijs/joi/blob/v16.0.0-rc2/API.md#object---inherits-from-any
* @param {object} options.validator - a validation function. Must return an { error, value } object. Schema takes precedence over validator.
* @param {object} options.nack - a message nack arguments object
* @param {bool} options.nack.allUpTo - defaults to false, see allUpTo https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
* @param {bool} options.nack.requeue - defaults to false, see requeue https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
* @param {number} options.retry - the ttl for retrying the message on top of the queue. Will create a dead letter exchange and no consumer queue to enable it
* @param {number} options.dlPrefix - the prefix for the dead letter exchange and no consumer queue
* @param {number} options.maxTries - The number of tries when sent to deadletter (defaults to none (unlimited retries))
* @param {string} options.dumpQueue - the queue where to send the message when the maxTries is reached (defaults to none (discards if maxTries is set))
* @return {Promise<object>} A promise that resolves { consumerTag }, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
*/
channel.subscribeToMessages(
"headers/my-headers-exchange/my-queue",
async ({ message }) => {
// handle message
},
{
headers: {
customheader: "my-header",
"x-match": "any"
}
}
);
/**
* Publish to an exchange
* @param {string} qualifier - the publish string, see Publish qualifier
* @param {object|Buffer} message - The message
* @param {object} options - The subscribe options
* @param {object} options.headers - Additional headers for the message
* @return {Promise<Bool>} A promise that resolves Bool, see http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish
*/
channel.publishMessage(
"headers/my-headers-exchange",
{
foo: "bar"
},
{
headers: {
customheader: "my-header"
}
}
);
Send To Queue
const connection = amqpconnector().connect();
const channel = connection.buildChannelIfNotExists({ json: true });
/**
* @callback subscribeCallback
* @param {object} message - an amqp message, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
* @param {function} invoke - a contextual invoke function
* @param {function} publishMessage - a contextual publishMessage function
* Subscribes an exchange
* @param {string} qualifier - the subscription string, see Subscribe qualifier
* @param {subscribeCallback} callback - The message handler
* @param {object} options - The subscribe options
* @param {object} options.exchange - The exchange parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange
* @param {object} options.queue - The queue parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
* @param {object} options.headers - The subscribe headers, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_bindQueue
* @param {object} options.schema - a Joi validation schema, see https://github.com/hapijs/joi/blob/v16.0.0-rc2/API.md#object---inherits-from-any
* @param {object} options.validator - a validation function. Must return an { error, value } object. Schema takes precedence over validator.
* @param {object} options.nack - a message nack arguments object
* @param {bool} options.nack.allUpTo - defaults to false, see allUpTo https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
* @param {bool} options.nack.requeue - defaults to false, see requeue https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
* @param {number} options.retry - the ttl for retrying the message on top of the queue. Will create a dead letter exchange and no consumer queue to enable it
* @param {number} options.dlPrefix - the prefix for the dead letter exchange and no consumer queue
* @param {number} options.maxTries - The number of tries when sent to deadletter (defaults to none (unlimited retries))
* @param {string} options.dumpQueue - the queue where to send the message when the maxTries is reached (defaults to none (discards if maxTries is set))
* @return {Promise<object>} A promise that resolves { consumerTag }, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
*/
channel.subscribeToMessages(
"direct/any-exchange//my-queue",
async ({ message }) => {
// handle message
}
);
/**
* Publish to a queue
* @param {string} qualifier - the publish string, see Publish qualifier
* @param {object|Buffer} message - The message
* @param {object} options - The subscribe options
* @param {object} options.headers - Additional headers for the message
* @return {Promise<Bool>} A promise that resolves Bool, see http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish
*/
channel.publishMessage("q/my-queue", {
foo: "bar"
});
RPC
- call a remote function through
amqp
(note thatinvoke
is fail fast and does not benefit of node-amqp-connection-manager's features)
const connection = amqpconnector().connect();
const channel = connection.buildChannelIfNotExists({ json: true });
/**
* @callback listenCallback
* @param {object} message - an amqp message, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
* @param {function} invoke - a contextual invoke function
* @param {function} publishMessage - a contextual publishMessage function
* Listens to an RPC queue
* @param {string} qualifier - the subscription string, see Subscribe qualifier
* @param {listenCallback} callback - The message handler
* @param {object} options - The subscribe options
* @param {object} options.queue - The queue parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
* @param {object} options.schema - a Joi validation schema, see https://github.com/hapijs/joi/blob/v16.0.0-rc2/API.md#object---inherits-from-any
* @param {object} options.validator - a validation function. Must return an { error, value } object. Schema takes precedence over validator.
* @return {Promise<object>} A promise that resolves { consumerTag }, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
*/
channel.listen("my-rpc-function", async ({ message }) => {
return { value: message.content.value * 42 };
});
/**
* Invoke an RPC function
* @param {string} qualifier - the queue string
* @param {object|Buffer} message - The message
* @param {object} options - The subscribe options
* @param {integer} options.timeout - the timeout (ms) for the call
* @param {object} options.headers - Additional headers for the message
* @return {Promise} A promise that resolves the function response
*/
const result = await channel.invoke("my-rpc-function", { value: 1337 }); // { value: 57491 }
Distributed tracing
- By using the contextual
invoke
andpublishMessage
functions you can easily trace the journey of a message
channel.listen("my-traced-rpc-function-4", async ({ message }) => {
// message.properties.headers["x-transaction-stack"] === ["qSdeF", "hYud7", "GTynl", "zQwfG"]
return { value: 5 * message.content.value };
});
channel.listen("my-traced-rpc-function-3", async ({ message, invoke }) => {
// message.properties.headers["x-transaction-stack"] === ["qSdeF", "hYud7", "GTynl"]
const mess = await invoke("my-traced-rpc-function-4", {
value: 4 * message.content.value
});
return mess.content;
});
channel.listen("my-traced-rpc-function-2", async ({ message, invoke }) => {
// message.properties.headers["x-transaction-stack"] === ["qSdeF", "hYud7"]
const mess = await invoke("my-traced-rpc-function-3", {
value: 3 * message.content.value
});
return mess.content;
});
channel.listen("my-traced-rpc-function-1", async ({ message, invoke }) => {
// message.properties.headers["x-transaction-stack"] === ["qSdeF"]
const mess = await invoke("my-traced-rpc-function-2", {
value: 2 * message.content.value
});
return mess.content;
});
const result = await channel
.invoke("my-traced-rpc-function-1", { value: 42 })
.then(response => {
// response.content.value === 42 * 2 * 3 * 4 * 5;
});
Dead letter
- this will create a dead letter exchange
my_dl_5000
and a no consumer queuemy_dl_5000
with adequate arguments to enable automatic retry on top of the queue every 5000ms. - providing a
maxTries
value will make the broker retest the messagemaxTries - 1
(otherwise it is endless retries), then it will eitherack
the message (which basically discards it) or send it to thedumpQueue
.
channel.subscribeToMessages(
"direct/my-direct-exchange/my.routing.key/my-queue",
async ({ message }) => {
// handle
},
{
retry: 5000,
dlPrefix: "my_dl_", // defaults to dl_
maxTries: 10, // defaults to none (endless)
dumpQueue: "my-dump-queue" // defaults to none (discard if maxTries is set)
}
);
Stream over AMQP (alpha)
- channels must be raw
channel.listen("my-rpc-function-stream-1", async () => {
return fs.createReadStream("/path/to/file");
});
const stream = channel.invoke(
"stream/my-rpc-function-stream-1",
Buffer.from("")
);
stream.pipe(fs.createWriteStream("/path/to/other_file"));
Qualifier structure
Publish qualifiers
direct/exchange/routingkey
direct// => type: direct, exchange: amqp.direct, routing key: ""
topic/exchange/routingkey
topic// => type: topic, exchange: amqp.topic, routing key: ""
fanout/exchange
fanout/ => type: fanout, exchange: amqp.fanout
headers/exchange
headers/ => type: headers, exchange: amqp.headers
q/queue
q/ => type: queue, queue: anonymous
Subscribe qualifiers
direct/exchange/routingkey/queue
direct/// => type: direct, exchange: amqp.direct, routing key: "", queue: anonymous
topic/exchange/routingkey/queue
topic/// => type: topic, exchange: amqp.topic, routing key: "", queue: anonymous
fanout/exchange/queue
fanout// => type: fanout, exchange: amqp.fanout, queue: anonymous
headers/exchange/queue
headers// => type: headers, exchange: amqp.headers, queue: anonymous