npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

cluster-cmd

v1.0.5

Published

Cluster ipc framework implementing request/reply paradigm

Downloads

1

Readme

cluster-cmd

cluster-cmd is a small framework implementing the command/response paradigm for the Node.js Cluster module. Instead of concentrating inter-process comunication logic into the worker.on and process.on event handlers, you can call a function in another thread pretty much the same way you would call a local function.

Main features:

  • transparent to the Node.js cluster module
  • master-to-worker and worker-to-master commands
  • basic master-worker synchronization
  • timeout handling

How it works

Synchronous or asynchronous command handlers are registered using the on.sync or on.async functions, respectively. These commands can then be remotely executed by the run function.

Here is a 'Hello world' example (also available in the ./examples folder):

const {forkWait, on, ready, run, getWorkerByName, myName} = 
    require('cluster-cmd');

function masterCode() {
    // start thread 'worker' and wait until it is initialized
    forkWait('worker')
    .then(async () => {
        console.log('Worker initialized');

        // run the 'helloWorld' command in 'worker'
        await run('worker','helloWorld');

        // terminate 'worker' thread
        getWorkerByName('worker').kill();   
    })
    .catch(err => {
        console.log(`Worker failed: ${err}`)
    })
}

function workerCode() {

    // register 'helloWorld' command handler
    on.sync('helloWorld', () => {
        console.log(`From ${myName}: Hello world!`);
    })

    // tell master that I'm ready to receive commands
    ready();
}

if (myName == 'master') {
    masterCode() 
}
else {
    workerCode();
}

See below for a more complex example demonstrating both master -> worker and worker -> master communication.

Exports

  • fork
  • forkWait
  • on
  • run
  • ready
  • failed
  • cancel
  • setOptions
  • removeWorkerName
  • getWorkerByName
  • getWorkerNames
  • myName
  • workerName: string
  • env: object

This function starts a new worker with the specified name. The worker name serves as a reference to the worker throughout the framework. getWorkerByName(workerName) gets you the underlying cluster.worker object, getNames() returns an array of all worker names.

env is an object whose properties will be added to the worker's process.env object. Default is { }. fork always adds the property '_workerName'(holding the worker's name) to the worker's environment.

fork can produce the following errors (error.code):

  • ERR_NO_WORKERNAME : missing worker name
  • ERR_WORKERNAME_NOT_STRING : worker name is not a string
  • ERR_WORKERNAME_EXISTS: a worker with the same name already exists
  • ERR_WORKERNAME_MUST_NOT_BE_MASTER: 'master' is not allowed as worker name
  • workerName: string
  • timeoutMs : number
  • env: object

This function calls fork(workerName, env) , then waits until the worker has called the ready function. A timeout interval in milliseconds can be specified in timeoutMs.

Example:

Promise.all([
    forkWait('server1', 6000, {port:8080}),
    forkWait('server2', 6000, {port:8081}),
    forkWait('server3', 6000, {port:8082})
])
.then(() => {
    console.log('All three servers running and initialized');
})

// process.env.port will be available in all workers 

forkWait can produce the same errors as fork plus:

  • ERR_FORK_TOO_MANY_ARGS: too many arguments
  • ERR_WORKER_TIMED_OUT: worker did not call ready within timeoutMs milliseconds
  • ERR_WORKER_FAILED: worker called failed

You cannot call on directly but have to use on.sync or on.async. If you call on directly, you will get the ERR_ONSYNC_OR_ONASYNC error.

on.sync(command, handler)

  • command: string
  • handler: function (arguments, command, workerName, timeoutMs)

This function registers a sync command handler, i.e. a function returning a value. It can be used both in master and worker mode. A special case is command == '*' which registers a default command handler (used when no specific command handler exists).

If you mistakenly register an async function with on.sync, you will get the ERR_NOT_SYNC_HANDLER error at the next run .

on.sync can produce the following errors (error.code):

  • ERR_INVALID_COMMAND: command is undefined or not a string
  • ERR_INVALID_HANDLER: command is undefined or not a function

Example: registration of a specific command handler

on.sync('getEnvVar', ({name}) => {
    return process.env[name];
})

Example: registration of the default handler ('*')

on.sync('*', (args, command)  => {
    console.log(`User called command ${command} with arguments ` +
    `${JSON.stringify(args)}`);
})

