cuckmq
v0.3.10
Published
A lightweight postgres-backed job queue
Downloads
349
Readme
A lightweight, configurable job-queue backed by postgresql, offering an alternative to the redis-backed bullmq
.
Core Features:
- built-in type safety
- repeating/scheduled jobs
- retryable jobs
Job Lifecycle
To fully understand the below documentation, it is imperative that the "Job Lifecycle" is understood. The different states that jobs can be in are described below:
Active
- Active jobs can be dequeued by workers for processing.Locked
- Locked jobs cannot be dequeued by workers for processing, but will eventually return to an active state.Finalized
- Finalized jobs also cannot be dequeued by workers for processing. Unlike locked jobs, A finalized job is in its terminal state and will not return to being active.
Jobs transition between these states in the following ways:
- Jobs start in an active state when they are initially enqueued.
- Upon dequeuing, jobs a move into a locked state, preventing other workers from also dequeuing them.
- Jobs that are processed successfully by workers move into the finalized state.
Depending on job-specific configuration:
- Locked jobs move back to an active state after a timeout (
liberateSecs
). - Active jobs are finalized, regardless of whether they have been successfully processed or not (
reapSecs
).
Depending on global configuration:
- Finalized jobs are eventually deleted (
sweepSecs
).
Installation
Install the package with:
yarn add cuckmq
Setup
We must ensure the database is ready to persist job state. This is achieved by calling the generateMigrationSql(schema : string) : string[]
function. This will generate an array of SQL fragments that can be run as part of your app's standard migration process. N.B. These SQL fragments are idempotent - thus can safely be re-run.
Usage
To begin using cuckmq
, we must first create the core Context
object:
import { Context } from "cuckmq"
import process from "process"
// N.B. make sure the schema matches the schema used in the above migration!
const context = new Context({ pool, schema: "_cuckmq" })
Context#constructor
| Parameter | Type | Required | Default | Description |
| --------- | ---- | ----------- | ------------- | ----------- |
| pool
| pg.Pool
| yes | N/A | a pg
connection pool |
| schema
| string
| yes | N/A | The schema in which cuckmq
persists job data |
| eventHandler
| (event : CuckEvent) => void
| no | null
| A handler that receives typed events (see below) from cuckmq
|
| sweepSecs
| number
| no | 7 days
| The amount of time finalized jobs exist in the database before being deleted |
Next, lets start defining definitions of various jobs that we would like cuckmq
to run for us. This is achieved by creating JobDefinition
objects and registering them with the context
:
import { JobDefinition } from "cuckmq"
export const pingJob = new JobDefinition({
name: "ping",
context: context
jobFunction: async (payload : { message : string }, metadata: { jobId : string }) => {
console.log(payload.message)
}
}).register()
JobDefinition#constructor
| Parameter | Type | Required | Default | Description |
| --------- | ---- | ----------- | ------------- | ----------- |
| name
| string
| yes | N/A | A globally unique name for the job definition |
| context
| cuckmq.Context
| yes | N/A | A reference to the previously created Context
object |
| channel
| string
| no | _default
| The channel that jobs are published to. Workers will only process jobs from channels they are subscribed to |
| repeatSecs
| number
| no | null
| The presence of this value specifies the job will be scheduled to run repeatedly with the interval specified - N.B. if this value is set, jobFunction
can only receive an empty payload |
| reapSecs
| number
| no | 86_400 | The amount of time since creation before jobs are finalized, regardless of whether they successfully processed or not |
| liberateSecs
| number
| no | <reapSecs>
| The amount of time a locked job has to wait before transitioning back to an active state. By default, this value is set to match reapSecs
- ensuring a job is only attempted once before finalization |
Deploying
Once our jobs are defined, we must deploy the context - persisting info about each defined job definition. We achieve this by simply calling:
await context.deploy()
N.B. The enqueueing, dequeueing, processing of jobs and various maintenace tasks are all implicitly deferred until after the context deployment has completed.
Processing Jobs
We can create Worker
objects to process enqueued jobs:
import { Worker } from "cuckmq"
const worker = new Worker({ context })
Worker#constructor
| Parameter | Type | Required | Default | Description |
| --------- | ---- | ----------- | ------------- | ----------- |
| context
| cuckmq.Context
| yes | N/A | A reference to the previously created Context
object |
| pollSecs
| number
| no | 2 secs
| The amount of time a worker waits before checking for new jobs |
| channels
| string[]
| no | ["_default"]
| The channels that the worker is subscribed to |
| concurrency
| number
| no | 1
| The number of jobs the worker will process concurrently |
N.B. Concurrency can be achieved in multiple ways:
- Multiple processes can run a
Worker
instance. - A single process can run multiple
Worker
instances. - A single
Worker
instance can process multiple jobs concurrently via theconcurrency
constructor parameter.
CuckMQ workers poll the DB for new work. Thus, for tasks requiring large degrees of concurrency, it is advised to use option (3), as to prevent unnecessary hammering of the database.
Maintenance
We also need to have at least one Orchestrator
running, which takes care of all maintenance tasks:
- Scheduling - enqueuing jobs that have been marked as repeating.
- Liberating - moving locked jobs back into an active state after
JobDefinition#liberateSecs
. - Reaping - moving active jobs into a finalized state after
JobDefinition#reapSecs
. - Sweeping - deleting finalized jobs after
Context#sweepSecs
.
import { Orchestrator } from "cuckmq"
const orchestrator = new Orchestrator({ context })
Orchestrator#constructor
| Parameter | Type | Required | Default | Description |
| --------- | ---- | ----------- | ------------- | ----------- |
| context
| cuckmq.Context
| yes | N/A | A reference to the previously created Context
object |
| reapPollSecs
| number
| no | 5 mins
| The time interval between checking for old active jobs that are due to be finalized |
| schedulePollSecs
| number
| no | 5 secs
| The time interval between checking for new repeating jobs that are due to be scheduled |
| sweepPollSecs
| number
| no | 5 mins
| The time interval between deleting old finalized that have been finalized for longer than Context#sweepSecs
|
| liberatePollSecs
| number
| no | 30 secs
| The time interval between checking for locked jobs that are due to be unlocked and retried (according to JobDefinition#liberateSecs
).
N.B. If a job is due to be "reaped", workers will not dequeue it. This prevents scenarios where a job that should've been finalized manages to be processed before the Reaper can get to it.
Shutting down
Once we are ready to shut down, we can call await worker.stop()
. The worker will finish the job it is currently processing (should one exist) and them immediately stop.
Event Handling
cuckmq
produces a large amount of events that can be consumed via an optional EventHandler
that can be passed into the Context
object. Details about all event types are below:
| Key | Description |
| --- | ----------- |
| JOB_DEFINITION_REGISTER
| A JobDefinition
has been registered with the Context
|
| JOB_DEFINITION_DEPLOY
| A JobDefinition
has been persisted to the database |
| JOB_DEFINITION_JOB_ENQUEUE
| A job has been enqueued and persisted in the database |
| WORKER_JOB_DEQUEUE
| A worker has dequeued a job for processing |
| WORKER_JOB_ORPHAN
| A dequeued job did not find a corresponding JobDefinition
registered with the Context
|
| WORKER_JOB_RUN
| A job has started to run on a worker |
| WORKER_JOB_RUN_ORPHAN
| A job has been dequeued but has no corresponding JobDefinition
to run |
| WORKER_JOB_RUN_ERROR
| A job threw an error during processing |
| WORKER_JOB_RUN_SUCCESS
| A job was sucessfully processed and was subsequently finalized |
| ORCHESTRATOR_JOB_REAP
| A job was finalized after existing for too long |
| ORCHESTRATOR_JOB_LIBERATE
| A job was unlocked after being locked for JobDefinition#liberateSecs
|
| ORCHESTRATOR_JOB_SWEEP
| A finalized job was deleted after being finalized for longer than Context#sweepSecs
|
| ORCHESTRATOR_JOB_SCHEDULE
| A repeating job is due to be enqueued |
| ORCHESTRATOR_JOB_SCHEDULE_ORPHAN
| A repeating job was due to be enqueued but there was no corresponding JobDefinition
registered with the Context
|