npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

pool-worker-threads

v1.0.6

Published

Allow to use a pool for worker_threads

Downloads

4

Readme

pool-worker-threads

Worker threads pool using Node's worker_threads module. Compitible with ES6+,Typscript, Observable, thus Promise Async/Await.

Notification

  1. This module can only run in Node.js 12 or higher.

Installation

npm install pool-worker-threads --save

Class: WorkerPool

Instance of WorkerPool is a threads pool executes different task functions provided every call.

new WorkerPool(size)

  • size <number> Number of workers in this pool.

pool.exec(opt)

  • opt
    • task <function> Function as a task to do. Notice: You can not use closure in task function! If you do want to use external data in the function, you can use workerData to pass some cloneable data.
    • workerData <any> Cloneable data you want to access in task function.
  • Returns: <Observable>

Choose one idle worker in the pool to execute your task function. The Promise is resolved with the result your task returned.

Typical example using observable

import { WorkerPool } from "pool-worker-threads";

const pool = new WorkerPool(4, true); //Set true will run your pool in strict mode then if an unpected case append it will throw an error

const myObservable = pool.exec<string, string>({
    task(data, post) {
        post("My message 1"); //You can easily emit some message from your thread by using you second argument
        var str = "ba";
        for (var i = 0; i < 2; i++)
            str += str;
        post("My message 2"); 
        return str; //This is you final post
    }
});
myObservable
    .subscribe((msg:string) => {
        console.log(msg); // At every post you will receive a notification with the data here
        if (msg === "babababa")
            process.exit(0);
    });

process.on("exit",(_) => {
    //Don't forget to destroy your pool when you finished with it
    pool.destroy();
});

Choose Observable as object allow you to easily use your worker results with an Promise approach by using

const obs = new Observable()
const promise = obs.toPromise(); //This is your promise it 'll wait for all your worker messages

Typical example using some data with Promise aproach

WorkerPool.execAwait

import { WorkerPool } from "./pool-worker-threads";

const threads = 4;

const pool = new WorkerPool(threads, true);

const secreKey = "my secretKey";

const nbrToken = 10000;

const workersPromises = [];

//For some reason We'll build an array of jwt token
for (let i = 0; i < threads; i++) {
    const workerData = {
        nbrTokenYouMustBuild:nbrToken/threads,
        secret:secreKey
    };
    type WorkerDataIn = {nbrTokenYouMustBuild:number, secret:string};
    //If you are a responsible developer you are coding with typscript
    //Thus you should create some type.
    /* 
        * It would works like that also
        pool.exec<{nbrTokenYouMustBuild:number, secret:string}, string[]>({ ....
            ....
        })
        * But I am a proper person
    */
    const workerPromise =  pool.execAwait<WorkerDataIn, string[]>({
            workerData,
            task(data:WorkerDataIn, post:Function):string[] {
                //Be carefule the code here must be writte in JS
                const jwt  = require("jsonwebtoken"); //Thus we use a require instead of import
                const tokens = [];
                const { nbrTokenYouMustBuild, secret } = data;
                while (tokens.length !== nbrTokenYouMustBuild) {
                    const payload = { data:"gitHub:Kmynes" };
                    const tokenOptions = { expiresIn: '1h' };
                    const token = jwt.sign(payload, secret, tokenOptions);
                    tokens.push(token);
                }

                return tokens; //We send one message at the, but we could send it by piece
            }
        }); //You can easily accumulate all datas emited in a promise
    workersPromises.push(workerPromise);
}

Promise.all(workersPromises)
    .then((workersResults:string[][]) => {
        const tokens = workersResults.reduce(
            (accRes, currRes) => accRes.concat(currRes) //We just create a big array with all tokens
        );
        if (tokens.length === 1000)
            console.log("Test execWaitOnce success");
        else
            console.error("Test execWaiteOnce failure");
        pool.destroy();
    }).catch(e => { 
        console.error("Test execWaiteOnce failure ", e); 
    });

Task with Async/Await

In your tasks you can easily return a promise and it 'll be resolved before send it as a message to your Observable.

Typical use case

import { WorkerPool } from "./pool-worker-threads";

const pool = new WorkerPool(4, true);

var oneIsPassed = false;

const subScribeListenner = (n:Number) => {
    console.log(n);
    if (!oneIsPassed)
        oneIsPassed = true; //Don't worry it seams nodejs is always single-tread then it 'll works 
    else
        pool.destroy();
};

pool.exec<Number, Number>({
    workerData:10,
    //@ts-ignore
    //Because typescript can't understand this function will not understand this function will be executed in a nice VM we must ingore it.
    task: (number:Number, post:Function):Promise<Number> => {
        return Promise.resolve(number);
    }
}).subscribe(subScribeListenner);

/*
// /!\ Don't use async await on a task declared in a typscript file, but in JS file you can do wathever you want
//The same thing
pool.exec<Number, Number>({
    workerData:10,
    //@ts-ignore
    //Because typescript can't understand this function will not understand this function will be executed in a nice VM we must ingore it.
    task: async (number:Number, post:Function):Promise<Number> => {
        //Please if you are unsing typscript this will not work because it will be transform in somthing VM module don't like
        return await Promise.resolve(number);
    }
}).subscribe(subScribeListenner);*/

Interupt seach

WorkerPool.execManyDuplicationCloseOnMessage

import { WorkerPool } from "../index";
import { generate } from "generate-password"

const threads = 4;

const pool = new WorkerPool(threads, true);

const wordToSearch = generate({ length:120 });

const nbrStr = 3000000;

pool.execAwaitManyDuplication<number, string[]>({
    workerNumber:threads,
    params:{
        workerData:nbrStr,
        task(data:number) {
            const { generate } = require("generate-password");
            const listStr = [];
            while (listStr.length !== data)
                listStr.push(generate({ length:10 }));
            return listStr;
        }
    }
}).then(resList => {
    const list = resList.reduce((acc, curr) => acc.concat(curr))
    list.push(wordToSearch);
    type DataIn = { list:string[], wordToSearch:string };
    const workersData = [] as DataIn[];
    let pad = nbrStr/threads;
    for (var i = 0; i < threads; i++) {
        workersData.push({
            list:list.slice(pad * i, (i+1) * (pad)),
            wordToSearch
        });
    }
    workersData[0].list[200000] = wordToSearch;
    console.time("Search");
    pool.execManyDuplicationCloseOnMessage<DataIn, boolean>({
        workerNumber:threads,
        checkValue(msg) {
            return msg === true;
        },
        workersData,
        task(data:DataIn, post:Function) {
            for (let i = 0; i < data.list.length; i++) {
                if (data.list[i] === data.wordToSearch) {
                    return true;
                }
            }
            return false;
        }
    }).then(res => {
        console.timeEnd("Search");
        console.log(res);
        pool.destroy();
    })
});

pool.destroy()

Call worker.terminate() for every worker in the pool and release them.