npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@api-typed/message-queue

v0.1.3

Published

Message Queue tools based on BullMQ, prepared for Api-Typed

Downloads

14

Readme

📯 Api-Typed/Message-Queue

🥣 An Api-Typed component.

Wrapper around BullMQ that abstracts away queue implementation and allows to register job handlers with a simple @Job() decorator.

  • Single MessageQueue class for adding jobs to queues.
  • One class per job helps to keep single responsibility principle.
  • Job configuration (target queue, retry strategy, generating id, etc.) lives in a single place.
  • Typed job payload that maps to the job handler's arguments 1:1.

At a glance

Define a job and its handler:

@Job<GreetingJob>({
  queue: 'high_priority',
  generateId: (greetId, name) => `job_${greetId}_${name}`,
})
export class GreetingJob implements JobInterface {
  public async run(
    greetId: number,
    name: string,
    question?: string,
  ): Promise<void> {
    // implementation
  }
}

Add a job to its queue:

const messageQueue = new MessageQueue();

messageQueue.dispatch(GreetingJob, 234, 'Michael', 'How are you?');
  1. Create a class that implements JobInterface.
  2. Define job payload as the .run() method arguments.
  3. Decorate it with @Job(): a. Bind it to a queue. b. Optionally provide a function to generate job ID based on payload.
  4. Somewhere in your application bootstrap configure MessageQueue object.
  5. Easily dispatch jobs with typed arguments (payload) as if they were regular function calls.
  6. Run worker with $ npx api-typed run worker --with-scheduler if using Api-Typed framework.

You can also use a dependency injection container!

Concepts

While @Api-Typed/Message-Queue uses BullMQ internally, it introduces several new concepts and changes how you interact with queues.

Single place for job configuration

The main concept of @Api-Typed/Message-Queue is that when you add a job to a message queue, it shouldn't be your concern at this time on which queue the job should end up or with what configuration the job should be added (unless you want to).

Let's say you have a job to send log messages to Slack. This job is added to queues in many places in your code. All these places should not be concerned with how exactly this job is handled, on what queue it should be added or how many times it should be repeated until successful.

What if you want to change that strategy? Should you update all the places where the job is being added? What if another developer adds this job to a different queue and blocks it?

(Sure, sometimes it depends on the context, but 90% of the time this configuration should be the same regardless of where the job is triggered from.)

Single service class that routes jobs to their queues

Just like the "dispatchers" of jobs should not be concerned with their configuration, they especially should not care to what queue a specific job belongs. They just want to add a job to processing and move on with their lives, most of the time not caring what happens to it.

In BullMQ you have to obtain Queue instance for appropriate queue and then add your job to it. With @Api-Typed/Message-Queue there is a single entry point, MessageQueue class, that takes care of adding jobs to their designated queues without having to take care of it yourself.

Setup

With Api-Typed

To get the best developer experience use it with Api-Typed in your app.api-typed.ts file:

import { App, MessageQueueModule } from 'api-typed';

export default new App(__dirname, [
  // ... your other modules
  new MessageQueueModule(),
]);

It's already included for you out of the box when using StandardApp.

Then let your app module (or any module you create) implement HasJobs interface with a loadJobs() method that should return an array of class names that implement JobInterface.

The easiest way to do this:

import { AbstractModule, HasJobs, loadJobs } from 'api-typed';

export class MyModule extends AbstractModule implements HasJobs {
  public readonly name = 'my_module';

  public loadJobs() {
    // load all jobs from './jobs' dir relative to this file
    return loadJobs(`${__dirname}/jobs/**/*.{ts,js}`);
  }
}

Then run worker that will handle all jobs with:

$ npx api-typed run worker --with-scheduler

You can also run workers for specific queues:

$ npx api-typed run worker high_priority mid_priority --with-scheduler

Or you can run schedulers in separate processes (advised for better performance):

$ npx api-typed run worker
$ npx api-typed run scheduler

You need to run schedulers if you want to support delayed, scheduled or repeatable jobs.

Stand-alone

You can easily use this as a stand-alone package in any project.

$ npm i -S @api-typed/message-queue

You need an entry point to run a worker:

// worker.ts
import { WorkerRunner, loadJobs } from '@api-typed/message-queue';

// register all your jobs using a glob pattern so that the runner knows about
// job handlers and all queues you're using
loadJobs(`${__dirname}/jobs/**/*.{ts,js}`);

const runner = new WorkerRunner();

runner.start();

If you want to run a scheduler the implementation is similar.

To start a worker just call the above script from your command line.

NOTE: You probably also want to bootstrap your application so that the job handlers have access to your whole infrastructure. Therefore, the actual integration might be a bit more complex or part of a bigger system.

Registering jobs with @Job() decorator

The easiest way to create and register jobs handlers is by using the @Job() decorator on a class that implements JobInterface:

