workestrator
v1.0.1
Published
<a href="https://coveralls.io/github/Yaty/workestrator?branch=master" target="_blank"><img src="https://coveralls.io/repos/github/Yaty/workestrator/badge.svg?branch=master" alt="Coverage Status" data-canonical-src="https://coveralls.io/github/Yaty/workest
Downloads
6
Maintainers
Readme
Workestrator
Workestrator is a library to distribute tasks to child processes. It is written in TypeScript and use ES2017 features.
This project is highly inspired by node-worker-farm
and add some new features.
Typescript Documentation : https://yaty.github.io/workestrator/
Features
- Concurrency options
- Durability / Resilience : when a call fails it will be re-queued automatically (according to the farm options).
- Async/Await support out of the box
- Events
- Serializers : You can choose among several serializers according to the data types you send to the workers. You can also write your own :)
- Broadcasting
Usage
const workestrator = require("workestrator");
const farm = workestrator.create({
module: "/absolute/path/to/the/worker/module.js",
});
try {
await farm.run(1, 2, 3); // returns 6
await farm.runMethod("foo", 1, 2, 3); // returns "bar:1:2:3"
} catch (err) {
console.log("Oh ... it failed :(", err);
}
And the module is :
module.exports = function(a, b, c) {
return a + b + c;
};
module.exports.foo = function(a, b, c) {
return `bar:${a}:${b}:${c}`;
};
Examples
Basic
Running and broadcasting methods.
Approximating π
Doing it the slow (single-process) way...
π ≈ 3.141597687999999 (0.000005034410206050666 away from actual!)
took 12217 milliseconds
Doing it the fast (multi-process) way...
π ≈ 3.1415487919999996 (0.000043861589793525724 away from actual!)
took 3425 milliseconds
See the full example with code here !
API
Workestrator
workestrator.create(options)
Create a new farm.
const workestrator = require("workestrator");
const farm = workestrator.create({
module: "/absolute/path/to/the/worker/module.js"
});
options
Options is an object, the default values are :
{
fork: {
args: process.argv,
cwd: process.cwd(),
env: process.env,
execArgv: process.execArgv.filter((v) => !(/^--(debug|inspect)/).test(v)), // without debug and inspect
execPath: process.execPath,
silent: false,
},
killTimeout: 500,
maxConcurrentCalls: Infinity,
maxConcurrentCallsPerWorker: 10,
maxIdleTime: Infinity,
maxRetries: 3,
numberOfWorkers: require("os").cpus().length,
serializerPath: workestrator.serializers.JSON
timeout: Infinity,
ttl: Infinity,
}
module
(mandatory !) : Absolute path to your module.fork
: Fork options used for each worker (see Node.js documentation)killTimeout
: The amount of time in ms for a worker to exit gracefully before killing it like butchers with SIGKILL.maxConcurrentCalls
: The maximum number of calls in the farm queue, i.e. : calls being processed by workers + calls waiting in the queue.maxConcurrentCallsPerWorker
: The maximum number of calls a worker can execute concurrently.maxIdleTime
: The maximum amount of time a worker can live without receiving a call. It will kill the worker and restart another one according to your policy.maxRetries
: How many times a call should be retried before failing once and for all, it will throw a CallMaxRetryError with the original error in thereason
property if this is reached.numberOfWorkers
: The amount of workers in the farm.serializerPath
: Absolute path to the serializer, Workestrator provides two serializers (JSON for basic data types, CBOR for complex data types). SeeSerializers
section.timeout
: A call will have to finish undertimeout
ms. It will throw a TimeoutError. It will be retried according to the farm options.ttl
: The amount of calls a worker can execute. The worker will be killed, his tasks will be redistributed to other workers. A new worker will be created.
workestrator.kill()
Kill all farms.
workestrator.kill();
Farm
farm.run(...args)
Run the default exported method in your module with arguments. Async. Returns what's returned by the default method.
try {
const res = await farm.run(1, [], "");
console.log("Result :", res);
} catch (err) {
console.log("Oh no :'(", err);
}
farm.runMethod(method, ...args)
Run a specific method in your module. Async. Returns what's returned by the method.
try {
// bar is a method exported with : module.exports.bar = function ...
const res = await farm.runMethod("bar", 1, [], "");
console.log("Result :", res);
} catch (err) {
console.log("Oh no :'(", err);
}
farm.broadcast(...args)
Run the default exported method in every worker with arguments. Async. Returns an array with the first element being the succeeded calls and the second element the failures.
const [successes, failures] = await farm.broadcast(1, [], "");
console.log("Successes :", successes);
console.log("Failures :", failures);
farm.broadcastMethod(method, ...args)
Run a specific method in every worker. Async. Returns an array with the first element being the succeeded calls and the second element the failures.
// bar is a function exported with : module.exports.bar = function ...
const [successes, failures] = await farm.broadcastMethod("foo", 1, [], "");
console.log("Successes :", successes);
console.log("Failures :", failures);
farm.kill()
Kill the farm.
farm.kill();
Serializers
According to the data you send and receive to your workers you might need to use a different serializer. Data needs to be serialized in order to be sent to the worker process.
Two serializers are available :
- JSON (default, Node.js internal serializer), supported types :
- boolean
- number (without -0, NaN, and ±Infinity)
- string
- Array
- Object
- CBOR, supported types :
- boolean
- number (including -0, NaN, and ±Infinity)
- string
- Array, Set (encoded as Array)
- Object (including null), Map
- undefined
- Buffer
- Date,
- RegExp
- url.URL (Legacy URL API)
- bignumber
For performances you might want to check the benchmark.
You can also build your own serializer. You need to extends the Serializer class :
const {Serializer} = require("workestrator");
class MySerializer extends Serializer {
encode(data) {
// do some encoding
return encodedData;
}
decode(data) {
// do some decoding
return decodedData;
}
}
module.exports = MySerializer;
Then set the farm options accordingly :
const farm = workestrator.create({
module: "/absolute/path/to/the/worker/module.js",
serializerPath: "/absolute/path/to/MySerializer", // you can use require.resolve to get the absolute path
});
JSON serializer path is in workestrator.serializers.JSON
.
CBOR serializer path is in workestrator.serializers.CBOR
.
Events
You can listen to events from a worker of from the farm directly. They both extends Node.js EventEmitter. All those events are already being used by Workestrator. You do not have to implement anything, it works out of the box. I wanted to expose events to have the possibility to add logging but I'm sure they are other use-cases :)
Farm
farm.on(event, callback)
online
When a new worker is created within the farm.
farm.on("online", (worker) => {
// ...
});
message
When a worker is sending a message to the farm.
data
looks like this :
{
callId: number;
res?: any;
err?: Error;
workerId: number;
}
farm.on("message", (worker, data) => {
// ...
});
disconnect
When a worker disconnect. See Node.js documentation
farm.on("disconnect", (worker) => {
// ...
});
error
The error
event is emitted whenever:
- The process could not be spawned, or
- The process could not be killed, or
- Sending a message to the child process failed.
farm.on("error", (worker, err) => {
// ...
});
close
When a stdio streams of a worker have been closed. See Node.js documentation
farm.on("close", (worker, code, signal) => {
// ...
});
exit
farm.on("exit", (worker, code, signal) => {
// ...
});
ttl
When ttl
is reached.
farm.on("ttl", (worker) => {
// ...
});
idle
When maxIdleTime
is reached.
farm.on("idle", (worker) => {
// ...
});
Logging
You can enable logging by using an environment variable : DEBUG=workestrator:*
Why I created this ?
I was looking for a project to use TypeScript, so the idea was to reproduce node-worker-farm
and add some new features. It actually went pretty smoothly and I'm happy with the result ;)
License
Workestrator is Copyright (c) 2018 Hugo Da Roit (@Yaty) and licensed under the MIT license. All rights not explicitly granted in the MIT license are reserved. See the included LICENSE file for more details.