tasqueue
v1.1.1
Published
Node.js job worker library using disque
Downloads
8
Readme
tasqueue
Promise-based Node.js job/task-queue library using disque.
How it works
- Create a client
- Register new job types handlers
- Push jobs
Tasqueue is a job/task-queue library based on disque and using Q. It aims to be simple, fast and to handle a high charge.
Monitoring functions of Tasqueue can only be entrusted when using a single-node instance of disque.
Create a client
var Tasqueue = require('tasqueue');
// Default options
var opts = {
authPass: null, // AUTH password for disque-server
host: 'localhost', // disque-server host
port: 7711, // disque-server port
pollDelay: 1000 * 15, // Polling delay in ms when no workers are available
jobTimeout: 1000 * 60 * 60, // Timeout in ms before a job is considered as failed
failedTTL: 60 * 60 * 24, // Failed jobs TTL in sec
completedTTL: 60 * 60 * 24, // Completed jobs TTL in sec
queuedTTL: 60 * 60 * 24, // Queued jobs TTL in sec
activeTTL: 60 * 60 * 1, // Active job TTL in sec
maxAttempts: 60, // Max reconnection attempts
retryMaxDelay: 1000 * 60 // Prevent exponential reconnection delay
};
var tasqueue = new Tasqueue(opts);
Queue API
tasqueue.init()
Async: Initialize the client.
Example
tasqueue.init()
.then(function() {
// Start working
}, function(err) {
// Connection to disque-server failed
});
tasqueue.shutdown(timeoutMs, callback)
Async: End the client.
Example
tasqueue.init()
.then(function() {
// ...
tasqueue.shutdown(1000, function() {
console.log('Tasqueue was shut down after at most 1000 ms.');
});
});
tasqueue.poll()
Start polling and jobs execution. This function should be run only once.
Example
tasqueue.init()
.then(function() {
tasqueue.poll();
});
tasqueue.registerHandler(handler)
Register a job handler. handler
should have the following properties:
var handler = {
type: 'jobType', // {String} will be used as the queue name
concurrency: 5, // {Integer} max number of concurrent workers for this type, default = 1
maxAttempts: 5, // {Integer} max number of retry for this job type, default = 1
exec: function(body) {
// do whatever using the body passed for this job
}
};
tasqueue.listHandlers()
List of registered handlers types as an array.
Example
var handler1 = { type: 'type:1', ... };
var handler2 = { type: 'type:2', ... };
tasqueue.registerHandler(handler1);
tasqueue.registerHandler(handler2);
var registeredHandlers = tasqueue.listHandlers();
// registeredHandlers equals ['type:1', 'type:2']
tasqueue.pushJob(jobType, body)
Async:
Push a new job that will be processed by the corresponding jobType
handler. The worker will call the handler's exec
function with body
used as its argument.
When successful, returns the added job id.
Example
var handler1 = {
type: 'type:1',
exec: function(body) {
console.log('hello '+body.name);
}
};
tasqueue.pushJob('type:1', { name: 'Johan' })
.then(function(jobId) {
// jobId will be a disque id
});
// After some time...
// Logs 'hello Johan'
tasqueue.getJob(id)
Async: Returns a Job object that can be easily manipulated. You can find the API for Jobs a bit below.
The promise is rejected if the queried job doesn't exist.
Example
tasqueue.getJob('someDisqueId')
.then(function(job) {
console.log(job.details());
});
Count jobs
Async: Returns the count of jobs by state.
tasqueue.count(state)
state
must be one of ['active', 'queued', 'completed', 'failed']
.
tasqueue.countActive()
tasqueue.countQueued()
tasqueue.countCompleted()
tasqueue.countFailed()
List jobs
Async: Returns the list of jobs for each state and cursors to paginate through the jobs.
Example
var opts = {
start: 10, // Start/skip cursor
limit: 10 // Number of jobs to return
};
tasqueue.listActive(opts)
.then(function(res) {
// res looks like
{
prev: 0, // Cursor to get the previous 10 jobs or null
next: null, // Cursor to get the next 10 jobs or null
list: [ ... ] // List of Jobs objects
}
});
tasqueue.list(state)
state
must be one of ['active', 'queued', 'completed', 'failed']
.
tasqueue.listActive()
tasqueue.listQueued()
tasqueue.listCompleted()
tasqueue.listFailed()
Jobs API
job.details()
Get the job's informations in a pretty form.
Example
tasqueue.getJob('someId')
.then(function(job) {
console.log(job.details());
{
id: {String},
type: {String},
body: {Object},
state: {String} - one of ['queued', 'active', 'completed', 'failed']
created: {Date},
ended: {Date},
attempt: {Number} - Attempt at which the job failed/completed,
duration: {Number} - in ms,
result: {Object} - anything returned by the exec function on success,
error: {Error} - details about why the job failed
}
});
job.cancel()
Async: Cancels the job and set it as failed.
Only queued jobs may be cancelled. The promise is rejected if the job is not in the queued
state.
job.delete()
Async: Utterly delete a job, whichever its state is.
Events
Tasqueue inherits the Node.js EventEmitter
class. Below is the list of all events emitted by tasqueue during execution:
Client
Client connection
emit('client:connected', {
// disque-server informations client is connected to
host: {String},
port: {Number}
});
Client closed
emit('client:closed');
Queue execution
Polling jobs
emit('client:polling', {
types: {Number}, // Number of available job types that can be processed by this poll
availableWorkers: {Number}, // Total number of available workers for these types
totalWorkers: {Number} // Total number of workers registered
});
Polling delayed
emit('client:delaying', {
delay: {Number} - tasqueue instance configured/default poll delay
});
No worker available
emit('client:no-workers')
Error while polling
emit('error:polling', error);
Jobs
Job started
emit('job:started', {
id: {String}, // The job id
type: {String} // The job type
});
Job successfully pushed
emit('job:pushed', {
id: {String}, // The job id
type: {String} // The job type
});
Job successfully canceled
emit('job:canceled', {
id: {String}, // The job id
type: {String} // The job type
});
Job successfully deleted
emit('job:deleted', {
id: {String}, // The job id
type: {String} // The job type
});
Job re-queued after failure
emit('job:requeued', {
id: {String}, // The job id
type: {String}, // The job type
attempt: {Number} // The last failed attempt for this job
});
Job passed
emit('job:success', {
id: {String}, // The job id
type: {String} // The job type
});
Job failed
emit('error:job-failed', error, {
id: {String}, // The job id
type: {String} // The job type
});
Error canceling a job
emit('error:job-cancel', error, {
id: {String}, // The job id
type: {String} // The job type
});
No handler registered for a job
emit('error:no-handler', error, {
id: {String}, // The job id
type: {String} // The job type
});
Handlers
Handler successfully registered
emit('handler:registered', {
handler.type
});
Error: handler already exists
emit('error:existing-handler', error, {
type: handler.type
});