on.async(command, handler)

  • command: string
  • handler: function (arguments, command, workerName, timeoutMs)

This function registers an async command handler, i.e. a function returning a Promise object. In all other respects it is the same as on.sync.

If you mistakenly register an sync function with on.async, you will get the ERR_NOT_ASYNC_HANDLER error at the next run.

on.async produces the same errors as on.sync.

N.B.: The handler must return a Promise, callback is not supported!

Example:

var request = require('request');

on.async('wget', ({url}) => {
    return new Promise((resolve, reject) => {
        request(url, (err, response) => {
            if (err) reject(err);
            else resolve(response);
        })
    })
})

on.async produces the same errors as on.sync.

  • workerName: string (only in master mode)
  • command: string
  • args: object
  • timeoutMs: number
  • callback: function(err, data)
  • returns: id (for the callback version, see cancel) or Promise

This function executes a command registered in another thread. The first version is used in master mode, the second in worker mode.

workerName is the name of the target worker (only in master mode). It is not possible for a worker to run commands in another worker. However, in master code it is possible to specify workerName == 'master' . In this special case the master runs the command directly in its own thread, bypassing the inter-process communication mechanisms. This is very practical if master and workers are running the same code and you need to send the same command to all instances, like in the following example:

...
let promises = [];

for (let workerName in ['master',...clc.getWorkerNames()])
    promises.push(run(workerName,'getStatus'));
let status = await Promise.all(promises);
...    

command is the name of a previously registered command

args is passed to the command handler and can be anything of type 'object' which can be serialized and de-serialized by JSON.

timeoutMs specifies a timeout interval in millisecods. If it is omitted, the default timeout interval is used (default is currently 5 minutes). A value of 0 means that no timer will be set.

handler: a command handler funtion of the type function(command, args, timeoutMs).

The callback version of run returns an id which can be used to cancel the command (see cancel for an example). In the promise version, this id can be got through the id property of the returned promise (see cancel for an example)again, .

run can produce the following errors (error.code):

  • ERR_TOO_MANY_ARGUMENTS: self-explanatory
  • ERR_WORKERNAME_NOT_STRING: self-explanatory
  • ERR_COMMAND_NOT_STRING: self-explanatory
  • ERR_INVALID_COMMAND: command was not registered
  • ERR_WORKERNAME_DOESNT_EXIST: worker name not found in worker list
  • ERR_WORKER_DEAD_OR_DISCONNECTED: self-explanatory
  • ERR_COMMAND_CANCELED: command was canceled using the cancel function
  • ERR_NOT_SYNC_HANDLER: command handler should be registered with on.async
  • ERR_NOT_ASYNC_HANDLER: command handler should be registered with on.sync
  • ERR_COMMAND_TIMED_OUT: command didn't execute within timeoutMs milliseconds
  • ERR_PENDING_OVERFLOW the default maximum of currently pending commands exceeded (default: 1000, see setOptions)
  • data: anything JSON.stringifiable

This function is called by a worker after it is initialized and ready to receive commands from master.

data is passed to the forkWait function and can be anything which can be handled by process.run (basically JSON.stringifiable)

ready doesn't produce errors.

  • err: Error object or anything JSON.stringifiable

This function is called by a worker when initialization failed and the worker cannot receive commands from master.

err is optional and is passed back to forkWait. When omitted, an ERR_WORKER_FAILED error is generated and passed to forkWait.

After calling failed the worker typically exits with cluster.worker.kill().

failed doesn't produce errors.

  • id: string

This function allow to cancel a pending command created by run. The command is immediately terminated with an ERR_COMMAND_CANCELED error.

id is the command id originally returned by run. cancel offers a more flexible way to cancel a command than the timeoutMs argument. Example:

let id = run('worker', 'sluggish', 0, (err, data) => {...});  // no timer
...
...
if (bored) cancel(id);

The same in promise style:

