npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

mongoose-jobqueue

v1.1.3

Published

Simple Job-Queue using mongoosejs

Downloads

8

Readme

mongoose-jobqueue

NPM

A very simple job queue, using mongoosejs for storage.

Quickstart

Create a queue object by handing over your mongoose instance:

var mongoose          = require('mongoose');
var mongooseJobQueue  = require('mongoose-jobqueue');

var queue = mongooseJobQueue(mongoose, 'job-queue');

All functions of the queue return a Promise. See bluebirdjs.com for more information on promises.

Add a job to a queue:

queue.add({ message: 'Hey' }).then(function(job){
  // Job with payload object added.
  // The created job-object is returned.
}, function(err){
  // Something went wrong!
});

Get a job from the queue:

queue.checkout().then(function(job) {
  // Returns the next job in the queue, or null if queue is empty.
  console.log('job._id=' + job._id);
  console.log('job.ack=' + job.ack);          // Acknowledge key, save for later
  console.log('job.payload=' + job.payload);  // { message: 'Hey' }
  console.log('job.tries=' + job.tries);   
});

Ping a job to keep it's visibility open for long-running tasks:

queue.ping(job.ack).then(function(job) {
  // Visibility window now increased for this job.
  // Updated job object is returned.
})

When pinging a job we can specify by how many seconds the window is extended, and the progress of the job in percent if we want:

queue.ping(job.ack, 60, 15).then(function(job) {
  // Visibility window now increased by 60 seconds. Job is 15% done.
  // Updated job object is returned.
})

Acknowledge a job (and remove it from the queue):

queue.ack(job.ack).then(function(job) {
  // This job removed from queue for this ack.
  // The acknowledged job is returned.
})

By default, all finished jobs are left in the queue, and are only marked as deleted. You can call the following function to remove processed jobs:

queue.cleanup().then(function(delCount) {
    // All processed (ie. acked) messages have been deleted
    // The number of deleted jobs is returned.
});

In-Code-Docs

The source code is documented using JSDoc tags. You can build the documentation as HTML files to /docs/build by using:

npm install
npm run jsdoc

Creating a Queue

To create a queue, call the exported function with the Mongoose instance, the name of the collection and a set of options.

var mongoose          = require('mongoose');
var mongooseJobQueue  = require('mongoose-jobqueue');

// an instance of a queue
var queueA = mongooseJobQueue(mongoose, 'a-queue');
// another queue
var queueB = mongoDbQueue(mongoose, 'b-queue');

Note: You can start using the queue right away, since mongoose stores requests until a connection to the MongoDB is established.

To pass in options for the queue:

var myQueue = mongooseJobQueue(mongoose, 'my-queue', {
    visibility : 30,
    delay : 15
  });

This example shows a queue with a job visibility of 30s and a insert-delay of 15s.

Options

name - Collection Name

This is the name of the MongoDB Collection you wish to use to store the jobs. Each queue you create will be it's own collection.

e.g.

var queueA = mongooseJobQueue(mongoose, 'a-queue');
var queueB = mongooseJobQueue(mongoose, 'b-queue');

This will create two collections in MongoDB called a-queue and b-queue.

visibility - Job Visibility Window

Default: 30

By default, if you don't ack a job within the first 30s after checking it out of the queue, it is placed back in the queue so it can be fetched again. This is called the visibility window.

You may set this visibility window on a per queue basis. For example, to set the visibility to 15 seconds:

var queue = mongooseJobQueue(mongoose, 'queue', { visibility : 15 });

All jobs in this queue now have a visibility window of 15s, instead of the default 30s.

You can also specify the visibility window when checking out a job of the queue:

var queue = mongooseJobQueue(mongoose, 'queue', { visibility : 15 });

queue.checkout(90).then(function(job){
  // Process the job...
});

The returned job now has a visibility window of 90s. This does not change the visibility window of the queue or other jobs in the queue.

delay - Delay Jobs on Queue

Default: 0

When a job is added to a queue, it is immediately available for checkout. However, there are times when you might like to delay jobs coming off a queue. If you set delay to be 10, then every job will only be available for checkout 10s after being added.

To delay all jobs by 10 seconds, do this:

var queue = mongooseJobQueue(mongoose, 'queue', { delay : 10 });

This is now the default for every job added to the queue.

deadQueue - Dead Job Queue

Default: null

Jobs that have been retried over maxRetries will be pushed to this queue so you can debug problematic jobs.

Pass in a collection name onto which these jobs will be pushed:

var queue = mongooseJobQueue(mongoose, 'queue', { deadQueue : 'dead-jobs' });

