npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@xuhaojun/p-queue

v7.2.0-noah.5

Published

Promise queue with concurrency control

Downloads

12

Readme

p-queue

Promise queue with concurrency control

Useful for rate-limiting async (or sync) operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.

Install

npm install p-queue

Usage

Here we run only one promise at the time. For example, set concurrency to 4 to run four promises at the same time.

import PQueue from 'p-queue';
import got from 'got';

const queue = new PQueue({concurrency: 1});

(async () => {
	await queue.add(() => got('https://sindresorhus.com'));
	console.log('Done: sindresorhus.com');
})();

(async () => {
	await queue.add(() => got('https://avajs.dev'));
	console.log('Done: avajs.dev');
})();

(async () => {
	const task = await getUnicornTask();
	await queue.add(task);
	console.log('Done: Unicorn task');
})();

API

PQueue(options?)

Returns a new queue instance, which is an EventEmitter3 subclass.

options

Type: object

concurrency

Type: number
Default: Infinity
Minimum: 1

Concurrency limit.

timeout

Type: number

Per-operation timeout in milliseconds. Operations fulfill once timeout elapses if they haven't already.

throwOnTimeout

Type: boolean
Default: false

Whether or not a timeout is considered an exception.

autoStart

Type: boolean
Default: true

Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.

queueClass

Type: Function

Class with a enqueue and dequeue method, and a size getter. See the Custom QueueClass section.

intervalCap

Type: number
Default: Infinity
Minimum: 1

The max number of runs in the given interval of time.

interval

Type: number
Default: 0
Minimum: 0

The length of time in milliseconds before the interval count resets. Must be finite.

carryoverConcurrencyCount

Type: boolean
Default: false

If true, specifies that any pending Promises, should be carried over into the next interval and counted against the intervalCap. If false, any of those pending Promises will not count towards the next intervalCap.

queue

PQueue instance.

.add(fn, options?)

Adds a sync or async task to the queue. Always returns a promise.

Note: If your items can potentially throw an exception, you must handle those errors from the returned Promise or they may be reported as an unhandled Promise rejection and potentially cause your process to exit immediately.

fn

Type: Function

Promise-returning/async function. When executed, it will receive {signal} as the first argument.

options

Type: object

priority

Type: number
Default: 0

Priority of operation. Operations with greater priority will be scheduled first.

signal

Requires Node.js 16 or later.

AbortSignal for cancellation of the operation. When aborted, it will be removed from the queue and the queue.add() call will reject with an AbortError. If the operation is already running, the signal will need to be handled by the operation itself.

import PQueue, {AbortError} from 'p-queue';
import got, {CancelError} from 'got';

const queue = new PQueue();

const controller = new AbortController();

try {
	await queue.add(({signal}) => {
		const request = got('https://sindresorhus.com');

		signal.addEventListener('abort', () => {
			request.cancel();
		});

		try {
			return await request;
		} catch (error) {
			if (!(error instanceof CancelError)) {
				throw error;
			}
		}
	}, {signal: controller.signal});
} catch (error) {
	if (!(error instanceof AbortError)) {
		throw error;
	}
}

.addAll(fns, options?)

Same as .add(), but accepts an array of sync or async functions and returns a promise that resolves when all functions are resolved.

.pause()

Put queue execution on hold.

.start()

Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via options.autoStart = false or by .pause() method.)

Returns this (the instance).

.onEmpty()

Returns a promise that settles when the queue becomes empty.

Can be called multiple times. Useful if you for example add additional items at a later time.

.onIdle()

Returns a promise that settles when the queue becomes empty, and all promises have completed; queue.size === 0 && queue.pending === 0.

The difference with .onEmpty is that .onIdle guarantees that all work from the queue has finished. .onEmpty merely signals that the queue is empty, but it could mean that some promises haven't completed yet.

.onSizeLessThan(limit)

Returns a promise that settles when the queue size is less than the given limit: queue.size < limit.

If you want to avoid having the queue grow beyond a certain size you can await queue.onSizeLessThan() before adding a new item.

Note that this only limits the number of items waiting to start. There could still be up to concurrency jobs already running that this call does not include in its calculation.

.clear()

Clear the queue.

.size

Size of the queue, the number of queued items waiting to run.

.sizeBy(options)

Size of the queue, filtered by the given options.

For example, this can be used to find the number of items remaining in the queue with a specific priority level.

import PQueue from 'p-queue';

const queue = new PQueue();

queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦄', {priority: 0});
queue.add(async () => '🦄', {priority: 1});

console.log(queue.sizeBy({priority: 1}));
//=> 2

console.log(queue.sizeBy({priority: 0}));
//=> 1

.pending

Number of running items (no longer in the queue).

.timeout

.concurrency

.isPaused

Whether the queue is currently paused.

Events

active

