npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

rabbit-communications

v0.13.0

Published

Configure two-way communication between microservices via RabbitMQ 📥 📤

Downloads

3

Readme

rabbit-communications

Configure two-way communication between microservices via RabbitMQ 📥 📤

Install

npm i rabbit-communications

Test

See test files

$ docker run -d -p 5672:5672 rabbitmq
$ export RABBIT_URL=amqp://guest:guest@localhost:5672
$ npm test

Basic concepts

This library provides several abstractions for communication between individual services via RabbitMQ. There are two main entities: service and communicator.

The scheme of their interaction is as follows:

Service-Communicator basic scheme

As you can see, the Service and its interaction channels (arrows) are marked with the same color (green). This is because the Service is the main entity in the system, and the Communicator is only connected to its channels.


You can build any data pipelines and service architectures using Services and Communicators, there's a pair of examples:

Architecture example 1 Architecture example 2


For cases when you have a main application that interacts with many services at the same time, there is a CommunicationsManager in rabbit-communications, which manages pool of Communicators and provides helpful features like outputListener's middleware, RabbitMQ connection sharing and other cool features.

Here is a diagram of the service architecture using the manager:

CommunicationsManager example

Usage example

Let's write a Service and Communicator that will exchange "ping-pong" messages and log it into the console.

service.js

const { Service } = require('rabbit-communications');

(async () => {
  const service = new Service({
    namespace: 'my-services', // namespace must be the same
    name: 'example-1',
    isInputEnabled: true,
    isOutputEnabled: true,
    shouldDiscardMessages: true,
    rabbitOptions: {
      url: 'amqp://guest:guest@localhost:5672',
    },
  });
  
  service.addInputListener(async (ctx) => {
    console.log(`Received message from Communicator: ${ctx.data.message}`);
    
    // echo, will trigger Communicator's output listener
    await ctx.reply({ message: 'pong' });
  });
  
  await service.start(); // returns promise
})()

communicator.js

const { Communicator } = require('rabbit-communications');

(async () => {
  const communicator = new Communicator({
    namespace: 'my-services', // namespace must be the same
    targetServiceName: 'example-1',
    isInputEnabled: true,
    isOutputEnabled: true,
    shouldDiscardMessages: true,
    rabbitOptions: { // and the RabbitMQ configuration, obviously, should also be the same :)
      url: 'amqp://guest:guest@localhost:5672',
    },
  });
  
  communicator.addOutputListener((ctx) => {
    console.log(`Received message from Service: ${ctx.data.message}`);
  });
  
  await communicator.start();
  
  // this will trigger Service's input listener
  await communicator.send({ message: 'ping' });
})();

In this example, the following happens:

  1. Service instance is created and started with input callback added (service.addInputListener(fn) call)
  2. Communicator instance is created and started with service's output callback added (communicator.addOutputListener(fn) call)
  3. Communicator sends "ping" message to service's input channel (communicator.send(data) call)
  4. Service logs it and responds with "pong" message to it's output channel (input listener callback invocation)
  5. Communicator receives service's "pong" output and logs it (output listener callback invocation)

After writing these two simple applications, you need to start RabbitMQ, after which you need to start the applications themselves.

In this example, we will use Docker to start RabbitMQ:

$ docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:management

Service app launch:

$ node service.js

Communicator app launch:

$ node communicator.js

