elwt
v1.4.0
Published
Worker threads pool manager
Downloads
12
Maintainers
Readme
elwt
This module provides fast pool abstraction over new experimental Node.js worker threads API. I'm gonna implement more parallel paradigm stuff, like locks, semaphores, serialized handles etc.
Why elwt?
- fully customizable
- asynchronous API & interaction
- supports correct usage of SharedArrayBuffer
- can transfer most of datatypes
- workers are reusable
- customizable caching mechanism
- automatic workers respawn
- minimalistic
Installation
Simply cast npm i -S elwt
to install module locally.
After this you're free to import it.
Usage
Firstly, you must import module to start work with it. Module exports object of such structure:
- class
Pool
: extendsEventEmitter
and is used to manage pool, tasks, workers, cache etc. - class
Storage
: extendsMap
and is used to manage workers and tasks storing - sync function
templater
: used to create workers wrapper code (see below for details) - class
PoolWorker
: extendsWorker
and is used to spawn new workers
Simply import Pool
and use it's static asynchronous method spawnPool
to create and preconfigure pool. It's not recommended to create pools by just casting new Pool(...)
due to spawnPool
computes some neccessary stuff to make pool workable. See API docs below for details. See examples section below for usage with explanation.
API
Below you can see default API containing of classes, their methods, public fields, advanced tools and relations between all of that. Feel free to extends any of classes to customize pool behavior. Here is also virtual Cache
definition, which isn't provided as exportable class due to cache is disabled by default, but API defines it's format for you to implement your own cache storage if needed. Some descriptions use worker_threads.*
notation as alias for require('worker_threads').*
.
Classes
class Pool
, extends EventEmitter
:
Main class of lib and is used to create and manage pools, to let workers execute tasks, to manage execution details. It's okay to create multiple pools if needed. Be sure to obtain correct manipulations made to memory due to it may be unsafe to keep in sync when working in parallel.
Public methods and fields are:
spawnPool
:static async function
use it to create new pool. Accepts optionalobject
containing following options:size
: optional positivenumber
- indicates how much workers to create at pool's init. Pool is capable of dynamic size changes (see below for details). Defaults torequire('os').cpus().length
queue
: optionalStorage
- used to manage task queue. Defaults to what doesstorage.js
exportfree
: optionalStorage
: used to manage free workers. Defaults to what doesstorage.js
exportactive
: optionalStorage
- used to manage busy workers. Defaults to what doesstorage.js
exportcache
: optionalboolean
|Cache
- used to manage cache. If falsy, then caching is turned off. Defaults tofalse
PoolWorker
: optionalPoolWorker
- used as workers' constructor, being called explicitly. Defaults to what doesworker.js
exporttemplater
: optional(): string
- synchronous function, which is called to generate body for workers, which must be in some special format explained below. Defaults to what doestemplater.js
exportroundRobin
: optional(Iterable): Iterable
- synchronous function, which is called to convert iterators to round Robin iterators. Defaults totools.roundRobin
unitProps
: optionalobject
- arguments passed to originalWorker
constructor as is.unitProps.eval
is alwaystrue
independent of what do you pass. Defaults to{}
autoRespawn
: optionalboolean
- whether to enable autorespawn of workers fired internalerror
event (internal here means not the one your code passed to pool fires, but the one fired by worker itself). Notice workers keep living after task is done so this option may be safely turned off if your implementation handles worker internal errors correctly. Defaults totrue
constructor
:function
used seemlessly byspawnPool
to create new pool. It's not recommended to use constructor explicitly. Pool created by calling constructor without of computations made byspawnPool
is unusable. Acceptsobject
same as one passed tospawnPool
, exceptsize
androundRobin
aren't passed, buttemplate
is passed instead oftemplater
itself.template
is result of callingtemplater
. It's string containing JS code, which must react onmessage
event fired byworker_threads.parentPort
, considering next:- handler is asynchronous function and accepts required
object
containing arguments as follows:action
:string
- defines action for worker to perform. See below for acceptable action typesport
:worker_threads.MessagePort
- used to reply to messagesfn
:function
- asynchronous function which must be executed. Notice it is executed inside dedicated context so import modules it uses inside it not outsidedata
:any
- data passed tofn
. Must be serialized if not of typeSharedArrayBuffer
. Ifdata
isobject
, then it's children of typeSharedArrayBuffer
will be shared, rest children of typeArryBuffer
will be moved (follow Nodeworker_threads
docs to learn more about sharing and moving of typed arrays between threads), rest must be serializedraw
:object
- raw data passed tofn
. Compriseskey: SharedArrayBuffer
pairs withSharedArrayBuffer
s derived fromdata
, if any
- handler must deserialize incoming data if it's not
SharedArrayBuffer
- handler must react to
tools.actions.RUN
action withfn
execution - handler must call
port.postMessage
passingobject
structured as follows:action
:string
- defines type of response to send. See below for acceptable action typesresult
:string
- any data to return alongside. Non-string data must be serialized before.ArrayBuffer
aren't moved or shared, and disallowed as valueerror
: optionalboolean
- must betrue
ifaction
istools.actions.ERROR
- handler must close
port
explicitly afterport.postMessage
is called - action must be
tools.actions.DONE
iffn
is executed successfully - action must be
tools.actions.ERROR
iffn
execution throws
- handler is asynchronous function and accepts required
toSize
:async function
used to dynamically change the size of pool. Accepts following options:size
:number
- new size to set. If new size is less than old one, then redundant workers will be safely terminated after finishing their current activities, if ones. If new size is greater than old one, then new workers will be created automatically
exec
:async function
used to enqueue task execution. Pool will automatically choose worker to execute task once one is available. Accepts following options:fn
:async function
- function to execute in threaddata
: optionalany
- data to pass tofn
. Ifobject
thenSharedArrayBuffer
s inside are shared and restArrayBuffer
s are moved- optional
object
containing following additional options:respawn
:number
- how much times to retry task execution if one has failed
size
:number
- current pool's size
Private methods and fields are:
addUnit
:async function
- used to create new worker, to define event listeners for and to storage new worker to storageactivateUnit
:async function
- used to move worker from storage of free ones to storage of busy onesloadUnit
:async function
- used to load free worker or to await for free one and load it when it's freeloadTask
:async function
- used to load task from queue if there are ones if await for one if notnext
:async function
- used to prepare execution environment, to set task-related listeners for worker and to send task to thread_execCached
:async function
- used as replacement forexec
if caching is enabled to omit checks overhead if it's disabled
class Storage
, extends Map
:
Used to manage workers and tasks. Different storages are created separately for free workers, for busy ones and for tasks. Default storage provides asynchronous wrappers over next Map
methods: clear
, delete
, has
, set
. Class which extends Storage
must be provide Iterable
with at least that set of methods.
class PoolWorker
, extends Worker
:
Used to create workers. eval
option passed to Worker
constructor always equals to true
.
Examples
First of all, you have to import Pool
class someway. Then call spawnPool
on it to get new pool. After one is ready you're free to asynchronously cast .exec(fn, data, { respawn: N })
and wait for Promise
in response. fn
will be called by free worker, or queued if there are no ones:
const Pool = require('elwt').Pool;
let swarm = await Pool.spawnPool({ size: require('os').cpus().length });
console.log(await swarm.exec(async (input) => {
return input * 2;
}, 21)); // logs 42
Data may be of any type, but notice it transfers typed arrays and shares shared ones. This means if you pass typed array as data or data children, it will become unaccessible on emitter side, but listener will be able to catch it immediately. This also means you're free to use SharedArrayBuffer
without of copying or moving of data.
let shared = new SharedArrayBuffer(2 * Int32Array.BYTES_PER_ELEMENT);
await swarm.exec(async (input) => {
let view = new Int32Array(input);
view[0] = 21;
}, shared);
await swarm.exec(async (input) => {
let view = new Int32Array(input);
view[1] = view[0] * 2;
}, shared);
let response = await swarmc.exec(async (input) => {
let view = new Int32Array(input);
return view[1];
}, shared);
console.log(response); // logs 42
Size of pool is changeable anytime:
await swarm.toSize(32); // ok
await swarm.toSize(-5); // ok, will be set to 0
await swarm.toSize(0); // ok
await swarm.toSize(Infinity); // ok, will be set to Number.MAX_SAFE_INTEGER - 1
One more thing
Feel free to contribute and participate! Feel free to open an issue or fork or PR. Feel free to use this module anywhere considering there is still original LICENSE.md
and link to it inside package.json
TODO Bnaya/objectbuffer