easy-pipeline
v0.0.16
Published
Tracable pipelines for node.
Downloads
10
Readme
easy-pipeline
Framework for building pipelines in node.
Supported node versions
This library is written using ES 2015 features. Therefore it can only be used in node 6.4.0+.
If you need to run it in an older version of node, code should be transpiled first.
Install
npm install easy-pipeline
Features
- Create pipeline stages as functions.
- Safer execution environment which prevents the output of function being modified.
- Extensible logging framework to capture the input and output of each function.
Stages
Stages are the fundamental building blocks of a pipeline. It's defined by a function that receives a single argument - context and returns an arbitary object representing the output.
// Simple stage that returns an object with a property called foo.
const stageFoo = context = ( { foo: 'a' });
// Another stage that return an object with a property called bar.
const stageBar = context = ( { bar: 'b' });
Once we have the stages, we can call createPipeline
function to bind them
together to create a pipeline.
const createPipeline = require('easy-pipeline');
const pipeline = createPipeline(stageFoo, stageBar);
Result of createPipeline
is a function that can be invoked with an input
to the pipeline. It returns a Task
which can be used to start the
execution of our pipeline.
To start the pipeline we have to invoke the fork
method.
pipeline().fork(err => console.error(err), r => console.log(r));
Result of the pipline would be the combination of return values from stageFoo and stageBar.
{ foo: 'a', bar: 'b' }
Accessing the results of a stage from another
Our pipeline would be little useful if we could not access the result of
one stage from another. Context
argument passed into each stage provides
access various useful services including the results of previous stages.
Let's modified our barStage
to return a value based on the previous stage.
const barStage = context = ({ bar: `${context.props.foo}-b`});
This time around, the result of our pipeline would be:
{ foo: 'a', bar: 'a-b' }
Safety of props
context.props
received by a stage is an immutable object. If a stage
accidently attempts to modify the output from another, it will result in an error.
const barStage = context => {
context.props.foo = 'c'; // Receives a type error.
};
Stage name
Each stage has a name. This is used within various logging operations (discussed below). By default the stage name is same as the function name. We can modify this by attaching a config property to our stage function.
const barStage = context => ( { bar: 'b' });
barStage.config = { name: 'my-bar-stage' };
This feature can be useful if we want to have more elaborate names for our stages or if we are running on node 6.4.0 where function name property is not available for certain types of expressions (http://node.green/#ES2015-built-in-extensions-function--name--property).
Asynchronous stages
We can choose between two flavours of asynchrnous stages.
First one is with a stage function can accept an additional argument for a callback function that can be called at the end of an asynchronous function to signify the success or failure.
const fs = require('fs');
const readFileStage = (context, cb) => {
fs.readFile('/etc/passwd', (err, data) => {
if (err) {
cb(err);
} else {
cb(null, { file: data });
}
});
};
const pipeline = createPipeline(readFileStage);
pipeline().fork(console.error, console.log);
// returns: { file: 'content of /etc/passwd' }
Other option is a stage function that returns a Promise
.
const asyncStage = context => {
return new Promise((resolve, reject) => {
// ...
});
};
const pipeline = createPipeline(asyncStage);
pipeline().fork(console.error, console.log);
Pipeline API
fork(onError, onSuccess)
- @param {Function} onError - Function to be invoked when there was an
error during pipeline execution. It receives the
Error
as an argument. - @param {Function} onSuccess - Function to be invoked when pipeline execution is finished.
Starts the execution of the pipeline.
as
Specify a meaningful name to a pipeline. It's used by the loggers when logging pipeline activities.
const p = createPipeline(stage1, stage2).as('my-awesome-pipeline');
p();
complex pipelines
Lets say you want to have multiple stages that must be run together, logically you can group these into a shared pipeline to be consumed within another pipeline
const s1 = context = ( { foo: 'f' });
const s2 = context = ( { bar: 'b' });
const p1 = createPipeline(s1);
const p2 = createPipeline(s2);
const lotsOfWork = createPipeline(
p1,
p2
);
You may then also have a shared stages that you want to use, this can be run along side an predefined shared pipeline
const s1 = context = ( { foo: 'f' });
const s2 = context = ( { bar: 'b' });
const p1 = createPipeline(s1);
const lotsOfWork = createPipeline(
p1,
s2
);
By using this you can compose complex pipelines whilst being able to reuse and even reduce the amount of code you write.
Logging
There is a defined logger which has been integrated into the context, the default levels are :
- debug
- info
- warn
- error
To use the logger you just need to invoke the level on the context
context.log.debug(<data object>);