npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@jjavery/worker-pool

v0.8.0

Published

A worker pool for Node.js applications

Downloads

10

Readme

worker-pool

A load-balancing and auto-scaling pool of Node.js worker processes

Features

  • Load Balancing Requests are sent to worker processes based on one of several of load balancing strategies
  • Auto Scaling Worker processes are started automatically when handling reqests and stopped automatically when idle
  • Hot Reloading Start a new set of worker processes while gracefully stopping the current workers
  • Prestarting Start one or more worker processes immediately and keep them running even when they're idle — initial or infrequent requests won't have to wait for worker process starts
  • Simple Workers A worker module is simply a module that exports one or more functions and (optionally) handles process signals

Installation

Install with NPM

$ npm install @jjavery/worker-pool

Example

app.js:

// Get a reference to the WorkerPool class
const WorkerPool = require('@jjavery/workerpool');

// Create an instance of a WorkerPool. The pool will start with 1 process and
// expand to 5 as needed. The 'fill' strategy will queue up to 10 requests in
// the first non-full worker before moving on to the next. If all workers are
// full, it will send requests to the least-full worker, even if this overfills
// the worker.
const workerPool = new WorkerPool({
  min: 1,
  max: 5,
  strategy: 'fill',
  full: 10
});

// Create a proxy for a worker function
// Note that a proxy function always returns a promise, even if its worker
// function is synchronous
const doSomeWork = workerPool.proxy('./worker', 'doSomeWork');

// Call the proxy 1,000 times
for (let i = 0; i < 1000; ++i) {
  doSomeWork('bar')
    .then((result) => {
      console.log(result);
    })
    .catch((err) => {
      console.error(err);
    });
}

// Stop the worker pool
workerPool.stop();

worker.js:

async function doSomeWork(work) {
  const ms = Math.floor(Math.random() * 1000);

  await wait(ms);

  return `[${process.pid}] Work "${work}" completed in ${ms}ms`;
}

async function wait(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

process.on('SIGTERM', () => {
  process.exit(0);
});

module.exports = {
  doSomeWork
};

API Reference

WorkerPool ⇐ EventEmitter

Provides a load-balancing and (optionally) auto-scaling pool of worker processes and the ability to request for worker processes to import modules, call their exported functions, and reply with their return values and thrown exceptions. Load balancing and auto-scaling are configurable via min/max limits, strategies, and timeouts.

Extends: EventEmitter

new WorkerPool(options)

| Param | Type | Default | Description | | --- | --- | --- | --- | | options | Object | {} | Optional parameters | | options.cwd | string | | The current working directory for worker processes | | options.args | Array. | | Arguments to pass to worker processes | | options.env | Object | | Environmental variables to set for worker processes | | options.min | number | 0 | The minimum number of worker processes in the pool | | options.max | number | 3 | The maximum number of worker processes in the pool | | options.idleTimeout | number | 10000 | Milliseconds before an idle worker process will be asked to stop via options.stopSignal | | options.stopTimeout | number | 10000 | Milliseconds before a worker process will receive SIGKILL after it has been asked to stop | | options.stopSignal | 'SIGTERM' ⎮ 'SIGINT' ⎮ 'SIGHUP' ⎮ 'SIGKILL' | 'SIGTERM' | Initial signal to send when stopping worker processes | | options.strategy | 'fewest' ⎮ 'fill' ⎮ 'round-robin' ⎮ 'random' | 'fewest' | The strategy to use when routing calls to workers | | options.full | number | 10 | The number of requests per worker used by the 'fill' strategy | | options.start | boolean | true | Whether to automatically start this worker pool |

Example

const workerPool = new WorkerPool(
  cwd: `${versionPath}/workers`,
  args: [ '--verbose' ],
  env: { TOKEN: token },
  min: 1,
  max: 4,
  idleTimeout: 30000,
  stopTimeout: 1000,
  stopSignal: 'SIGINT'
  strategy: 'fill',
  full: 100
});

workerPool.cwd : string

The current working directory for worker processes. Takes effect after start/recycle.

Example

workerPool.cwd = `${versionPath}/workers`;

await workerPool.recycle();

workerPool.args : Array.

Arguments to pass to worker processes. Takes effect after start/recycle.

Example

workerPool.args = [ '--verbose' ];

await workerPool.recycle();

workerPool.env : Object

Environmental variables to set for worker processes. Takes effect after start/recycle.

Example

workerPool.env = { TOKEN: newToken };

await workerPool.recycle();

workerPool.isStopping : boolean

True if the worker pool is stopping

workerPool.isStopped : boolean

True if the worker pool has stopped

workerPool.isStarted : boolean

True if the worker pool has started

workerPool.getProcessCount() ⇒ Number

Gets the current number of worker processes

Returns: Number - The current number of worker processes

workerPool.start() ⇒ Promise

Starts the worker pool

Resolves: When the worker pool has started
Rejects: WorkerPool.NotReadyError | Error When an error has been thrown

workerPool.stop() ⇒ Promise

Stops the worker pool, gracefully shutting down each worker process

Resolves: When the worker pool has stopped
Rejects: Error When an error has been thrown

workerPool.recycle() ⇒ Promise

Recycle the worker pool, gracefully shutting down existing worker processes and starting up new worker processes

Resolves: When the worker pool has recycled
Rejects: WorkerPool.NotReadyError | Error When an error has been thrown

workerPool.call(modulePath, functionName, ...args) ⇒ Promise

Routes a request to a worker in the pool asking it to import a module and call a function with the provided arguments.

Note: WorkerPool#call() uses JSON serialization to communicate with worker processes, so only types/objects that can survive JSON.stringify()/JSON.parse() will be passed through unchanged.

Resolves: any The return value of the function call when the call returns
Rejects: WorkerPool.UnexpectedExitError | WorkerPool.WorkerError | Error When an error has been thrown

| Param | Type | Description | | --- | --- | --- | | modulePath | string | The module path for the worker process to import | | functionName | string | The name of a function expored by the imported module | | ...args | any | Arguments to pass when calling the function |

Example

const result = await workerPool.call('user-module', 'hashPassword', password, salt);

workerPool.proxy(modulePath, functionName) ⇒ function

Creates a proxy function that will call WorkerPool#call() with the provided module path, function name, and arguments. Provided as a convenience and minor performance improvement as the modulePath will only be resolved when creating the proxy, rather than with each call.

Note: WorkerPool#proxy() uses JSON serialization to communicate with worker processes, so only types/objects that can survive JSON.stringify()/JSON.parse() will be passed through unchanged.

Returns: function - A function that calls WorkerPool#call() with the provided modulePath, functionName, and args, and returns its Promise

| Param | Type | Description | | --- | --- | --- | | modulePath | string | The module path for the worker process to import | | functionName | string | The name of a function expored by the imported module |

Example

const hashPassword = workerPool.proxy('user-module', 'hashPassword');

const hashedPassword = await hashPassword(password, salt);

"error"

Emitted when an error is thrown in the constructor.

"start"

Emitted when the worker pool starts.

"recycle"

Emitted when the worker pool recycles.

"stop"

Emitted when the worker pool stops.

WorkerPool.NotStartedError

Thrown when the worker pool is not started

WorkerPool.NotReadyError

Thrown when a worker process doesn't signal that it is ready

WorkerPool.UnexpectedExitError

Thrown when a worker process exits unexpectedly

WorkerPool.WorkerError

Thrown when a function called by a worker process (or a worker process itself) throws an error


Copyright © 2022 James P. Javery @jjavery