npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

promise-blocking-queue

v1.0.0

Published

Memory optimized promise blocking queue with concurrency control

Downloads

321

Readme

Npm Version node Build status Test Coverage Maintainability Known Vulnerabilities

Promise Blocking Queue

Memory optimized promise blocking queue with concurrency control, specially designed to handle large data sets that must be consumed using streams.

Useful for rate-limiting async (or sync) operations that consume large data sets. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.

Why

If we use Bluebird.map() for example, we are forced to load all the data in memory, before being able to consume it - Out Of Memory Exception is right around the corner.

If we use p-queue (by the amazing sindresorhus) for example, we can utilize streams to avoid memory bloat, but we have no (easy) way to control the stream flow without hitting that Out Of Memory Exception.

The solution - a blocking queue that returns a promise that will be resolved when the added item gains an available slot in the queue, thus, allowing us to pause the stream consumption, until there is a real need to consume the next item - keeping us memory smart while maintaining concurrency level of data handling.

Install

npm install promise-blocking-queue

Usage example

Let's assume we have a very large (a couple of GBs) file called users.json which contains a long list of users we want to add to our DB.
Also, let's assume that our DB instance it very cheap, and as such we don't want to load it too much, so we only want to handle 2 concurrent DB insert operations.

We can achieve a short scalable solution like so:

import * as JSONStream from 'JSONStream';
import * as fs from 'fs';
import * as es from 'event-stream';
import * as sleep from 'sleep-promise';
import { BlockingQueue } from 'promise-blocking-queue';

const queue = new BlockingQueue({ concurrency: 2 });
let handled = 0;
let failed = 0;
let awaitDrain: Promise<void> | undefined;

const readStream = fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' });
const jsonReadStream = JSONStream.parse('*');
const jsonWriteStream = JSONStream.stringify();
const writeStream = fs.createWriteStream('./results.json');

const addUserToDB = async (user) => {
    try {
        console.log(`adding ${user.username}`);
        // Simulate long running task
        await sleep((handled + 1) * 100);
        console.log(`added ${user.username} #${++handled}`);
        const writePaused = !jsonWriteStream.write(user.username);
        if (writePaused && !awaitDrain) {
            // Down stream asked to pause the writes for now
            awaitDrain = new Promise((resolve) => {
                jsonWriteStream.once('drain', resolve);
            });
        }
    } catch (err) {
        console.log(`failed ${++failed}`, err);
    }
};

const handleUser = async (user) => {
    // Wait until the down stream is ready to receive more data without increasing the memory footprint
    if (awaitDrain) {
        await awaitDrain;
        awaitDrain = undefined;
    }
    return queue.enqueue(addUserToDB, user).enqueuePromise;
};

// Do not use async!
const mapper = (user, cb) => {
    console.log(`streamed ${user.username}`);
    handleUser(user)
        .then(() => {
            cb();
        });
    // Pause the read stream until we are ready to handle more data
    return false;
};

const onReadEnd = () => {
    console.log('done read streaming');
    // If nothing was written, idle event will not be fired
    if (queue.pendingCount === 0 && queue.activeCount === 0) {
        jsonWriteStream.end();
    } else {
        // Wait until all work is done
        queue.on('idle', () => {
            jsonWriteStream.end();
        });
    }
};

const onWriteEnd = () => {
    console.log(`done processing - ${handled} handled, ${failed} failed`);
    process.exit(0);
};

jsonWriteStream
    .pipe(writeStream)
    .on('error', (err) => {
        console.log('error wrtie streaming', err);
        process.exit(1);
    })
    .on('end', onWriteEnd)
    .on('finish', onWriteEnd);

readStream
    .pipe(jsonReadStream)
    .pipe(es.map(mapper))
    .on('data', () => {
        // Do nothing
    })
    .on('error', (err) => {
        console.log('error read streaming', err);
        process.exit(1);
    })
    .on('finish', onReadEnd)
    .on('end', onReadEnd);

If users.json is like:

[
  {
    "username": "a"
  },
  {
    "username": "b"
  },
  {
    "username": "c"
  },
  {
    "username": "d"
  }
]

Output will be:

streamed a
adding a
streamed b
adding b
streamed c // c now waits in line to start and streaming is paused until then
added a #1
adding c // c only gets handled after a is done
streamed d // d only get streamed after c has a spot in the queue
added b #2
adding d // d only gets handled after b is done
done read streaming
added c #3
added d #4
done processing - 4 handled, 0 failed

results.json will be:

[
"a"
,
"b"
,
"c"
,
"d"
]

API

BlockingQueue(options)

Returns a new queue instance, which is an EventEmitter subclass.

options

Type: object

concurrency

Type: number
Default: Infinity
Minimum: 1

Concurrency limit.

queue

BlockingQueue instance.

.enqueue(fn, ...args)

Adds a sync or async task to the queue

Return value

Type: object

enqueuePromise

Type: Promise<void>

A promise that will be resolved when the queue has an available slot to run the task.
Used to realize that it is a good time to add another task to the queue.

fnPromise

Type: Promise<T>

A promise that will be resolved with the result of fn.

started

Type: boolean

Indicates if the task has already started to run

fn

Type: Function

Promise/Value returning function.

args

Type: any[]

The arguments to pass to the function

activeCount

The number of promises that are currently running.

pendingCount

The number of promises that are waiting to run.

Events

empty

Emitted when the queue becomes empty. Useful if, for example, you add additional items at a later time.

idle

Emitted when the queue becomes empty, and all promises have completed: queue.activeCount === 0 && queue.pendingCount === 0.

The difference with empty is that idle guarantees that all work from the queue has finished. empty merely signals that the queue is empty, but it could mean that some promises haven't completed yet.

Credits

The library is based on p-limit and p-queue (by the amazing sindresorhus)

Versions

Promise Blocking Queue supports Node 6 LTS and higher.

Contributing

All contributions are happily welcomed!
Please make all pull requests to the master branch from your fork and ensure tests pass locally.