stream-join
v1.0.1
Published
stream-join is the micro-module to join values of multiple streams together.
Downloads
15
Readme
stream-join
stream-join
is a function, which takes an array of object mode Readable streams and returns a combined object mode Readable
stream, which pack together corresponding values from input streams, while properly handling backpressure.
Originally stream-join
was used with stream-json to create and eventually join side-channels but can be used stand-alone.
stream-join
is a lightweight, no-dependencies micro-package. It is distributed under New BSD license.
Intro
By default stream-join
creates a stream of arrays of values. The first array contains the first values of all streams and the N
th array value comes from the N
th stream. Their respective order doesn't matter. The second array will contain the second values of all streams. And so on. If the corresponding stream has ended, null
is going to be used as its value (object mode streams cannot use null
values because it indicates the end-of-stream). The resulting stream will end when all streams have ended.
const join = require('stream-join');
const {PassThrough} = require('stream-join/tests/helpers');
const s1 = new PassThrough(), s2 = new PassThrough(),
result = join([s1, s2]);
result.on('data', data => console.log(data));
// all streams are written asynchronously
s2.write('a');
s1.write(1);
s1.write(2);
s2.write('b');
s1.write(3);
s2.end();
s1.write(4);
s1.end();
// prints:
// [1, 'a']
// [2, 'b']
// [3, null] // s2 has ended
// [4, null]
The output can be controlled by a custom joining function. Given the setup above:
const s1 = new PassThrough(), s2 = new PassThrough(),
result = join([s1, s2], {
joinItems(output, items) {
// a variable number of values is pushed out
items.forEach(item => {
// we should push only non-null values
if (item !== null) output.push(item);
});
}
});
result.on('data', data => console.log(data));
// all streams are written asynchronously
s2.write('a');
s1.write(1);
s1.write(2);
s2.write('b');
s1.write(3);
s2.end();
s1.write(4);
s1.end();
// now we normalized the order of values
// prints: 1, 'a', 2, 'b', 3, 4
Installation
npm i --save stream-join
# or: yarn add stream-join
Documentation
The module returns a function, whose prototype is:
const join = require('stream-join');
const result = join(streams[, options]);
Where:
streams
is an array of object mode Readable streams.options
is an optional object detailed in the Node's documentation used to createresult
.- The following properties are always overridden:
objectMode
is alwaystrue
.read()
is replaced with an internal implementation.
- The following custom properties are recognized:
skipEvents
is an optional flag. If it is falsy (the default),'error'
events from all streams are forwarded toresult
. If it is truthy, no event forwarding is made. A user can always do so manually.joinItems(output, items)
is an optional function. It can be used to combine individual values together. It may push to the output 0 or more values. It returns no value and takes two arguments:output
is aresult
object described below. It can be used to push values with a methodpush()
.- Warning: never push out
null
values because they indicate that a stream has been finished and should be closed.
- Warning: never push out
items
is an array of values. It has the same length asstreams
and contains values from corresponding streams. If a corresponding stream has ended,null
is going to be used as a value.
- The following properties are always overridden:
result
is an object mode Readable stream, which produces combined values.
See the Introduction above for examples of how to use stream-join
.
Release History
- 1.0.1 technical release, no need to upgrade.
- 1.0.0 the initial release.