rabbit-communications
v0.13.0
Published
Configure two-way communication between microservices via RabbitMQ 📥 📤
Downloads
3
Maintainers
Readme
rabbit-communications
Configure two-way communication between microservices via RabbitMQ 📥 📤
Install
npm i rabbit-communications
Test
$ docker run -d -p 5672:5672 rabbitmq
$ export RABBIT_URL=amqp://guest:guest@localhost:5672
$ npm test
Basic concepts
This library provides several abstractions for communication between individual services via RabbitMQ. There are two main entities: service and communicator.
The scheme of their interaction is as follows:
As you can see, the Service and its interaction channels (arrows) are marked with the same color (green). This is because the Service is the main entity in the system, and the Communicator is only connected to its channels.
You can build any data pipelines and service architectures using Services and Communicators, there's a pair of examples:
For cases when you have a main application that interacts
with many services at the same time,
there is a CommunicationsManager in rabbit-communications
,
which manages pool of Communicators and provides helpful features
like outputListener's middleware, RabbitMQ connection sharing and other cool features.
Here is a diagram of the service architecture using the manager:
Usage example
Let's write a Service and Communicator that will exchange "ping-pong" messages and log it into the console.
service.js
const { Service } = require('rabbit-communications');
(async () => {
const service = new Service({
namespace: 'my-services', // namespace must be the same
name: 'example-1',
isInputEnabled: true,
isOutputEnabled: true,
shouldDiscardMessages: true,
rabbitOptions: {
url: 'amqp://guest:guest@localhost:5672',
},
});
service.addInputListener(async (ctx) => {
console.log(`Received message from Communicator: ${ctx.data.message}`);
// echo, will trigger Communicator's output listener
await ctx.reply({ message: 'pong' });
});
await service.start(); // returns promise
})()
communicator.js
const { Communicator } = require('rabbit-communications');
(async () => {
const communicator = new Communicator({
namespace: 'my-services', // namespace must be the same
targetServiceName: 'example-1',
isInputEnabled: true,
isOutputEnabled: true,
shouldDiscardMessages: true,
rabbitOptions: { // and the RabbitMQ configuration, obviously, should also be the same :)
url: 'amqp://guest:guest@localhost:5672',
},
});
communicator.addOutputListener((ctx) => {
console.log(`Received message from Service: ${ctx.data.message}`);
});
await communicator.start();
// this will trigger Service's input listener
await communicator.send({ message: 'ping' });
})();
In this example, the following happens:
- Service instance is created and started with input callback added (
service.addInputListener(fn)
call) - Communicator instance is created and started with service's output callback added (
communicator.addOutputListener(fn)
call) - Communicator sends "ping" message to service's input channel (
communicator.send(data)
call) - Service logs it and responds with "pong" message to it's output channel (input listener callback invocation)
- Communicator receives service's "pong" output and logs it (output listener callback invocation)
After writing these two simple applications, you need to start RabbitMQ, after which you need to start the applications themselves.
In this example, we will use Docker to start RabbitMQ:
$ docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:management
Service app launch:
$ node service.js
Communicator app launch:
$ node communicator.js
Now, when all the necessary things are running, in the service`s output you will see the following:
Service "example-1" successfully started
﹂RabbitMQ connection url: amqp://guest:guest@localhost:5672
﹂Input queue name: my-services:example-1:input
﹂Output queue name: my-services:example-1:output
Received message from Communicator: ping
And in the output of the communicator is:
Communicator for service "example-1" successfully started
﹂RabbitMQ connection url: amqp://guest:guest@localhost:5672
﹂Target service's input queue name: my-services:example-1:input
﹂Target service's output queue name: my-services:example-1:output
Received message from Service: pong
If you are interested in the queues topology in RabbitMQ, then you can go to the browser at http://localhost:15672 (RabbitMQ management board) with login "guest" and password "guest".
There you will see the exchange my-services
(this is the name of the namespace from the service and communicator configurations),
to which two queues are binded: my-services:example-1:input
and my-services:example-1:output
(these queues are generated automatically,
their names are in the application logs above)
API Reference
RabbitClient
const { RabbitClient } = require('rabbit-communications');
rabbit-communications
exports RabbitClient
class from rabbit-client npm package.
Documentation and usage examples can be found on the it's npm page.
You can pass RabbitClient instance to Service, Communicator and CommunicationsManager constructors, if you don't, RabbitClient will be created under the hood (configured from rabbitOptions)
Service
const { Service } = require('rabbit-communications');
- constructor(settings)
- .addInputListener(fn)
- .addAskListener(subject, fn)
- .send(data, metadata = {})
- .start()
constructor(settings)
Create Service instance.
const service1 = new Service({
namespace: 'my-namespace',
name: 'my-service-1',
isOutputEnabled: true,
isInputEnabled: true,
shouldDiscardMessages: false,
metadata: {
foo: 'bar',
},
rabbitOptions: {
url: 'amqp://guest:guest@localhost:5672',
},
});
// or
const rabbitClient = new RabbitClient('amqp://guest:guest@localhost:5672', {
appName: 'my-rabbit-client',
disableLogging: true,
json: true,
});
const service2 = new Service({
namespace: 'my-namespace',
name: 'my-service-2',
isOutputEnabled: true,
isInputEnabled: true,
shouldDiscardMessages: false,
metadata: {
foo: 'bar',
},
rabbitClient, // RabbitClient instance is passed instead of rabbitOptions
});
Settings description:
- namespace - the name of the service group used
to distinguish them based on their part of your system,
for example,
namespace "shop" -> service "accounts"
andnamespace "social" -> service "accounts"
- name - service name used to connect Сommunicators
- isOutputEnabled - whether the service should send messages to Communicator
- isInputEnabled - whether the service should receive messages from the Communicator
- shouldDiscardMessages - whether the service should delete messages instead of returning them back to the input queue if an error occurred during its processing
- metadata - object, that would be sent with every output message
and could be accessed via
ctx.metadata
in listener - metadata - object, that would be sent with every service input message
and could be accessed via
ctx.metadata
in listener - rabbitOptions - settings for connecting to RabbitMQ (used if rabbitClient was not passed to the constructor)
- rabbitClient - RabbitClient instance (if rabbitClient is passed, rabbitOptions are ignored)
.addInputListener(fn)
Add callback to messages from input queue.
If you passed isInputEnabled: true
to the Service constructor,
you must add input listener before service.start()
is called.
service.addInputListener((ctx) => {
// your awesome input handler goes here..
})
.addAskListener(subject, fn)
Add ask callback
For this to work you need to enable both input and output channels
service.addAskListener('echo', async (ctx) => {
// your awesome ask handler goes here, for example:
await ctx.reply(ctx.data);
});
.send(data, metadata = {})
Send message to output queue.
await service.send({ foo: 'bar' });
.start()
Start service (input and output queues and channels are created).
await service.start();
Communicator
const { Communicator } = require('rabbit-communications');
- constructor(settings)
- .addOutputListener(fn)
- .send(data, metadata = {})
- .ask(subject, data, metadata = {})
- .start()
constructor(settings)
Create Communicator instance.
const communicator1 = new Communicator({
namespace: 'my-namespace',
targetServiceName: 'my-service-1',
useAsk: false,
askTimeout: 5e3,
isOutputEnabled: true,
isInputEnabled: true,
shouldDiscardMessages: false,
metadata: {
foo: 'bar',
},
rabbitOptions: {
url: 'amqp://guest:guest@localhost:5672',
},
});
// or
const rabbitClient = new RabbitClient('amqp://guest:guest@localhost:5672', {
appName: 'my-rabbit-client',
disableLogging: true,
json: true,
});
const communicator2 = new Communicator({
namespace: 'my-namespace',
targetServiceName: 'my-service-1',
useAsk: false,
askTimeout: 5e3,
isOutputEnabled: true,
isInputEnabled: true,
shouldDiscardMessages: false,
metadata: {
foo: 'bar',
},
rabbitClient,
});
Settings description:
- namespace - the name of the service group used
to distinguish them based on their part of your system,
for example,
namespace "shop" -> service "accounts"
andnamespace "social" -> service "accounts"
- targetServiceName - name of the service to which communicator will be connected
- useAsk - set it to true if you want to use ask method, this will enable both input and output channels automatically
- askTimeout - the number of milliseconds for which the service will have to respond when using the ask method
- isOutputEnabled - whether the communicator should listen service's output queue
- isInputEnabled - will the communicator send messages to service's input queue
- shouldDiscardMessages - whether the communicator should delete messages instead of returning them back to the service's output queue if an error occurred during its processing
- rabbitOptions - settings for connecting to RabbitMQ (used if rabbitClient was not passed to the constructor)
- rabbitClient - RabbitClient instance (if rabbitClient is passed, rabbitOptions are ignored)
.addOutputListener(fn)
Add callback to messages from service's output queue.
If you passed isOutputEnabled: true
to the Communicator constructor,
you must add service output listener before communicator.start()
is called.
service.addOutputListener((ctx) => {
// your awesome service output handler goes here..
});
.send(data, metadata = {})
Send message to service's input queue.
await service.send({ foo: 'bar' });
.ask(subject, data, metadata = {})
Ask
service (receive response from
service's .addAskListener(subject, fn) callback)
const { data, metadata } = await communicator.ask('ping', { foo: 'bar' });
.start()
Start communicator (connect to the target service input and output channels).
await communicator.start();
CommunicationsManager
const { CommunicationsManager } = require('rabbit-communications');
- constructor(settings)
- .registerCommunicator(targetServiceName, communicatorOptions, outputListener)
- .send(targetServiceName, data, metadata = {})
- .ask(targetServiceName, subject, data, metadata = {})
- .broadcast(data, metadata = {})
- .applyMiddleware(...args)
- .addOutputListener(targetServiceName, fn)
- .start()
constructor(settings)
Create CommunicationsManager instance.
const manager1 = new CommunicationsManager({
namespace: 'my-namespace',
rabbitOptions: {
url: 'amqp://guest:guest@localhost:5672',
},
});
// or
const rabbitClient = new RabbitClient('amqp://guest:guest@localhost:5672', {
appName: 'my-rabbit-client',
disableLogging: true,
json: true,
});
const manager2 = new CommunicationsManager({
namespace: 'my-namespace',
rabbitClient,
});
All manager’s communicators will use the same RabbitClient instance.
Settings description:
- namespace - namespace in which all communicators controlled by the manager will work
- rabbitOptions - settings for connecting to RabbitMQ (used if rabbitClient was not passed to the constructor)
- rabbitClient - RabbitClient instance (if rabbitClient is passed, rabbitOptions are ignored)
.registerCommunicator(targetServiceName, communicatorOptions, outputListener)
Create and configure communicator.
outputListener
argument is optional, you can add
listener method later using addOutputListener
or not add at all if you don't need
manager.registerCommunicator('my-service-1', {
isInputEnabled: true,
isOutputEnabled: false,
});
// or
manager.registerCommunicator(
'my-service-2',
{
isInputEnabled: false,
isOutputEnabled: true,
},
(ctx) => {
// your awesome service output handler goes here..
},
);
.send(targetServiceName, data, metadata = {})
Send message to specific service.
Communicator for targetServiceName
must be registered for this action
await manager.send('my-service-1', { foo: 'bar' });
.ask(targetServiceName, subject, data, metadata = {})
Ask
service (receive response from
service's .addAskListener(subject, fn) callback)
.broadcast(data, metadata = {})
Send message to all registered services.
await manager.broadcast({ foo: 'bar' });
.applyMiddleware(...args)
Apply async koa-like
middleware functions for outputListeners
.
There are several ways to use this method:
.applyMiddleware(func)
- single middleware for all listeners.applyMiddleware([func1, func2, func3])
- multiple middleware functions for all listeners.applyMiddleware(targetServiceName, func)
- single middleware for specific listener.applyMiddleware(targetServiceName, [func1, func2, func3])
- multiple middleware functions for specific listener.applyMiddleware([name1, name2], func)
- single middleware for several specific listeners.applyMiddleware([name1, name2], [func1, func2, func3])
- multiple middleware functions for several specific listeners
manager.applyMiddleware(async (ctx, next) => {
console.time('Output listener execution time');
await next(); // wait for all middleware chain to execute
console.timeEnd('Output listener execution time');
});
manager.addOutputListener('my-service-1', async (ctx) => {
await new Promise(resolve => setTimeout(resolve, 1500));
});
.addOutputListener(targetServiceName, fn)
Add output listener for specific registered communicator.
manager.addOutputListener('my-service-1', (ctx) => {
// your awesome service output handler goes here..
});
.start()
Start manager and all registered communicators.
await manager.start();
Coming soon
- Allow to pass custom input/output processing function (not just default JSON.parse/JSON.stringify)
- Add communicator.ask(type, data, metadata) method mapping
service's output messages with input messages.
For example,
authCommunicator.ask('login', { token: 'pih8gw1a32' })
- Add JSDoc or TS-typings
License
MIT.