Emitted as each item is processed in the queue for the purpose of tracking progress.

import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 2});

let count = 0;
queue.on('active', () => {
	console.log(`Working on item #${++count}.  Size: ${queue.size}  Pending: ${queue.pending}`);
});

queue.add(() => Promise.resolve());
queue.add(() => delay(2000));
queue.add(() => Promise.resolve());
queue.add(() => Promise.resolve());
queue.add(() => delay(500));

completed

Emitted when an item completes without error.

import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 2});

queue.on('completed', result => {
	console.log(result);
});

queue.add(() => Promise.resolve('hello, world!'));

error

Emitted if an item throws an error.

import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 2});

queue.on('error', error => {
	console.error(error);
});

queue.add(() => Promise.reject(new Error('error')));

idle

Emitted every time the queue becomes empty and all promises have completed; queue.size === 0 && queue.pending === 0.

import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue();

queue.on('idle', () => {
	console.log(`Queue is idle.  Size: ${queue.size}  Pending: ${queue.pending}`);
});

const job1 = queue.add(() => delay(2000));
const job2 = queue.add(() => delay(500));

await job1;
await job2;
// => 'Queue is idle.  Size: 0  Pending: 0'

await queue.add(() => delay(600));
// => 'Queue is idle.  Size: 0  Pending: 0'

The idle event is emitted every time the queue reaches an idle state. On the other hand, the promise the onIdle() function returns resolves once the queue becomes idle instead of every time the queue is idle.

add

Emitted every time the add method is called and the number of pending or queued tasks is increased.

next

Emitted every time a task is completed and the number of pending or queued tasks is decreased. This is emitted regardless of whether the task completed normally or with an error.

import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue();

queue.on('add', () => {
	console.log(`Task is added.  Size: ${queue.size}  Pending: ${queue.pending}`);
});

queue.on('next', () => {
	console.log(`Task is completed.  Size: ${queue.size}  Pending: ${queue.pending}`);
});

const job1 = queue.add(() => delay(2000));
const job2 = queue.add(() => delay(500));

await job1;
await job2;
//=> 'Task is added.  Size: 0  Pending: 1'
//=> 'Task is added.  Size: 0  Pending: 2'

await queue.add(() => delay(600));
//=> 'Task is completed.  Size: 0  Pending: 1'
//=> 'Task is completed.  Size: 0  Pending: 0'

AbortError

The error thrown by queue.add() when a job is aborted before it is run. See signal.

Advanced example

A more advanced example to help you understand the flow.

import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 1});

(async () => {
	await delay(200);

	console.log(`8. Pending promises: ${queue.pending}`);
	//=> '8. Pending promises: 0'

	(async () => {
		await queue.add(async () => '🐙');
		console.log('11. Resolved')
	})();

	console.log('9. Added 🐙');

	console.log(`10. Pending promises: ${queue.pending}`);
	//=> '10. Pending promises: 1'

	await queue.onIdle();
	console.log('12. All work is done');
})();

(async () => {
	await queue.add(async () => '🦄');
	console.log('5. Resolved')
})();
console.log('1. Added 🦄');

(async () => {
	await queue.add(async () => '🐴');
	console.log('6. Resolved')
})();
console.log('2. Added 🐴');

(async () => {
	await queue.onEmpty();
	console.log('7. Queue is empty');
})();

console.log(`3. Queue size: ${queue.size}`);
//=> '3. Queue size: 1`

console.log(`4. Pending promises: ${queue.pending}`);
//=> '4. Pending promises: 1'
$ node example.js
1. Added 🦄
2. Added 🐴
3. Queue size: 1
4. Pending promises: 1
5. Resolved 🦄
6. Resolved 🐴
7. Queue is empty
8. Pending promises: 0
9. Added 🐙
10. Pending promises: 1
11. Resolved 🐙
12. All work is done

Custom QueueClass

For implementing more complex scheduling policies, you can provide a QueueClass in the options:

import PQueue from 'p-queue';

class QueueClass {
	constructor() {
		this._queue = [];
	}

	enqueue(run, options) {
		this._queue.push(run);
	}

	dequeue() {
		return this._queue.shift();
	}

	get size() {
		return this._queue.length;
	}

	filter(options) {
		return this._queue;
	}
}

const queue = new PQueue({queueClass: QueueClass});

p-queue will call corresponding methods to put and get operations from this queue.

FAQ

How do the concurrency and intervalCap options affect each other?

They are just different constraints. The concurrency option limits how many things run at the same time. The intervalCap option limits how many things run in total during the interval (over time).

Maintainers

Related

  • p-limit - Run multiple promise-returning & async functions with limited concurrency
  • p-throttle - Throttle promise-returning & async functions
  • p-debounce - Debounce promise-returning & async functions
  • p-all - Run promise-returning & async functions concurrently with optional limited concurrency
  • More…