@rxstack/worker-threads-pool
v0.8.0
Published
RxStack Worker Threads Pool Module
Downloads
56
Readme
RxStack Worker Threads Pool Module
Offload tasks to a pool of workers in rxstack application.
Installation
npm install @rxstack/worker-threads-pool --save
Documentation
Setup
WorkerThreadsPool
module is installed and configured by default in skeleton
application.
Module Options
The module accepts the following options::
path
: path to executable .js filemax
: Maximum number of workers allowed in the pool. Defaults to 1maxWaiting
: Maximum number of workers waiting to be started when the pool is full. It will trigger an exception if limit is reached. Defaults to 10
Create a task
Each task should extends AbstractWorkerThread
class:
// ./src/app/workers/my-task.ts
import {AbstractWorkerThread} from '@rxstack/worker-threads-pool';
import {Injectable} from 'injection-js';
import {parentPort, workerData} from 'worker_threads';
@Injectable()
export class MyTask extends AbstractWorkerThread {
async run(): Promise<void> {
parentPort.postMessage(`hello ${workerData.options.message} - from worker`);
}
getName(): string {
return 'my-task';
}
}
then register it in the application providers:
// ./src/app/workers/APP_WORKER_PROVIDERS.ts
import {ProviderDefinition} from '@rxstack/core';
import {WORKER_THREADS_POOL_REGISTRY} from '@rxstack/worker-threads-pool';
import {MyTask} from './my-task';
export const APP_WORKER_PROVIDERS: ProviderDefinition[] = [
// ...
{
provide: WORKER_THREADS_POOL_REGISTRY,
useClass: MyTask,
multi: true
}
];
now your task is ready to be run.
Run a task
you can run a task from anywhere. In the example below we'll run it from a controller and communicate with the connected client via socket connection:
import {Injectable, Injector} from 'injection-js';
import {InjectorAwareInterface, Request, Response, WebSocket} from '@rxstack/core';
import {WorkerThreadsPool} from '@rxstack/worker-threads-pool';
@Injectable()
export class IndexController implements InjectorAwareInterface {
private injector: Injector;
setInjector(injector: Injector): void {
this.injector = injector;
}
@WebSocket('app_index')
async indexAction(request: Request): Promise<Response> {
const pool = this.injector.get(WorkerThreadsPool);
pool.acquire('my-task', {massage: 'world'}).then((worker) => {
worker.on('message', (data: any) => {
// communication between client and worker via sockets
request.connection.emit('message', data);
});
worker.on('online', (err: any) => {
// task is started
request.connection.emit('message', 'task is started');
});
worker.on('error', (err: any) => {
// handle errors
request.connection.emit('message', err.message);
});
worker.on('exit', (code) => {
// task is completed
request.connection.emit('message', code === 0 ? 'success' : 'fail');
});
}).catch(e => console.error(e.message));
return new Response('Task is scheduled', 202);
}
}
API
pool.acquire(name, [options])
name
: task name,options
(optional): data passed toworkerData
returns a promise of Worker
or throws an exception if task does not exist or queue is full.
pool.stats()
returns workerSize
and queueSize
pool.terminate()
clears the queue and terminates all running workers
License
Licensed under the MIT license.