npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@bgaldino/nestjs-rabbitmq

v1.2.2

Published

A different way of configuring your RabbitMQ

Downloads

719

Readme

@bgaldino/nestjs-rabbitmq

Table of Contents

Description

This module features an opinionated way of configuring the RabbitMQ connection by using a configuration file that describes the behaviour of all publishers and consumers present in your project

Motivation

I wanted to have a central place where its easier to open the project and see the declared behaviour of the RabbitMQ instance for that project, without having to look around for NestJS annotations, microservices or whatever. Simply to open, go to the configuration file and know exactly what I'm looking at and what I should expect.

Requirements

Instalation

PNPM

pnpm add @nestjs-rabbitmq

YARN

yarn add @nestjs-rabbitmq

NPM

npm add @nestjs-rabbitmq

Getting Started

Importing the module

Import the RabbitMQModule in your app.module.ts and call the method register({})

import { RabbitMQModule } from '@bgaldino/nestjs-rabbitmq';
import { RabbitOptions } from '@bgaldino/nestjs-rabbitmq';

@Module({
  imports: [
    ...
    RabbitMQModule.register({ useClass: RabbitOptions, injects: [...] }),
    ...
  ]
})
export class AppModule {}

The RabbitMQModule is marked as @Global, therefore, calling it once is enough to allow the injection of the RabbitMQService

The configuration file

Create a rabbitmq.config.ts or whatever the name you prefer containing the minimum configuration:

import { Injectable } from "@nestjs/common";
import {
  RabbitMQModuleOptions,
  RabbitOptionsFactory,
} from "@bgaldino/nestjs-rabbitmq";

@Injectable()
export class RabbitOptions implements RabbitOptionsFactory {
  createRabbitOptions(): RabbitMQModuleOptions {
    return {
      connectionString: "amqp://{user}:{password}@{url}/{vhost}",
      delayExchangeName: "MyDelayExchange",
      assertExchanges: [],
      consumerChannels: [],
    };
  }
}

Publishers

Example config file:

assertExchanges: [
  {
    name: 'webhooks', type: 'topic',
    options: { durable: true, autoDelete: false }
  },
  { name: 'test-fanout', type: 'fanout' },
  { name: 'example-direct', type: 'direct'},
],

The assertExchanges property expects an array of RabbitMQAssertExchange and each entry will asserted against the RabbitMQ connected server.

If any entry does not match a current configuration, or cannot be created/attached. A terminal error 406 - PRECONDITION_FAILED will be thrown with the reason and the server will not initialize

Publishing messages

Example:

import { Injectable } from "@nestjs/common";
import { RabbitMQService } from "@bgaldino/nestjs-rabbitmq";

@Injectable()
export class MyService {
  constructor(private readonly rabbitMQService: RabbitMQService) {}
}

async publishMe(){
  const isPublished = await this.rabbitMQService.publish('exchange_name', 'routing_key', {});
}

//or

async publishMeTyped() {
  const isPublished = await this.rabbitMQService.publish<CustomType>('exchange_name', 'routing_key', {});
  //This will return an error if the object is not properly typed
}

The publish() method uses Publish Confirms to make sure that the message is delivered to the broker before returning the promise.

Consumers

Inside the configuration file you can declare your consumers on the section consumerChannels. This list of RabbitMQConsumerChannel will be evaluated and each entry will try to create the queue and bind it to the declared exchange.

Example:


createRabbitOptions(): RabbitMQModuleOptions {
    return {
      ...,
      consumerChannels: [
        {
          options: {
            queue: 'myqueue',
            exchangeName: 'foobar.exchange',
            routingKey: 'myqueue',
            prefetch: Number(process.env.RABBIT_PREFETCH ?? 10),
            retryStrategy: {
              enabled: true,
              maxAttempts: 5,
              delay: (attempt: number) => {
                return attempt * 5000;
              },
            },
          },
          messageHandler: this.consumerService.messageHandler.bind(
            this.consumerService,
          ),
        },
      ]
  }
}

The consumer DOES NOT create exchanges and only bind to ones that already exists. This is to avoid creating exchanges with typos and misconfigurations.

You can also declare an array of routingKeys: string[] if you want to attach multiple keys to the same queue/callback

The messageHandler callback

