npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

banana-fork

v1.1.2

Published

Easy parallel job processing in NodeJS with Banana Fork.

Downloads

24

Readme

Banana Fork

Easy parallel job processing in NodeJS.

Description

BananaFork is a lightweight wrapper of the NodeJS cluster module. It's also tiny!! (18.6 kB)

Getting Started

npm i banana-fork

Bare-Bones Example

import { bananaFork } from 'banana-fork';

(async () => {
  await bananaFork({
    workerCount: 5,
    getArrayOfItems: async () => {
      // ...Get the full list of items to work on.
      return [...Array(5).keys()]; // ex: [0,1,2,3,4]
    },
    workerMain: async ({ id, subsetOfItems }) => {
      // This code runs in a forked process!
      for (let i = 0; i < subsetOfItems.length; i++) {
        console.log('Worker ID', id, 'just processing some stuff.');
      }
    },
  });
})();

What do i do?

Just define 3 things:

  1. How many processes you want to fork.
  2. How to get your full list of work-items.
  3. What your workers do with each subset of items.

The manager process will evenly distribute your workload amongst worker threads and run them until they either error or complete.

Workloads can be of any type.

If you expect long-running workloads, consider using reportDurationInMs in combination with the messageProcessor method to log periodically on how many items each worker has completed. See examples below for inspiration.

Full-Featured Example

(async () => {
  // ^ This function is used just so we can use async/await.

  const cluster = await bananaFork({
    workerCount: 5,
    getArrayOfItems: async () => [...Array(5).keys()], // Do something to get the FULL list of items to work on.
    reportDurationInMs: 3000,
    messageProcessor: async (msg) => {
      if (msg.cmd === 'report')
        logger.info(
          `[WORKER][${msg.data.id}] INTERVAL DRIVEN PROGRESS REPORT: ${msg.data.completedSoFar}/${msg.data.total}`
        );
    },
    workerMain: async ({ id, subsetOfItems, incrementMePerItemProcessed }) => {
      for (let i = 0; i < subsetOfItems.length; i++) {
        const item = subsetOfItems[i];
        logger.info(`[WORKER][${id}] processing item ${item}`);

        // do something computationally expensive.
        await new Promise((resolve) => setTimeout(resolve, 5000)); // wait 5 seconds!

        logger.info(`[WORKER][${id}] processing item ${item} complete`);
        incrementMePerItemProcessed.count += 1; // Track the completeness of each item in the subset!
      }
    },
  });
})();

Configuration Options

| Name | Type | Required | Description | | ------------------ | -------- | -------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | | workerCount | number | yes | The number of processes to fork. Should not exceed number of CPUs on host machine. | | getArrayOfItems | function | yes | An async function which resolves a list of items that will be split into equal chunks and distributed amongst worker processes. | | workerMain | function | yes | An async function which handles items distributed by the "manager" process. For accurate reporting it should increment incrementMePerItemProcessed.count. | | messageProcessor | function | no | This optional method is an async function that handles messages sent from worker processes. The shape and type of each WorkerMessage is documented below. | | reportDurationInMs | number | no | The duration in ms which each worker should report it's progress using a ReportMessage. If falsy, no reports will be sent. |

WorkerMessage Interface


enum WorkerCmd {
  DONE = 'done', // Sent by worker when job has ran to completion without error.
  ERROR = 'error', // Sent by worker when work stopped due to error.
  STARTING = 'starting', // Sent by worker when work begins.
  REPORT = 'report', // Sent by worker when reporting periodically.
  WORK = 'work', // Sent by the manager process when delegating workload.
}

type WorkerMessage<T = unknown> =
  | WorkerDoneMessage
  | WorkerErrorMessage
  | WorkerStartMessage
  | WorkerWorkMessage<T>
  | WorkerReportMessage;

interface WorkerDoneMessage {
  cmd: WorkerCmd.DONE;
  data: { id: number };
}

interface WorkerErrorMessage {
  cmd: WorkerCmd.ERROR;
  data: Error;
}
interface WorkerStartMessage {
  cmd: WorkerCmd.STARTING;
  data: { id: number; length: number };
}

interface WorkerReportMessage {
  cmd: WorkerCmd.REPORT;
  data: {
    id: number;
    completedSoFar: number;
    total: number;
  };
}

