@jesuarva/js-pipeline
v1.0.2
Published
A Pipeline implementation in JavaScript with an API that mimics the Array iteration methods.
Downloads
2
Maintainers
Readme
A Pipeline implementation in JavaScript with an API that mimics the Array iteration methods.
It is an approach to solve performance issues processing big blocks of data in a non-blocking way.
The Pipeline
is implemented with JS generators
There are currently two proposals to introduce a new pipeline operator
in JavaScript: tc39/proposal-pipeline-operator
Install
npm install @jesuarva/js-pipeline
How to initialize a Pipeline
const Pipeline = require('path/to/Pipeline');
const iterable = new Array(10000); // Any `iterable` object
for (let i = iterable.length - 1; i >= 0; i--) {
iterable[i] = i;
}
const pipeline = Pipeline(iterable);
Usage example
After running example1.js
:
node src/examples/example1.js
// src/examples/example1.js
const Pipeline = require('../Pipeline');
const arr = new Array(100);
for (let i = arr.length; i >= 0; i--) {
arr[i] = i;
}
Pipeline(arr)
.filter(item => item % 2 === 0 && item % 10 === 0)
.map(item => `Pipeline-1 ${item}`)
.effect(value => console.log(value))
.runAsync(response => console.log('Pipeline-1 last callbackFn', response));
Pipeline(arr)
.filter(item => item % 3 === 0 && item % 30 === 0)
.map(item => `Pipeline-2 ${item}`)
.effect(value => console.log(value))
.runAsync(response => console.log('Pipeline-2 last callbackFn', response));
Pipeline(arr)
.filter(item => item % 3 === 0 && item % 30 === 0)
.map((item, index) => `Pipeline-3 ${item}. index-${index}`)
.effect(value => console.log(value))
.runAsync(response => console.log('Pipeline-3 last callbackFn', response));
the output in the console looks like:
✘-127 ~/Sites/js-pipeline [master ↑·1|✚ 3…2]
13:43 $ node src/examples/example1.js ;2A
Pipeline-1 0
Pipeline-2 0
Pipeline-3 0. index-0
Pipeline-1 10
Pipeline-2 30
Pipeline-1 20
Pipeline-3 30. index-1
Pipeline-2 60
Pipeline-1 30
Pipeline-3 60. index-2
Pipeline-2 90
Pipeline-1 40
Pipeline-2 last callbackFn { done: true,
value:
[ 'Pipeline-2 0',
'Pipeline-2 30',
'Pipeline-2 60',
'Pipeline-2 90' ] }
Pipeline-3 90. index-3
Pipeline-1 50
Pipeline-1 60
Pipeline-3 last callbackFn { done: true,
value:
[ 'Pipeline-3 0. index-0',
'Pipeline-3 30. index-1',
'Pipeline-3 60. index-2',
'Pipeline-3 90. index-3' ] }
Pipeline-1 70
Pipeline-1 80
Pipeline-1 90
Pipeline-1 100
Pipeline-1 last callbackFn { done: true,
value:
[ 'Pipeline-1 0',
'Pipeline-1 10',
'Pipeline-1 20',
'Pipeline-1 30',
'Pipeline-1 40',
'Pipeline-1 50',
'Pipeline-1 60',
'Pipeline-1 70',
'Pipeline-1 80',
'Pipeline-1 90',
'Pipeline-1 100' ] }
The output shows how the pipeline
runs asynchronously, processing a unit of data one by one, one after the other, no blocking the main thread.
API Reference
The public API emulates the Array's iteration methods (map, filter).
pipeline.map(mapFn)
Emulates Array.map
.
mapFn
: Function that is invoke with each element of the iterable. Each time callback executes, the returned value is passed thought the pipeline
.
interface pipelineAPI {
map(mapFn: <T, S>(currentData: T, index: number) => S | T, index: number): void;
}
pipeline.filter(filterFn)
Emulates Array.filter
.
filterFn
is a predicate, to test each element of the iterable. Return true to keep the element, false otherwise.
interface pipelineAPI {
filter(filterFn: <T>(currentData: T, index: number) => boolean): void;
}
pipeline.effect(effectFn)
Meant to perform a side effect with the current chunk of data outside the pipeline data-flow.
interface pipelineAPI {
effect(effectFn: <T>(currentValue: T, index: number) => any): void;
}
pipeline.getPipeLine()
Returns underlying generator instance.
Could be useful if looking to run the generators-pipeline manually.
interface iteratorResult {
value: any;
done: boolean;
}
interface generatorObject {
next(value?: any): iteratorResult;
throw(value?: any): iteratorResult;
return(value?: any): iteratorResult;
}
interface pipelineAPI {
getIterator(): generatorObject;
}
pipeline.stopPipeLine()
Stops the pipeline
.
interface pipelineAPI {
stopPipeLine(): void;
}
pipeline.runAsync(callback)
Run asynchronously the pipeline
.
callback
is called when the pipeline
has finished processing all data chunks. Returns an Array of data processed.
type pipeLineResponse = {
done: true;
value: any[]; // Array of processed data.
};
interface pipelineAPI {
runAsync(callback: (response: pipeLineResponse) => any): void;
}
pipeline.runAsSaga(options)
A version of runAsync
adapted to run within a redux-saga
task.
interface sagaFnOptions {
callbackEffect: <T>(response: pipeLineResponse, ...others: T[] | []) => any;
callbackArguments: any[];
}
interface pipelineAPI {
runAsSaga(options: sagaFnOptions): void;
}
Types & Interfaces
interface iter {
length: number;
}
interface generatorObject {
next(value?: any): iteratorResult;
throw(value?: any): iteratorResult;
return(value?: any): iteratorResult;
}
interface iteratorResult {
value: any;
done: boolean;
}
type pipeLineResponse = {
done: true;
value: any[];
};
interface sagaFnOptions {
callbackEffect: <T>(response: pipeLineResponse[], ...others: T[] | []) => any;
callbackArguments: any[];
}
interface pipelineAPI {
runAsync(callback: (response: pipeLineResponse) => any): void;
runAsSaga(options: sagaFnOptions): void;
map(mapFunction: <T, S>(currentValue: T, index: number) => S | T, index: number): void;
filter(filterFunction: <T>(currentValue: T, index: number) => boolean): void;
effect(effectFn: <T>(currentValue: T, index: number) => any): void;
getPipeLine(): generatorObject;
stopPipeLine(): void;
}