taskcluster-lib-pulse
v11.1.1
Published
Library for interacting with Pulse and Taskcluster-Pulse
Downloads
19
Readme
taskcluster-lib-pulse
Library for interacting with Pulse and Taskcluster-Pulse. See the docs for more information on Pulse.
This library is designed for use in Taskcluster services, both for producing and consuming pulse messages.
Usage
This library defines a Client along with several classes and functions that base their functionality on a Client. The Client represents an association with a Pulse service, automatically reconnecting as necessary.
It also provides higher-level components with simplified APIs for common applications. The higher-level components are:
If you are using one of the higher-level components, then the details of interacting with a Client are not important -- just construct one and move on.
Client
Create a credentials function, choosing among:
const pulse = require('taskcluster-lib-pulse');
let credentials;
// raw AMQP credentials
credentials = pulse.pulseCredentials({
username: 'sendr',
password: 'sekrit',
hostname: 'pulse.mycompany.com',
vhost: '/',
});
// ..or a connection string
credentials = pulse.connectionStringCredentials(
'amqps://me:[email protected]/%2Fvhost');
Next, create a Client
to handle (re)connecting to Pulse:
const client = new pulse.Client({
namespace: 'my-service',
credentials, // from above
monitor: .., // taskcluster-lib-monitor instance
});
The Client
is responsible for connecting, and re-connecting, to the pulse
server. Once created, it will do so automatically until stopped.
Other options to the constructor:
recycleInterval
- interval on which connections are automatically recycled, in ms. Default: 1 hour.retirementDelay
- time that a connection remains in theretiring
state. Default: 30 seconds.minReconnectionInterval
- minimum time between connection attempts. Default: 15s.
Interacting With a Client
AMQP is a very connection-oriented protocol, so as a user of this library, you
will need to set up each new connection. To do so, set up an event listener
in a handler for the connected
event from the client:
client.onConnected(conn => {
// ...
});
The conn
value of this event is a Connection
instance, from this library.
The amqplib connection is available as conn.amqp
. The listener should create
any necessary channels, declare queues and exchanges, and - if consuming
messages - bind to those queues.
The onConnected
method is a shorthand for on('connected', ..)
that also
calls the handler immediately if the Client is already connected. Its return
value can be used with client.removeListener
just like any other EventEmitter
listener.
Note that declaring non-durable queues in this method may lead to message loss or duplication: when this connection fails, the server will delete the queues and any pending messages. If this is not acceptable for your application, use a durable queue.
The library cannot detect all problems with an existing connection. If any
method produces an error that might be fixed by reconnecting, call the
connection's failed
method. This will mark the connection as failed and
begin cretaing a new connection (culminating in another connected
event).
Active Connection
The activeConnection
property contains the current Connection, or undefined
if no connection exists at the moment. In most cases, you will want to use
onConnected
or withConnection
to get access to a Client's connection,
rather than this property.
Manipulating AMQP Objects
If you have a one-off task that requires a channel, such as declaring an
exchange, use client.withChannel
, which will wait for a connection if
necessary, then run your asynchronous function with an amqplib channel or
confirmChannel. If the function fails, it is not automatically retried, but the
channel is closed.
await client.withChannel(async channel => { .. }, {confirmChannel: true});
await client.withChannel(async channel => { .. }, {confirmChannel: false});
There is also a more general withConnection
which returns the Connection
instance without creating a channel.
await client.withConnection(async conn => { .. });
The most common use case for these functions is to declare or delete objects on the AMQP server. For example:
await client.withChannel(async chan => {
const exchangeName = client.fullObjectName('exchange', 'notable-things');
await chan.assertExchange(exchangeName, 'topic');
});
Object Names
Note that the example above uses the fullObjectName
method. This method will
generate an exchange name compatible with the pulse access control model, in
this case exchanges/<namespace>/notable-things
.
This method is useful for translating unqualified names like queueName
to the
fully qualified names required when working directly with amqplib.
Reconnection
The Client
instance will automatically reconnect periodically. This helps
to distribute load across a cluster of servers, and also exerciess the
reconnection logic in the application, avoiding nasty surprises when a network
or server failure occurs.
The Client
also has a recycle
method that will trigger a retirement and
reconnection.
Retirement
When a connection is still working, but a new connection is being created, the old connection spends 30 seconds "retiring". The intent of this delay is to allow any ongoing message handling to complete before closing the underlying AMQP connection.
The Connection
instance emits a retiring
event when retirement begins.
Consumers should respond to this message by cancelling any channel consumers.
The retiring
event from the Connection
will be followed by a
connected
event from the Client
for the next connection.
Shutdown
Call the async Client.stop()
method to shut the whole thing down. This will
wait until all existing Connection
instances are finished their retirement.
Examples
Consumer
To consume messages, listen for connected
messages and set up a new
channel on each new connection. Stop consuming from the channel on retirement,
allowing time for any in-flight consumption to complete before the connection
is finished.
The whole thing is wrapped in a try/catch so that any errors in connection setup are treated as a connection failure.
NOTE: the PulseConsumer
class implements a full-featured consumer; this
code is provided merely as an example of Client usage.
client.onConnected(async (conn) => {
let channel, consumer;
try {
const amqp = conn.amqp;
channel = await amqp.createChannel();
await channel.assertExchange(exchangeName, 'topic');
await channel.assertQueue(queueName);
await channel.bindQueue(queueName, exchangeName, routingKeyPattern);
consumer = await channel.consume(queueName, (msg) => {
// do something with the message, then ack it..
channel.ack(msg);
});
conn.on('retiring', () => {
// ignore errors in this call: the connection is already retiring..
channel.cancel(consumer.consumerTag).catch(() => {});
});
} catch (err) {
debug('error in connected listener: %s', err);
conn.failed();
}
});
Fakes for Testing
The FakeClient
class can be used instead of Client
in testing situations,
to avoid the need for an actual AMQP server. The class itself has no
functionality, but serves as a semaphore to activate a "fake" mode when passed
to higher-level components such as PulseConsumer
: fakeConsumer =
consume({client: new FakeClient(), ..})
.
The stop
and recycle
methods do nothing. The activeConnection
property
is always undefined. All of onConnected
, withConnection
, and withChannel
return immediately without invoking their callback.
A fake client has client.isFakeClient
set to true.
PulseConsumer
A PulseConsumer declares a queue and listens for messages on that queue, invoking a callback for each message.
Construct a PulseConsumer with the async consume
function:
const pulse = require('taskcluster-lib-pulse');
let pc = await pulse.consume({
client, // Client object for connecting to the server
bindings: [{ // exchange/routingKey patterns to bind to
exchange, // Exchange to bind
routingKeyPattern, // Routing key as string
routingKeyReference, // Reference used to parse routing keys (optional)
}, ..],
queueName, // Queue name (without `queues/<namespace>/` prefix)
prefetch, // Max number of messages unacknowledged to hold (optional)
maxLength, // Maximum queue size, undefined for none
...queueOptions, // passed to assertQueue
}, async ({payload, exchange, routingKey, redelivered, routes, routing}) => {
// handle message
...
});
This will create a queue using a pulse-compatible queue name based on
queueName
(prefixed with queue/<namespace>
).
If routingKeyReference
is provided for the exchange from which messages
arrive, then the listener will parse the routing key and make it available as a
dictionary on the message. Note that bindings are easily constructed using the
taskcluster-client library.
The instance starts consuming messages immediately. When the consume
function's
promise has resolved, the queue exists and all bindings are in place.
At this time, it is safe to initiate any actions that might generate messages
you wish to receive.
Call await pc.stop()
to stop consuming messages. A PulseConsumer cannot be
restarted after stopping -- instead, create a new instance. The stop
method's Promise will not resolve until all message-handling has completed and
the channel is closed.
When a message is received, the message handler (which can be specified as
option handleMessage
or as the second positional argument) is called
(asynchronously) with a message of the form:
{
payload, // parsed payload (as JSON)
exchange, // exchange name
routingKey: // primary routing key
redelivered: // true if this message has already been attempted
routes: [..] // additional routes (from CC header, with the `route.`
// prefix stripped)
routing: {} // parsed routes (if routingKeyReference is provided)
}
If the handler fails, the message will be re-queued and re-tried once.
Ephemeral Consumers
The default mode of operation is to create a "permanent" queue that will
persist across restarts and reconnections, thereby ensuring no messages are
lost. If multiple processes use the same queueName
, then a message will only
be delivered to one process, provinding an easy way to distribute
message-processing load.
Some uses of Pulse call instead for a queue for each process, and arranging that the queue be automatically cleaned up when the process goes away. For example, some components of an application may notify other components that some data has changed.
This is referred to as an "ephemeral consumer". The automatic cleanup occurs on reconnection, meaning that it is possible and common for messages to be lost when an AMQP connection is recycled. Callers must handle this situation.
To use an ephemeral queue, pass ephemeral: true
, do not pass a queueName
(as one will be invented for you), and pass an onConnected
function that will
be called (asynchronously) every time a new connection is established. Such a
call is a signal that messages may have been skipped. In the example given
above, the onConnected
callback would assume that a data-change notification
had been missed and perform whatever reconciliation is required.
Routing Key Reference
A binding's routingKeyReference
gives reference information for the format of
a routing key, and allows the tool to "parse" a message's routing key into
components. It is an array of objects with properties name
, the name of the
component, and multipleWords
if the component can match multiple words
(joined with a .
). Other fields are ignored. Only one component can have
multipleWords
. This is compatible with the references produced by
taskcluster services.
routingKeyReference: [
{name: 'routingKeyKind'},
{name: 'someId', multipleWords: true},
]
If this parameter is given, the message will have a routing
property
containing the routing key components keyed by their name.
The library assumes that all messages on a given exchange share the same routing key reference, as it is not practical to determine which routingKeyPattern matched a particular message.
Modifying Bindings
In some cases, it is necessary to modify the bindings for a queue while it is
still consuming. Use queue.withChannel
(above) for this purpose. In this
case, it is simplest to provide an empty bindings: []
to the PulseConsumer
constructor and manage bindings entirely via withChannel
. Note that with this
arrangement, routing key reference is not supported.
Fake Mode
If passed a FakeClient
, consume
will return a fake consumer. That object
does not interface with an AMQP server, but has an async fakeMessage
method
which will call back the message-handling function with the same arguments.
const consumer = consume({
client: new FakeClient(),
...
}, async ({payload, exchange, routingKey, redelivered, routes, routing}) => {
// ...
});
await consumer.fakeMessage({payload: .., exchange: .., ..});
PulsePublisher
The library provides high-level support for publishing messages, as well. The support for sending messages is quite simple, but this component also handles declaring exchanges, message schemas, and so on, and producing a reference document that can be consumed by client libraries to generate easy-to-use clients. All of this is similar to what taskcluster-lib-api does for HTTP APIs.
Declaring Exchanges
Begin by creating an Exchanges
instance. This will collect all exchange
definitions for the service.
const {Exchanges} = require('taskcluster-lib-pulse');
const exchanges = new Exchanges({
serviceName: 'myservice',
projectName: 'taskcluster-myservice',
version: 'v1',
title: 'Title for Exchanges Docs',
description: [
'Description in **markdown**.',
'This will available in reference JSON',
].join(''),
});
The serviceName
should match that passed to
taskcluster-lib-validate.
The projectName
is used to construct exchange and queue names. It should
match the pulse namespace (at least in production deployments) and the name
passed to
taskcluster-lib-monitor,
Having created the exchanges
instance, declare exchanges on it:
exchanges.declare({
exchange: 'egg-hatched',
name: 'eggHatched',
title: 'Egg has Hatched',
description: [
'Description in **markdown**.',
'This will available in reference JSON',
].join(''),
schema: 'egg-hatched-message.yml',
messageBuilder: ({eggId, nestId, hatchDate}) => ({eggId, nestId, hatchDate}),
routingKeyBuilder: ({eggId}) => ({eggId}),
routingKey: [
{
name: 'routingKeyKind',
summary: 'Routing key kind hardcoded to primary in primary routing-key',
constant: 'primary',
required: true,
}, {
name: 'eggId',
summary: 'The egg that has changed state',
required: true,
maxSize: 22,
multipleWords: false,
}, {
name: 'reserved',
summary: 'Space reserved for future use.',
multipleWords: true,
maxSize: 1,
}
],
CCBuilder: (message) => ([`route.nests.${message.nestId}`]),
});
The first few arguments describe the exchange.
exchange
- the name of the exchange. This will be qualified asexchange/<projectName>/<name>
.name
- the camelCased name of the exchange; used for function namestitle
- short summarydescription
- longer descriptionschema
- the schema against which the payload must validate
A message will be sent by calling a method on the publisher named by the name
argument. The arguments to that method are passed unchanged to
messageBuilder
, routingKeyBuilder
, and CCBuilder
.
The messageBuilder
returns the message payload. This payload is subsequently
validated against the schema.
The routingKeyBuilder
returns the message's routing key either as an object
containing routing key fields. That object is then used along with
routingKey
to construct a routing key. Each element in the routingKey
array generates at least one element in the routing key, based on the following
properties:
name
-- name of the field, drawn from the object returned byroutingKeyBuilder
summary
-- short summary of the fieldconstant
-- if set, the only allowable value for the fieldrequired
-- if false, the field is not required and will default to_
maxSize
-- maximum number of characters (or bytes, as this is ASCII) in the fieldmultipleWords
-- if true, the value can contain.
; this can occur only once in the routing key.
By convention, the first element of the routing key is always the constant
primary
and the last element is a multi-word field named reserved
.
Finally, the CCBuilder
returns a list of strings giving CC'd routing keys for
this message. Queues bound with routing patterns matching a CC'd routing key
will receive the message even if they do not match the primary routing key.
By convention, these are prefixed with route.
.
References
The exchanges.reference()
method will return a reference document suitable
for publication under <rootUrl>/references
.
This is typically passed to taskcluster-lib-docs like this:
Docs.documenter({
references: [
{name: 'events', reference: exchanges.reference()},
], ..
})
Publishing
To publish messages, create a new pulse publisher:
const publisher = await exchanges.publisher({
rootUrl: cfg.taskcluster.rootUrl,
schemaset, // from taskcluster-lib-validate
client, // taskcluster-lib-pulse Client instance
sendDeadline: 12000,
});
Call the methods declared on the Exchanges instance; for example await
publisher.eggHatched({eggId, nestId, datehatched})
. The function's promise
will resolve when the AMQP server has confirmed receipt of the message.
The sendDeadline
option gives a time (in ms) after which a send operation
will not be retried. This value is typically chosen so that operations sending
messages (such as REST API handlers) can return an error instead of timing out.
Default is 12 seconds.
For compatibility with the deployment at taskcluster.net
, this function also
accepts parameters publish
and aws
, which control publishing the references
to an Amazon S3 bucket.
Fake Mode
If given a FakeClient
, the resulting publisher will not actually send messages.
Instead, it is an EventEmitter that will emit a message
event for every message
it sends, containing the processed exchange name, routing key, and so on:
publisher.on('message', msg => {
assume(msg).to.deeply.equal({
exchange: 'exchange/taskcluster-lib-pulse/v2/egg-hatched',
routingKey: 'badEgg',
payload: {eggId: 'badEgg'},
CCs: [],
});
});
await publisher.eggHatched({eggId: 'badEgg'});
Testing
To run the tests, a simple yarn test
will do. But it will skip most of the tests!
Better to run against a real RabbitAMQP server. If you have Docker, that's easy:
docker run -d -ti --rm -p 5672:5672 rabbitmq:alpine
export PULSE_CONNECTION_STRING=amqp://guest:guest@localhost:5672/