@giancosta86/worker-transform
v1.0.3
Published
Multithread mapping stream
Downloads
6
Maintainers
Readme
worker-transform
Multithread mapping stream
worker-transform provides a WorkerTransform
object-oriented stream that transparently runs a given operation on multiple items, concurrently, by internally managing a pool of worker threads.
Like any other standard Transform
, this stream can be plugged into a pipeline, as well as manually controlled; for maximum performance, the item order is not guaranteed.
Installation
npm install @giancosta86/worker-transform
or
yarn add @giancosta86/worker-transform
The public API entirely resides in the root package index, so you shouldn't reference specific modules.
Usage
First of all, you need an operation module: a module exporting just the operation - a function that will be applied to every item flowing through the stream.
The operation must be:
with an arbitrary name
accepting one parameter - of type
ChunkInput<TInput>
- where you need to replaceTInput
with the actual type of the items entering the streamChunkInput<TInput>
actually contains just two fields, provided by the underlying stream infrastructure:value
- the item entering the streamencoding
- the associated character encoding. Actually meaningful only when the item is a string
returning one of these two types:
ChunkOutput<TOutput>
- if the function is synchronous (without interruptions). You need to replaceTOutput
with the type of the items produced by the stream.The
ChunkOutput<TOutput>
type contains thevalue
to be produced by the function as well as the relatedencoding
- but the latter is optional, because it is meaningful only if the output items are stringsThe result of the function must be a
ChunkOutput<T>
- but suchT
can be a nullable type, such asnumber | null
! When thevalue
field of the returnedChunkOutput
isnull
, the stream will just skip it!Promise<ChunkOutput<TOutput>>
- if the function is asynchronous - i.e., if it could performawait
on external conditions or, more generally, if it is designed to return aPromise
. Again, you need to replaceTOutput
with the type of the items produced by the stream - which, again, can be nullable
throwing errors when needed: both errors and rejected promises simply make the stream ignore the related input element
An operation module with a synchronous operation could be:
import { ChunkInput, ChunkOutput } from "@giancosta86/worker-transform"; function add200({ value }: ChunkInput<number>): ChunkOutput<number> { return { value: value + 200 }; } export = add200;
On the other hand, an operation module with an asynchronous operation could be:
import delay from "delay"; import { ChunkInput, ChunkOutput } from "@giancosta86/worker-transform"; async function add500({ value }: ChunkInput<number>): Promise<ChunkOutput<number>> { await delay(5); await delay(2); await delay(6); return Promise.resolve({ value: value + 500 }); } export = add500;
Create an instance of
WorkerTransform
- passing at least the path to the operation module as expected byresolve()
, and maybe additional options (see the description below)For example:
import { join } from "node:path"; import { WorkerTransform } from "@giancosta86/worker-transform"; const modulePath = join(__dirname, "add200"); const transform = new WorkerTransform(modulePath);
Use it like any other standard stream - for example, in a pipeline:
await pipeline(Readable.from([90, 95, 98]), transform, someWritableStream);
Additional constructor options
The following values can be passed to the constructor as fields of an optional object, right after the operation module path:
agentCount: the number of worker threads in the pool. Default: the number of processors
logger: a
Logger
interface, as exported by unified-logging. Default: no loggerhighWaterMark: if present, passed to the base constructor
signal: if present, passed to the base constructor
Further reference
For additional examples, please consult the test suites in the source code repository.