mongo-qd
v0.0.0
Published
Priority Job Queue with MongoDB. A `qd` implementation.
Downloads
10
Readme
mongo-qd
Priority Job Queue with MongoDB. A qd
implementation.
Reactive, and easily extensible.
Introduction
First we need an instance of Db()
from Mongo's native driver. Let's assume we already have it, and
it's set as db
.
Creating jobs
var MongoQd = require('mongo-qd').MongoQd;
var qd = MongoQd(db);
var q = qd.queue('myFirstQueue');
q.job('my first job', {'_this': 'is', my: 'payload'})
.priority('high')
.attempts(3)
.on('complete', function(result){console.log('Result', result)});
Pulling Jobs
Meanwhile on some other machine...
var MongoQd = require('mongo-qd');
var qd = MongoQd(db);
var q = qd.queue('myFirstQueue');
q.pull(function (err, job) {
if (job == null) return;
// Use whatever async function you want
someAsyncTodo(job.payload, function (err, result) {
if (err) return job.fail(err);
job.complete(result);
});
});
Features
- Abstract and easily extensible with other modules
- Delayed Jobs
- Job event and progress pubsub
- Optional retries with backoff
Usage
var MongoQd = require('mongo-qd').MongoQd;
var qd = MongoQd(db, opts)
Create an instance of MongoQd.
Args:
db
- An instance of Db() from Mongo's native driver.opts
- Object with options.
Options:
ns
- (Default: 'qd'). The prefix for the collection names that would be generated by this module.separator
- (Default: ':'). The separator to use for collection names.priorities
- (Default: would be documented later.). Json object to define the available priorities.
Events:
error
- All errors would be redirected to the main client.
var queue = qd.queue(name)
Get a reference to a certain queue.
Returns a Queue
instance.
Args:
name
- The name of the queue you're referencing.
Events:
newJob
- A new job has just been created. The first arg is a reference the new job.pulledJob
- A job has just been pulled. The first arg is a reference to the pulled job.
var newJob = queue.job(name, payload)
Create a new job. The job would be persisted on the next tick, so you'll be able to set some stuff with NewJob's
methods.
Returns a NewJob
instance.
Args:
name
- A string that represents the name of the job.payload
- Any type of struct that will represent that params of the job.
Events:
complete
- Fired when the job is complete. First arg is the result.failed
- Fired when the job fails. First arg is the reason for failure.failedAttempt
- Fired when the job fails, but has more attempts. First arg is the reason for failure.progress
- Fired when a worker wants to signal on a progress. First arg iscompleted
, second arg istotal
amount of work.
newJob.priority(priority)
Define the priority of the job.
Returns itself.
priority
is a string that should be mapped into a number, using a priority map.
A priority map can be defined in opts of the Qd
instance.
Here is the default priority map:
{
low: 10,
normal: 0,
medium: -5,
high: -10,
critical: -15
}
newJob.delay(delay)
Delay the processing of the job.
Returns self.
delay
is defined in ms
.
newJob.attempts(attempts)
Define how many times this job could be restarted after a failure.
Returns itself.
newJob.backoff(backoff)
Backoff a little bit if the job fails.
Returns itself.
backoff
might be one of the following values:
true
- When it's true, the job would be rescheduled after waitingdelay
ms, wheredelay
is the same value that was defined usingNewJob#delay
.{ type: 'fixed', delay: X }
- Reschedule the job afterX
amount of ms.{ type: 'exponential', delay: ?X }
- The delay between failed jobs will grow exponentially, as more the job keeps failing. WhereX
is the base for the exponential delay. If you setNewJob#delay
, you might ignoreX
.
queue.pull(function (err, pulledJob) {...})
Pull a job from the queue.
pulledJob
is an instance of PulledJob
.
Note: If there are no waiting
jobs to pull, pulledJob
would be null
. The async pull function could easily,
wait until a job will become available, because there is already a notification for that using Pub/Sub. However, the
decision was to keep this module as simple as possible. Such functionality could be easily extended with an external
module.
pulledJob.complete(result)
Finish processing a job, and mark it as complete.
result
is optional, and it can be any struct.
pulledJob.fail(error)
Mark the pulled job as failed
. If it has attempts left, it would be marked as failedAttempt
.
pulledJob.progress(completed, total)
Notify about some progress with the job.
completed
- Some number that indicates how many units have been completed.total
- The total amount of units to be completed.
install
With npm do:
npm install mongo-qd
license
MIT