docmq
v0.5.7
Published
A document based messaging queue for Mongo, DocumentDB, and others
Downloads
14
Readme
Why Choose This :grey_question:
DocMQ is a good choice if a persistance layer for your messaging queue is a deciding factor. Backed by a database, DocMQ makes it easy to schedule, query, and debug jobs using an adapter layer tuned for your database type. This also allows DocMQ to choose the no database option and run in-memory for local environments where it's unnecessary to set up a local mongo / postgres / etc. DocMQ works with anything that holds and queries documents or document-like objects.
DocMQ is also a strong choice when your messaging queue needs to care about time and time zones. When you want to "send a message at 4:00 am", it matters if you mean 4am in Los Angeles or 4am in Phoenix because only one of those locations implements Daylight Savings Time. DocMQ reduces the pain associated with figuring out if one day is
86400
,90000
, or85800
seconds in the future.Finally, DocMQ is database agnostic. You can run one backend in development where you are less concerned about scale, and run a robust solution in production. A suite of tests makes it easy to ensure your beahvior is consistent across deployments.
Why AVOID This :grey_question:
Simple. Performance. This kind of solution will never be as fast as an in-memory Redis queue or an event bus. If fast FIFO is your goal, you should consider BullMQ, Kue, Bee, Owl, and others. They're all excellent libraries, and I can't recommend them enough if they fit your use case!
DocMQ
- An adaptable DB layer with ready-made drivers for MongoDB and Postgres
- An in-memory driver for faster local development
- Concurrent job execution, job repitition, scheduling, and persistence of results
- Queue-level hooks for managing side effects of job completions and failures
- Timezone aware scheduling and recurrence
Comparison by Feature
| Feature | BullMQ | Agenda | DocMQ | | :----------------- | :---------------------------------------------: | :----------------------------------------: | :--------------------------------------: | | Backend | redis | mongo | (any)[#-custom-driver-support] | | Parent/Child | ✓ | | | | Priorities | ✓ | ✓ | | | Concurrency | ✓ | ✓ | ✓ | | Delayed jobs | ✓ | ✓ | ✓ | | Global events | ✓ | | | | Rate Limiter | ✓ | | | | Pause/Resume | ✓ | | ✓ | | Sandboxed worker | ✓ | | | | Repeatable jobs | ✓ | ✓ | ✓ | | Atomic ops | ✓ | | ✓ | | Persistence | ✓ | ✓ | ✓ | | UI | ✓ | ✓ | | | REST API | | ✓ | | | Run In-memory | | | ✓ | | Timezone awareness | | | ✓ | | New job polling | ✓ | ✓ | ✓ | | Queue subscription | ✓ | | ✓ | | Optimized for | Jobs / Messages | Jobs | Jobs |
If you're not concerned about timezone awareness or an in-memory driver, BullMQ and Agenda are excellent alternatives. Thank you BullMQ for making the original comparison chart
:warning: EARLY DEVELOPMENT - This software is currently used in Production as part of Taskless, but is still a newer project. This project follows semver spec for versions < 1.
0.y.x
^ ^- fixes, features
\-- breaking changes
Installation
# npm
npm i docmq
# yarn
yarn add docmq
# pnpm
pnpm add docmq
DocMQ comes with an in-memory driver MemoryDriver
, along with several other adapters for various DBs.
📚 Documentation
Creating a Queue
import { Queue, MemoryDriver } from "docmq";
interface SimpleJob {
success: boolean;
}
const queue = new Queue<SimpleJob>(new MemoryDriver("default"), "docmq");
new Queue()
options
new Queue<
TData,
TAck = unknown,
TFail extends Error = Error,
TContext = DefaultContext,
>(driver: Driver, name: string, options?: QueueOptions)
driver
a Driver implementation to use such as theMemoryDriver
name
a string for the queue's nameoptions?
additional optionsretention.jobs?
number of seconds to retain jobs with no further work. Default3600
(1 hour)statInterval?
number of seconds between emitting astat
event with queue statistics, defaults to5
A Note on TypeScript
This library uses TypeScript to provide a better developer experience regarding the objects passed into your queue and the responses your job processor provides back to DocMQ. There are four main types used throughout this documentation, and all are set during the creation of the Queue
class.
TData
refers specifically to the typing of your job payload. It's the payload you're expecting to pass when calling enqueue()
, and it's the payload you're expecting to receive inside of your process()
callback.
TAck = unknown
refers to the typing of your ack response when calling api.ack()
inside of your job processor and is by default an unknown type. Setting TAck
also sets the typings for the ack
event.
TFail extends Error = Error
refers to the typing of your error object created and passed to api.fail()
inside of your job processor and defaults to the base Error
class. Setting TFail
also sets the typings for your fail
event.
TContext = Record<string, unknown>
refers to the context object available during processing, and is by default an empty object. The context is available inside of process()
as well as inside of event callbacks after the processing context is available (ack
, fail
, ping
, dead
, etc). A DefaultContext
is made available as a convienence for the Record definition.
Adding a Job to the Queue
await queue.enqueue({
ref: "sample-id",
/* TData */ payload: {
success: true,
},
});
enqueue()
API
queue.enqueue(job: JobDefinition<TData> | JobDefinition<TData>[])
job
the JSON Job object (or an array of job objects), consisting ofref?: string
an identifier for the job, allowing futureenqueue()
calls to replace the job with new data. Defaults to a v4 UUIDpayload: TData
the job's payload which will be saved and sent to the handlerrunAt?: Date
a date object describing when the job should run. Defaults tonow()
runEvery?: string | null
Either a cron interval or an ISO-8601 duration, ornull
to remove recurrencetimezone?: string | null
When usingrunEvery
, you can specify a timezone to make DocMQ aware of durations that cross date-modifying thresholds such as Daylight Savings Time; recommended when using cron and duration values outside of UTC.retries?: number
a number of tries for this job, defaults to5
retryStrategy?: RetryStrategy
a retry strategy, defaults toexponential
Retry Strategies
interface FixedRetryStrategy {
type: "fixed";
amount: number;
jitter?: number;
}
interface ExponentialRetryStrategy {
type: "exponential";
min: number;
max: number;
factor: number;
jitter?: number;
}
export interface LinearRetryStrategy {
type: "linear";
min: number;
max: number;
factor: number;
jitter?: number;
}
Handling Work (Processing)
queue.process(
async (job: TData, api: HandlerAPI<TAck, TFail, TContext>) => {
await api.ack();
},
{
/* options */
}
);
process()
Options
queue.process(handler: JobHandler<T, A, F>, config?: ProcessorConfig<C>)
handler
the job handler function, taking the jobT
and the api as arguments, returns a promiseconfig?: ProcessorConfig
an optional configuration for the processor includingpause?: boolean
should the processor wait to be started, defaultfalse
concurrency?: number
the number of concurrent processor loops to run, default1
visibility?: number
specify the visibility window (how long a job is held for by default) in seconds, default30
pollInterval?: number
as a fallback, define how often to check for new jobs in the event that driver does not support evented notifications. Defaults to5
createContext?: () => Promise<TContext> | TContext
generates a unique context of typeTContext
for this run. It will be available in the handler API.
api
Methods and Members
api.ref
(string
) the ref value of the jobapi.attempt
(number
) the attempt number for this jobapi.visible
(number
) the number of seconds this job was originally reserved forapi.context
(TContext
) the context object, generated for this runapi.ack(result: TAck)
acknowlegde the job, marking it complete, and scheduling future workapi.fail(reason: string | TFail)
fail the job and emit thereason
, scheduling a retry if requiredapi.ping(extendBy: number)
on a long running job, extend the runtime byextendBy
seconds
Events
The Queue
object has a large number of emitted events available through queue.events
. It extends EventEmitter
, and the most common events are below. Events related to the processing of a job (ack
, fail
, dead
, and ping
) will all receive context: TContext
as a second argument to the event callback
ack
when a job was acked successfullyfail
when a job was faileddead
when a job has exceeded its retries and was moved to the dead letter queuestats
an interval ping containing information about the queue's processing loadstart
,stop
when the queue starts and stops processinglog
,warn
,error
,halt
logging events from the queue
Managing Fatal Errors With halt
A fatal event occurs when DocMQ does not believe it can receover from a connection or processing issue. For example, if using the MongoDB driver and the Change Stream disconnects and the process cannot reconnect. To minimize the likelyhood of repeatedly processing jobs in this scenario, DocMQ will call its destroy
method and no longer accept jobs for processing. In addition to emitting a standard error
event, DocMQ will also emit a halt
event. In most environments, it's recommended to kill your service, allowing your PaaS provider to mark your service as unhealthy and restart it.
queue.on("halt", () => {
console.error("Received HALT from DocMQ");
process.exit(1); // exit code for termination of node.js
});
🔧 Custom Driver Support
DocMQ works with several drivers, many of which are included in the /drivers
directory. For development or non-production scenarios, we recommend the MemoryDriver
, an in-memory driver that supports all of DocMQ's apis. When transitioning to production, you can pass a production driver in and DocMQ will work with no additional changes.
import { Queue, MemoryDriver } from "docmq";
import { MongoDriver } from "docmq/driver/mongo";
// for example, using the MongoDriver in production, while using
// the less resource-intensive in-memory driver for development
const driver =
process.env.NODE_ENV === "production"
? new MongoDriver(process.env.MONGO_URI)
: new MemoryDriver("default");
const queue = new Queue(driver, "queueName");
| Driver | import
| Notes |
| :------------ | :------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| In Memory | import { MemoryDriver} from "docmq"
| The default in-memory driver and (currently) a re-export of LokiDriver
|
| LokiJS | import { LokiDriver } from "docmq/driver/loki"
| A fast in-memory driver designed for non-production instances. |
| MongoDB | import { MongoDriver } from "docmq/driver/mongo"
| Currently, DocMQ requires a Mongo Client >= 4.2 for transaction support, and the mongo instance must be running in a Replica Set. This is because MongoDriver uses the OpLog to reduce polling. Requires mongodb peer dependency if using |
| Postgres | import { PGDriver } from "docmq/driver/postgres"
| We are slowly expanding our PG Test Matrix based on what GitHub allows. LISTEN
/NOTIFY
support is not available, and the driver will fall back to polling. Requires pg as a peer dependency if using |
If you need to write a custom driver, the core BaseDriver
is available in the core docmq
package.
✏️ Contributing
We would love you to contribute to jakobo/docmq, pull requests are welcome! Please see the CONTRIBUTING.md for more information.
⚖️ License
DocMQ source is made available under the MIT license
❤️ Sponsor
This project is made possible thanks to Taskless. The in-memory driver of DocMQ enables Taskless to offer a local version of their service, eliminating the hassle of forwarding webhooks around.
✨ Contributors
Thanks goes to these wonderful people (emoji key):
This project follows the all-contributors specification. Contributions of any kind welcome!