If you checkout a job out of the queue over maxRetries times and have still not acked it, it will be pushed onto the deadQueue for you. This happens when you call .checkout() the next time. (not when you miss acking a job within it's visibility window).

maxRetries - Maximum Retries per Job

Default: 5

This option only comes into effect if you pass in a deadQueue as shown above. What this means is that if job is checked out of the queue maxRetries times (e.g. 5) and not acked, it will be moved to the deadQueue the next time it is checked out.

strictAck - Disallow ack if Visibility Window timed out

Default: true

By default you are only allowed to acknowledge a checked out job within the visibility window. Set this option to false if you want to allow acks of a job outside of the visibility window, given that the job was not already checked out a second time by another user.

raw - Return raw JavaScript Objects

Default: true

By default all functions return plain JavaScript objects. Set this option to false if you want the original mongoose documents returned. For example, this gives you the ability to edit a returned job and call the .save() method on in. Use this option at own risk.

cosmosDb - Use compatibility mode for Azure CosmosDB

Default: false

CosmosDB does not support sorting on a findAndUpdate operation. (as of 2017-08-22) Set this option to true to get unsorted results.

Operations

.add()

You can add a string to the queue:

queue.add('Hey').then(function(job) {
  // Job with payload 'Hey' added.
  // Created job object is returned.
});

Or add an object:

queue.add({ message: 'Hey' }).then(function(job) {
  // Job with payload { message: 'Hey' } added.
  // Created job object is returned.
});

Or add multiple jobs (strings or objects):

queue.add(['One', 'Two', 'Three']).then(function(jobs) {
  // Jobs with payloads 'One', 'Two' & 'Three' added.
  // An array of the created job objects is returned.
});

You can delay jobs from being visible by passing the second delay parameter:

queue.add('Later', 120).then(function(job) {
  // Job with payload 'Later' added.
  // Created job object is returned.
  // This job won't be available for checkout for 120 seconds.
});

.checkout()

Retrieve a job from the queue:

queue.checkout().then(function(job) {
  // You can now process the job
  // IMPORTANT: The message will be null if the queue is empty.
});

You can choose the visibility window of an individual retrieved job by passing the visibility parameter:

queue.checkout(90).then(function(job) {
  // You can now process the job for 90s before it goes back into the queue.
});

Jobs will have the following structure:

{
  _id: '533b1eb64ee78a57664cc76c', // ID of the message
  ack: 'c8a3cc585cbaaacf549d746d7db72f69', // Key for ack and ping operations
  payload: 'Hey', // Payload passed when the job was addded
  tries: 1 // Number of times this job has been retrieved from queue without being ack'd
}

.ack()

After you have received a job from a queue and processed it, you can delete it by calling .ack() with the unique ack key returned:

queue.checkout().then(function(job) {
  // Do some processing...

  queue.ack(job.ack).then(function(job) {
      // this job has now been removed from the queue
  });
});

.ping()

After you have checked out a job from a queue and you are taking a while to process it, you can .ping() the job to tell the queue that you are still alive and continuing to process the job:

queue.checkout().then(function(job) {
  queue.ping(job.ack).then(function(job) {
    // this job has had it's visibility window extended
  });
});

You can also choose the visibility time that gets added by the ping operation by passing the visibility parameter:

queue.checkout().then(function(job) {
  queue.ping(job.ack, 120).then(function(job) {
    // this job has had it's visibility window extended by 120 seconds
  });
});

You can pass an optional progress parameter, when pinging a job to show the progress of your operation in percent:

queue.checkout().then(function(job) {
  queue.ping(job.ack, 120, 12).then(function(job) {
    // this job has had it's visibility window extended by 120 seconds
    // The operation is completed by 12%
  });
});

Like the progress parameter you can pass the payload parameter to update the payload of a job. This will overwrite the old payload value.

queue.checkout().then(function(job) {
  queue.ping(job.ack, null, null, { message: 'my new payload' }).then(function(job) {
    // Updated job object is returned.
  });
});

.get()

Get a list of jobs in the queue. Does not check out any jobs.

queue.get().then(function(jobs){
  // Returns an array of all jobs in queue, processed or not
});

You can pass a filter object to only retrieve a subset of jobs. The filter object follows the MongoDB query syntax.

queue.get({
  deleted: null
}).then(function(jobs){
  // Returns an array of all that are not processed
});

This example returns only unfinished jobs.

queue.get({
  tries: { $gt: 1, $lt: 4 }
}).then(function(jobs){

});

This example returns jobs with 2-3 tries.

queue.get({
  'payload.message': 'Hey'
}).then(function(jobs){

});

This example returns jobs where the payload contains the message Hey.

.cleanup()

By default, all finished jobs are left in the queue, and are only marked as deleted. You can call the cleanup() function to remove processed jobs:

queue.cleanup().then(function(delCount) {
  // All processed (ie. acked) jobs have been deleted.
  // The number of deleted jobs is returned.
});

You can specify the minimum age of jobs to be deleted by using the age parameter:

queue.cleanup(120).then(function(delCount) {
  // All processed (ie. acked) jobs older than 120 seconds have been deleted.
  // The number of deleted jobs is returned.
});

.cleanupDead()

Removes all jobs from the deadQueue. Unlike the cleanup() function, this operation does not have a age parameter.

queue.cleanupDead().then(function(delCount) {
  // All dead jobs have been deleted.
  // The number of deleted jobs is returned.
});

.reset()

Deletes ALL jobs from the queue (and the deadQueue if configured), regardless of checked out jobs.

queue.reset().then(function(totalDeleted) {
  // Queues are now empty, number of deleted jobs on both queues is returned.
});

Releases

1.1.3 (2017-12-06)

  • [FIX] Minor fix for use with "multi-connection" mongoose instances

1.1.2 (2017-08-22)

  • [NEW] Added option for Azure CosmosDB (Azure CosmosDB does not support sorting on findAndUpdate)

1.1.1 (2017-08-21)

  • [FIX] Fixed null error on checkout.

1.1.0 (2017-08-15)

  • [NEW] Added option to update payload on ping.
  • [NEW] Added option to return the original mongoose documents instead of plain JavaScript objects.

1.0.1 (2017-08-06)

  • [NEW] Documentation & Readme Update
  • [FIX] Ack function now correctly rejects if the visibility window timed out.
  • [FIX] Add function now returns job object if one job was added, or array of objects if multiple jobs were added.

1.0.0 (2017-08-05)

  • Initial Release

Author

Written by Michael Sperk

License

MIT - https://choosealicense.com/licenses/mit/