npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

rmqem

v3.0.4

Published

Paquete

Downloads

17

Readme

VERSION 3.0.2

Se corrigió un problema con el metodo emitAndWait, en le que se publicaba un mensaje a un exchange sin que existiera una cola enlazada a este, esto ocasionaba la perdida de dicho mensaje. El problema se solucionó asegurando que la cola y el exchange de la respuesta de emitAndWait existan y en su defecto crearlos, de manera que ningun mensaje se pierda.

La razon de que se publicara un mensaje si que existiera una cola enlazada al exchange, fue debido a que tanto la creación del exchange como la de la cola ocurrián en procesos independientes, de manera que en ocasiones funcionaba y en otras, evidentemente, no.

Aqui debajo muestro el cambio realizado en el archivo EventManager.class.ts de la linea 129 a la 145:


      //se crea el canal y se obtiene le nombre de la cola de respuesta (esta cola es temporal). 
      let channel = await adapter.createChannel(this.options.url)
      let queueName = `${this.options.application}::${replyTo}`;
      
      //Se crea el exchange si no existe
      await adapter.createExchange(channel, replyTo, this.options.alternateExchangeName, {
        autoDelete: false,
        durable: true,
        exchange_type: options && options.exchange_type ? options.exchange_type : 'fanout'
      });

      //se crea la cola si no existe y se relaciona al exchange...
      await adapter.createQueue(channel, queueName, replyTo, {
        messageTtl: listenerOptions && listenerOptions.ttl ? listenerOptions.ttl : this.options.ttl,
        deadLetterExchange: listenerOptions && listenerOptions.dlx ?  listenerOptions.dlx : this.options.deadLetterExchangeName,
        //deadLetterRoutingKey: options ? options.dlx_key ? options.dlx_key : this.options.deadLetterRoutingKey : this.options.deadLetterRoutingKey,
        arguments: {
          'x-dead-letter-exchange': listenerOptions && listenerOptions.dlx ? listenerOptions.dlx : this.options.deadLetterExchangeName, 
          'x-dead-letter-routing-key': queueName,
        },
      }); 

      //Con esto aseguramos que existe una cola y un exchange antes de publicar el mensaje. s

VERSION 2.0.0

This version support multiple exchange types (fanout, direct, topic)

For use this you should specify an option as following:

for on method: pass {binding_key:'value'} this automatically set exchange type to direct.

for emit and emitAndWait method: pass {exchange_type: 'direct'}

this object should be passed as the third parameter.

There are other fix it, we will make a pull request later, we upload this package to continue working on our company.

RabbitMQ Event Manager

Build Status Maintainability Test Coverage

A Node Event Manager using RabbitMQ to exchange events.

Exchanges and Queues are automatically created.

Here is a little schema :

RabbitMQ Schema

Install

npm install rabbitmq-event-manager

Or with Yarn

yarn add rabbitmq-event-manager

Basic Example

  • Initialize
import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({url:'amqp://localhost'}, appName:'CONSUMER');
myEventManager.initialize()
    .then(()=>{
        /** Do something after initialization */
    })
    .catch((err)=> {
        /** An error occured while initialization */
    });
  • Consumer
import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({url:'amqp://localhost'}, appName:'CONSUMER');
myEventManager.on('MY_EVENT_NAME', async (payload)=>{
    console.log(payload);
    return true;
});

The return statement at the end of the handler function, will tell RabbitMQ to "acknowledge" the message.

If you want to flush the message you can simply throw an exception ...

  • Producer
import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({ url: 'amqp://localhost', appName: 'PRODUCER_1' });

myEventManager.emit('MY_EVENT_NAME', payload);

Note: Since versio 1.1.0 the emit function return a promise that resolves with the payload effectively sent to RabbitMQ (ie: with _metas).

This will create the following elements in RabbitMQ :

  • An Exchange of type fanout named : MY_EVENT_NAME (the appName is not USED)
  • One Queues CONSUMER::MY_EVENT_NAME bound to the Exchange MY_EVENT_NAME

NOTE: :warning: A very good convention may be to prefix the name of the event with the emitter application name, for example : PRODUCER_1.MY_EVENT_NAME but it's not mandatory.

If a new Consumer is created and listen the same event :

import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({url:'amqp://localhost'}, appName:'OTHER_CONSUMER');
myEventManager.on('MY_EVENT_NAME', async (payload)=>{
    console.log(payload);
    return true;
});

It will add a queue OTHER_CONSUMER::MY_EVENT_NAME bound to the Exchange MY_EVENT_NAME.

Emit And Wait

This feature has been introduced in version 1.1.0, and allow you to emit an event and wait for a response (from another event, or from a generated event name).

import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({ url: 'amqp://localhost', appName: 'PRODUCER_1' });
const payload = { a: 42, b: 58 };
myEventManager.on('add', async eventPayload => {
  return { result: eventPayload.a + eventPayload.b };
});
const response = await myEventManager.emitAndWait('add', payload);

