npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@ponomarevlad/docmq

v0.5.6-browser.1

Published

A document based messaging queue for Mongo, DocumentDB, and others

Downloads

1

Readme

Why Choose This :grey_question:

DocMQ is a good choice if a persistance layer for your messaging queue is a deciding factor. Backed by a database, DocMQ makes it easy to schedule, query, and debug jobs using an adapter layer tuned for your database type. This also allows DocMQ to choose the no database option and run in-memory for local environments where it's unnecessary to set up a local mongo / postgres / etc. DocMQ works with anything that holds and queries documents or document-like objects.

DocMQ is also a strong choice when your messaging queue needs to care about time and time zones. When you want to "send a message at 4:00 am", it matters if you mean 4am in Los Angeles or 4am in Phoenix because only one of those locations implements Daylight Savings Time. DocMQ reduces the pain associated with figuring out if one day is 86400, 90000, or 85800 seconds in the future.

Finally, DocMQ is database agnostic. You can run one backend in development where you are less concerned about scale, and run a robust solution in production. A suite of tests makes it easy to ensure your beahvior is consistent across deployments.

Why AVOID This :grey_question:

Simple. Performance. This kind of solution will never be as fast as an in-memory Redis queue or an event bus. If fast FIFO is your goal, you should consider BullMQ, Kue, Bee, Owl, and others. They're all excellent libraries, and I can't recommend them enough if they fit your use case!

DocMQ

  • An adaptable DB layer with ready-made drivers for MongoDB and Postgres
  • An in-memory driver for faster local development
  • Concurrent job execution, job repitition, scheduling, and persistence of results
  • Queue-level hooks for managing side effects of job completions and failures
  • Timezone aware scheduling and recurrence

Comparison by Feature

