amqp-message-bus
v2.1.0
Published
Node.js message bus interface for AMQP servers, such as RabbitMQ.
Downloads
5
Readme
AMQP Message Bus
Node.js message bus interface for AMQP servers, such as RabbitMQ.
Features
- Hides the complexity of AMQP client;
- Comes with build-in symmetric message encryption;
- Supports promises + async/await.
Installation
$ npm install amqp-message-bus
Requirements
- Node.js v.7+
Quick start
Create new message bus.
const MessageBus = require('amqp-message-bus');
const bus = new MessageBus({
url: 'amqp://localhost',
encryptionKey: 'keep-it-safe'
});
Connect to AMQP server and subscribe to queue for messages.
await bus.connect();
const unsubscribe = await bus.subscribe('my_queue', (msg, props, done) => {
// process msg + props
console.log(`Received message ${props.messageId} with priority ${props.priority}, published on ${props.timestamp}`);
// call done when ready to remove message from rabbitmq
done();
});
// unsubscribe from queue
await unsubscribe();
// disconnect from bus
await bus.disconnect();
Connect to AMQP server, create queue (if not exists) and send message.
await bus.connect();
await bus.assertQueue('my_queue');
await bus.sendToQueue('my_queue', { foo: 1, bar: 2 });
await bus.disconnect();
Connect to AMQP server, create topic exchange and publish message to route.
await bus.connect();
await bus.assertExchange('my_exchange', 'topic'); // note the 2nd argument (i.e. "topic") used to create a topic exchange
await bus.assertQueue('my_queue');
await bus.bindQueue('my_queue', 'my_exchange', 'route.1'); // note the 3rd argument (i.e. route.1)
await bus.publish('my_exchange', 'route.1', { foo: 1, bar: 2 }); // note the 3rd argument (i.e. route.1)
await bus.disconnect();
API Docs
#constructor(spec) -> MessageBus
Constructs new message bus with the supplied properties.
Arguments
- props (Object) message bus properties (required).
- props.url (string) AMQP server URL (required).
- props.encryptionKey (string) encryption key to use with symmetric encryption (optional).
Example
const bus = new MessageBus({
url: 'amqp://localhost',
encryptionKey: 'keep-it-safe'
});
#connect() -> Promise
Connects to AMQP server.
Returns
Promise
Example
bus.connect()
.then(() => {
console.log('Connected to rabbitmq');
})
.catch((err) => {
console.error(err);
});
#disconnect() -> Promise
Disconnects from AMQP server.
Returns
Promise
Example
bus.disconnect()
.then(() => {
console.log('Disconnected from rabbitmq');
})
.catch((err) => {
console.error(err);
});
#subscribe(queue, listener)
Subscribes to the designated queue for incoming messages.
Arguments
- queue (string) the name of the queue to subscribe to
- listener (Function) listener function, i.e.
function(msg, props, done)
(required).- msg (Object) message body (required).
- props (Object) message meta-data (required).
- done (Function) call done to signal message proccessing is done (required).
Please visit http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue for further info on props
meta-data.
Returns
Promise<Function>
The function returned is the unsubscribe()
method.
Example
const listener = (msg, props, done) => {
// do something with msg and props
console.log(JSON.stringify(msg, null, 2));
// call done when you are done with msg to remove from queue
done();
};
bus.subscribe(listener)
.then((unsubscribe) => {
// unsubscribe when ready
return unsubscribe();
})
.catch((err) => {
console.error(err);
});
Example using async/await
const unsubscribe = await bus.subscribe((msg, props, done) => {
// do something with msg and props
console.log(JSON.stringify(msg, null, 2));
// call done when you are done with msg to remove from queue
done();
});
// unsubscribe when ready
await unsubscribe();
#sendToQueue(queue, message, props)
Sends the supplied message to the designated queue.
Arguments
- queue (string) the name of the queue to send message to (required)
- message (*) message body; can be any JSON serializable value (required)
- props (Object) message props (optional).
- props.id (string) message ID (optional; defaults to
UUID v4
) - props.priority (integer) message priority, must be between 1 and 10 (optional; defaults to 1)
- props.timestamp (number) message timestamp (optional; defaults to
Date.now()
) - props.type (string) message type (optional)
Returns
Promise
Example
bus.sendToQueue('my_queue', {
foo: 'bar'
}, {
type: 'nonsense',
priority: 10
})
.catch((err) => {
console.error(err);
});
Example using async/await
await bus.sendToQueue({
foo: 'bar'
}, {
type: 'nonsense',
priority: 10
});
#publish(exchange, routingKey, message, props)
Publishes the supplied message to the designated exchange.
Arguments
- exchange (string) the name of the exchange to publish message to (required)
- routingKey (string) the routing key to publish message to (required)
- message (*) message body; can be any JSON serializable value (required)
- props (Object) message props (optional).
- props.id (string) message ID (optional; defaults to
UUID v4
) - props.priority (integer) message priority, must be between 1 and 10 (optional; defaults to 1)
- props.timestamp (number) message timestamp (optional; defaults to
Date.now()
) - props.type (string) message type (optional)
Returns
Promise
Example
bus.publish('my_exchange', 'route.1', {
foo: 'bar'
}, {
type: 'nonsense',
priority: 10
})
.catch((err) => {
console.error(err);
});
Example using async/await
await bus.publish('my_exchange', 'route.1', {
foo: 'bar'
}, {
type: 'nonsense',
priority: 10
});
#unsubscribe(consumerTag)
Unsubscribes the designated consumer.
Arguments
- consumerTag (string) the ID of the consumer to unsubscribe from queue (required)
Returns
Promise
Example
bus.unsubscribe('consumer-123')
.catch((err) => {
console.error(err);
});
#unsubscribeAll()
Unsubscribes all message bus consumers.
Returns
Promise
Example
bus.unsubscribeAll()
.catch((err) => {
console.error(err);
});
Contribute
Source code contributions are most welcome. The following rules apply:
- Follow the Airbnb Style Guide;
- Make sure not to break the tests.