interface WorkerWorkMessage<T> {
  cmd: WorkerCmd.WORK;
  data: {
    id: number;
    subsetOfItems: T[];
  };
}

Sending your own messages from workers

From anywhere within your workerMain function you can send messages to the "messageProcessor" function in the "manager" process by calling:

process.send({ cmd: 'whatever', data: 'anything' });

Example output

$ ts-node ./src/example/all-options.ts
2022-03-08T05:46:02.084Z [info]: [MANAGER] Number of cpus on this machine: 8
2022-03-08T05:46:02.084Z [info]: [MANAGER] Max threads allowed: 5
2022-03-08T05:46:02.084Z [info]: [MANAGER] Progress reporting every: 3000ms
2022-03-08T05:46:02.085Z [info]: [MANAGER] Number of chunks: 5
2022-03-08T05:46:02.085Z [info]: [MANAGER] Chunk[0].length: 1
2022-03-08T05:46:02.085Z [info]: [MANAGER] Chunk[1].length: 1
2022-03-08T05:46:02.086Z [info]: [MANAGER] Chunk[2].length: 1
2022-03-08T05:46:02.086Z [info]: [MANAGER] Chunk[3].length: 1
2022-03-08T05:46:02.086Z [info]: [MANAGER] Chunk[4].length: 1
2022-03-08T05:46:02.086Z [info]: [MANAGER] Forking 0
2022-03-08T05:46:02.094Z [info]: [MANAGER] Forking 1
2022-03-08T05:46:02.100Z [info]: [MANAGER] Forking 2
2022-03-08T05:46:02.106Z [info]: [MANAGER] Forking 3
2022-03-08T05:46:02.116Z [info]: [MANAGER] Forking 4
2022-03-08T05:46:06.055Z [info]: [WORKER][1] starting worker
2022-03-08T05:46:06.062Z [info]: [WORKER][1] processing item 1
2022-03-08T05:46:06.072Z [info]: [WORKER][3] starting worker
2022-03-08T05:46:06.074Z [info]: [WORKER][3] processing item 3
2022-03-08T05:46:06.099Z [info]: [WORKER][0] starting worker
2022-03-08T05:46:06.102Z [info]: [WORKER][0] processing item 0
2022-03-08T05:46:06.113Z [info]: [WORKER][4] starting worker
2022-03-08T05:46:06.115Z [info]: [WORKER][4] processing item 4
2022-03-08T05:46:06.132Z [info]: [WORKER][2] starting worker
2022-03-08T05:46:06.133Z [info]: [WORKER][2] processing item 2
2022-03-08T05:46:09.062Z [info]: [WORKER][1] INTERVAL DRIVEN PROGRESS REPORT: 0/1
2022-03-08T05:46:09.077Z [info]: [WORKER][3] INTERVAL DRIVEN PROGRESS REPORT: 0/1
2022-03-08T05:46:09.101Z [info]: [WORKER][0] INTERVAL DRIVEN PROGRESS REPORT: 0/1
2022-03-08T05:46:09.120Z [info]: [WORKER][4] INTERVAL DRIVEN PROGRESS REPORT: 0/1
2022-03-08T05:46:09.135Z [info]: [WORKER][2] INTERVAL DRIVEN PROGRESS REPORT: 0/1
2022-03-08T05:46:11.068Z [info]: [WORKER][1] processing item 1 complete
2022-03-08T05:46:11.079Z [info]: [WORKER][3] processing item 3 complete
2022-03-08T05:46:11.103Z [info]: [WORKER][0] processing item 0 complete
2022-03-08T05:46:11.122Z [info]: [WORKER][4] processing item 4 complete
2022-03-08T05:46:11.134Z [info]: [WORKER][2] processing item 2 complete
2022-03-08T05:46:11.135Z [info]: [MANAGER] All workers are done. Exiting...
2022-03-08T05:46:11.193Z [info]: [MANAGER] Worker 17583 exited with code 0, and signal null
2022-03-08T05:46:11.194Z [info]: [MANAGER] Worker 17579 exited with code 0, and signal null
2022-03-08T05:46:11.199Z [info]: [MANAGER] Worker 17582 exited with code 0, and signal null
2022-03-08T05:46:11.202Z [info]: [MANAGER] Worker 17580 exited with code 0, and signal null
2022-03-08T05:46:11.212Z [info]: [MANAGER] Worker 17581 exited with code 0, and signal null