streamworks
v0.1.4
Published
Run a nested collection of streams in pipe or merge configurations
Downloads
8
Maintainers
Readme
streamworks
Combine merge and pipe streams into stream architectures that can bend packets to your will.
merge stream
A merge stream will duplicate the input across all members and merge the output from all members (based on combine-stream.
pipe stream
A pipe stream will pass the output from each member as the input to the next (based on stream-combiner.
combo streams
Both types accepts streams in their list - you can combine merge and pipes with each other - it's stream inception!
installation
$ npm install streamworks
usage
There are 2 main methods:
- merge
- pipe
Create each type of stream by passing an array of either:
- functions - these are turned into streams using through2
- streams - these are used as is
example
var from = require('from');
var streamworks = require('streamworks');
// a merge is a stream that splits the input across each function and merges the output back into one stream
var mergestream = streamworks.merge([
// order lots
function(chunk, enc, callback){
this.push(chunk + ':10')
callback()
},
// order some
function(chunk, enc, callback){
this.push(chunk + ':2')
callback()
}
])
// a pipe is a stream that passes values through each function
var pipestream = streamworks.pipe([
// filter anything that does not start with p
function(chunk, enc, callback){
if(chunk.toString().indexOf('p')!=0){
this.push(chunk);
}
callback();
},
// uppercase to input
function(chunk, enc, callback){
this.push(chunk.toString().toUpperCase())
}
])
// run some data through the merge stream (which will duplicate it) and then through the pipe stream (which will filter it)
from(['apple', 'pie', 'custard'])
.pipe(mergestream)
.pipe(pipestream)
.pipe(process.stdout)
nested streams
Because streamworks streams are, umm, streams - you can create complex nested stream-structures:
var bigAssStream = streamworks.pipe([
function(chunk, enc, callback){
if(chunk!='world'){
this.push(chunk);
}
callback();
},
streamworks.merge([
function(chunk, enc, callback){
this.push('A1:' + chunk);
callback();
},
function(chunk, enc, callback){
this.push('A2:' + chunk);
callback();
},
function(chunk, enc, callback){
this.push('A3:' + chunk);
callback();
}
]),
streamworks.pipe([
function(chunk, enc, callback){
if(chunk.toString().indexOf('A2')!=0){
this.push(chunk);
}
callback();
},
streamworks.merge([
function(chunk, enc, callback){
this.push('sub1:' + chunk);
callback();
},
function(chunk, enc, callback){
this.push('sub2:' + chunk);
callback();
}
]),
function(chunk, enc, callback){
if(chunk.toString().indexOf('sub2:A1:')!=0){
this.push(chunk);
}
callback()
},
])
])
var arr = [];
from(['hello','world','apple']).pipe(bigAssStream)
.on('data', function(chunk){
arr.push(chunk.toString())
}).on('end', function(){
console.dir(arr);
})
/*
sub1:A1:hello
sub1:A3:hello
sub2:A3:hello
sub1:A1:apple
sub1:A3:apple
sub2:A3:apple
*/
object streams
If you pass true or:
{
objectMode:true
}
as the first argument to pipe or merge - the stream will be in object mode.
this means that the 'chunks' will be what you sent and not buffers/strings.
You can also use:
- pipeObjects
- mergeObjects
Which are shortcuts for:
- pipe(true, [])
- merge(true, [])
api
streamworks.pipe([objectMode], fns)
create a new readable/writable stream that will pipe each value through the array of streams/functions
streamworks.pipeObjects(fns)
shorthand for pipe(true, [])
streamworks.merge([objectMode], fns)
create a new readable/writable stream that will duplicate each value into each of the streams/functions and merge the results back into the output
streamworks.mergeObjects(fns)
shorthand for merge(true, [])
license
MIT