console.log(response.result); // 100

The above code, will generate a queue with a name : add.RESPONSE.$$GUID$$ where guid is the value of _metas.correlationId.

This queue should be deleted after event is received, if something wrong happened, the message may be flushed to the Dead letter queue.

Options

| Name | Type | Default | Description | | -------------------------- | --------------------- | ---------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | url | String | amqp://localhost | The connection URL of the RabbitMQ Server | | appName | String | - | The name of the application (used for naming exchanges and queues). | | metas | boolean or (function) | true | Weither or not to add _metas infirmations in the event, If a function this returned value, will become the _metas object (see ) | | alternateExchangeName | String | DEAD_LETTER_EXCHANGE | The name of the dead letter exchange you would like to use, (:warning: remember this must be the same value for producer and consumers) | | alternateQueueName | String | DEAD_LETTER_QUEUE | The name of the dead letter queue (bound to the dead letter exhange) you would like to use, (:warning: remember this must be the same value for producer and consumers) | | deadLetterExchangeName | String | NO_QUEUE_EXCHANGE | The name of the alternate exchange you would like to use, (:warning: remember this must be the same value for producer and consumers) | | deadLetterQueueName | String | QUEUE_NO_QUEUE | The name of the alternate exchange queue you would like to use, (:warning: remember this must be the same value for producer and consumers) | | ttl | Number | 86400000 (24h) | The default TTL before flushing event to the Dead Letter Echange | | maxNumberOfMessagesRetries | Numbner | 100 | The number of tries the consumer will treat one specific message, before flushing it to the dead letter exhange. | | logPrefix | string | [RABBITMQ] | The text that will be printed before the error log | | logLevel | string | error | The log Level (see winston logLevels) | | logTransportMode | string | console | Mute (no log), or output to console. Possible values are ("console" or "mute") | | emitAndWaitTimeout | number | 30000 (30s) | Define the maximum time to wait for a event | | defaultResponseSuffix | string | .RESPONSE | The suffix to add to response event name when waiting for a response |

Metas Informations

By defaut, some metas data are added to the payload :

  • guid : A unique id generated, to be able to debug for example, or for following the event.
  • timestamp : A number of milliseconds elapsed since January 1, 1970 00:00:00 UTC. (Date.now())
  • name : A string which is the name of the emitted event.
  • applicationName: The value of the application which emits the Event.
  • correlationId: (optional) a unique ID (guid) to be used when waiting for a response when using emitAndWait
  • replyTo: (optional) the event to reply to when waiting for a response when using emitAndWait

So if your payload is :

{
  userId: 42;
}

With Metas data it will be :

{
    _metas:{
        guid: '465e008c-d37f-4e31-b494-023e6d187946',
        name: 'MY_EVENT_NAME',
        timestamp: 1519211809934,
        applicationName: 'PRODUCER_1'
    },
    userId:42
}

You can remove metas informations by settings the option value "metas" to false.

You can also override the metas generation by giving a function as metas options value (on the emitter side only, as the event is generated there).

With no metas

import EventManager from 'rabbitmq-event-manager';
const myEventManagerWithNoMetas = new EventManager({
  url: 'amqp://localhost',
  appName: 'PRODUCER_1',
  metas: false,
});
const payload = { userId: 42 };
myEventManagerWithNoMetas.emit('MY_EVENT_NAME', payload);
// Payload will be
// {
//    userId:42
// }

Override Metas

import EventManager from 'rabbitmq-event-manager';
const myEventManagerOverrideMetas = new EventManager({
  url: 'amqp://localhost',
  appName: 'PRODUCER_1',
  metas: sourceMetas => {
    // sourceMetas contains the default metaa
    return {
      ...sourceMetas,
      otherProperty: 'MyValue',
    };
  },
});
const payload = { userId: 42 };
myEventManagerOverrideMetas.emit('MY_EVENT_NAME', payload);
// Payload will be
// {
//    _metas: {
//        guid : '465e008c-d37f-4e31-b494-023e6d187947'
//        name: 'MY_EVENT_NAME',
//        timestamp: 1519211809934,
//        otherProperty:'MyValue'
//    }
//    userId:42
// }

Add metas per events

Since version 1.1.0 you can add (or override) the _metas property by setting it in the event paylaod :

import EventManager from 'rabbitmq-event-manager';
const myEventManagerOverrideMetas = new EventManager({
  url: 'amqp://localhost',
  appName: 'PRODUCER_1',
});
const payload = { _metas: { newKey: 'value' }, userId: 42 };
myEventManagerOverrideMetas.emit('MY_EVENT_NAME', payload);
// Payload will be
// {
//    _metas: {
//        guid : '465e008c-d37f-4e31-b494-023e6d187947'
//        name: 'MY_EVENT_NAME',
//        timestamp: 1519211809934,
//        newKey:'value'
//    }
//    userId:42
// }

