npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

cuckmq

v0.3.10

Published

A lightweight postgres-backed job queue

Downloads

349

Readme

A lightweight, configurable job-queue backed by postgresql, offering an alternative to the redis-backed bullmq.

Core Features:

  • built-in type safety
  • repeating/scheduled jobs
  • retryable jobs

Job Lifecycle

To fully understand the below documentation, it is imperative that the "Job Lifecycle" is understood. The different states that jobs can be in are described below:

  • Active - Active jobs can be dequeued by workers for processing.
  • Locked - Locked jobs cannot be dequeued by workers for processing, but will eventually return to an active state.
  • Finalized - Finalized jobs also cannot be dequeued by workers for processing. Unlike locked jobs, A finalized job is in its terminal state and will not return to being active.

Jobs transition between these states in the following ways:

  • Jobs start in an active state when they are initially enqueued.
  • Upon dequeuing, jobs a move into a locked state, preventing other workers from also dequeuing them.
  • Jobs that are processed successfully by workers move into the finalized state.

Depending on job-specific configuration:

  • Locked jobs move back to an active state after a timeout (liberateSecs).
  • Active jobs are finalized, regardless of whether they have been successfully processed or not (reapSecs).

Depending on global configuration:

  • Finalized jobs are eventually deleted (sweepSecs).

Installation

Install the package with:

yarn add cuckmq

Setup

We must ensure the database is ready to persist job state. This is achieved by calling the generateMigrationSql(schema : string) : string[] function. This will generate an array of SQL fragments that can be run as part of your app's standard migration process. N.B. These SQL fragments are idempotent - thus can safely be re-run.

Usage

To begin using cuckmq, we must first create the core Context object:

import { Context } from "cuckmq"
import process from "process"

// N.B. make sure the schema matches the schema used in the above migration!
const context = new Context({ pool, schema: "_cuckmq" })

Context#constructor

| Parameter | Type | Required | Default | Description | | --------- | ---- | ----------- | ------------- | ----------- | | pool | pg.Pool | yes | N/A | a pg connection pool | | schema | string | yes | N/A | The schema in which cuckmq persists job data | | eventHandler | (event : CuckEvent) => void | no | null | A handler that receives typed events (see below) from cuckmq| | sweepSecs | number | no | 7 days | The amount of time finalized jobs exist in the database before being deleted |

Next, lets start defining definitions of various jobs that we would like cuckmq to run for us. This is achieved by creating JobDefinition objects and registering them with the context:

import { JobDefinition } from "cuckmq"

export const pingJob = new JobDefinition({
    name: "ping",
    context: context
    jobFunction: async (payload : { message : string }, metadata: { jobId : string }) => {
        console.log(payload.message)
    }
}).register()

JobDefinition#constructor

| Parameter | Type | Required | Default | Description | | --------- | ---- | ----------- | ------------- | ----------- | | name | string | yes | N/A | A globally unique name for the job definition | | context | cuckmq.Context | yes | N/A | A reference to the previously created Context object | | channel | string | no | _default | The channel that jobs are published to. Workers will only process jobs from channels they are subscribed to | | repeatSecs | number | no | null | The presence of this value specifies the job will be scheduled to run repeatedly with the interval specified - N.B. if this value is set, jobFunction can only receive an empty payload | | reapSecs | number | no | 86_400 | The amount of time since creation before jobs are finalized, regardless of whether they successfully processed or not | | liberateSecs | number | no | <reapSecs> | The amount of time a locked job has to wait before transitioning back to an active state. By default, this value is set to match reapSecs - ensuring a job is only attempted once before finalization |

Deploying

Once our jobs are defined, we must deploy the context - persisting info about each defined job definition. We achieve this by simply calling:

await context.deploy()

N.B. The enqueueing, dequeueing, processing of jobs and various maintenace tasks are all implicitly deferred until after the context deployment has completed.

Processing Jobs

We can create Worker objects to process enqueued jobs:

import { Worker } from "cuckmq"

const worker = new Worker({ context })

