@alibrate/amqp-client
v1.0.2
Published
Simple connector for amqp protocol
Readme
AMQP client library
High level amqp API written on top of amqplib
Install
npm installuse
const pubsub = require('amqp-client').pubsub;
Publish and Suscribe
Set of functions that implement Publish and Suscribe pattern.
Publishing
The Publisher side looks like this:
const pubsub = require('amqp-client').pubsub;
const msg = {foo: 'bar'};
pubsub.startExchange({url: amqpServerURL, name: 'myexchange'}).then(function (ch) {
pubsub.publish(ch, 'myfilter', msg);
});
pubsub.startExchange() takes an object with the following properties
url: amqp server url. Default:amqp://localhost; often config.amqp.url or context.config.amqp.urlname: Name for your exchange, an exchange is like a gateway, so be sure consumers & producers share the same exchange nametype: Exchange type. Can be: 'direct', 'topic', 'headers', 'fanout'. See amqplib documentation for more information about all the types. Default:directoptions: Options object for internalassertExchangemethod. See amqplib documentation.
pubsub.publish()
ch: the channel returned from startExchange that is used to dispatch messages'myfilter': the string filter used for sending messages, consumers must use the same filter to receivemsg: the message to pass along to consumers
Suscribers
The Consumer side looks like this:
const pubsub = require('amqp-client').pubsub;
function onMsg(message){
console.log("Received message:", message);
let publisherMessage = JSON.parse(message.content);
console.log("Received content from publisher:", publisherMessage);
}
pubsub.startExchange({url: amqpServerURL, name: 'myexchange'}).then(function (ch) {
pubsub.consume(ch, 'optional_group', ['myfilter', 'otherFilter'], onMsg);
});The callback parameter is not the message that was sent from the publisher. Rather it is the actual AMQP message that contains both the stringified version of the message that the producer sent and a whole lot of other information. The actual message sent by the producer is contained in message.content
The consumer open the same exchange myexchange using the same promise startExchange() and then listen for new messages with specific filters.
That meeans all the messages sended to myexchange with that filters will be received for this consumer.
Note the optional_group parameter. This parameter is used to balance messages between consumers that are using the same group name. That means only one consumer in the same group will receive the message each time like round-robin algorithm.
If you use empty string '' as group name then the system will create a random name for you.
