@coinify/rabbitmq
v2.0.4
Published
Coinify RabbitMQ client with support for events and tasks
Downloads
393
Readme
node-rabbitmq
Major version changes
Version 2:
- Publishing messages now defaults to using publisher confirms
Implementation details
- Everything happens on the same TCP connection to RabbitMQ. It is created first time it is needed, and cached for subsequent requests.
- Message consumption and publishing use separate channels. They are created first time they are needed, and cached for subsequent requests.
Connection/channel failure
If the channel or connection closes unexpectedly (i.e. #close()
was not called), the library will attempt to reconnect
and re-attach the existing consumers transparently.
Re-connection will be attempted using fibonacci backoff with a maximum delay of 1 minute, meaning that the following delays (seconds) will be used between re-connection failures: 1, 1, 2, 3, 5, 10, 20, 30, 50, 60, 60, 60...
CoinifyRabbit
API
JavaScript
const CoinifyRabbit = require('@coinify/rabbitmq');
TypeScript
import CoinifyRabbit from '@coinify/rabbitmq';
new CoinifyRabbit(options)
Creates a new instance of CoinifyRabbit
The options
argument overrides default configuration options that are specified in the DEFAULT_CONFIGURATION
object in src/CoinifyRabbitConfiguration.ts
.
const options = {
logger: null, // Bunyan-compatible logger
service: { // Service-specific options
name: 'my-service' // Different service names consume events from different queues
}
// For all configuration options, see getDefaultConfig() static function
};
const coinifyRabbit = new CoinifyRabbit(options);
#shutdown(timeout=null): Promise<void>
Perform graceful shutdown, optionally with a timeout.
Calling this function closes the active channel and connection, if they are open. It also stops consuming messages and waits for active consumer functions to finish within the given timeout.
The timeout
argument specifies the number of milliseconds to wait for all consumer functions to finish.
If one or more active consumer function haven't finished in this time, they are nack
'ed and returned to the queue
where they came from.
If timeout = null
, there is no timeout and shutdown()
will wait for all consumer functions to finish.
The promise resolves when connection and channel is closed, and all consumer functions are have finished or the timeout has been reached.
Events
Topology
All events are published to the topic exchange with the name defined in options.exchanges.eventsTopic
.
The name defaults to 'events.topic'
, and shouldn't really be changed unless you know what you're doing.
Emitting an event (using #emitEvent(eventName, context)
) publishes a message to this exchange, with the routing key
config.service.name + '.' + eventName
.
Subscribing to consuming an event (using #registerEventConsumer(eventKey, options, consumeFn)
) binds a queue to this exchange,
using eventKey
as the binding key.
The queue name is determined by the (eventKey, config.service.name)
pair, meaning that if multiple consumers
call the registerEventConsumer
function with the same eventKey
and having the same config.service.name
, they will consume
from the same queue.
Specifically, the queue name is 'events.' + options.service.name + '.' + eventKey
.
Event message
Emitting an event with name eventName
and context context
publishes the following JSON object:
{
eventName: fullEventName, // serviceName + '.' + eventName
context: context,
uuid: 'd51bbaed-1ee8-4bb6-a739-cee5b56ee518', // Actual UUID generated upon emit
time: 1504865878534 // Timestamp of event, in milliseconds since UNIX epoc
}
Example
const CoinifyRabbit = require('@coinify/rabbitmq');
const coinifyRabbit = new CoinifyRabbit({service: {name: 'my-service'}});
async function testEvent() {
await coinifyRabbit.registerEventConsumer('my-service.my-event', async (context, event) => {
const {eventName, uuid, time} = event;
console.log('Event consumed', {context, eventName, uuid, time});
process.exit(0);
});
await coinifyRabbit.emitEvent('my-event', {myContext: true});
}
testEvent();
#emitEvent(eventName, context, options={}): Promise<{eventName, context, uuid, time}>
Emits an event with an associated context
const result = await coinifyRabbit.emitEvent('my-event', {myContext: true});
// result is an object with the below keys
const {eventName, context, uuid, time} = result;
NOTE: If options.service.name
is defined, the actual emitted eventName
is prefixed with the service name and a dot (.
).
#registerEventConsumer(eventKey, consumeFn, options = {}): Promise<string>
Registers a function for consuming a specific event
const consumerTag = await coinifyRabbit.registerEventConsumer('my-service.my-event', async(context, event) => {
// Resolve to ACK event, removing it from the queue
// Reject and event will not be ACK'ed
// context is the same as emitted context object
// event is object of {eventName, uuid, time}
console.log({context, event});
});
// consumerTag is used to cancel consumer again at a later point. (Not yet implemented)
The following properties can be set in options
:
retry
: Configure retry mechanism for this consumer. See Retry section for more informationuniqueQueue
: Create a unique queue (true
) instead of default behaviour (false
) where each instance of the service consumes from the same queue. Setting this totrue
will cause the event to be consumed by each event consumer.consumer
: Consumer-specific options. Must be an object with the following properties:prefetch
: Prefetch value for this consumer. See Prefetch section for more information.
usePublisherConfirm
: Use publisher confirms as default for publishing messages. Defaults totrue
.service
: Service-specific options. Must be an object with the following properties:name
: Overrides theoptions.service.name
set in theCoinifyRabbit
constructor. Prefixes the consumed task name withname
and a dot (.
).
eventKey
wildcards
Given that events are emitted to a RabbitMQ topic exchange,
you can use wildcards in the eventKey
argument to consume more than one event.
Dots (.
) are used to separate words, which can be replaced with wildcards.
*
can substitute for exactly one word#
can substitute for zero or more words
Examples
Imagine that four consumers are registered, using the following eventKey
s:
- (a)
my-service.my-event.happened
(specific event, no wildcards) - (b)
my-service.my-event.*
- (c)
my-service.#
- (d)
#
Then, four events with the following event names are emitted:
my-service.my-event.happened
- Consumed by all 4 consumers (a+b+c+d)
my-service.my-event.failed
- Consumed by 3 consumers (b+c+d)
my-service.another-event.happened
- Consumed by 2 consumers (c+d)
yourService.shit.hit.the.fan
- Consumed only by consumer (d)
Tasks
Topology
All tasks are published to the topic exchange with the name defined in options.exchanges.tasksTopic
.
The name defaults to 'tasks.topic'
, and shouldn't really be changed unless you know what you're doing.
Enqueueing a task (using #enqueueTask(taskName, context)
) publishes a message to this exchange, with the routing key
taskName
.
Subscribing to consuming a task (using #registerTaskConsumer(taskName, consumeFn)
) creates a queue with the name
'tasks.' + options.service.name + '.' + taskName
, and binds it to the exchange,
using options.service.name + '.' + taskName
as the binding key.
A task can be enqueued delayed, meaning that it won't be available for consumers until a specified delay has passed. See the below "Delayed tasks" section for more information.
Task message
Enqueueing a task with name taskName
and context context
publishes the following JSON object:
{
"taskName": taskName,
"context": context,
"uuid": 'f07d33d7-f56f-4c89-9489-1bc89d3a6483', // Actual UUID generated upon emit
"time": 1506331856322, // Timestamp of task, in milliseconds since UNIX epoc
"attempt": 0, // number of current attempt. Used by retry mechanism
"origin": 'another-service', // Name of service that enqueued the task
"delayMillis": 60000 // Only present if task was enqueued as a delayed task
}
Example
const CoinifyRabbit = require('@coinify/rabbitmq');
const coinifyRabbit = new CoinifyRabbit({service: {name: 'my-service'}});
async function testTask() {
await rabbit.registerTaskConsumer('my-task', async (context, task) => {
const {taskName, uuid, time} = task;
console.log('Task consumed', {context, taskName, uuid, time});
process.exit(0);
});
await rabbit.enqueueTask('my-service.my-task', context);
}
testTask();
#enqueueTask(taskName, context, options={}): Promise<{taskName, context, uuid, time}>
Enqueues a task with an associated context
const result = await coinifyRabbit.enqueueTask('my-service.my-task', {myContext: true});
// result is true if task was enqueued correctly.
Delayed tasks
In order to enqueue a delayed task, you must specify a delayMillis
field in the options
object when
calling enqueueTask()
. The following example enqueues a task that will not be ready for consumption
until 1 minute (60 seconds) later:
await coinifyRabbit.enqueueTask('my-service.my-task', {myContext: true}, {delayMillis: 60000});
Retry functionality for tasks and events is implemented using (Dead Letter Exchanges)[https://www.rabbitmq.com/dlx.html] and (Per-Queue Message TTL)[https://www.rabbitmq.com/ttl.html] similarly to the retry functionality described in greater detail below.
#registerTaskConsumer(taskName, consumeFn, options={})
Registers a function for consuming a specific task
const consumeOptions = {
retry: {
backoff: {
type: 'exponential'
},
// Retry at most two times (initial attempt, first retry, second retry)
maxAttempts: 2
}
};
const consumerTag = await coinifyRabbit.registerTaskConsumer('my-task', async(context, task) => {
// Resolve to ACK task, removing it from the queue
// Reject and task will not be ACK'ed
// context is the same as enqueued context object
// task is object of {taskName, uuid, time}
console.log({context, task});
}, consumeOptions);
// consumerTag is used to cancel consumer again at a later point. (Not yet implemented)
The given taskName
will be prefixed with '<serviceName>.'
to produce a full task name, which is used for consumption.
<serviceName>
is defined in options.service.name
in CoinifyRabbit
constructor or directly in options
object.
The following properties can be set in options
:
retry
: Configure retry mechanism for this consumer. See Retry section for more informationuniqueQueue
: Create a unique queue (true
) instead of default behaviour (false
) where each instance of the service consumes from the same queue. Setting this totrue
will cause the event to be consumed by each event consumer.consumer
: Consumer-specific options. Must be an object with the following properties:prefetch
: Prefetch value for this consumer. See Prefetch section for more information.
service
: Service-specific options. Must be an object with the following properties:name
: Overrides theoptions.service.name
set in theCoinifyRabbit
constructor. Prefixes the consumed task name withname
and a dot (.
).
Retry
Retry functionality for tasks and events is implemented using (Dead Letter Exchanges)[https://www.rabbitmq.com/dlx.html] and (Per-Queue Message TTL)[https://www.rabbitmq.com/ttl.html]:
When consumption of a message fails, a retry mechanism (see options.retry
argument to #registerTaskConsumer()
) determines
whether the message should be retried at a later point in time, and if so, how much time to wait (the delay) until the next attempt.
If the message should not be retried (i.e. consumer is configured to not retry, or maximum number of retry attempts reached)
the message will be republished to the fanout exchange with the name defined in options.exchanges.failed
,
which defaults to '_failed'
. To this exchange is bound a single queue with the name defined in options.queues.failed
,
which also defaults to '_failed'
. Failed messages will remain here until manually removed.
If the message should be retried with a delay of t
milliseconds, it will be re-published to the direct exchange with the name
options.exchanges.retry
.
To this exchange is bound queues with the names options.queues.retryPrefix + '.' + t + 'ms'
, which are bound to the exchange.
The queues are configured to have messages expire after t
milliseconds, and republished back to the default direct,
using the name of the original queue as the routing key.
When registering a consumer, the following retry properties can be set in the options
argument:
retry
: Specify how (if at all) performing the task should be retried on failure (ifconsumeFn
rejects). If not specified, defaults no retry (false
). If specified, must be anfalse
or an object with the following properties:backoff
: Backoff definition. If specified, must be an object with the following properties:type
: Backoff type: Must be'exponential'
or'fixed'
. Defaults tofixed
- For
exponential
backoff, the delay until next retry is calculated as(delay * (base ^ n))
, wheren
is the current attempt (0-indexed). First retry is thus always afterdelay
seconds - For
fixed
backoff, the delay until next retry is alwaysdelay
- For
delay
: Delay in seconds for first retry. Defaults to 16base
: (Only forexponential
type) The base number for the exponentiation. Defaults to 2
maxAttempts
: The maximum number of retry attempts. Defaults to 12. If set to e.g. 1, the task will at most be run twice: One for the original attempt, and one retry attempt. Setting this to 0 is the same as settingretry: false
.
consumeFn
usage
If consumeFn
rejects with an Error that has noRetry: true
property set, the task will not be retried
regardless of what the options.retry
settings specify.
Prefetch
Prefetch values (limits on the number of unacknowledged messages) can be set both on a per-channel and per-consumer basis.
Per-channel value is set in the configuration value channel.prefetch
.
Per-consumer value is set by default in the configuration value consumer.prefetch
, which can be overridden for
individual consumers using the options.consumer.prefetch
argument to registerEventConsumer
and registerTaskConsumer
functions.
NOTE: When overriding default per-consumer prefetch value (using options.consumer.prefetch
argument),
you must take to register consumers serially (i.e. not in parallel) due to possible race conditions mixing
up prefetch values.
See (Consumer prefetch)[https://www.rabbitmq.com/consumer-prefetch.html] and (Confirms)[https://www.rabbitmq.com/confirms.html] for more information.
Failed Messages
As described in the section about retry, all failed messages will end up in the _failed
queue. We can then consume them and manually re-enqueue the messages to a specified queue we want to retry by using the functions below.
#registerFailedMessageConsumer(consumeFn, options={})
Registers a function for consuming a task or event from the queue of failed messages
const consumerTag = await coinifyRabbit.registerFailedMessageConsumer(async(routingKey, message) => {
// Resolve to ACK task, removing it from the queue
// Reject and task will bet NACK'ed and re-enqueued
// routingKey is the name of the queue that the message was failed from
// message is object of {eventName|taskName, context, uuid, time, attempts}
console.log({context, message});
});
#registerFailedMessageConsumer
bears resemblance to #registerTaskConsumer
and #registerEventConsumer
, however it will not need to match to any (event or task) name.
The following properties can be set in options
:
consumer
: Consumer-specific options. Must be an object with the following properties:prefetch
: Prefetch value for this consumer. See Prefetch section for more information.
#enqueueMessage(queueName, messageObject)
Enqueues a message to a specific queue. This can be of type event or task message
const messageObject = {
eventName: fullEventName, // serviceName + '.' + eventName
context: context,
uuid: 'd51bbaed-1ee8-4bb6-a739-cee5b56ee518', // Actual UUID generated upon emit
time: 1504865878534 // Timestamp of event, in milliseconds since UNIX epoc
}
const result = await coinifyRabbit.enqueueMessage('events.accounting.trade.trade-completed', messageObject);
// result is true if message was enqueued correctly.
Failed Message Handling
It is possible to setup and use #registerFailedMessageConsumer
and #enqueueMessage
for handling failed message and evaluating them for re-enqueueing:
const consumerTag = await coinifyRabbit.registerFailedMessageConsumer(async(routingKey, message) => {
// Logic for determining whether the failed message
// should be re-enqueued or if other action should be taken.
// If it wishes to re-enqueue do:
const result = await coinifyRabbit.enqueueMessage(routingKey, message);
});
Alternatively, if one is in a scenario where all failed messages should be re-enqueued right away, it can be done as:
const consumerTag = await coinifyRabbit.registerFailedMessageConsumer(async (routingKey, message) => coinifyRabbit.enqueueMessage(routingKey, message));
Suggestions for improvement
- Ping functionality for service health check
- CLI scripts to emit events / enqueue tasks
- Split main lib code into smaller files