reactive-job-queue
v0.3.1
Published
A reactive, reliable, job queue which includes state transitions. Backed by redis
Downloads
13
Readme
Reactive Job Queue
A Reactive job queue backed by Redis. This Job queue provides guarantees
(as much as Redis can provide) about the loss of job data. The job state
atomically changes in the database from queued
, to processing
to
complete
so the data is always available in the database.
Get Started
npm install reactive-job-queue
These examples assume you have a Redis server running on localhost on the
standard port. You can configure port
and host
using the constructor, or
the constructor will pick up the values in the REDIS_PORT
and REDIS_HOST
environment variables.
If you have docker installed, you can source activate_docker.sh
to start a
Redis server in a docker instance and set up the appropriate REDIS_PORT
and
REDIS_HOST
environment variables automatically.
Example: Producer
var JobQueue = require('reactive-job-queue');
var q = new JobQueue({ queuename: 'myjobqueue', port: 6379, host: '0.0.0.0' });
q.send({"name": "test", "job": "data"}, function(error, result) {
if (error) {
// Should re-send or handle, if there was not an error.
// the data was added to the queue and it is safe to continue
}
});
Example: Consumer/Job processor
var JobQueue = require('reactive-job-queue');
var q = new JobQueue({ queuename: 'myjobqueue', port: 6379, host: '0.0.0.0' });
q.registerProcessor(function(data) {
yourProcessDataFunction(data, function(error, complete) {
if (!error) {
q.notifyJobComplete(data, function(error, data) {
if (!error) {
console.log("Processing complete!");
}
});
}
});
});
API
new ReactiveJobQueue(options)
Creates a new ReactiveJobQueue.
options
- (Object) Settings for this Job Queue, must be set as some members are mandatoryqueuename
- (String) The name of the queue to put/receive jobs to/from.port
- (Integer|Optional) The redis port to connect to (uses env varREDIS_PORT
if not set).host
- (String|Optional) The port or hostname of the redis server (uses env varREDIS_HOST
if not set).concurrency
- (Integer|Optional) The number of jobs to process at any time.redis
- (Object|Optional) The instance of the redis client to use.
send(job, callback)
Send a new Job to the queue.
job
- (Object) the job to send to the queue as an object.callback
- (Function) called once the data has been added to the queue, if the addition of the data failed the error argument is set. Takes two arguments: (error, result)
registerProcessor(callback)
Register a function to process items on the queue as they arrive. Only one function can be used to process items coming from the queue. Only the first registered function will be used, everything else will be ignored.
callback
- (Function) the function to call with the job data from the queue. This should accept a data argument: (data). The data is the object sent from the client with an additional__reactive_job_id
property as a 'uuid'.
notifyJobComplete(job, callback)
Notify the job queue that a job has been processed successfully. The job object must be identical to the job received from the queue in the processor function.
job
- (Object) the job to move into the complete statecallback
- (Function) the callback to call when this operation completes.
waitQueueLength(callback)
Get the number of items in the queue waiting to be processed.
callback
- (Function) the callback to call with the data, the callback should accept an error argument and a data argument. The data argument will be an integer indicating the number of items
TODO
- Promises API
- Provide more generic job state transition.
- Provide mechanisms to recover jobs from each state.
License
Samuel Giles - MIT Licensed