npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@jjavery/oddjob

v0.15.0

Published

A job queue for Node.js applications

Downloads

10

Readme

oddjob

A job queue for Node.js applications

Why use a job queue? If your application needs to complete units of work outside of its main process, whether for reliability or scalability or both, it might benefit from a job queue.

Why use oddjob? If your stack already includes Node.js and MongoDB or one of the other supported databases, then oddjob might be a good fit for your project.

Why not use oddjob? It's beta quality! Not yet fully tested or used in production. There are many other high-quality options available.

Features

  • Distributed - Multiple workers run from the same job queue, and multiple clients push jobs to the same job queue.
  • Concurrency - Multiple workers run multiple jobs simultaneously.
  • Persistence - Jobs run at least once (unless they expire prior to being run).
  • Idempotency - Under most circumstances, unique jobs run no more than once, even if multiple clients push the same unique job into the queue.
  • Recurrences - Jobs can be scheduled to run on multiple dates and times using cron expressions.
  • Schedule - Jobs can be scheduled to run no sooner than a specific date and time.
  • Expiration - Jobs can be scheduled to run no later than a specific date and time.
  • Delay - Jobs can be scheduled to run after a time delay.
  • Retries - Failed jobs are retried a limited number of times.
  • Locking - Workers lock jobs prior to running them. Jobs that do not complete prior to the timeout are re-run. Workers can update a job's lock to continue to hold it past its initial timeout.
  • Priority - Jobs can be run before or after other jobs with the same eligiblity.
  • Messages - Jobs carry application-defined messages from clients to workers.
  • Logging - Workers can write log messages to a job's log stream.
  • Results - Workers can return a result to store with a job.
  • Types - Support for multiple job types in the same job queue.
  • Events - Job queues are event emitters and emit events when various actions occur.
  • Promises - Promise-based API (async/await).
  • Metadata - Jobs record the hostname & PID of clients and workers, and the timing of job events.
  • Plugins - Pluggable data layer for various database systems (MongoDB, SQLite, etc.)

Installation

Install with NPM

$ npm install @jjavery/oddjob
$ npm install @jjavery/oddjob-mongodb # or oddjob-sqlite etc.

You will also need a compatible database server to store your jobs, logs, results, etc.

Example

worker.js:

// Get a reference to the JobQueue class
const { JobQueue } = require('@jjavery/oddjob');

// A module that sends emails
const email = require('./email');

// Create an instance of a JobQueue
const jobQueue = new JobQueue('mongodb://localhost:27017/oddjob');

// Tell the JobQueue to handle jobs of type 'send-email' with the provided
// async function. Concurrency is set to handle up to four jobs of this type
// simultaneously.
jobQueue.handle('send-email', { concurrency: 4 }, async (job, onCancel) => {
  const { message } = job;

  // Send the email. If an exception is thrown, it will be written to the job
  // log for this job.
  const result = await email.send(message);

  // Write to the job log for this job
  job.log(`Email sent`);

  // Return the result. The return value, if any, will be stored with the job.
  return result;
});

// Handle errors
jobQueue.on('error', (err) => {
  console.error(err);
});

// Start the JobQueue
jobQueue.start();

client.js:

// Get references to the JobQueue and Job classes
const { JobQueue, Job } = require('@jjavery/oddjob');

// Create an instance of a JobQueue. Connects to localhost by default.
const jobQueue = new JobQueue('mongodb://localhost:27017/oddjob');

async function sendEmail() {
  // Push a new Job into the JobQueue
  await jobQueue.push(
    new Job('send-email', {
      from: '[email protected]',
      to: '[email protected]',
      subject: 'This is an example',
      text: 'Hi Someone, How do you like my example? -Someone Else'
    })
  );
}

async function disconnect() {
  // Disconnect from the database
  await jobQueue.disconnect();
}

API Reference

JobQueue ⇐ EventEmitter

Provides access to a job queue

Extends: EventEmitter

new JobQueue(uri, options)

| Param | Type | Default | Description | | --- | --- | --- | --- | | uri | | | | | options | Object | {} | Optional parameters | | options.concurrency | number | 10 | Maximum number of jobs that may run concurrently | | options.timeout | number | 60 | Seconds to wait before a running job is considered timed-out and eligible for retry or failure | | options.idleSleep | number | 1000 | Milliseconds to sleep after completing a run loop when no jobs are acquired | | options.activeSleep | number | 10 | Milliseconds to sleep after completing a run loop when a job is acquired | | options.connect | boolean | true | Whether to connect to the database immediately | | options.connectOptions | Object | | Options to pass along to the database connector |

jobQueue.concurrency : number

Maximum number of jobs that may run concurrently

jobQueue.timeout : number

Seconds to wait before a running job is considered timed-out and eligible for retry or failure

jobQueue.idleSleep : number

Milliseconds to sleep after completing a run loop when no jobs are acquired

jobQueue.activeSleep : number

Milliseconds to sleep after completing a run loop when a job is acquired

jobQueue.running : number

Number of jobs that are currently running

jobQueue.isSaturated : boolean

Whether the number of jobs currently running is equal to the maximum concurrency

jobQueue.connect()

Establish a connection to the database server

