npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@elite-libs/promise-pool

v1.3.1

Published

Configurable async task queue, w/ throttling, retries, progress, error handling.

Downloads

61

Readme

Promise Pool 🏖️

CI Status NPM version GitHub stars

A background task processor focused on performance, reliability, and durability.

TLDR; An upgraded Promise queue that's essentially a stateful Promise.all() wrapper.

| Table of Contents

Smart Multi-Threaded Execution

Diagram of Promise Pool's 'Minimal Blocking' design

Key Promise Pool Features

Promise Pool strives to excel at 4 key goals:

  1. Durability - won't fail in unpredictable ways - even under extreme load.
  2. Extensible by Design - supports Promise-based libraries (examples include: retry handling, debounce/throttle)
  3. Reliability - control your pool with a total runtime limit (align to max HTTP/Lambda request limit), idle timeout (catch orphan or zombie situations), plus a concurrent worker limit.
  4. Performance - the work pool queue uses a speedy Ring Buffer design!*

* Since this is JavaScript, our Ring Buffer is more like three JS Arrays in a trenchcoat.

Who needs a Promise Pool?

  • Any Node Services (Lambda functions, etc.) which does background work, defined as:
    • Long-running async function(s),
      • where the return value isn't used (in the current request.)
      • And failures are handled by logging.

Features

  • [x] Configurable.
  • [x] Concurrency limit.
  • [x] Retries. (Use p-retry for this.)
  • [x] Zero dependencies.
  • [x] Error handling.
  • [x] Singleton mode: Option to auto-reset when .done(). (Added .drain() method.)
  • [x] Task scheduling & prioritization.
    • [x] Support or return hints/stats? (Time in Event Loop? Event Loop wait time? Pending/Complete task counts?)
    • [x] Support smart 'Rate Limit' logic.
      • Recommended solution: conditionally delay (using await delay(requestedWaitTime)) before (or after) each HTTP call.
      • Typically you'll detect Rate Limits via HTTP headers (or Payload/body data.) For example, check for any headers like X-RateLimit-WaitTimeMS.)

API

PromisePool exposes 3 methods:

  • .add(...tasks) - add one (or more) tasks for background processing. (A task is a function that wraps a Promise value. e.g. () => Promise.resolve(1)).
  • .drain() - Returns a promise that resolves when all tasks have been processed, or another thread takes over waiting by calling .drain() again.
  • .done() - Drains AND 'finalizes' the pool. No more tasks can be added after this. Can be called from multiple threads, only runs once.

See either the Usage Section below, or checkout the /examples folder for more complete examples.

Install

# with npm
npm install @elite-libs/promise-pool
# or using yarn
yarn add @elite-libs/promise-pool

Usage

Minimal Example

import PromisePool from '@elite-libs/promise-pool';
// 1/3: Either use the default instance or create a new one.
const pool = new PromisePool();

(async () => {
  // 2/3: Add task(s) to the pool as needed.
  // PromisePool will execute them in parallel as soon as possible.
  pool.add(() => saveToS3(data));
  pool.add(() => expensiveBackgroundWork(data));
  
  // 3/3: REQUIRED: in order to ensure your tasks are executed, 
  // you must await either `pool.drain()` or `pool.done()` at some point in your code (`done` prevents additional tasks from being added).
  await pool.drain();
})();

Singleton Pattern

Recommended for virtually all projects. (API, CLI, Lambda, Frontend, etc.)

The singleton pattern creates exactly 1 Promise Pool - as soon as the script is imported (typically on the first run).

This ensures the maxWorkers value will act as a global limit on the number of tasks that can run at the same time.

File /services/taskPoolSingleton.ts

import PromisePool from '@elite-libs/promise-pool';

export const taskPool = new PromisePool({
  maxWorkers: 6, // Optional. Default is `4`.
});

Recipes

See examples below, or check out the /examples folder for more complete examples.

AWS Lambda & Middy

Promise Pool in some middy middleware:

File 1/2 ./services/taskPoolMiddleware.ts

Note: The imported taskPool is a singleton instance defined in the taskPoolSingleton file.

import { taskPool } from './services/taskPoolSingleton';

export const taskPoolMiddleware = () => ({
  before: (request) => {
    Object.assign(request.context, { taskPool });
  },
  after: async (request) => {
    await request.context.taskPool.drain();
  }
});

Now you can use taskPool in your Lambda function via event.context.taskPool:

File 2/2 ./handlers/example.handler.ts

import middy from '@middy/core';
import { taskPoolMiddleware } from './services/taskPoolMiddleware';

const handleEvent = (event) => {
  const { taskPool } = event.context;

  const data = getDataFromEvent(event);

  taskPool.add(() => saveToS3(data));
  taskPool.add(() => expensiveBackgroundWork(data));
  
  return {
    statusCode: 200,
    body: JSON.stringify({
      message: 'Success',
    }),
  }
}

export const handler = middy(handleEvent)
  .use(taskPoolMiddleware());

Express Middleware

File 1/3 /services/taskPoolSingleton.mjs

import PromisePool from '@elite-libs/promise-pool'

export const taskPool = new PromisePool({
  maxWorkers: 6 // Optional. Default is `4`.
})

File 2/3 /middleware/taskPool.middleware.mjs

import { taskPool } from "../services/taskPoolSingleton.mjs";

const taskPoolMiddleware = {
  setup: (request, response, next) => {
    request.taskPool = taskPool
    next()
  },
  cleanup: (request, response, next) => {
    if (request.taskPool && 'drain' in request.taskPool) {
      taskPool.drain()
    }
    next()
  }
}

export default taskPoolMiddleware

To use the taskPoolMiddleware in your Express app, you'd include taskPoolMiddleware.setup() and taskPoolMiddleware.cleanup().

File 3/3 /app.mjs

import taskPoolMiddleware from "../middleware/taskPool.middleware.mjs"

export const app = express()

// Step 1/2: Setup the taskPool
app.use(taskPoolMiddleware.setup)
app.use(express.bodyParser())

app.post('/users/', function post(request, response, next) {
    const { taskPool } = request

    const data = getDataFromBody(request.body)

    // You can .add() tasks wherever needed,
    //   - they'll run in the background.
    taskPool.add(() => logMetrics(data))
    taskPool.add(() => saveToS3(request))
    taskPool.add(() => expensiveBackgroundWork(data))
    
    // Or, 'schedule' multiple tasks at once.
    taskPool.add(
      () => logMetrics(data), 
      () => saveToS3(request),
      () => expensiveBackgroundWork(data)
    )

    next()
  })

// Step 2/2: Drain the taskPool
app.use(taskPoolMiddleware.cleanup)

Changelog

v1.3.1 - April 2023

  • Upgraded dev dependencies (dependabot).
  • Cleaned up README code & description.

v1.3.0 - September 2022

  • What? Adds Smart .drain() behavior.
  • Why?
    1. Speeds up concurrent server environments!
    2. prevents several types of difficult (ghost) bugs!
      • stack overflows/mem access violations,
      • exceeding rarely hit OS limits, (e.g. max open handle/file/socket limits)
      • exceeding limits of Network hardware (e.g. connections/sec, total open socket limit, etc.)
      • uneven observed wait times.
  • How? Only awaits the latest .drain() caller.
  • Who? Server + Singleton use cases will see a major benefit to this design.
  • Huh? See diagram