import { Job, JobInterface } from '@api-typed/message-queue';

@Job({
  queue: 'high_priority',
})
export class SomeJob implements JobInterface {
  public async run(itemId: string, theAnswerToUniverse: number): Promise<void> {
    // job implementation
  }
}

The above code will configure the SomeJob to be always added to the high_priority queue and be handled by this class.

The name of the job that will actually be sent to BullMQ will be generated based on the class name. You can also specify it yourself by using name: string option or using the decorator like this:

@Job('custom_job_name', 'queue_name', {
  // other options
})

Adding jobs

Because the queue to which a job should be added is configured right in the place where the job handler is defined, it is so much easier to add jobs to queues: a single MessageQueue class instance will route new jobs to their destination queues.

All the methods described below return a Promise that resolves with BullMQ's Job instance that describes the added job.

Dispatch job immediatelly

If you want a job to be dispatched immediatelly, meaning that it should be processed as soon as possible (which is most usually the case), you should use the .dispatch() method.

import { MessageQueue } from '@api-typed/message-queue';
import { SomeJob } from './jobs/SomeJob';

// this should happen somewhere in your app bootstrap
const mq = new MessageQueue();

mq.dispatch(SomeJob, 'item-id', 42);

Yes, the jobs are referred to by their class names. This guards you from ever making typos in strings or makes it unnecessary to come up with enums or consts with all the job names in the app.

But referring to jobs by their class names enabled one more powerful feature of @Api-Typed/Message-Queue:

Job payload and the .run() method

Notice that in the examples above the SomeJob.run() method accepts two arguments: a string and a number and the call .dispatch() call also accepts a string and a number in its arguments.

This is because dispatching a job is almost like calling a function. You can type the job payload arguments just like you can type the job handler's .run() arguments.

So if your job handler expected arguments like this:

class OtherJob implements JobInterface {
  public async run(
    otherId: string,
    someValue: number,
    items: string[],
    options: {
      doSomething?: boolean;
    } = {},
  ) {
    // implementation
  }
}

You would dispatch it like this:

mq.dispatch(OtherJob, 'other-id', 33, ['item1', 'item2', 'item3'], {
  doSomething: true,
});

And all these arguments would be properly typed!

With @Api-Typed/Message-Queue you don't have to wonder what payload a job accepts and be sure to always send the correct one.

NOTE: Make sure the arguments you use on .run() method are as simple as possible, ie. they can be serialized into JSON.

Delay a job

To dispatch a job with a delay, to be processed after some time, use the .schedule() method:

mq.schedule(5000, SomeJob, 'item-id');

The code above will add SomeJob to the queue to be processed after 5s (5000 ms).

NOTE: You have to have a scheduler running for SomeJob's designated queue in order for scheduling to work.

Schedule a job

To dispatch a job to be processed at a specific point in time, also use the .schedule() method, but with a Date object as the first argument:

mq.schedule(new Date('2022-05-01T00:20:00'), SomeJob, 'item-id');

This will schedule the job to be processed on 1st of May, 2022 at 00:20.

NOTE: If the date is in the past then the job will be processed immediatelly.

NOTE: You have to have a scheduler running for SomeJob's designated queue in order for scheduling to work.

Repeat a job on a schedule

If you want to repeat a job on a schedule, use the .repeat() method:

mq.repeat({ every: 3600 * 1000 }, SomeJob, 'item-id');

This will make the job to be repeated every hour with the given payload.

NOTE: You have to have a scheduler running for SomeJob's designated queue in order for repeating to work.

Generic adding of jobs

And if you ever need to overwrite the default configuration of a job and gain access to BullMQ's job options, use the .addJob() method:

mq.addJob(SomeJob, ['item-id'], {
  /* job options */
});

The difference here is that instead of passing payload normally as subsequent arguments, you must pass them as an array in the second argument. The third argument are BullMQ's job options that will take precedence over any other configuration (except for designated queue).

Using Dependency Injection

If you want your job classes to be bootstrapped using a dependency injection container (like e.g. TypeDI) configure it by calling:

const runner = new WorkerRunner();
runner.useContainer(/* your container */);

The passed container MUST implement .get(identifier: Function) method that can construct and retrieve objects based on their class.

Then your job class supports any methods of dependency injection that your container offers.

Generating job id's

To generate a job ID (e.g. so you can store a reference to it and then check its status) you can use the generateId option that accepts a function:

@Job<SomeJob>({
  generateId: (someId, someNumber) => `some_job_${someId}_${someNumber}`,
})
export class SomeJob implements JobInterface {
  public async run(someId: string, someNumber: number) {
    // implementation
  }
}

The generateId function accepts the same arguments as the .run() method making it easy to generate the id based on job's payload. By adding type argument to the @Job() decorator we can reuse the types.