@joakin/task-queue
v1.0.1
Published
Task/Job in-process queue with concurrency/capacity/abort/cancelling support
Downloads
5
Maintainers
Readme
task-queue
npm install @joakin/task-queue
A customizable job/task queue for javascript processes.
import { Queue } from "@joakin/task-queue";
const queue = Queue();
queue.add(hardToDoComputation).then(result => console.log(result));
- Promise based
- Queued jobs can be cancelled
- Configurable:
queueTimeout
: Time a job spends waiting in the queueexecutionTimeout
: Time a job can spend executingconcurrency
: How many jobs should be run concurrentlymaxTaskCount
: How many tasks can the queue hold
- All activity is reported via events on the queue for easy logging/monitoring
- Typescript definitions
Usage
Creating a queue
Queues are created by default with the following arguments:
const defaultQueue = Queue({
queueTimeout: 500,
executionTimeout: 250,
concurrency: 1,
maxTaskCount: 1
});
Feel free to pass in only the ones you want to modify when creating one.
Enqueueing jobs
For adding a job to the queue, just call queue.add(fn)
, where fn
is a
function that returns a promise.
The queue will save the closure and run it when it is its turn. When calling
add
, you get back a Promise
with a cancel
method that you can call to
remove the job from the queue.
queue.add(() => printToPdf(url)).then(pdf => res.send(pdf));
Removing jobs from the queue
When you add a job, you will get back a promise with a cancel
method that you
can call to remove the job from the queue, whatever its state is (waiting or
running).
// Timeout example for illustration purposes. You can do timeouts better with the queue parameters though.
const job = queue.add(() => printToPdf(url));
const timeout = setTimeout(() => job.cancel(), 10000);
job.then(pdf => {
clearTimeout(timeout);
res.send(pdf);
});
You can also specify a callback for when the job is cancelled, by calling
queue.add(job, onCancel)
. In onCancel you can do cleanup work in case your job
has started running:
let browser;
const job = queue
.add(
() => {
browser = launchBrowser();
printToPdf(browser, url);
},
() => {
if (browser) closeBrowser(browser);
}
)
.then(pdf => {
res.send(pdf);
});
Queue status
You can get the status of the queue by calling queue.stats()
:
console.log(queue.stats());
/*
{
jobs: {
total: 10,
inProgress: 4,
waiting: 6
},
full: true
}
*/
Handling errors
There are different error cases for an enqueued job. The promise returned when enqueueing a job can reject with different types of errors, if you want fine grained control over the reason why it failed.
For logging/metrics purposes, see next section about events.
queue.add(job).catch(err => {
if (err instanceof QueueTimeout) {
// Item timed out in the queue (queueTimeout configuration option)
} else if (err instanceof QueueFull) {
// The queue is full
} else if (err instanceof JobCancelled) {
// A job was cancelled
} else if (err instanceof JobTimeout) {
// A job timed out when running (executionTimeout configuration option)
} else {
// Something else failed when running the job function
}
});
Monitoring and logging queue activity
The queue instance is also an event emitter, and will emit events for the activity on the queue that you can listen to.
Events related to queue are under the queue.*
topic, and events related to
jobs are under the job.*
topic.
Queue events
// New item added to the queue
queue.on("queue.new", ({ id, inProgressCount, waitingCount }) => {});
// Item timed out in the queue (queueTimeout configuration option)
queue.on("queue.timeout", ({ id, addedToTheQueueAt }) => {});
// The queue is full
queue.on("queue.full", ({ waitingCount, inProgressCount }) => {});
Job events
// A job started running
queue.on("job.started", ({ id, addedToTheQueueAt }) => {});
// A job successfully finished
queue.on("job.success", ({ id, addedToTheQueueAt, startedProcessingAt }) => {});
// A job timed out when running (executionTimeout configuration option)
queue.on("job.timeout", ({ id, addedToTheQueueAt, startedProcessingAt }) => {});
// A job was cancelled
queue.on(`job.cancel`, ({ id, addedToTheQueueAt }) => {});
// A job failed when run (promise rejected or running the function threw)
queue.on(
"job.failure",
({ id, addedToTheQueueAt, startedProcessingAt, err }) => {}
);
Types
Here's the type (typescript format) definitions if you want to get a better understanding of what types the library provides and expects:
import { EventEmitter } from "events";
export { Queue, JobCancelled, QueueTimeout, QueueFull, JobTimeout };
declare function Queue({
queueTimeout,
executionTimeout,
concurrency,
maxTaskCount
}?: {
queueTimeout?: number | undefined;
executionTimeout?: number | undefined;
concurrency?: number | undefined;
maxTaskCount?: number | undefined;
}): QueueInstance;
interface QueueInstance extends EventEmitter {
add: <T>(
fn: () => Promise<T>,
onCancel?: () => void
) => CancellablePromise<T>;
stats(): QueueStats;
}
interface CancellablePromise<T> extends Promise<T> {
cancel: () => void;
}
interface QueueStats {
jobs: {
total: number;
inProgress: number;
waiting: number;
};
full: boolean;
}
/**
* Error thrown when job gets cancelled. The promise returned by the
* queue gets rejected with JobCancelled
*/
declare class JobCancelled extends Error {
constructor();
}
/**
* Thrown when task timeouts in the queue
*/
declare class QueueTimeout extends Error {
addedToTheQueueAt: number;
constructor(addedToTheQueueAt: number);
}
/**
* Thrown when there is no space for new task
*/
declare class QueueFull extends Error {
waitingCount: number;
inProgressCount: number;
constructor(waitingCount: number, inProgressCount: number);
}
/**
* Thrown when task processing takes too much time
*/
declare class JobTimeout extends Error {
constructor();
}
Rationale
This library is extracted from wikimedia/mediawiki-services-chromium-render/lib/queue.js, where we needed an in-process queue for production pdf rendering services.
Initially, async.js/queue
was used, but wanting to use promises, configuring
timeouts and cancelling jobs made the Reading Web team at the Wikimedia
Foundation build their own task queue implementation for their production use
case.
This library is an adaptation of that work, with the following initial changes to it:
- Good documentation, types, and public API
- Functional API instead of OO class API
- Renamed events and errors for consistency of the public API
- Migrated the project to strict typescript, for easier maintainability, documentation, and correctness