This will be added only for this emit.

DEAD LETTER QUEUE

From the RabbitMQ documenation, the Dead Letter Exchange is a RabbitMQ Exchange, that is attached to a queue. And message in that queue can be "dead-lettered" if the queue reach its length limit, or, if the messages has expired (TTL).

By default, The Exchange DEAD_LETTER_EXCHANGE (and its bound queue DEAD_LETTER_QUEUE) is automatically created and attached to all queues.

The names of the queue and the exchange can be changed by setting the options properties.

See <Acknowledge / N-Acknowledge> to see how to send an event to the Dead Letter Exchange

:warning: Be carefull, if you change the names of the DEAD_LETTER_EXCHANGE and the DEAD_LETTER_QUEUE, remember that you will have to do it for all producers and all consumers, as they will all use the same RabbitMQ server.

QUEUE NO QUEUE

When an Event Exchange is created, an exchange NO_QUEUE_EXCHANGE is created and a queue named QUEUE_NO_QUEUE is created and bound to it.

When an event is emitted to the Event Exchange if the exchange has no queue bounded to it, all the messages are routed to the NO_QUEUE_EXCHANGE

The names of the queue and the exchange can be changed by setting the options properties.

:warning: Be carefull, if you change the names of the DEAD_LETTER_EXCHANGE and the DEAD_LETTER_QUEUE, remember that you will have to do it for all producers and all consumers, as they will all use the same RabbitMQ server.

Acknowledge / N-Acknowledge

The return statement at the end of the handler function, will tell RabbitMQ to "acknowledge" the message.

You can "negatively acknowledge" and Requeue the message by returning false (rejecting the Promise).

If you don't want to requeue the message you can simply throw an exception ...

Acknowledge the message

import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({url:'amqp://localhost'}, appName:'OTHER_CONSUMER');
myEventManager.on('MY_EVENT_NAME', async (payload)=>{
    return {payload}; // the message will be acknowledge
    // or even
    // return;
    // or nothing
});

After the message is acknowledged, it will be removed from the queue and deleted.

Flush the message

import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({url:'amqp://localhost'}, appName:'OTHER_CONSUMER');
myEventManager.on('MY_EVENT_NAME', async (payload)=>{
    throw new Error('This will flush the message to DEAD LETTER QUEUE')
});

After the message is negatively acknowledged, it will be send to the Dead Letter Exhange, so in the queue DEAD_LETTER_QUEUE.

NOTES :

Should we integrate the application name in the event name

In a world of events an event is fired, and some listeners will listen some events. So with events sent by "Application", if we have an Application UserAdminApp which will send the event 'USER_CREATED', and we have an other application (WelcomeEmail) (or service) wanted to send on email to new users ... So let's define that WelcomeEmail is listening USER_CREATED, it should knows that the event was fired by the UserAdminApp, but does we need to add the name of the application in event payload (_metas), or in the event name.

On the WelcomeEmail side :

myEventManager.on('UserAdminApp.USER_CREATED', payload => {
  /* ... */
});

Or

myEventManager.on('USER_CREATED', payload => {
  /* 
    payload._metas.application = UserAdminApp
*/
});

If we consider RabbitMQ it means that the Exchange name will be UserAdminApp.USER_CREATED or USER_CREATED, so listening queues will be bound to the exchange.

Regarding this, I really think that the event should be USER_CREATED without any consideration of the application name, but as it is important to be able to know which application fires wich event, we may add the application name in the _metas information of the event's payload;

Alternate Exchange notes

  • If the "Alternate Exchange" was not created first it's not a problem, as it's configured, the only thing is that if one message is sent to the exchange 'My*EVENT', and the *"Alternate Exchange"_ does not exists (if no queues are bound to the exchange 'My_EVENT'), the message will be lost !

  • When we listen to an event :

import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({url:'amqp://localhost'}, appName:'CONSUMER');
myEventManager.on('MY_EVENT_NAME', async (payload)=>{
    console.log(payload);
    return true;
});

It will automatically create an exchange of name MY_EVENT_NAME, and a Queue : CONSUMER::MY_EVENT_NAME bound to that exchange.

The Queue CONSUMER::MY_EVENT_NAME is configured with the DEAD LETTER EXHANGE, even if that exchange does not exists yet. It means that if a MY_EVENT_NAME is emmited, and the "listener" mark the event to be flush (dead lettered), the message will be lost (as no DEAD LETTER EXHANGE is define, so ne queue are bound to it...).

:warning: In RabbitMQ only queue store messages, not exchangesso it's important that you initialize your rabbitMQ instance with the values of alternateExchangeName, alternateQueueName, deadLetterExchangeName, and deadLetterQueueName

Requeue with delay

It could be very intersting to "negatively acknowledge" a message and ask to be requeue after a delay, but this will be (if can) in version 2 !