@leisurelink/event-messenger
v1.0.0
Published
disclaimer: this library is new; so may have some upcoming tweaks to the interface after we use it in practice
Downloads
3
Keywords
Readme
Event Messenger
disclaimer: this library is new; so may have some upcoming tweaks to the interface after we use it in practice
Features
Security
this library biases towards implementing a security model under RabbitMQ where each application has it's own silo of messaging; prefixed by it's name. e.g. widget-api will login as widget
, and only have access to resources with the prefix /^widget\./
. any routing of messages from other application domains must be configured administratively (i.e. allowed) using appropriate amqp tactics (routing keys; etc)
Retry/Delay/Fail
this library has the ability to retry failed messages after a variable delay; or after a maximum number of failures deliver to a .failed queue.
Resilient Connections
by default this library will retry connections to the amqp server, once every 10 seconds. it will also re-establish the underlying primary channel. future versions may implement/provide a channel pool to provide better multi-process transaction isolation.
Encoding/Decoding
amqp by default requires low-level Buffer data to send messages, or returns mixed Strings or Buffers as content. in our first version, you can simply send Object's, and they will be json encoded, or decoded as message.parsedContent.
future versions will implement a set of extended content plugins to encode/decode "attached" data with the message, such as authentication context, transaction identifiers.
amqplib
this library is built around amqplib's callback api. many of our api's are identical/inspired by amqplib's interface. you also have access to queue.channel in case you need to handle more advanced messaging.
Getting Started
example:
var EventMessenger = require('event-messenger');
EventMessenger.connect('amqp://vrware-booking:password@localhost', function(err, conn, channel) {
// i will be called every time the connection is retried!
var queue = EventMessenger.queue(conn.username, channel);
queue.init(function(err) {
if (err) { throw err; }
// ready to go!
// access the amqplib channel directly
queue.channel.bindQueue(
queue.queueName,
queue.exchangeName('topic')),
'vrware-booking.booking.cancelled'
);
// send message
queue.sendMessage(...)
});
});
Publishing Messages
queue.sendMessage(exchangeType, routingKey, content, options, callback)
the exchange type can be direct/topic/headers (the domain prefix will be auto-added) routingKey and options are the same accepted by amqplib content is the data your are sending. by default objects will be json stringified.
callback is optional, but recommended (especially if sending many messages; amqplib does not actually send data until the next process tick, so a callback will yield)
note: future versions will allow attaching other contextual data (like security context) as a substructure of options; plus plugins that can actively encode/decode that data.
Receiving Messages
- all event messenger messages require acknowledgement (call queue.ack or queue.reject)
- be sure to revisit the "retry" section below, you'll most likely want to pass retry options to the queue() constructor
// see api doc below for selector options
queue.addConsumer(selector, function(msg, queue) {
// do your work, but be sure to *always* ack/reject
// otherwise your consumer process will eventually stall once
// [prefetch] messages are unresolved. (see queue options)
if (err) {
// if retry enabled, reject will retry in a delayed fashion
// if retry not enabled, puts back on queue.
// WARNING: rejecting something that will always fail without
// retry+fail enabled will thrash the queue!
return queue.reject(msg);
// or if retry+fail enabled, reject w/o requeue will send to the .failed queue
return queue.reject(msg, false);
// or log/ack
console.error('ERROR bad message! move along...');
return queue.ack(msg);
}
console.log("received: ", msg.parsedContent);
// msg.parsedContent is parsed json (or other)
// msg.content for raw Buffer/String from amqplib
return queue.ack(msg);
});
just like amqplib, queue.reject(msg, false) will not requeue the message, and if you have Retry enabled, it will deliver to the .failed queue.
for multiple queues or per-process queues, see EventMessenger.queue options and the Concepts section.
lastly you should start consumption:
queue.startConsuming(function(err) {
if (err) { throw err; }
});
Retry/Fail
event messenger has a module that implements retry/fail capabilities in RabbitMQ. this is integrated into the EventMessenger Queue with the following options:
var queue = EventMessenger.queue('app-domain', channel, {
retry: {
delay: 120000, // or
delay: function(retries, msg) {
// retry in 2 minutes, then 5, then 10...
return ([2,2,5][retries] || 10)*60*1000;
},
maxRetries: 14, // ~2hrs
// send max-retried or rejected+no-requeue mesages to app-domain.default.failed queue
failQueue: true
}
});
API details
EventMessenger.connect(url, [sockopts], callback)
callback
: function(err, connection, channel)
this connect function will auto-reonnect if ther eare problems with the connection or channel, as such this callback can be called many times!
EventMessenger.queue(appName, channel, [options], [callback])
appName
this is your app name, which will be prefixed in front of exchanges/queues/etc.
in official environments this will be your amqp://username, and can be accessed via conn.username (which will be guest if using amqp://localhost).
channel
the amqp channel (returned by EventMessenger.connect)
options.process
- defaults to "default", which creates a queue named app.default
- can be another name
- if true, will create a temporary queue just for your process (use queue.queueName for bindings/etc)
most applications will need to subscribe to messages under a single processing queue; so we always create one by default. however if you have a more complex set of subscriptions, where you have classes of messages which could starve the processing of others (i.e. batch updates which publish 1M messages in a short period) you would prefer to use { process: 'volume-updates' }
options.prefetch
- defaults to 10
prefetch sets the maximum allowable messages that can be unacknowledged by your process. this is effectively throttling. since NodeJS is inherently asynchronous for many operations, it's important to setup some sort of operational limits on consumption. there is no perfect solution or number for this, but consider this scenario:
your queue is processing offline quotes, and let's say they take long time to return a result (60 seconds). without limits, your consumer could consume 100k messages from the queue in 60 seconds, and open 100k quote api connections before getting a response from the first quote... not really recommended; and might crash yours or the quote system!
unforunately administrative throttling of a queue is not part of the amqp spec, so this is the default way to limit each consumer.
callback
optional, but recommended. amqplib cannot send messages until process.nextTick, so if you send lots of messages without a callback, they are buffered in memory.
Queue.sendMessage
see amqplib Channel#publish, but for exchange
name one of your core exchange types (direct, headers, topic).
methods identical to amqplib.Channel
#ack #reject
Queue.addConsumer(selector, consumer)
if a selector matches an incoming message, it's sent to consumer
selector
- String or Regexp matches against domainKey
- a function(message) that returns truthy/falsey
consumer
- function(message, queue)
Queue.startConsuming(callback)
- start consuming messages
- any messages that do not match a subscriber/selector are rejected without retry (sent to .failed if configured) and a
noConsumer
event with message will be emitted.
Queue.stopConsuming(callback)
- stops accepting new messages (there may still be unacked messages still processing); useful if doing a graceful+delayed shutdown of your process