var id;
run('worker', 'sluggish', 0)   // 0 means no timer
.id(cmdId => { // id passed to callback
    id = cmdId;
})
.then(() => {
    ...
})
.catch(err => {
    ...

if (bored) cancel(id);

The id property of the returned promise expects a callback function which is called with the id string. It is important to place .id(...) before .then and .catch(...) as the latter don't return the complete promise object.

cancel doesn't produce errors.

  • info: object
  • returns: object

This function sets the global options.

info is an object of type { name: value, name : value, ... }. Currently, the following options are available:

  • forkWaitTimeout (default: 3000 - 5 min)
  • runTimeout (default: 300000 )
  • maxPendingCommands (default: 1000)

forkWaitTimeout is used in forkWait, runTimeout in run.

maxPendingCommands designates the maximum number of pending commands, i.e. commands which have been started by run but which have not yet returned a reply. If this number is exceeded, the next run returns an ERR_PENDING_OVERFLOW error. This can be used to catch infinite recursion or synchronization errors.

setOptions returns an object with the complete previous option values. Thus setOptions() can be used to obtain the current options without changing them.

setOptions can produce the following errors (error.code):

  • ERR_INVALID_OPTION
  • workerName: string
  • returns: : boolean

This function removes workerName from the list of active workers. It is the user's responsibility to terminate the worker in the proper way.

It returns true if the entry existed, false otherwise.

removeWOrkerName doesn't produce errors.

  • workerName: string
  • returns: Cluster.worker object or undefined

This function returns the Cluster.worker object referenced by workerName, or undefined when no such object exists.

getWorkerByName doesn't produce errors.

  • returns: Array

This function returns the array of all workerNames created by fork or forkWait

getWorkerNames doesn't produce errors.

This constant contains the current worker name (in worker code) or 'master' (in master mode).

Tests

Mocha tests are available for master and worker code:

npm test test/master.js

and

npm test test/worker.js

Please run the above texts separately - npm test won't work as Mocha has some subtle issues when running in multithreaded envornment (well, perhaps I could have figured it out but didn't have the time and motivation...)

const {forkWait, on, run, ready, getWorkerByName, myName} = 
    require('cluster-cmd');

// Register a sync command handler for both master and worker
on.sync('add', ([a,b]) => {  
    return a + b;
});

// Register an async command handler for both master and worker
on.async('async_add', ([a,b]) => {  
    return new Promise(resolve => {
        setTimeout(() => {
            resolve(a + b);
        }, 2000)
    })
})


if (myName == 'master') { 
    // code executed by master

    // fork (start) a new worker thread and name it 'worker'
    // with a timeout interval of 5 seconds ...
    console.log(`${myName}: starting worker`);
    forkWait('worker', 5000) 

    // ... and wait until it has sent a 'ready' message   
    .then(async () => {  
        console.log(`${myName}: received 'ready' from worker`); 
        
        // run worker's 'add' command:
        let sum = await run('worker','add',[3,4]); 
        console.log(`${myName}: 3 + 4 =  ${sum}`)  // master: sum is 7 

        // run worker's 'async_add' command in the same way:
        sum = await run('worker','async_add',[3,4]); 
        console.log(`${myName}: 3 + 4 =  ${sum}`)  // master: sum is 7 

        // Alternatively, you can use promise syntax:
        sum = await run('worker','add',[3,4])
        .then(sum => {
            console.log(`${myName}: 3 + 4 = ${sum}`)  // master: sum is 7
        })

        // or good old callback syntax:
        sum = await run('worker','async_add',[3,4], (err, sum) => {
             if (err) console.log(err);
             else console.log(`${myName}: 3 + 4 = ${sum}`)  // master: sum is 7
        })

        // execute worker's 'doSomethingUseful' command in the same way
        console.log(`${myName}: calling worker's 'calculate' command`)  
        await run('worker','calculate'); 

        // terminate worker
        let worker = getWorkerByName('worker');
        worker.on('exit', () => {
            console.log(`${myName}: worker terminated`)
        })
        worker.process.kill();
    })

    // If the worker does not signal 'ready' within 5 seconds
    // forkAndWait rejects with an 'ERR_WORKER_TIMED_OUT' error. 
    .catch(err => {
        console.log(`Worker timed out: ${err}`) // ETIMEDOUT error
    })
}
else if (myName == 'worker') { 
    // code executed by worker

    // register async command handler
    on.async('calculate', async () => {
        // Call master's 'add' command
        console.log(`${myName}: 5 + 7 = ${await run('add',[5,7])}`); 
        console.log(`${myName}: 6 + 8 = ${await run('async_add',[6,8])}`); 
    })

    // ready to receive commands
    console.log(`${myName} is ready`);
    ready(); 
}