jobQueue.disconnect()

Disconnect from the database server

jobQueue.push(job) ⇒ boolean

Push a job into the job queue

Returns: boolean - - Returns true if a new job was pushed, or false if the job already exists (based on id or unique_id)

| Param | Type | Description | | --- | --- | --- | | job | Job | The job to push into the queue |

jobQueue.proxy(type, defaultOptions) ⇒ function

Creates a proxy function that will push a new job when called

| Param | Type | Default | Description | | --- | --- | --- | --- | | type | string | | The job type. Only jobs of this type will be passed to the handle function. | | defaultOptions | Object | {} | Optional parameters sent to each Job constructor |

jobQueue.cancel(options) ⇒ Job

Cancel a job if it exists in the job queue. Must provide one id or unique_id param. If both are provided, id is used and unique_id is ignored.

| Param | Type | Default | Description | | --- | --- | --- | --- | | options | * | {} | Optional parameters | | options.id | * | | ID of job to cancel | | options.unique_id | * | | Unique ID of job to cancel |

jobQueue.handle(type, options, fn)

Configure the job queue to handle jobs of a particular type

| Param | Type | Default | Description | | --- | --- | --- | --- | | type | string | | The job type. Only jobs of this type will be passed to the handle function. | | options | Object | {} | Optional parameters | | options.concurrency | number | 1 | Maximum number of jobs that this handler may run concurrently | | fn | function | | An async function that takes a single job as its parameter |

jobQueue.start()

Starts the job queue

jobQueue.pause()

Pauses the job queue

jobQueue.stop()

Stops the job queue

"error"

Emitted when an error is thrown in the constructor or run loop.

"handlerError"

Emitted when an error is thrown by a handler.

"connect"

Emitted when the job queue is connected to the database.

"disconnect"

Emitted when the job queue has disconnected from the database.

"push"

Emitted when a job has been pushed into the job queue.

"handle"

Emitted when a job has been passed to a handler.

"start"

Emitted when the job queue starts its run loop.

"pause"

Emitted when the job queue pauses its run loop.

"stop"

Emitted when the job queue stops its run loop.

"beforeRun"

Emitted before a job runs.

"afterRun"

Emitted after a job runs.

"timeout"

Emitted when a job times out and is canceled.

"cancel"

Emitted when a job is canceled.

Job

Provides access to the properties and methods needed to define a job

new Job(type, message, options)

| Param | Type | Default | Description | | --- | --- | --- | --- | | type | string | | The job type | | message | any | | Application-defined message to pass to the job handler | | options | Object | {} | Optional parameters | | options.unique_id | string | | Unique ID of the job | | options.recurring | string | | Cron expression | | options.scheduled | Date | now | Date and time after which the job will run | | options.expire | Date | | Date and time after which the job will no longer run | | options.retries | number | 2 | Number of times to retry on failure | | options.priority | number | 0 | Priority of the job | | options.delay | number | 0 | Number of seconds to delay run |

job.id : string

Job ID

job.type : string

Job type

job.message : any

Application-defined message to pass to the job handler

job.unique_id : string

Unique ID of the job

job.recurring : string

Cron expression

job.scheduled : Date

Date and time after which the job will run

job.expire : Date

Date and time after which the job will no longer run

job.retries : number

Number of times to retry on failure

job.try : number

The current number of times the job has been tried

job.priority : number

Priority of the job

job.acquired : Date

Date and time that the job was acquired (locked) by the job queue

job.timeout : Date

Date and time when the job's lock will expire

job.isComplete : boolean

Has the handler completed the job?

job.hasTimedOut : boolean

Has the job's lock timed out?

job.hasExpired : boolean

Has the job expired?

job.hasError : boolean

Has the job's handler thrown an exception?

job.canRetry : boolean

Is the job eligible to be retried?

job.updateTimeout(seconds)

Update the job's lock timeout

| Param | Type | Description | | --- | --- | --- | | seconds | number | The number of seconds to lock the job |

job.log(level, message)

Write to the job's log

| Param | Type | Default | Description | | --- | --- | --- | --- | | level | string | "info" | The log level | | message | any | | The message to log |

job.error(error)

Write to the job's log with level = "error"

| Param | Type | Description | | --- | --- | --- | | error | any | The error to log |

job.warn(message)

Write to the job's log with level = "warn"

| Param | Type | Description | | --- | --- | --- | | message | any | The message to log |

job.info(message)

Write to the job's log with level = "info"

| Param | Type | Description | | --- | --- | --- | | message | any | The message to log |

job.debug(message)

Write to the job's log with level = "debug"

| Param | Type | Description | | --- | --- | --- | | message | any | The message to log |

job.readLog(skip, limit)

Retrieve the job's log from the database

| Param | Type | Default | Description | | --- | --- | --- | --- | | skip | number | 0 | The number of log messages to skip | | limit | number | 100 | The maximum number of log messages to return |

job.readResult()

Retrieve the job's result from the database

Job.load(id) ⇒ Job

Load a Job from the database using the job's ID

| Param | Type | Description | | --- | --- | --- | | id | string | Job ID of the job to be loaded |


Copyright © 2022 James P. Javery @jjavery