Worker#constructor

| Parameter | Type | Required | Default | Description | | --------- | ---- | ----------- | ------------- | ----------- | | context | cuckmq.Context | yes | N/A | A reference to the previously created Context object | | pollSecs | number | no | 2 secs | The amount of time a worker waits before checking for new jobs | | channels | string[] | no | ["_default"] | The channels that the worker is subscribed to | | concurrency | number | no | 1 | The number of jobs the worker will process concurrently |

N.B. Concurrency can be achieved in multiple ways:

  1. Multiple processes can run a Worker instance.
  2. A single process can run multiple Worker instances.
  3. A single Worker instance can process multiple jobs concurrently via the concurrency constructor parameter.

CuckMQ workers poll the DB for new work. Thus, for tasks requiring large degrees of concurrency, it is advised to use option (3), as to prevent unnecessary hammering of the database.

Maintenance

We also need to have at least one Orchestrator running, which takes care of all maintenance tasks:

  1. Scheduling - enqueuing jobs that have been marked as repeating.
  2. Liberating - moving locked jobs back into an active state after JobDefinition#liberateSecs.
  3. Reaping - moving active jobs into a finalized state after JobDefinition#reapSecs.
  4. Sweeping - deleting finalized jobs after Context#sweepSecs.
import { Orchestrator } from "cuckmq"

const orchestrator = new Orchestrator({ context })

Orchestrator#constructor

| Parameter | Type | Required | Default | Description | | --------- | ---- | ----------- | ------------- | ----------- | | context | cuckmq.Context | yes | N/A | A reference to the previously created Context object | | reapPollSecs | number | no | 5 mins | The time interval between checking for old active jobs that are due to be finalized | | schedulePollSecs | number | no | 5 secs | The time interval between checking for new repeating jobs that are due to be scheduled | | sweepPollSecs | number | no | 5 mins | The time interval between deleting old finalized that have been finalized for longer than Context#sweepSecs | | liberatePollSecs | number | no | 30 secs | The time interval between checking for locked jobs that are due to be unlocked and retried (according to JobDefinition#liberateSecs).

N.B. If a job is due to be "reaped", workers will not dequeue it. This prevents scenarios where a job that should've been finalized manages to be processed before the Reaper can get to it.

Shutting down

Once we are ready to shut down, we can call await worker.stop(). The worker will finish the job it is currently processing (should one exist) and them immediately stop.

Event Handling

cuckmq produces a large amount of events that can be consumed via an optional EventHandler that can be passed into the Context object. Details about all event types are below:

| Key | Description | | --- | ----------- | | JOB_DEFINITION_REGISTER | A JobDefinition has been registered with the Context | | JOB_DEFINITION_DEPLOY | A JobDefinition has been persisted to the database | | JOB_DEFINITION_JOB_ENQUEUE | A job has been enqueued and persisted in the database | | WORKER_JOB_DEQUEUE | A worker has dequeued a job for processing | | WORKER_JOB_ORPHAN | A dequeued job did not find a corresponding JobDefinition registered with the Context | | WORKER_JOB_RUN | A job has started to run on a worker | | WORKER_JOB_RUN_ORPHAN | A job has been dequeued but has no corresponding JobDefinition to run | | WORKER_JOB_RUN_ERROR | A job threw an error during processing | | WORKER_JOB_RUN_SUCCESS | A job was sucessfully processed and was subsequently finalized | | ORCHESTRATOR_JOB_REAP | A job was finalized after existing for too long | | ORCHESTRATOR_JOB_LIBERATE | A job was unlocked after being locked for JobDefinition#liberateSecs | | ORCHESTRATOR_JOB_SWEEP | A finalized job was deleted after being finalized for longer than Context#sweepSecs | | ORCHESTRATOR_JOB_SCHEDULE | A repeating job is due to be enqueued | | ORCHESTRATOR_JOB_SCHEDULE_ORPHAN | A repeating job was due to be enqueued but there was no corresponding JobDefinition registered with the Context |