npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@goparrot/pubsub-event-bus

v5.0.0

Published

NestJS EventBus extension for RabbitMQ PubSub

Downloads

664

Readme

PubSub Event Bus

npm version Build Status

PubSub Event Bus is built on top of NestJS CQRS module.

It gives the ability to use NestJS Cqrs Module across microservice architecture, using RabbitMQ message broker.

Table of Contents

Installation

First install the required package:

npm install --save @goparrot/pubsub-event-bus

It is highly recommended installing peerDependencies by yourself.

Import module

Import module & configure it by providing the connection string.

import { CqrsModule } from "@goparrot/pubsub-event-bus";

export const connections: string[] = ["amqp://username:[email protected]/virtualhost"];

@Module({
    imports: [CqrsModule.forRoot({ connections })],
})
export class AppModule {}

Full list of the PubSub CQRS Module options:

| Options | Description | |----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | connections | Array of connection strings | | config | AMQP connection options | | isGlobal | Should the module be registered as global | | logger | Logger service to be used | | connectionName | Name of the connection to be displayed in the server logs and management UI. Final name will have a suffix :producer or :consumer depending on the connection purpose | | retryOptions | Global options for the retry mechanism. Read more in the Retry Mechanism section |

Note: The CqrsModule class should be imported from @goparrot/pubsub-event-bus library.

Usage

Create event

Event is a simple class with message payload.

export class StoreCreated implements IEvent {
    constructor(private readonly storeId: string) {
    }
}

This is a fully compatible event class that can be used with NestJS EventBus.

In order to make it PubSub ready, it should extend the AbstractPubsubEvent class and be decorated with PubsubEvent ( both imported from @goparrot/pubsub-event-bus).

import { AbstractPubsubEvent, PubsubEvent } from "@goparrot/pubsub-event-bus";

export interface IStoreCreatedPayload {
    storeId: string;
}

@PubsubEvent({ exchange: "store" })
export class StoreCreated extends AbstractPubsubEvent<IStoreCreatedPayload> {}

Publish event

Inject EventBus into the service in order to emit the event (imported from @goparrot/pubsub-event-bus).

import { EventBus } from "@goparrot/pubsub-event-bus";
import { Injectable } from "@nestjs/common";

@Injectable()
class SomeService {
    constructor(private readonly eventBus: EventBus) {
    }

    async doCoolStuff() {
        // create item

        await this.eventBus.publish(new StoreCreated({ storeId }));

        // return item
    }
}

Consuming events

Create event handler

Create a simple class which extends AbstractPubsubHandler and is decorated with PubsubEventHandler (both imported from @goparrot/pubsub-event-bus).

import { AbstractPubsubHandler, PubsubEventHandler } from "@goparrot/pubsub-event-bus";

@PubsubEventHandler(StoreCreated)
export class StoreCreatedHandler extends AbstractPubsubHandler<StoreCreated> {
    handle(event: StoreCreated) {
        console.log(`[${this.constructor.name}] ->`, event.payload);
    }
}

Notice, Unlike regular Cqrs events handlers, PubSub EventHandler uses its own decorator @PubsubEventHandler(StoreCreated)

@PubsubEventHandler decorator accepts a list of Events it is listening for, like:

@PubsubEventHandler(StoreCreated, UserCreated)

Implement required methods:

handle - central point where event payload will come

Register event handler

Register the event handler as provider:

@Module({
    providers: [StoreCreatedHandler],
})
export class AppModule {}

Once registered, event handler will start listening for incoming events.

Configuration

Event Configuration

In order to emit an event with extra headers, just call the withOptions({}) method and provide required configuration:

await this.eventBus.publish(
    new StoreCreated({ storeId: "storeId" }).withOptions({
        persistent: false,
        priority: 100,
        headers: ["..."],
    }),
);

Handler Configuration

PubsubEventHandler decorator accepts handler options as the last argument. List of available options

| Options | Description | |---------------------|--------------------------------------------------------------------------------------------------------------| | autoAck | Event acknowledge mode. Default ALWAYS_ACK. Read more in the Acknowledge Mode section | | queue | Custom queue name | | bindingQueueOptions | Queue binding options from the amqplib | | retryOptions | Handler specific retry options. Read more in the Retry Mechanism section |

Acknowledge Mode

By default, library creates queues without automatic delivery acknowledgement, therefore, messages should be acknowledged by the client. There are several acknowledge modes provided by the library:

