@jjavery/oddjob
v0.15.0
Published
A job queue for Node.js applications
Downloads
10
Maintainers
Readme
oddjob
A job queue for Node.js applications
Why use a job queue? If your application needs to complete units of work outside of its main process, whether for reliability or scalability or both, it might benefit from a job queue.
Why use oddjob? If your stack already includes Node.js and MongoDB or one of the other supported databases, then oddjob might be a good fit for your project.
Why not use oddjob? It's beta quality! Not yet fully tested or used in production. There are many other high-quality options available.
Features
- Distributed - Multiple workers run from the same job queue, and multiple clients push jobs to the same job queue.
- Concurrency - Multiple workers run multiple jobs simultaneously.
- Persistence - Jobs run at least once (unless they expire prior to being run).
- Idempotency - Under most circumstances, unique jobs run no more than once, even if multiple clients push the same unique job into the queue.
- Recurrences - Jobs can be scheduled to run on multiple dates and times using cron expressions.
- Schedule - Jobs can be scheduled to run no sooner than a specific date and time.
- Expiration - Jobs can be scheduled to run no later than a specific date and time.
- Delay - Jobs can be scheduled to run after a time delay.
- Retries - Failed jobs are retried a limited number of times.
- Locking - Workers lock jobs prior to running them. Jobs that do not complete prior to the timeout are re-run. Workers can update a job's lock to continue to hold it past its initial timeout.
- Priority - Jobs can be run before or after other jobs with the same eligiblity.
- Messages - Jobs carry application-defined messages from clients to workers.
- Logging - Workers can write log messages to a job's log stream.
- Results - Workers can return a result to store with a job.
- Types - Support for multiple job types in the same job queue.
- Events - Job queues are event emitters and emit events when various actions occur.
- Promises - Promise-based API (async/await).
- Metadata - Jobs record the hostname & PID of clients and workers, and the timing of job events.
- Plugins - Pluggable data layer for various database systems (MongoDB, SQLite, etc.)
Installation
Install with NPM
$ npm install @jjavery/oddjob
$ npm install @jjavery/oddjob-mongodb # or oddjob-sqlite etc.
You will also need a compatible database server to store your jobs, logs, results, etc.
Example
worker.js:
// Get a reference to the JobQueue class
const { JobQueue } = require('@jjavery/oddjob');
// A module that sends emails
const email = require('./email');
// Create an instance of a JobQueue
const jobQueue = new JobQueue('mongodb://localhost:27017/oddjob');
// Tell the JobQueue to handle jobs of type 'send-email' with the provided
// async function. Concurrency is set to handle up to four jobs of this type
// simultaneously.
jobQueue.handle('send-email', { concurrency: 4 }, async (job, onCancel) => {
const { message } = job;
// Send the email. If an exception is thrown, it will be written to the job
// log for this job.
const result = await email.send(message);
// Write to the job log for this job
job.log(`Email sent`);
// Return the result. The return value, if any, will be stored with the job.
return result;
});
// Handle errors
jobQueue.on('error', (err) => {
console.error(err);
});
// Start the JobQueue
jobQueue.start();
client.js:
// Get references to the JobQueue and Job classes
const { JobQueue, Job } = require('@jjavery/oddjob');
// Create an instance of a JobQueue. Connects to localhost by default.
const jobQueue = new JobQueue('mongodb://localhost:27017/oddjob');
async function sendEmail() {
// Push a new Job into the JobQueue
await jobQueue.push(
new Job('send-email', {
from: '[email protected]',
to: '[email protected]',
subject: 'This is an example',
text: 'Hi Someone, How do you like my example? -Someone Else'
})
);
}
async function disconnect() {
// Disconnect from the database
await jobQueue.disconnect();
}
API Reference
JobQueue ⇐ EventEmitter
Provides access to a job queue
Extends: EventEmitter
- JobQueue ⇐ EventEmitter
- new JobQueue(uri, options)
- .concurrency : number
- .timeout : number
- .idleSleep : number
- .activeSleep : number
- .running : number
- .isSaturated : boolean
- .connect()
- .disconnect()
- .push(job) ⇒ boolean
- .proxy(type, defaultOptions) ⇒ function
- .cancel(options) ⇒ Job
- .handle(type, options, fn)
- .start()
- .pause()
- .stop()
- "error"
- "handlerError"
- "connect"
- "disconnect"
- "push"
- "handle"
- "start"
- "pause"
- "stop"
- "beforeRun"
- "afterRun"
- "timeout"
- "cancel"
new JobQueue(uri, options)
| Param | Type | Default | Description |
| --- | --- | --- | --- |
| uri | | | |
| options | Object | {}
| Optional parameters |
| options.concurrency | number | 10
| Maximum number of jobs that may run concurrently |
| options.timeout | number | 60
| Seconds to wait before a running job is considered timed-out and eligible for retry or failure |
| options.idleSleep | number | 1000
| Milliseconds to sleep after completing a run loop when no jobs are acquired |
| options.activeSleep | number | 10
| Milliseconds to sleep after completing a run loop when a job is acquired |
| options.connect | boolean | true
| Whether to connect to the database immediately |
| options.connectOptions | Object | | Options to pass along to the database connector |
jobQueue.concurrency : number
Maximum number of jobs that may run concurrently
jobQueue.timeout : number
Seconds to wait before a running job is considered timed-out and eligible for retry or failure
jobQueue.idleSleep : number
Milliseconds to sleep after completing a run loop when no jobs are acquired
jobQueue.activeSleep : number
Milliseconds to sleep after completing a run loop when a job is acquired
jobQueue.running : number
Number of jobs that are currently running
jobQueue.isSaturated : boolean
Whether the number of jobs currently running is equal to the maximum concurrency
jobQueue.connect()
Establish a connection to the database server
jobQueue.disconnect()
Disconnect from the database server
jobQueue.push(job) ⇒ boolean
Push a job into the job queue
Returns: boolean - - Returns true if a new job was pushed, or false if the job already exists (based on id or unique_id)
| Param | Type | Description | | --- | --- | --- | | job | Job | The job to push into the queue |
jobQueue.proxy(type, defaultOptions) ⇒ function
Creates a proxy function that will push a new job when called
| Param | Type | Default | Description |
| --- | --- | --- | --- |
| type | string | | The job type. Only jobs of this type will be passed to the handle function. |
| defaultOptions | Object | {}
| Optional parameters sent to each Job constructor |
jobQueue.cancel(options) ⇒ Job
Cancel a job if it exists in the job queue. Must provide one id or unique_id param. If both are provided, id is used and unique_id is ignored.
| Param | Type | Default | Description |
| --- | --- | --- | --- |
| options | * | {}
| Optional parameters |
| options.id | * | | ID of job to cancel |
| options.unique_id | * | | Unique ID of job to cancel |
jobQueue.handle(type, options, fn)
Configure the job queue to handle jobs of a particular type
| Param | Type | Default | Description |
| --- | --- | --- | --- |
| type | string | | The job type. Only jobs of this type will be passed to the handle function. |
| options | Object | {}
| Optional parameters |
| options.concurrency | number | 1
| Maximum number of jobs that this handler may run concurrently |
| fn | function | | An async function that takes a single job as its parameter |
jobQueue.start()
Starts the job queue
jobQueue.pause()
Pauses the job queue
jobQueue.stop()
Stops the job queue
"error"
Emitted when an error is thrown in the constructor or run loop.
"handlerError"
Emitted when an error is thrown by a handler.
"connect"
Emitted when the job queue is connected to the database.
"disconnect"
Emitted when the job queue has disconnected from the database.
"push"
Emitted when a job has been pushed into the job queue.
"handle"
Emitted when a job has been passed to a handler.
"start"
Emitted when the job queue starts its run loop.
"pause"
Emitted when the job queue pauses its run loop.
"stop"
Emitted when the job queue stops its run loop.
"beforeRun"
Emitted before a job runs.
"afterRun"
Emitted after a job runs.
"timeout"
Emitted when a job times out and is canceled.
"cancel"
Emitted when a job is canceled.
Job
Provides access to the properties and methods needed to define a job
- Job
- new Job(type, message, options)
- instance
- .id : string
- .type : string
- .message : any
- .unique_id : string
- .recurring : string
- .scheduled : Date
- .expire : Date
- .retries : number
- .try : number
- .priority : number
- .acquired : Date
- .timeout : Date
- .isComplete : boolean
- .hasTimedOut : boolean
- .hasExpired : boolean
- .hasError : boolean
- .canRetry : boolean
- .updateTimeout(seconds)
- .log(level, message)
- .error(error)
- .warn(message)
- .info(message)
- .debug(message)
- .readLog(skip, limit)
- .readResult()
- static
- .load(id) ⇒ Job
new Job(type, message, options)
| Param | Type | Default | Description |
| --- | --- | --- | --- |
| type | string | | The job type |
| message | any | | Application-defined message to pass to the job handler |
| options | Object | {}
| Optional parameters |
| options.unique_id | string | | Unique ID of the job |
| options.recurring | string | | Cron expression |
| options.scheduled | Date | now
| Date and time after which the job will run |
| options.expire | Date | | Date and time after which the job will no longer run |
| options.retries | number | 2
| Number of times to retry on failure |
| options.priority | number | 0
| Priority of the job |
| options.delay | number | 0
| Number of seconds to delay run |
job.id : string
Job ID
job.type : string
Job type
job.message : any
Application-defined message to pass to the job handler
job.unique_id : string
Unique ID of the job
job.recurring : string
Cron expression
job.scheduled : Date
Date and time after which the job will run
job.expire : Date
Date and time after which the job will no longer run
job.retries : number
Number of times to retry on failure
job.try : number
The current number of times the job has been tried
job.priority : number
Priority of the job
job.acquired : Date
Date and time that the job was acquired (locked) by the job queue
job.timeout : Date
Date and time when the job's lock will expire
job.isComplete : boolean
Has the handler completed the job?
job.hasTimedOut : boolean
Has the job's lock timed out?
job.hasExpired : boolean
Has the job expired?
job.hasError : boolean
Has the job's handler thrown an exception?
job.canRetry : boolean
Is the job eligible to be retried?
job.updateTimeout(seconds)
Update the job's lock timeout
| Param | Type | Description | | --- | --- | --- | | seconds | number | The number of seconds to lock the job |
job.log(level, message)
Write to the job's log
| Param | Type | Default | Description |
| --- | --- | --- | --- |
| level | string | "info"
| The log level |
| message | any | | The message to log |
job.error(error)
Write to the job's log with level = "error"
| Param | Type | Description | | --- | --- | --- | | error | any | The error to log |
job.warn(message)
Write to the job's log with level = "warn"
| Param | Type | Description | | --- | --- | --- | | message | any | The message to log |
job.info(message)
Write to the job's log with level = "info"
| Param | Type | Description | | --- | --- | --- | | message | any | The message to log |
job.debug(message)
Write to the job's log with level = "debug"
| Param | Type | Description | | --- | --- | --- | | message | any | The message to log |
job.readLog(skip, limit)
Retrieve the job's log from the database
| Param | Type | Default | Description |
| --- | --- | --- | --- |
| skip | number | 0
| The number of log messages to skip |
| limit | number | 100
| The maximum number of log messages to return |
job.readResult()
Retrieve the job's result from the database
Job.load(id) ⇒ Job
Load a Job from the database using the job's ID
| Param | Type | Description | | --- | --- | --- | | id | string | Job ID of the job to be loaded |
Copyright © 2022 James P. Javery @jjavery