npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

nest-rabbit-tasks

v1.3.0

Published

`nest-rabbit-tasks` is a `TaskQueue` based upon `RabbitMQ` for `NestJS`.

Downloads

11

Readme

nest-rabbit-tasks

nest-rabbit-tasks is a TaskQueue based upon RabbitMQ for NestJS.

NPM

David npm bundle size

CircleCI

Quality Gate Status Maintainability Rating Reliability Rating Security Rating

rabbit

Motivation

By working hard on abstracting NestJS injection and RabbitMQ connection, we wanted to make it easy to do background tasks in NestJS.

nest-bull is a reference in this domain: it use bull and redis to provide the task queue. Moreover nest-bull and bull are the state of art about how TaskQueue complexity can be hidden and abstracted. With them, building a queue is actually fun.

However, complex system may need other technologies than redis: RabbitMQ provides more features, such as exchange (to allow task routing).

We needed it on a project, so here it is made available for you.

Disclaimer

This is beta quality software. We just started it so it has probably a few bugs.

We are actively working on it so it may break without notice / API may change.

:warning: Use with caution, be ready to pin a version and make some modification on the code :warning:

Contributors ✨

Thanks goes to these wonderful people (emoji key):

This project follows the all-contributors specification. Contributions of any kind welcome!

Get Started

Declare the module:

import { Module } from '@nestjs/common';
import { NestRabbitTasksModule } from 'nest-rabbit-tasks';

// Import the worker.
import { EmailWorker } from './worker/email.worker';
// And import some services.
import { SMTPService } from './service/smtp.service';

@Module({
  imports: [
    // Note that there is also a `registerAsync` method
    // that allows the parameters to depend on an injected configService.
    // You can have a look at the interface to see what options it takes.
    NestRabbitTasksModule.registerSync({
      // That is not the name of RabbitMQ queue
      // but an unique reference that identify the queue and the worker
      // and allow us to link the conf here and the worker implementation there
      reference: 'worker-email-queue',
      // Queue or exchange (but exchange are not implemented yet ^^).
      entityType: 'queue',
      // AMQP connection/channel wide options.
      amqpOptions: { connectionUrl: 'amqp://localhost:5672' },
      // Module option that dictate the worker behavior
      globalOptions: {
        // By default RabbitMQ create queue if the queue doesn't exist
        // By setting `{ immutableInfrastructure: true }` will throw if the queue does not exist
        // Note that this is not yet implemented yet but will be pretty soon
        immutableInfrastructure: true,
        // How many messages the Worker should take.
        // Increasing it increase the throughput but decrease the consistency
        prefetchSize: 1,
      },
      // Definition of the queue and queue specific options
      // such as `single-active-consumer`
      entity: { queueName: 'worker.queue.1', queueOptions: {} },
      // Worker used to handle the AMQP message
      worker: EmailWorker,
    }),
  ],
  // Here you should import the worker (EmailWorker)
  // and other service you need
  providers: [EmailWorker, SMTPService],
})
export class AppModule {}

// ---

// in main.js
import { NestFactory } from '@nestjs/core';

async function bootstrap() {
  // Start the module like always
  const app = await NestFactory.create(AppModule);
  // And launch it.
  // Having an HTTP port is useful to configure an health-check route
  await app.listen(3000);
}

and create a worker:

import { RabbitWorker, RabbitTasksWorker, RabbitTasksWorker } from 'nest-rabbit-tasks';

// You can specify what the Event looks like.
// By default it is any.
interface Event {
  name: string;
  recipient: string[];
  cc?: string[];
  bcc?: string[];
  content: string;
}

@RabbitTasksWorker({ reference: 'worker-email-queue' })
export class EmailWorker extends RabbitWorker<Event> {
  // Please do dependency injection
  // and inject what need for the worker to work
  public constructor(private readonly smtpService: SMTPService) {}

  // This method is mandatory
  public async handleMessage(data: Event, message: RabbitTasksWorker<Event, void>) {
    if (data.name === 'send-mail') {
      await this.smtpService.sendEmail(data.recipient, data.cc, data.bcc, data.content);
      // Acknowledge the message to remove it from the queue
      message.ack();
    } else {
      log.error('unknown event');
      if (message.getHeader('x-retries') < 3) {
        // Non acknowledge the message but with retry
        // to requeue it and process it later
        message.nack(true);
      } else {
        // Non acknowledge the message without retry
        // to remove it from the queue
        message.nack(false);
      }
    }
  }
}

Advanced usage

Async configuration

Why: your configuration may be dynamic, depends on env variable or API calls.

@Module({
  imports: [
    NestRabbitTasksModule.registerAsync({
      // The reference must be static and unique
      reference: 'toto',
      // Same for the entity type
      entityType: 'queue',
      // The handler is static too
      // (but that is not a mandatory constraint in the code, let me know if you have usages that)
      // (... requires it to be dynamic. I just found it made more sense like this in my use cases.)
      worker: TestWorker,
      // The rest of the options are dynamic
      // (We only provide `useFactory` for now but `useExisting` and `useClass` can be easily implemented)
      useFactory: async (configService: ConfigService) => {
        const queueName = await configService.getQueueName();
        return {
          entity: { queueName },
          amqpOptions: { connectionUrl: 'amqp://localhost:5672' },
          globalOptions: { immutableInfrastructure: true, prefetchSize: 1 },
        };
      },
      // For it to work you have to import a module, eg. a config module
      // that export a service, eg. a config service
      // and inject the configService, so `useFactory` can resolve it
      imports: [ConfigModule],
      inject: [ConfigService],
    }),
  ],
  providers: [TestWorker],
})
export class AppModule {}

Road-map

  • [x] implement the immutableInfrastructure mode

  • [x] connect Rabbit logger to nest Logger and improve debug logs

  • [x] properly check that config is correct and report error if not

  • [x] implement async configuration (registerAsync)

  • [x] prevent Haredo deps to leak

  • [ ] implement @OnEvent(rabbitEventName: string) to decorate a method of the Worker that will be call when rabbitEventName is emitted in the queue

  • [ ] work on quality (unit tests, E2E tests)

  • [ ] improve the doc (registerAsync)

  • [ ] implement an Exchange class (so users can publish to exchange using this)