npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

featureless-job-queue

v2.0.2

Published

Very basic job queue focusing on performance for high throughput

Downloads

484

Readme

Featureless job queue

Very simple job queue focusing on high performance / high throughput.

Basically, just a distributed FIFO.

Why use this library?

There are very decent job queues for more complex use cases (TTL, job retries, priority, progress indicator). If you're interested in this kind of features, have a look at Kue.

However, those features come with a tradeoff -- tons of calls to Redis' HSET, potential stuck jobs spoiling concurrency, and O(log(n)) operations to pop something in a priority queue.

Moreover, those libraries need to maintain a bigger pool of connection to Redis, which can be costly on Redis providers billing by the number of open connections.

They also store atomic data in multiple keys, which lead to an increased memory consumption and number of calls to Redis.

By only focusing on one use case, this library is much faster but also less flexible. See this page for a performance comparison between kue and this library.

When should I use this library?

  • You need to run many small jobs at once
  • Your tasks are not CPU bound and you can run more than your number of CPU cores
  • You need concurrency across many servers (if you don't, just use async.queue)
  • You don't need priority -- just a plain FIFO
  • You can afford to lose a couple of tasks on critical app failure (SIGKILL or power failure). SIGTERM is fine.

How to use this library

Too lazy to read doc? Have a look at the examples folder!

Constructor

var FJQ = require('featureless-job-queue')
var fjq = new FJQ({redisUrl: redisUrl})

The constructor accepts two important options:

  • redisUrl, the URL to use for Redis. If unspecified, this will default to localhost on the default port.
  • redisKey, the redis key to use to store the jobs. Default is fjq:jobs, change it if you need to run multiple job queues on the same Redis instance.

Other options are documented lower in this README, where it makes sense to introduce them.

Process jobs

var FJQ = require('featureless-job-queue')
var fjq = new FJQ({redisUrl: redisUrl})

// Number of workers to run at the same time, will default to 1
var concurrency = 40
var workerFunction = function(job, cb) {
    // your function, call cb(err) once done.    
}

fjq.process(workerFunction, concurrency)

Concurrency will always be respected, but note that some jobs might be unqueued before they're sent to a worker to ensure optimal throughput. This behavior can be tweaked by specifying the key overfillRatio in the constructor options. This value defaults to 1.1 (e.g., for concurrency of 40, 40 workers will run in parallel, and 4 tasks will be pre-buffered to be sent to workers).

This function will return an async.queue. You can listen to events on it, and FJQ will append jobs to the queue automatically. Updating the queue concurrency in real time is not supported.

Queue jobs

var FJQ = require('featureless-job-queue')
var fjq = new FJQ({redisUrl: redisUrl})

// job can be any valid JS structure.
// Try to keep it as small as possible since it will transit across the network and be fully stored in Redis
// It will be serialized using JSON.serialize, so you can't use any fancy items in your job (e.g. functions)
var job = {
    type: "something",
    foo: "bar"
}

fjq.create(job, function(err, count) {
    if(err) return console.warn(err)
    // Job successfully created
    // `count` is the number of items currently in the queue (after saving this job)
})

// OR
var jobs = [{job1: true}, {job2: true}, ...]
fjq.create(job, function(err, count) {
    // Please note: if you get an error, some jobs may have been saved and others not :(
    if(err) return console.warn(err)
    // Jobs successfully created
})

If you need to save multiple jobs at once, use the array version of the function to minimize the number of calls sent to Redis. There is no limit to the number of jobs that can be saved at once.

Stop processing

fjq.shutdown(function() {
    console.log("All workers drained, Redis connections closed!")
})

The callback will only be called once all of this is true:

  • all workers have finished their current task (including tasks pre-buffered with overfillRatio). Note that this means some workers might start new jobs even after you've called .shutdown!
  • all opened Redis connections have been closed

You'll probably want to hook this function to process.on('SIGTERM') ;)

Other

  • use fjq.length(function(err, count)) to get the number of jobs currently queued in Redis (doesn't include tasks currently worked on and pre-buffered tasks)
  • use fjq.clearAll(function(err)) to clear all jobs currently in Redis. Workers currenty running won't be affected; ideally, only use this function in your test suite and never in prod!
  • if you need access to a Redis connection, use fjq.getCommandConnection() which returns a redis client.

Implementation notes

  • FIFO queue
  • The library uses BLPOP to process jobs, so you'll have one Redis connection per call to .process
  • If you never use .create from a worker, you won't have a second connection. From the moment you use .create, a new connection is established with Redis (since BLPOP is blocking and can't be used again) and maintained for performance.
  • See https://github.com/Neamar/featureless-job-queue/issues/1 for a detailed breakdown of the performance improvement you can expect when switching from Kue to featureless-job-queue.

Does it work?

This library is currently used in various productions environment. The biggest one I know of handles more than 45 millions tasks a day with less than 8 open Redis connections.