npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

tasqueue

v1.1.1

Published

Node.js job worker library using disque

Downloads

8

Readme

tasqueue

npm version Build Status

Promise-based Node.js job/task-queue library using disque.

How it works

  1. Create a client
  2. Register new job types handlers
  3. Push jobs

Tasqueue is a job/task-queue library based on disque and using Q. It aims to be simple, fast and to handle a high charge.

Monitoring functions of Tasqueue can only be entrusted when using a single-node instance of disque.

Create a client

var Tasqueue = require('tasqueue');

// Default options
var opts = {
    authPass:      null,            // AUTH password for disque-server
    host:          'localhost',     // disque-server host
    port:          7711,            // disque-server port
    pollDelay:     1000 * 15,       // Polling delay in ms when no workers are available
    jobTimeout:    1000 * 60 * 60,  // Timeout in ms before a job is considered as failed
    failedTTL:     60 * 60 * 24,    // Failed jobs TTL in sec
    completedTTL:  60 * 60 * 24,    // Completed jobs TTL in sec
    queuedTTL:     60 * 60 * 24,    // Queued jobs TTL in sec
    activeTTL:     60 * 60 * 1,     // Active job TTL in sec
    maxAttempts:   60,              // Max reconnection attempts
    retryMaxDelay: 1000 * 60        // Prevent exponential reconnection delay
};

var tasqueue = new Tasqueue(opts);

Queue API

tasqueue.init()

Async: Initialize the client.

Example
tasqueue.init()
.then(function() {
    // Start working
}, function(err) {
    // Connection to disque-server failed
});

tasqueue.shutdown(timeoutMs, callback)

Async: End the client.

Example
tasqueue.init()
.then(function() {
    // ...
    tasqueue.shutdown(1000, function() {
        console.log('Tasqueue was shut down after at most 1000 ms.');
    });
});

tasqueue.poll()

Start polling and jobs execution. This function should be run only once.

Example
tasqueue.init()
.then(function() {
    tasqueue.poll();
});

tasqueue.registerHandler(handler)

Register a job handler. handler should have the following properties:

var handler = {
  type: 'jobType', // {String}  will be used as the queue name
  concurrency: 5,  // {Integer} max number of concurrent workers for this type, default = 1
  maxAttempts: 5,  // {Integer} max number of retry for this job type, default = 1
  exec: function(body) {
    // do whatever using the body passed for this job
  }
};

tasqueue.listHandlers()

List of registered handlers types as an array.

Example
var handler1 = { type: 'type:1', ... };
var handler2 = { type: 'type:2', ... };

tasqueue.registerHandler(handler1);
tasqueue.registerHandler(handler2);
var registeredHandlers = tasqueue.listHandlers();
// registeredHandlers equals ['type:1', 'type:2']

tasqueue.pushJob(jobType, body)

Async: Push a new job that will be processed by the corresponding jobType handler. The worker will call the handler's exec function with body used as its argument.

When successful, returns the added job id.

Example
var handler1 = {
    type: 'type:1',
    exec: function(body) {
        console.log('hello '+body.name);
    }
};

tasqueue.pushJob('type:1', { name: 'Johan' })
.then(function(jobId) {
    // jobId will be a disque id
});

// After some time...
// Logs 'hello Johan'

tasqueue.getJob(id)

Async: Returns a Job object that can be easily manipulated. You can find the API for Jobs a bit below.

The promise is rejected if the queried job doesn't exist.

Example
tasqueue.getJob('someDisqueId')
.then(function(job) {
    console.log(job.details());
});

Count jobs

Async: Returns the count of jobs by state.

tasqueue.count(state)

state must be one of ['active', 'queued', 'completed', 'failed'].

tasqueue.countActive()

tasqueue.countQueued()

tasqueue.countCompleted()

tasqueue.countFailed()

List jobs

Async: Returns the list of jobs for each state and cursors to paginate through the jobs.

Example
var opts = {
    start: 10,  // Start/skip cursor
    limit: 10   // Number of jobs to return
};

tasqueue.listActive(opts)
.then(function(res) {
    // res looks like
    {
        prev: 0,       // Cursor to get the previous 10 jobs or null
        next: null,    // Cursor to get the next 10 jobs or null
        list: [ ... ]  // List of Jobs objects
    }
});

tasqueue.list(state)

state must be one of ['active', 'queued', 'completed', 'failed'].

tasqueue.listActive()

tasqueue.listQueued()

tasqueue.listCompleted()

tasqueue.listFailed()

Jobs API

job.details()

Get the job's informations in a pretty form.

Example
tasqueue.getJob('someId')
.then(function(job) {
    console.log(job.details());
    {
        id:         {String},
        type:       {String},
        body:       {Object},
        state:      {String} - one of ['queued', 'active', 'completed', 'failed']
        created:    {Date},
        ended:      {Date},
        attempt:    {Number} - Attempt at which the job failed/completed,
        duration:   {Number} - in ms,
        result:     {Object} - anything returned by the exec function on success,
        error:      {Error} - details about why the job failed
    }
});

job.cancel()

Async: Cancels the job and set it as failed.

Only queued jobs may be cancelled. The promise is rejected if the job is not in the queued state.

job.delete()

Async: Utterly delete a job, whichever its state is.

Events

Tasqueue inherits the Node.js EventEmitter class. Below is the list of all events emitted by tasqueue during execution:

Client

Client connection
emit('client:connected', {
    // disque-server informations client is connected to
    host: {String},
    port: {Number}
});
Client closed
emit('client:closed');

Queue execution

Polling jobs
emit('client:polling', {
    types:            {Number}, // Number of available job types that can be processed by this poll
    availableWorkers: {Number}, // Total number of available workers for these types
    totalWorkers:     {Number}  // Total number of workers registered
});
Polling delayed
emit('client:delaying', {
    delay: {Number} - tasqueue instance configured/default poll delay
});
No worker available
emit('client:no-workers')
Error while polling
emit('error:polling', error);

Jobs

Job started
emit('job:started', {
    id:   {String}, // The job id
    type: {String}  // The job type
});
Job successfully pushed
emit('job:pushed', {
    id:   {String}, // The job id
    type: {String}  // The job type
});
Job successfully canceled
emit('job:canceled', {
    id:   {String}, // The job id
    type: {String}  // The job type
});
Job successfully deleted
emit('job:deleted', {
    id:   {String}, // The job id
    type: {String}  // The job type
});
Job re-queued after failure
emit('job:requeued', {
    id:      {String}, // The job id
    type:    {String}, // The job type
    attempt: {Number}  // The last failed attempt for this job
});
Job passed
emit('job:success', {
    id:   {String}, // The job id
    type: {String}  // The job type
});
Job failed
emit('error:job-failed', error, {
    id:    {String}, // The job id
    type:  {String}  // The job type
});
Error canceling a job
emit('error:job-cancel', error, {
    id:   {String}, // The job id
    type: {String}  // The job type
});
No handler registered for a job
emit('error:no-handler', error, {
    id:   {String}, // The job id
    type: {String}  // The job type
});

Handlers

Handler successfully registered
emit('handler:registered', {
    handler.type
});
Error: handler already exists
emit('error:existing-handler', error, {
    type: handler.type
});