npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

clusterfy

v1.1.5

Published

NodeJS package that simplifies process management and IPC in NodeJS cluster.

Downloads

16

Readme

Clusterfy

This library simplifies managing processes in a NodeJS cluster. It supports IPC via messaging and a shared storage managed by the primary process.

For presentation timeouts were inserted. You can find the code to this terminal preview here.

Features

  • Simple API for creating, watching and controlling workers in a cluster.
  • Shared storage managed by primary process. Workers can set and retrieve this storage.
  • Simple API for sending commands between primary and workers: primary <-> worker or worker <-> worker
  • Easy way to add custom commands
  • Simple event handling using RxJS (e.g. using Observable)
  • Extended workers with more attributes like name and status
  • Graceful shutdown on process signals
  • ES Module with Typescript definitions

Applications

You can use Clusterfy in all NodeJS applications with Node >= 16 and servers like ExpressJS or NestJS.

How it works

The primary process is the manager who knows everything about the workers and controls these. Furthermore the primary process manages a shared storage (JS object). If worker want to retrieve or save values to the storage they send requests to the primary process which returns or saves the value.

Commands from primary process to worker process and vice versa are sent via IPC as ClusterfyIPCEvent objects. It's also possible to send commands from one worker to another. This works thanks to the primary process that redirects the messages from one worker to another. With this construct you can send commands asynchronously between different processes.

Installation

npm install clusterfy --save

Furthermore, you need to install rxjs >= 7.5.0 and uuid >= 7.0.3 if not already installed.

Usage

Clusterfy is a static class that can be called everywhere. At the entrypoint of your application you want to start the cluster. So you need to initiate the primary process and its workers like that:

import {Clusterfy, ClusterfyIPCEvent, ClusterfyStorage} from 'clusterfy';

const onShutdown = async () => {
    console.log(`Simulate cleanup on shutdown for ${Clusterfy.currentLabel}...`);

    let j = 0;
    for (let i = 0; i < 100000; i++) {
        j++;
    }
    console.log(`All cleaned up for ${Clusterfy.currentLabel} OK`);
};

async function main() {
    if (Clusterfy.isCurrentProcessPrimary()) {
        // initStorage shared memory
        const sharedMemory = new ClusterfyStorage({
            test: {
                some: 1,
            },
        });
        Clusterfy.initStorage(sharedMemory);

        // create some workers asynchronously
        Clusterfy.fork('SomeName1');
        Clusterfy.fork();
        // now create worker that is revived automatically after it died
        Clusterfy.fork('EmailServer', {revive: true});
        await Clusterfy.initAsPrimary({
            gracefulOnSignals: ['SIGINT', 'SIGTERM'],
        });
        Clusterfy.registerShutdownMethod('default', onShutdown);

        // now you can use Clusterfy on primary as you like
        Clusterfy.events.subscribe({
            next: (event: ClusterfyIPCEvent) => {
                console.log(`Primary got event ${event.type} from worker ${event.senderID}`);
            }
        });
    } else {
        await Clusterfy.initAsWorker({
            gracefulOnSignals: ['SIGINT', 'SIGTERM'],
        });
        Clusterfy.registerShutdownMethod('default', onShutdown);
        // worker retrieved metadata from primary on initialization
        console.log(`Worker ${Clusterfy.currentWorker.name} is ready.`);

        // now you can use Clusterfy on worker as you like

        Clusterfy.events.subscribe({
            next: (event: ClusterfyIPCEvent) => {
                console.log(`Worker ${Clusterfy.currentWorker.name} got event ${event.type} from worker ${event.senderID}`);
            }
        });
    }
}

main();

Getter

currentWorker(): ClusterfyWorker

Returns current worker (only on worker). Returns undefined else.

currentLabel(): string

Returns "Primary" on primary or "Workername (id)" on worker.

storage(): ClusterfyStorage

Returns shared storage (only on primary). Returns undefined else.

events(): Subject<ClusterfyIPCEvent>

Returns observable of all events to this worker or primary.

workers(): ClusterfyWorker[]

Returns array of workers (only on primary). Returns empty array else.

Functions

initStorage<T>(storage: ClusterfyStorage)

Initializes the storage. Call this method on primary. The storage must be an object, e.g.

{
    test: {
        something: 123
    }
}

fork(name?: string, options?: ClusterfyWorkerOptions)

Creates a new worker with given name and options.

Options

async initAsPrimary(shutdownOptions?: ClusterfyShutdownOptions)

Initializes the primary with Clusterfy. This method must be called on primary (see example). If you want to include graceful shutdown on process signals you need to add shutdownOptions. Resolves as soon as all workers are ready.

async initAsWorker(shutdownOptions?: ClusterfyShutdownOptions)

Initializes the worker with Clusterfy and waits for metadata from primary. This method must be called on worker (see example). Wait until this method returns. If you want to include graceful shutdown on process signals you need to add shutdownOptions. Resolves as soon as worker is ready.

registerShutdownMethod(name: string, command: (signal: NodeJS.Signals) => Promise)

Registers a new method that is run on shutdown.

removeShutdownMethod(name: string)

Removes an existing shutdown method.

async saveToStorage(path: string, value: any)

Saves a serializable value to a path in the shared storage on primary. Path should be a string with dot notation to the attribute , e.g. "test.something". This method returns as soon as saved.

async retrieveFromStorage<T>(path: string)

Returns a serializable value from a given path in shared storage. Path should be a string with dot notation to the attribute , e.g. "test.something". This method returns the value of type T as soon as retrieved.

async shutdownWorker(worker: ClusterfyWorker, timeout = 2000)

If you call Clusterfy.shutdownWorker(worker, 2000) from primary it sends a cy_shutdown command to a worker. The worker gets status "STOPPING" and should exit itself using process.exit(0) in 2 seconds (graceful shutdown). If the worker doesn't exit itself, the primary kill it.

That means: the algorithm you are using for processing in a worker should check if the status is " STOPPING" using Clusterfy.currentWorker.status and then exit itself. The algorithm should also change the status to " PROCESSING" after it started processing and change it back to "IDLE" after finished. If a shutdown is received Clusterfy checks if the status is "IDLE" and calls process.exit(0) or changes the status to "STOPPING".

getStatistics()

Returns a object with statistics about the workers. Call it on primary.

outputStatisticsTable()

Outputs statistics from getStatistics to console. Call it on primary.

registerIPCCommand(command: clusterfyCommand)

Registers a new custom command to the list of supported commands. Call this method on worker and primary.

changeCurrentWorkerStatus(status: ClusterfyWorkerStatus)

Changes the status of the current worker and emits event of type "status" to itself and to primary.

Create custom command

Each command should extend class ClusterfyCommand. See examples in commands. Call registerIPCCommand(command) on primary and worker.

export class ClusterfyCommand {
    name: string;
    target: 'primary' | 'worker';
    runOnTarget: (
        args: Record<string, any>,
        commandEvent?: ClusterfyCommandRequest<any>
    ) => Promise<ClusterfyCommandRequestResult<any>>;

    constructor(options?: Partial<ClusterfyCommand>) {
        if (options) {
            Object.assign(this, options);
        }
    }
}

Development

  1. Install dependencies
   npm install
  1. Start in one terminal
   npm run watch:lib
  1. Run demo to test it.
   npm run start:demo