Now, when all the necessary things are running, in the service`s output you will see the following:

Service "example-1" successfully started
﹂RabbitMQ connection url: amqp://guest:guest@localhost:5672
﹂Input queue name: my-services:example-1:input
﹂Output queue name: my-services:example-1:output

Received message from Communicator: ping

And in the output of the communicator is:

Communicator for service "example-1" successfully started
﹂RabbitMQ connection url: amqp://guest:guest@localhost:5672
﹂Target service's input queue name: my-services:example-1:input
﹂Target service's output queue name: my-services:example-1:output

Received message from Service: pong

If you are interested in the queues topology in RabbitMQ, then you can go to the browser at http://localhost:15672 (RabbitMQ management board) with login "guest" and password "guest".

There you will see the exchange my-services (this is the name of the namespace from the service and communicator configurations), to which two queues are binded: my-services:example-1:input and my-services:example-1:output (these queues are generated automatically, their names are in the application logs above)

API Reference


RabbitClient

const { RabbitClient } = require('rabbit-communications');

rabbit-communications exports RabbitClient class from rabbit-client npm package. Documentation and usage examples can be found on the it's npm page.

You can pass RabbitClient instance to Service, Communicator and CommunicationsManager constructors, if you don't, RabbitClient will be created under the hood (configured from rabbitOptions)


Service

const { Service } = require('rabbit-communications');

constructor(settings)

Create Service instance.

const service1 = new Service({
  namespace: 'my-namespace',
  name: 'my-service-1',
  isOutputEnabled: true,
  isInputEnabled: true,
  shouldDiscardMessages: false,
  metadata: {
    foo: 'bar',
  },
  rabbitOptions: {
    url: 'amqp://guest:guest@localhost:5672',
  },
});

// or

const rabbitClient = new RabbitClient('amqp://guest:guest@localhost:5672', {
  appName: 'my-rabbit-client',
  disableLogging: true,
  json: true,
});

const service2 = new Service({
  namespace: 'my-namespace',
  name: 'my-service-2',
  isOutputEnabled: true,
  isInputEnabled: true,
  shouldDiscardMessages: false,
  metadata: {
    foo: 'bar',
  },
  rabbitClient, // RabbitClient instance is passed instead of rabbitOptions
});
Settings description:
  • namespace - the name of the service group used to distinguish them based on their part of your system, for example, namespace "shop" -> service "accounts" and namespace "social" -> service "accounts"
  • name - service name used to connect Сommunicators
  • isOutputEnabled - whether the service should send messages to Communicator
  • isInputEnabled - whether the service should receive messages from the Communicator
  • shouldDiscardMessages - whether the service should delete messages instead of returning them back to the input queue if an error occurred during its processing
  • metadata - object, that would be sent with every output message and could be accessed via ctx.metadata in listener
  • metadata - object, that would be sent with every service input message and could be accessed via ctx.metadata in listener
  • rabbitOptions - settings for connecting to RabbitMQ (used if rabbitClient was not passed to the constructor)
  • rabbitClient - RabbitClient instance (if rabbitClient is passed, rabbitOptions are ignored)

.addInputListener(fn)

Add callback to messages from input queue.

If you passed isInputEnabled: true to the Service constructor, you must add input listener before service.start() is called.

service.addInputListener((ctx) => {
  // your awesome input handler goes here..
})

.addAskListener(subject, fn)

Add ask callback

For this to work you need to enable both input and output channels

service.addAskListener('echo', async (ctx) => {
  // your awesome ask handler goes here, for example:
  
  await ctx.reply(ctx.data);
});

.send(data, metadata = {})

Send message to output queue.

await service.send({ foo: 'bar' });

.start()

Start service (input and output queues and channels are created).

await service.start();

Communicator

const { Communicator } = require('rabbit-communications');

constructor(settings)

Create Communicator instance.

const communicator1 = new Communicator({
  namespace: 'my-namespace',
  targetServiceName: 'my-service-1',
  useAsk: false,
  askTimeout: 5e3,
  isOutputEnabled: true,
  isInputEnabled: true,
  shouldDiscardMessages: false,
  metadata: {
    foo: 'bar',
  },
  rabbitOptions: {
    url: 'amqp://guest:guest@localhost:5672',
  },
});

// or

const rabbitClient = new RabbitClient('amqp://guest:guest@localhost:5672', {
  appName: 'my-rabbit-client',
  disableLogging: true,
  json: true,
});

const communicator2 = new Communicator({
  namespace: 'my-namespace',
  targetServiceName: 'my-service-1',
  useAsk: false,
  askTimeout: 5e3,
  isOutputEnabled: true,
  isInputEnabled: true,
  shouldDiscardMessages: false,
  metadata: {
    foo: 'bar',
  },
  rabbitClient,
});
Settings description:
  • namespace - the name of the service group used to distinguish them based on their part of your system, for example, namespace "shop" -> service "accounts" and namespace "social" -> service "accounts"
  • targetServiceName - name of the service to which communicator will be connected
  • useAsk - set it to true if you want to use ask method, this will enable both input and output channels automatically
  • askTimeout - the number of milliseconds for which the service will have to respond when using the ask method
  • isOutputEnabled - whether the communicator should listen service's output queue
  • isInputEnabled - will the communicator send messages to service's input queue
  • shouldDiscardMessages - whether the communicator should delete messages instead of returning them back to the service's output queue if an error occurred during its processing
  • rabbitOptions - settings for connecting to RabbitMQ (used if rabbitClient was not passed to the constructor)
  • rabbitClient - RabbitClient instance (if rabbitClient is passed, rabbitOptions are ignored)

.addOutputListener(fn)

Add callback to messages from service's output queue.

If you passed isOutputEnabled: true to the Communicator constructor, you must add service output listener before communicator.start() is called.

service.addOutputListener((ctx) => {
  // your awesome service output handler goes here..
});

.send(data, metadata = {})

Send message to service's input queue.

await service.send({ foo: 'bar' });

.ask(subject, data, metadata = {})

Ask service (receive response from service's .addAskListener(subject, fn) callback)

const { data, metadata } = await communicator.ask('ping', { foo: 'bar' });

.start()

Start communicator (connect to the target service input and output channels).

await communicator.start();

CommunicationsManager

const { CommunicationsManager } = require('rabbit-communications');

constructor(settings)

Create CommunicationsManager instance.

const manager1 = new CommunicationsManager({
  namespace: 'my-namespace',
  rabbitOptions: {
    url: 'amqp://guest:guest@localhost:5672',
  },
});

// or

const rabbitClient = new RabbitClient('amqp://guest:guest@localhost:5672', {
  appName: 'my-rabbit-client',
  disableLogging: true,
  json: true,
});

const manager2 = new CommunicationsManager({
  namespace: 'my-namespace',
  rabbitClient,
});

All manager’s communicators will use the same RabbitClient instance.

Settings description:
  • namespace - namespace in which all communicators controlled by the manager will work
  • rabbitOptions - settings for connecting to RabbitMQ (used if rabbitClient was not passed to the constructor)
  • rabbitClient - RabbitClient instance (if rabbitClient is passed, rabbitOptions are ignored)

.registerCommunicator(targetServiceName, communicatorOptions, outputListener)

Create and configure communicator.

outputListener argument is optional, you can add listener method later using addOutputListener or not add at all if you don't need

manager.registerCommunicator('my-service-1', {
  isInputEnabled: true,
  isOutputEnabled: false,
});

// or

manager.registerCommunicator(
  'my-service-2',
  {
    isInputEnabled: false,
    isOutputEnabled: true,
  },
  (ctx) => {
    // your awesome service output handler goes here..
  },
);

.send(targetServiceName, data, metadata = {})

Send message to specific service.

Communicator for targetServiceName must be registered for this action

await manager.send('my-service-1', { foo: 'bar' });

.ask(targetServiceName, subject, data, metadata = {})

Ask service (receive response from service's .addAskListener(subject, fn) callback)

.broadcast(data, metadata = {})

Send message to all registered services.

await manager.broadcast({ foo: 'bar' });

.applyMiddleware(...args)

Apply async koa-like middleware functions for outputListeners.

There are several ways to use this method:

  • .applyMiddleware(func) - single middleware for all listeners
  • .applyMiddleware([func1, func2, func3]) - multiple middleware functions for all listeners
  • .applyMiddleware(targetServiceName, func) - single middleware for specific listener
  • .applyMiddleware(targetServiceName, [func1, func2, func3]) - multiple middleware functions for specific listener
  • .applyMiddleware([name1, name2], func) - single middleware for several specific listeners
  • .applyMiddleware([name1, name2], [func1, func2, func3]) - multiple middleware functions for several specific listeners
manager.applyMiddleware(async (ctx, next) => {
  console.time('Output listener execution time');
  
  await next(); // wait for all middleware chain to execute
  
  console.timeEnd('Output listener execution time');
});

manager.addOutputListener('my-service-1', async (ctx) => {
  await new Promise(resolve => setTimeout(resolve, 1500));
});

.addOutputListener(targetServiceName, fn)

Add output listener for specific registered communicator.

manager.addOutputListener('my-service-1', (ctx) => {
  // your awesome service output handler goes here..
});

.start()

Start manager and all registered communicators.

await manager.start();

Coming soon

  • Allow to pass custom input/output processing function (not just default JSON.parse/JSON.stringify)
  • Add communicator.ask(type, data, metadata) method mapping service's output messages with input messages. For example, authCommunicator.ask('login', { token: 'pih8gw1a32' })
  • Add JSDoc or TS-typings

License

MIT.