ALWAYS_ACK (default)

Positive acknowledge in case of success or failure

ACK_AND_NACK

Automatic positive ack in case of success and automatic negative acknowledge in case of error

NEVER

Acknowledge should be performed manually. Message can be manually positively or negatively acknowledged using AbstractPubsubHandler.ack and AbstractPubsubHandler.nack methods respectively

AUTO_RETRY

Automatic positive ack in case of success and automatic retry attempt in case of error. Read more in the Retry Mechanism section

Retry Mechanism

PubSub Event Bus supports automatic event processing retries with static or dynamic backoff. It can be enabled by setting acknowledge mode to AUTO_RETRY. In case of any unhandled error library will publish the event to the delayed exchange to return it back the queue with a delay.

Retry mechanism can be configured both on module and handler levels. Handler specific options are merged with the module ones.

Available options:

| Options | Description | Default value | |------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------| | maxRetryAttempts | Maximum number of retry attempts | 3 | | delay | Delay between retry attempts in milliseconds. Can be a fixed positive number or a function that receives current retry attempt count and returns delay | Math.floor(1000 * Math.exp(retryCount - 1)) | | strategy | Retry strategy to be used. Read more in the Retry Strategies section | DEAD_LETTER_TTL |

When number of retry attempts is exceeded handler method onRetryAttemptsExceeded is called with the event and last error as arguments. Then message is discarded.

Example:

// app.module.ts

import { CqrsModule, RetryStrategyEnum } from "@goparrot/pubsub-event-bus";

export const connections: string[] = ["amqp://username:[email protected]/virtualhost"];

@Module({
    imports: [
        CqrsModule.forRoot({
            connections,
            retryOptions: {
                maxRetryAttempts: 5,
                delay: (retryCount: number) => retryCount * 1000,
                strategy: RetryStrategyEnum.DELAYED_MESSAGE_EXCHANGE,
            },
        }),
    ],
})
export class AppModule {}

// store-created.handler.ts

import { AbstractPubsubHandler, PubsubEventHandler, RetryStrategyEnum } from "@goparrot/pubsub-event-bus";

@PubsubEventHandler(StoreCreated, {
    autoAck: AutoAckEnum.AUTO_RETRY,
    retryOptions: {
        maxRetryAttempts: 10,
        delay: (retryCount: number) => retryCount ** 2 * 1000,
        strategy: RetryStrategyEnum.DEAD_LETTER_TTL,
    },
})
export class StoreCreatedHandler extends AbstractPubsubHandler<StoreCreated> {
    async handle(event: StoreCreated) {
        // process the event
    }

    async onRetryAttemptsExceeded(event: StoreCreated, error: Error) {
        // log the event processing failure
    }
}

Retry Strategies

This library provides two different strategies for retry mechanism implementation. The main differences are requirements and performance.

Dead Letter Message and Per-Message TTL Strategy

This strategy has no additional requirements and therefore is the default one.

Library creates several RabbitMQ components:

  • Waiting queues, one for each waiting time
  • Exchange to route messages to the corresponding waiting queue
  • Exchange to route messages back to source queue

Example:

There are two PubSub handlers:
The first one with static delay 1000 ms and 5 maximum retry attempts. Only one queue is required with waiting time 1000 ms.
The second one with delay function 1000*2^x ms and 3 maximum retry attempts. Several queues are required with waiting time 1000, 2000 and 4000 ms.

Therefore, library will create 3 queues with 1000, 2000 and 4000 ms waiting time. Queue with waiting time 1000 ms will be used for both handlers.

Delayed Message Exchange Strategy

This strategy requires RabbitMQ Delayed Message Plugin to be installed and enabled on the RabbitMQ server.

Library creates a delayed message exchange to route messages back to the source queue with a set delay.

Known Issues

Several handlers listening to the same event

Problem: When several handlers listen to the same event, each handler receives the same event several times (equal to number of listeners)

Workaround: You can create one "proxy" pub-sub event listener that will listen to the required event. Then there are two options available:

  • It will publish a local event with the same content. Then you can create as many event listeners to this local event as you need. Main disadvantage of this approach is that the pub-sub event is acknowledged in this "proxy" event listener. Therefore, if something goes wrong in the actual event listeners, the library won't handle the error.
  • It will execute all the required commands. Main disadvantage of this approach is that the pub-sub event acknowledgement is shared to all commands. Therefore, you should handle the double event processing in each command handler.

Enjoy!