npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

bull-mq-transport

v3.0.1

Published

BullMQ Microservice Transport for NestJS

Downloads

88

Readme

BullMQ NestJS Microservice Transport

Like the @nestjs/bull module, except it works with @nestjs/microservices and uses the new bullmq version:

  • Single module import with custom connection settings
  • Synchronous or Asynchronous module configuration (ie. options factory)
  • Queues/Workers are declared by creating @MessagePattern(...) and @EventPattern(...) handlers on Controllers
    • NOTE: you must pass the Transport.REDIS option to these decorators
  • The ClientProxy (BullMqClient) can send()/emit() to any Queue
  • BullMQ queue connections are created on-demand and re-used

Getting Started

Install:

$ npm i @nestjs/microservices bullmq bull-mq-transport

Import into AppModule and provide Redis connection settings (can also provide an existing ioredis connection instance):

import { Module } from '@nestjs/common';
import { BullMqModule } from 'bull-mq-transport';

@Module({
  imports: [
    BullMqModule.forRoot({
      connection: { host: 'localhost', port: 6379 },
      // connection: ioRedisConnection
    }),
  ],
})
export class AppModule {}

Or, delcare an asynchronous factory to provide the module options:

import { Module } from '@nestjs/common';
import { BullMqModule } from 'bull-mq-transport';
import { ConfigService } from './config.service';

@Module({
  imports: [
    BullMqModule.forRootAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      useFactory: (config: ConfigService) => ({
        connection: {
          host: config.get('REDIS_SERVICE_HOST'),
          port: +config.get('REDIS_SERVICE_PORT'),
        },
      }),
    }),
  ],
})
export class AppModule {}

Or, use an options factory class to provide the module options:

import { Module } from '@nestjs/common';
import {
  BullMqModule,
  IBullMqModuleOptions,
  IBullMqModuleOptionsFactory,
} from 'bull-mq-transport';
import { ConfigService } from './config.service';

@Injectable()
export class BullMqConfigService implements IBullMqModuleOptionsFactory {
  constructor(private readonly config: ConfigService) {}
  createModuleOptions(): IBullMqModuleOptions | Promise<IBullMqModuleOptions> {
    return {
      connection: {
        host: this.config.get('REDIS_SERVICE_HOST'),
        port: +this.config.get('REDIS_SERVICE_PORT'),
      },
    };
  }
}

@Module({
  imports: [
    BullMqModule.forRootAsync({
      imports: [ConfigModule],
      useClass: BullMqConfigService,
    }),
  ],
})
export class AppModule {}

Then, create a microservice in main.ts and pass the BullMqServer as the strategy to the options, for example:

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';

// Hybrid approach
async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const strategy = app.get(BullMqServer);
  app.connectMicroservice({ strategy }, { inheritAppConfig: true });
  await app.startAllMicroservicesAsync();
  await app.listen(3000);
}
bootstrap();

After you've done these things, you can use things like Pipes, Interceptors, Guards, Filters, and Request Context in the Workers in the rpc context.

Finally, you'll need to declare some controllers to act as Workers for BullMQ:

@Controller()
export class SomethingController {
  @EventPattern('stuff', Transport.REDIS)
  async handleStuff(@Payload() data: StuffDto, @Ctx() job: Job) {
    // ... do something with the stuff
  }
}

Add it to some module:

@Module({
  // ...
  controllers: [SomethingController],
})
export class AppModule {}

Then, publish something to the queue:

import { v4 } from 'uuid';

export class SomethingService {
  constructor(private readonly client: BullMqClient) {}

  doStuff() {
    const payload = new StuffDto('...');
    this.client.emit('stuff', { id: v4(), payload }).subscribe();
  }
}

You can also set a job to be delayed by some milliseconds into the future:

import { v4 } from 'uuid';

export class SomethingService {
  constructor(private readonly client: BullMqClient) {}

  doStuff() {
    const payload = new StuffDto('...'); // Custom type
    // Will be delayed by 5 seconds
    this.client.emit('stuff', { id: v4(), delay: 5000, payload }).subscribe();
  }
}

You can also get a response from a message by using the send() method:

import { v4 } from 'uuid';

export class SomethingService {
  constructor(private readonly client: BullMqClient) {}

  // Returns an observable containing the reply (caller must subscribe)
  doStuff() {
    const payload = new StuffDto('...');
    return this.client.send('stuff', { id: v4(), payload });
  }

  // Alternatively, you can convert to and return a promise
  async doStuff2() {
    const payload = new StuffDto('...');
    return this.client.send('stuff', { id: v4(), payload }).toPromise();
  }
}

With a worker that uses @MessagePattern(...) and returns a value:

@Controller()
export class SomethingController {
  @MessagePattern('stuff', Transport.REDIS)
  async handleStuff(@Payload() data: StuffDto, @Ctx() job: Job) {
    // ... do something with the stuff
    return { foo: 'bar' };
  }
}

See the apps/example directory for a complete example.

Work in progress

This package is a work in progress. Features include:

  • [x] Pub/Sub-style events
  • [x] RPC-style request/response-style
  • [x] Optional BullMqRpcExceptionFilter filter that can pass the stack trace back to HTTP if needed
    • [x] With provided Job options, retry & backoff, work
    • [x] Configurable optional exception logging LogLevel
    • [x] Optional RpcValidationPipe that exposes class-validator validation errors as RpcExceptions (also with more detail)
  • [x] Ability to provide more Job options (all supported)
  • [ ] Ability to provide more Queue options
  • [ ] Ability to provide more Worker options