| Feature | BullMQ | Agenda | DocMQ | | :----------------- | :---------------------------------------------: | :----------------------------------------: | :--------------------------------------: | | Backend | redis | mongo | (any)[#-custom-driver-support] | | Parent/Child | ✓ | | | | Priorities | ✓ | ✓ | | | Concurrency | ✓ | ✓ | ✓ | | Delayed jobs | ✓ | ✓ | ✓ | | Global events | ✓ | | | | Rate Limiter | ✓ | | | | Pause/Resume | ✓ | | ✓ | | Sandboxed worker | ✓ | | | | Repeatable jobs | ✓ | ✓ | ✓ | | Atomic ops | ✓ | | ✓ | | Persistence | ✓ | ✓ | ✓ | | UI | ✓ | ✓ | | | REST API | | ✓ | | | Run In-memory | | | ✓ | | Timezone awareness | | | ✓ | | New job polling | ✓ | ✓ | ✓ | | Queue subscription | ✓ | | ✓ | | Optimized for | Jobs / Messages | Jobs | Jobs |

If you're not concerned about timezone awareness or an in-memory driver, BullMQ and Agenda are excellent alternatives. Thank you BullMQ for making the original comparison chart

:warning: EARLY DEVELOPMENT - This software is currently used in Production as part of Taskless, but is still a newer project. This project follows semver spec for versions < 1.

0.y.x
  ^ ^- fixes, features
   \-- breaking changes

Installation

# npm
npm i docmq

# yarn
yarn add docmq

# pnpm
pnpm add docmq

DocMQ comes with an in-memory driver MemoryDriver, along with several other adapters for various DBs.

📚 Documentation

Creating a Queue

import { Queue, MemoryDriver } from "docmq";

interface SimpleJob {
  success: boolean;
}

const queue = new Queue<SimpleJob>(new MemoryDriver("default"), "docmq");

new Queue() options

new Queue<
  TData,
  TAck = unknown,
  TFail extends Error = Error,
  TContext = DefaultContext,
>(driver: Driver, name: string, options?: QueueOptions)
  • driver a Driver implementation to use such as the MemoryDriver
  • name a string for the queue's name
  • options? additional options
    • retention.jobs? number of seconds to retain jobs with no further work. Default 3600 (1 hour)
    • statInterval? number of seconds between emitting a stat event with queue statistics, defaults to 5

A Note on TypeScript

This library uses TypeScript to provide a better developer experience regarding the objects passed into your queue and the responses your job processor provides back to DocMQ. There are four main types used throughout this documentation, and all are set during the creation of the Queue class.

TData refers specifically to the typing of your job payload. It's the payload you're expecting to pass when calling enqueue(), and it's the payload you're expecting to receive inside of your process() callback.

TAck = unknown refers to the typing of your ack response when calling api.ack() inside of your job processor and is by default an unknown type. Setting TAck also sets the typings for the ack event.

TFail extends Error = Error refers to the typing of your error object created and passed to api.fail() inside of your job processor and defaults to the base Error class. Setting TFail also sets the typings for your fail event.

TContext = Record<string, unknown> refers to the context object available during processing, and is by default an empty object. The context is available inside of process() as well as inside of event callbacks after the processing context is available (ack, fail, ping, dead, etc). A DefaultContext is made available as a convienence for the Record definition.

Adding a Job to the Queue

await queue.enqueue({
  ref: "sample-id",
  /* TData */ payload: {
    success: true,
  },
});

enqueue() API

queue.enqueue(job: JobDefinition<TData> | JobDefinition<TData>[])
  • job the JSON Job object (or an array of job objects), consisting of
    • ref?: string an identifier for the job, allowing future enqueue() calls to replace the job with new data. Defaults to a v4 UUID
    • payload: TData the job's payload which will be saved and sent to the handler
    • runAt?: Date a date object describing when the job should run. Defaults to now()
    • runEvery?: string | null Either a cron interval or an ISO-8601 duration, or null to remove recurrence
    • timezone?: string | null When using runEvery, you can specify a timezone to make DocMQ aware of durations that cross date-modifying thresholds such as Daylight Savings Time; recommended when using cron and duration values outside of UTC.
    • retries?: number a number of tries for this job, defaults to 5
    • retryStrategy?: RetryStrategy a retry strategy, defaults to exponential

Retry Strategies

interface FixedRetryStrategy {
  type: "fixed";
  amount: number;
  jitter?: number;
}

interface ExponentialRetryStrategy {
  type: "exponential";
  min: number;
  max: number;
  factor: number;
  jitter?: number;
}

export interface LinearRetryStrategy {
  type: "linear";
  min: number;
  max: number;
  factor: number;
  jitter?: number;
}

Handling Work (Processing)

queue.process(
  async (job: TData, api: HandlerAPI<TAck, TFail, TContext>) => {
    await api.ack();
  },
  {
    /* options */
  }
);

process() Options

queue.process(handler: JobHandler<T, A, F>, config?: ProcessorConfig<C>)

  • handler the job handler function, taking the job T and the api as arguments, returns a promise
  • config?: ProcessorConfig an optional configuration for the processor including
    • pause?: boolean should the processor wait to be started, default false
    • concurrency?: number the number of concurrent processor loops to run, default 1
    • visibility?: number specify the visibility window (how long a job is held for by default) in seconds, default 30
    • pollInterval?: number as a fallback, define how often to check for new jobs in the event that driver does not support evented notifications. Defaults to 5
    • createContext?: () => Promise<TContext> | TContext generates a unique context of type TContext for this run. It will be available in the handler API.

api Methods and Members

  • api.ref (string) the ref value of the job
  • api.attempt (number) the attempt number for this job
  • api.visible (number) the number of seconds this job was originally reserved for
  • api.context (TContext) the context object, generated for this run
  • api.ack(result: TAck) acknowlegde the job, marking it complete, and scheduling future work
  • api.fail(reason: string | TFail) fail the job and emit the reason, scheduling a retry if required
  • api.ping(extendBy: number) on a long running job, extend the runtime by extendBy seconds

Events

The Queue object has a large number of emitted events available through queue.events. It extends EventEmitter, and the most common events are below. Events related to the processing of a job (ack, fail, dead, and ping) will all receive context: TContext as a second argument to the event callback

  • ack when a job was acked successfully
  • fail when a job was failed
  • dead when a job has exceeded its retries and was moved to the dead letter queue
  • stats an interval ping containing information about the queue's processing load
  • start, stop when the queue starts and stops processing
  • log, warn, error, halt logging events from the queue

Managing Fatal Errors With halt

A fatal event occurs when DocMQ does not believe it can receover from a connection or processing issue. For example, if using the MongoDB driver and the Change Stream disconnects and the process cannot reconnect. To minimize the likelyhood of repeatedly processing jobs in this scenario, DocMQ will call its destroy method and no longer accept jobs for processing. In addition to emitting a standard error event, DocMQ will also emit a halt event. In most environments, it's recommended to kill your service, allowing your PaaS provider to mark your service as unhealthy and restart it.

queue.on("halt", () => {
  console.error("Received HALT from DocMQ");
  process.exit(1); // exit code for termination of node.js
});

🔧 Custom Driver Support

DocMQ works with several drivers, many of which are included in the /drivers directory. For development or non-production scenarios, we recommend the MemoryDriver, an in-memory driver that supports all of DocMQ's apis. When transitioning to production, you can pass a production driver in and DocMQ will work with no additional changes.

import { Queue, MemoryDriver } from "docmq";
import { MongoDriver } from "docmq/driver/mongo";

// for example, using the MongoDriver in production, while using
// the less resource-intensive in-memory driver for development
const driver =
  process.env.NODE_ENV === "production"
    ? new MongoDriver(process.env.MONGO_URI)
    : new MemoryDriver("default");

const queue = new Queue(driver, "queueName");

| Driver | import | Notes | | :------------ | :------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | In Memory | import { MemoryDriver} from "docmq" | The default in-memory driver and (currently) a re-export of LokiDriver | | LokiJS | import { LokiDriver } from "docmq/driver/loki" | A fast in-memory driver designed for non-production instances. | | MongoDB | import { MongoDriver } from "docmq/driver/mongo" | Currently, DocMQ requires a Mongo Client >= 4.2 for transaction support, and the mongo instance must be running in a Replica Set. This is because MongoDriver uses the OpLog to reduce polling. Requires mongodb peer dependency if using | | Postgres | import { PGDriver } from "docmq/driver/postgres" | We are slowly expanding our PG Test Matrix based on what GitHub allows. LISTEN/NOTIFY support is not available, and the driver will fall back to polling. Requires pg as a peer dependency if using |

If you need to write a custom driver, the core BaseDriver is available in the core docmq package.

✏️ Contributing

We would love you to contribute to jakobo/docmq, pull requests are welcome! Please see the CONTRIBUTING.md for more information.

⚖️ License

DocMQ source is made available under the MIT license

❤️ Sponsor

This project is made possible thanks to Taskless. The in-memory driver of DocMQ enables Taskless to offer a local version of their service, eliminating the hassle of forwarding webhooks around.

✨ Contributors

All Contributors

Thanks goes to these wonderful people (emoji key):

This project follows the all-contributors specification. Contributions of any kind welcome!