npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

nestjs-workers-pool

v1.2.0

Published

Nest Workers Pool is a library that allows you to create a pool of workers to execute tasks in parallel.

Downloads

15

Readme

@Nest/workers-pool

"NestJS Workers Pool" is a robust library for NestJS that enables you to execute functions in separate processes using worker_threads. This prevents the main thread from being blocked in heavy computation process and also allows you to control the number of functions running concurrently.

  • It manages a pool of workers, automatically assigning tasks and queuing them when all workers are busy.

  • runAsyncTask for awaiting task results, and runTask for non-blocking execution, you have the flexibility to choose based on your needs.

  • When using the waitUntilAvailable option, tasks wait in the queue until a worker is free, preventing overloading.

  • Workers are restarted after reaching their maximum tasks or lifetime, maintaining optimal performance.

  • Extend or inject WorkerPoolService as needed, and use decorators for handling custom events in worker scripts.

  • Use of decorador as @OnMessage or @onError to handle custom events in your service.

Installation

npm install nestjs-workers-pool

Usage as a extendable service

import { Injectable } from '@nestjs/common';
import { WorkerPoolService, onEmitted, onceEmitted, onError } from '@nestjs/workers-pool';

@Injectable()
export class FibonacciService extends WorkerPoolService {
  constructor() {
    super({
      scriptPath: './src/workers/heavy_task_worker.js',
      waitUntilAvailable: true,
      workerOptions: {
        execArgv: ['--max-old-space-size=4096'],
      },
    });
  }

  async calcFibonacci(n: number): Promise<boolean> {
    return this.runTask(n);
  }

  // by default the name of the method
  @onEmitted()
  progress(data: { progress: number }): void {
    console.log(`Progress: ${data.progress}`);
  }

  @onEmitted('message')
  message(data: { result: string }): void {
    console.log(`Message: ${data.result}`);
  }

  @onError()
    error(data: { error: string }): void {
        console.log(`Error: ${data.error}`);
    }
}

Usage as a inyectable service

// module.ts
import { Module } from '@nestjs/common';
import { WorkerPoolService, WORKER_OPTIONS_TOKEN } from '@nestjs/workers-pool';

@Module({
  providers: [
    {
      provide: WORKER_OPTIONS_TOKEN,
      useValue: {
        scriptPath: './src/workers/heavy_task_worker.js',
        waitUntilAvailable: true,
        maxWorkers: 2,
        maxLifetime: 10000,
        maxTasks: 10,
      },
    },
    WorkerPoolService,
  ],
})
export class AppModule {}
// app.service.ts
import { Injectable } from '@nestjs/common';

import { WorkersPool } from '@nest/workers-pool';

@Injectable()
export class AppService {
  constructor(private readonly workersPool: WorkersPool) {}

  async run() {
    // run task in a separate process, returns a boolean if the task was queued
    this.workersPool.runTask()

    // run task in a separate process, returns a result of the task
    const result = this.workersPool.runSyncTask()
    console.log(result)
  }
}

Worker script

// heavy_task_worker.js
const { parentPort } = require('worker_threads');

parentPort.on('message', (data) => {
  parentPort.postMessage({ progress: 0.1, type: 'progress' });
  // do something
  parentPort.postMessage({ progress: 1, type: 'progress' });
  parentPort.postMessage({ result: 'done', type: 'message' });
});
  • To trigger the custom events you should use the type property in the message object with the name of the event.
  • If you send an unhandled event, it will be ignored.
  • You should always send a type: 'message' in the final result, or the worker will never be flag as available again and the task will be queued forever, once you send the type: 'message' it will be stop listening for events and the worker will continue with the lifecycle.

Configuration

| Option | Type | Default value | Description | | ----------------- | -------- | ------------- | --------------------------------------------------------------------------- | | scriptPath | string | | Path to the worker script | | maxWorkers | number | os.cpus().length | The maximum number of workers that can be started simultaneously | | waitUntilAvailable | boolean | false | Wait until the worker is ready to accept tasks, if is false, it will throw an error |
| workerOptions | object | {} | Options for the worker thread | | maxTasks | number | 0 | The maximum number of tasks that can be run | | maxLifetime | number | 0 | The maximum lifetime of a worker in milliseconds | | verbose | boolean | false | Show debug information |

handling custom events

There are several decorators available for handling different types of events:

  • Use the @onEmitted() decorator to handle custom events. This will be executed each time the specified event is emitted.

  • The @onceEmitted() decorator is also for handling custom events, but it will only be executed the first time the event is emitted.

  • To handle errors, use the @onError() decorator. This will be executed each time an error raised in the worker.

  • The @onExit() decorator is for handling the exit event. This will be executed each time the exit event is emitted.

  • You can destroy all workers with the destroyWorkerPool() method if you need to, bt any pending task will be lost.

import { Injectable } from '@nestjs/common';
import { WorkerPoolService, onEmitted, onceEmitted, onError } from '@nestjs/workers-pool';

@Injectable()
export class FibonacciService extends WorkerPoolService {
  constructor() {
    super({
      scriptPath: './src/workers/heavy_task_worker.js',
      waitUntilAvailable: true,
      maxWorkers: 2,
    });
  }

  async calcFibonacci(n: number): Promise<boolean> {
    // you can send a transferable object to the worker
    const transferList = [n];
    return this.runTask(n, transferList);
  }

  // by default the name of the method
  @onEmitted()
  progress(data: { progress: number }): void {
    console.log(`Progress: ${data.progress}`);
  }

  @onEmitted('message')
  message(data: { result: string }): void {
    console.log(`Message: ${data.result}`);
  }

  @onError()
    error(data: { error: string }): void {
        console.log(`Error: ${data.error}`);
    }

  @onExit()
    exit(data: { exitCode: number }): void {
        console.log(`Exit: ${data.exitCode}`);
    }

}
// heavy_task_worker.js
const { parentPort } = require('worker_threads');

parentPort.on('message', (data) => {
  parentPort.postMessage({ progress: 0.1, type: 'progress' });
  // do something
  parentPort.postMessage({ progress: 1, type: 'progress' });
  parentPort.postMessage({ result: 'done', type: 'message' });
});

Future improvements

  • Integrations with services like Bull or RabbitMQ
  • Send intransferible objects to the worker like streams or sockets
  • Integrations with TypeORM to handle database connections

This is my first library so any feedback is welcome, I will be happy to receive your comments and pull requests! :D @bueno12223