stream-join
v1.0.1
Published
stream-join is the micro-module to join values of multiple streams together.
Readme
stream-join 
stream-join is a function, which takes an array of object mode Readable streams and returns a combined object mode Readable stream, which pack together corresponding values from input streams, while properly handling backpressure.
Originally stream-join was used with stream-json to create and eventually join side-channels but can be used stand-alone.
stream-join is a lightweight, no-dependencies micro-package. It is distributed under New BSD license.
Intro
By default stream-join creates a stream of arrays of values. The first array contains the first values of all streams and the Nth array value comes from the Nth stream. Their respective order doesn't matter. The second array will contain the second values of all streams. And so on. If the corresponding stream has ended, null is going to be used as its value (object mode streams cannot use null values because it indicates the end-of-stream). The resulting stream will end when all streams have ended.
const join = require('stream-join');
const {PassThrough} = require('stream-join/tests/helpers');
const s1 = new PassThrough(), s2 = new PassThrough(),
result = join([s1, s2]);
result.on('data', data => console.log(data));
// all streams are written asynchronously
s2.write('a');
s1.write(1);
s1.write(2);
s2.write('b');
s1.write(3);
s2.end();
s1.write(4);
s1.end();
// prints:
// [1, 'a']
// [2, 'b']
// [3, null] // s2 has ended
// [4, null]The output can be controlled by a custom joining function. Given the setup above:
const s1 = new PassThrough(), s2 = new PassThrough(),
result = join([s1, s2], {
joinItems(output, items) {
// a variable number of values is pushed out
items.forEach(item => {
// we should push only non-null values
if (item !== null) output.push(item);
});
}
});
result.on('data', data => console.log(data));
// all streams are written asynchronously
s2.write('a');
s1.write(1);
s1.write(2);
s2.write('b');
s1.write(3);
s2.end();
s1.write(4);
s1.end();
// now we normalized the order of values
// prints: 1, 'a', 2, 'b', 3, 4Installation
npm i --save stream-join
# or: yarn add stream-joinDocumentation
The module returns a function, whose prototype is:
const join = require('stream-join');
const result = join(streams[, options]);Where:
streamsis an array of object mode Readable streams.optionsis an optional object detailed in the Node's documentation used to createresult.- The following properties are always overridden:
objectModeis alwaystrue.read()is replaced with an internal implementation.
- The following custom properties are recognized:
skipEventsis an optional flag. If it is falsy (the default),'error'events from all streams are forwarded toresult. If it is truthy, no event forwarding is made. A user can always do so manually.joinItems(output, items)is an optional function. It can be used to combine individual values together. It may push to the output 0 or more values. It returns no value and takes two arguments:outputis aresultobject described below. It can be used to push values with a methodpush().- Warning: never push out
nullvalues because they indicate that a stream has been finished and should be closed.
- Warning: never push out
itemsis an array of values. It has the same length asstreamsand contains values from corresponding streams. If a corresponding stream has ended,nullis going to be used as a value.
- The following properties are always overridden:
resultis an object mode Readable stream, which produces combined values.
See the Introduction above for examples of how to use stream-join.
Release History
- 1.0.1 technical release, no need to upgrade.
- 1.0.0 the initial release.