As declared in the example above, the messageHandler property expects a callback of the type IRabbitHandler. Because of the nature of the library, we will need to call the .bind(this.yourService) in order to bind the this context of the origin service to the callback.

The RabbitMQModule.register() accepts an array of NestJS Modules with the any module that contains an consumer callback function.

The callback has the following signature:

async messageHandler(content: any, params?: RabbitConsumerParams): Promise<void>;

where RabbitConsumerParams is optional and contains the following info:

export type RabbitConsumerParameters = {
  message: ConsumeMessage;
  channel: ConfirmChannel;
  queue: string;
};

Strongly typed consumer

You can use the IRabbitConsumer<T> to type the consumer first parameter content.

export interface MyInterface {
  type: string;
  id: number;
}

@Injectable()
export class MyClass implements IRabbitConsumer<MyInterface> {
  public async messageHandler(content: MyInterface): Promise<void> {
    console.log(content.type);
  }
}

Declaration example

Service example:

//consumer.module.ts
@Module({
  provides: [ConsumerService],
  exports: [ConsumerService],
})
export class ConsumerModule {}

//consumer.service.ts
@Injectable()
export class ConsumerService {
  async messageHandler(content: any) {
    return null;
  }
}

Config Example:

//rabbit.config.ts
@Injectable()
export class RabbitOptions implements RabbitOptionsFactory {
  constructor(
    readonly consumerService: ConsumerService ,
  ) {}

createRabbitOptions(): RabbitMQModuleOptions {
  return {
        ...
        consumerchannels: [
          {
            options: {
              queue: "myqueue",
              exchangename: "test.exchange",
              routingkey: "myqueue",
              prefetch: number(process.env.rabbit_prefetch ?? 10),
            },
            messagehandler: this.MyService.messageHandler.bind(this.MyService),
          },
        ];
    }
}

//app.module.ts
@Module({
  imports: [
  ...,
  RabbitMQModule.register({
    useClass: RabbitConfig,
    injects: [ConsumerModule]
  })
  ]
})
export class AppModule {}

Retrying strategy

On each consumer declaration you can pass the optional parameter: retryStrategy with following the contract:

retryStrategy: {
  enabled?: true,
  maxAttempts?: 5,
  delay?: (attempt) => {return attempt * 5000};
}

By default, the retryStrategy is enabled. When consuming a new message and the callback throws an error.

When enabled, the library will create a {options.queue}.dlq queue and will use the delayExchangeName exchange as the retrying orchestrator where the x-delay value is the return of the anonymous callback delay.

When the maximum attempts is reached, the library issues a nack, sending the message to the .dlq queue.

Disabling the automatic ack

By default, the consumer will automatically send an ack at the end of the callback execution. If you need to disable this behaviour, you can pass:

consumerChannels: [
  options: {
    ...,
    autoAck: false
  }
]

When disabled, it is necessary to manually acknowledge the message as follows:

async messageHandler(
  content: ChangeEventStatusUseCaseInput,
  params: RabbitConsumerParameters,
): Promise<void> {
 params.channel.ack(params.message);
}

Extra options

Consumer manual loading

This library attaches the consumers during the OnApplicationBootstrap lifecycle of the NestJS Application, meaning that the application will begin to receive messages as soon as the lifecycle is done.

If your application needs some time to initiate the consumers for some reason, (pods with limited resource for example), you can set the flag extraOptions.consumerManualLoad: true on the configuration file and manually call the consumer instantiation.

Example:

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  await app.listen(3000);

  const rabbit: RabbitMQService = app.get(RabbitMQService);
  await rabbit.createConsumers();
}
bootstrap();

Message inspection and logging

You can inspect the consumer/publisher messages by setting the parameter extraOptions.logType or setting the environment variable RABBITMQ_LOG_TYPE to either: all | consumer | publisher | none.

The default value is none

You can also use the extraOptions.loggerInstance to pass your custom Logger as long as it follows the Logger/Console interfaces. The SDK will use the given instance to log any messages

How to build this library locally?

Just pull the project and run:

pnpm install
pnpm build

And should be good to go

Planned features

  • [x] Add tests
  • [ ] Improve semantics of the config file
  • [ ] Offer a retry mechanism without the x-delay
  • [ ] Make the publisher method strongly typed based on the assertExchanges exchangeName and routingKeys configurations

Contribute

TBD

License

MIT License