npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@joakin/task-queue

v1.0.1

Published

Task/Job in-process queue with concurrency/capacity/abort/cancelling support

Downloads

5

Readme

task-queue

npm install @joakin/task-queue

A customizable job/task queue for javascript processes.

import { Queue } from "@joakin/task-queue";

const queue = Queue();

queue.add(hardToDoComputation).then(result => console.log(result));
  • Promise based
  • Queued jobs can be cancelled
  • Configurable:
    • queueTimeout: Time a job spends waiting in the queue
    • executionTimeout: Time a job can spend executing
    • concurrency: How many jobs should be run concurrently
    • maxTaskCount: How many tasks can the queue hold
  • All activity is reported via events on the queue for easy logging/monitoring
  • Typescript definitions

Usage

Creating a queue

Queues are created by default with the following arguments:

const defaultQueue = Queue({
  queueTimeout: 500,
  executionTimeout: 250,
  concurrency: 1,
  maxTaskCount: 1
});

Feel free to pass in only the ones you want to modify when creating one.

Enqueueing jobs

For adding a job to the queue, just call queue.add(fn), where fn is a function that returns a promise.

The queue will save the closure and run it when it is its turn. When calling add, you get back a Promise with a cancel method that you can call to remove the job from the queue.

queue.add(() => printToPdf(url)).then(pdf => res.send(pdf));

Removing jobs from the queue

When you add a job, you will get back a promise with a cancel method that you can call to remove the job from the queue, whatever its state is (waiting or running).

// Timeout example for illustration purposes. You can do timeouts better with the queue parameters though.

const job = queue.add(() => printToPdf(url));
const timeout = setTimeout(() => job.cancel(), 10000);
job.then(pdf => {
  clearTimeout(timeout);
  res.send(pdf);
});

You can also specify a callback for when the job is cancelled, by calling queue.add(job, onCancel). In onCancel you can do cleanup work in case your job has started running:

let browser;
const job = queue
  .add(
    () => {
      browser = launchBrowser();
      printToPdf(browser, url);
    },
    () => {
      if (browser) closeBrowser(browser);
    }
  )
  .then(pdf => {
    res.send(pdf);
  });

Queue status

You can get the status of the queue by calling queue.stats():

console.log(queue.stats());
/*
  {
    jobs: {
      total: 10,
      inProgress: 4,
      waiting: 6
    },
    full: true
  }
*/

Handling errors

There are different error cases for an enqueued job. The promise returned when enqueueing a job can reject with different types of errors, if you want fine grained control over the reason why it failed.

For logging/metrics purposes, see next section about events.

queue.add(job).catch(err => {
  if (err instanceof QueueTimeout) {
    // Item timed out in the queue (queueTimeout configuration option)
  } else if (err instanceof QueueFull) {
    // The queue is full
  } else if (err instanceof JobCancelled) {
    // A job was cancelled
  } else if (err instanceof JobTimeout) {
    // A job timed out when running (executionTimeout configuration option)
  } else {
    // Something else failed when running the job function
  }
});

Monitoring and logging queue activity

The queue instance is also an event emitter, and will emit events for the activity on the queue that you can listen to.

Events related to queue are under the queue.* topic, and events related to jobs are under the job.* topic.

Queue events

// New item added to the queue
queue.on("queue.new", ({ id, inProgressCount, waitingCount }) => {});

// Item timed out in the queue (queueTimeout configuration option)
queue.on("queue.timeout", ({ id, addedToTheQueueAt }) => {});

// The queue is full
queue.on("queue.full", ({ waitingCount, inProgressCount }) => {});

Job events

// A job started running
queue.on("job.started", ({ id, addedToTheQueueAt }) => {});

// A job successfully finished
queue.on("job.success", ({ id, addedToTheQueueAt, startedProcessingAt }) => {});

// A job timed out when running (executionTimeout configuration option)
queue.on("job.timeout", ({ id, addedToTheQueueAt, startedProcessingAt }) => {});

// A job was cancelled
queue.on(`job.cancel`, ({ id, addedToTheQueueAt }) => {});

// A job failed when run (promise rejected or running the function threw)
queue.on(
  "job.failure",
  ({ id, addedToTheQueueAt, startedProcessingAt, err }) => {}
);

Types

Here's the type (typescript format) definitions if you want to get a better understanding of what types the library provides and expects:

import { EventEmitter } from "events";

export { Queue, JobCancelled, QueueTimeout, QueueFull, JobTimeout };

declare function Queue({
  queueTimeout,
  executionTimeout,
  concurrency,
  maxTaskCount
}?: {
  queueTimeout?: number | undefined;
  executionTimeout?: number | undefined;
  concurrency?: number | undefined;
  maxTaskCount?: number | undefined;
}): QueueInstance;

interface QueueInstance extends EventEmitter {
  add: <T>(
    fn: () => Promise<T>,
    onCancel?: () => void
  ) => CancellablePromise<T>;
  stats(): QueueStats;
}

interface CancellablePromise<T> extends Promise<T> {
  cancel: () => void;
}

interface QueueStats {
  jobs: {
    total: number;
    inProgress: number;
    waiting: number;
  };
  full: boolean;
}

/**
 * Error thrown when job gets cancelled. The promise returned by the
 * queue gets rejected with JobCancelled
 */
declare class JobCancelled extends Error {
  constructor();
}
/**
 * Thrown when task timeouts in the queue
 */
declare class QueueTimeout extends Error {
  addedToTheQueueAt: number;
  constructor(addedToTheQueueAt: number);
}
/**
 * Thrown when there is no space for new task
 */
declare class QueueFull extends Error {
  waitingCount: number;
  inProgressCount: number;
  constructor(waitingCount: number, inProgressCount: number);
}
/**
 * Thrown when task processing takes too much time
 */
declare class JobTimeout extends Error {
  constructor();
}

Rationale

This library is extracted from wikimedia/mediawiki-services-chromium-render/lib/queue.js, where we needed an in-process queue for production pdf rendering services.

Initially, async.js/queue was used, but wanting to use promises, configuring timeouts and cancelling jobs made the Reading Web team at the Wikimedia Foundation build their own task queue implementation for their production use case.

This library is an adaptation of that work, with the following initial changes to it:

  • Good documentation, types, and public API
  • Functional API instead of OO class API
  • Renamed events and errors for consistency of the public API
  • Migrated the project to strict typescript, for easier maintainability, documentation, and correctness