bs-better-stream
v2.0.0
Published
`npm i -save bs-better-stream`
Downloads
52
Readme
bs-better-stream
Setup
npm i -save bs-better-stream
const Stream = require('bs-better-stream');
let myStream = new Stream();
myStream.write(10, 20, 30);
myStream.each(console.log);
myStream.write(11, 21, 31);
Overview
chaining
myStream
.write(10, 20, 30)
.each(print)
.map(double)
.filter(greaterThan30)
.write(-1, -2, -3)
.writePromise(requestAsyncNumber());
accumulation
you can use streams before and after you have began writing to them. See .clean().
let myStream = new Stream();
myStream.write(11, 21, 31);
myStream.each(console.log);
myStream.write(12, 22, 32);
// will print 11, 21, 31, 12, 22, 32
write (...values)
myStream.write(10, 20, 30);
write array (...[array of values])
myStream.write(...[10, 20, 30]);
writePromise (...promises)
let promise1 = Promise.resolve('i hate `.then`s');
let promise2 = Promise.reject('rejections r wrapped');
myStream.writePromise(promise1, promise2);
// myStream.outValues equals ['i hate `.then`s', {rejected: 'rejections r wrapped', isRejected: true}]
writePromiseSkipOnReject (...promises)
let promise1 = Promise.resolve('i hate `.then`s');
let promise2 = Promise.reject('rejections r ignored');
myStream.writePromiseSkipOnReject(promise1, promise2);
// myStream.outValues equals ['i hate `.then`s']
each (handler)
myStream.each( (value, index) => doOperation(value, index) );
map (handler)
myStream.map( (value, index) => value * index );
filter (predicateHandler)
myStream.filter( (value, index) => value > index );
filterCount (integer)
let outStream = myStream.filterCount(3);
myStream.write('first', 'second', 'third', 'fourth', 'fifth');
// outStream.outValues equals [first, second, third]
filterIndex ([array of indices])
let outStream = myStream.filterIndex([0, 2, 3]);
myStream.write('first', 'second', 'third', 'fourth', 'fifth');
// outStream.outValues equals [first, third, fourth]
filterEach (predicateHandler, truePredicateHandler, falsePredicateHandler)
let outStream = myStream.filterEach(value => value > 100, handleLargeNumbers, handleSmallNumbers);
filterMap (predicateHandler, truePredicateHandler, falsePredicateHandler)
let outStream = myStream.filterMap(value => value > 100, a => a + 100, a => -a);
myStream.write(200, 0, 1, 201, 2, 202);
// outStream.outValues equals [300, -0, -1, 301, -2, 302]
branchMap (...predicate and map handlers)
let myStream = new Stream();
let outStream = myStream.branchMap(
a => a[0] === 'a', a => 'Apple ' + a, // if item starts with 'a', prepend 'Apple'
a => a[0] === 'b', a => 'Banana ' + a); // if item starts with 'b', prepend 'Banana'
myStream.write('at', 'bat', 'action', 'cat', 'aaa');
// outStream.outValues equals ['Apple at', 'Banana bat', 'Apple action', 'cat', 'Apple aaa']
let myStream = new Stream();
let outStream = myStream.branchMap(
a => a[0] === 'a', a => 'Apple ' + a, // if item starts with 'a', prepend 'Apple'
a => a[0] === 'b', a => 'Banana ' + a, // if item starts with 'b', prepend 'Banana'
a => 'Other ' + a); // else, prepend, 'Other'
myStream.write('at', 'bat', 'action', 'cat', 'aaa');
// outStream.outValues equals ['Apple at', 'Banana bat', 'Apple action', 'Other cat', 'Apple aaa']
switchMap (switchHandler, ...case and map handlers)
let myStream = new Stream();
let outStream = myStream.switchMap(
a => a.type,
'animal', a => `i have a pet ${a.value}`,
'number', a => `u have ${a.value} pencils`,
'color', a => `his favorite color is ${a.value}`,
a => `other: ${a.value}`);
myStream.write(
{type: 'animal', value: 'elephant'},
{type: 'animal', value: 'flamingo'},
{type: 'number', value: 51},
{type: 'number', value: 1235},
{type: 'color', value: 'blue'},
{type: 'color', value: 'pink'},
{type: 'star', value: 'sun'});
/*
[ 'i have a pet elephant',
'i have a pet flamingo',
'u have 51 pencils',
'u have 1235 pencils',
'his favorite color is blue',
'his favorite color is pink',
'other: sun' ]
*/
let myStream = new Stream();
let outStream = myStream.switchMap(
a => a.type,
'animal', a => `i have a pet ${a.value}`,
'number', a => `u have ${a.value} pencils`,
'color', a => `his favorite color is ${a.value}`,
a => `other: ${a.value}`);
myStream.write(
{type: 'animal', value: 'elephant'},
{type: 'animal', value: 'flamingo'},
{type: 'number', value: 51},
{type: 'number', value: 1235},
{type: 'color', value: 'blue'},
{type: 'color', value: 'pink'},
{type: 'star', value: 'sun'});
/*
[ 'i have a pet elephant',
'i have a pet flamingo',
'u have 51 pencils',
'u have 1235 pencils',
'his favorite color is blue',
'his favorite color is pink',
{ type: 'star', value: 'sun' } ]
*/
unique ()
let outStream = myStream.unique();
myStream.write(0, 1, 1, 0, 2, 3, 2, 3);
// outStream.outValues equals [0, 1, 2, 3]
uniqueOn (keyName)
let outStream = myStream.uniqueOn('name');
myStream.write(
{age: 4231, name: 'Odysseus'},
{age: 4250, name: 'Odysseus'},
{age: 4234, name: 'Helen'});
// outStream.outValues equals [{age: 4231, name: 'Odysseus'},
// {age: 4234, name: 'Helen'}]
uniqueX (handler)
let outStream = myStream.uniqueX(obj => obj.a + obj.b);
myStream.write(
{a: 1, b: 5},
{a: 2, b: 4},
{a: 3, b: 3});
// outStream.outValues equals [{a: 1, b: 5}]
pluck (keyName)
let outStream = myStream.pluck('key');
myStream.write({key: 'value'});
// outStream.outValues equals ['value']
wrap (keyName)
let outStream = myStream.wrap('key');
myStream.write('value');
// outStream.outValues equals [{key: 'value'}]
pick (...keyNames)
let outStream = myStream.pick('name', 'age');
myStream.write({
name: 'myName',
age: 'myAge',
gender: 'myGender',
weight: 'myWeight'
});
// outStream.outValues equals [{name: 'myName', age: 'myAge'}]
omit (...keyNames)
let outStream = myStream.omit('gender', 'weight');
myStream.write({
name: 'myName',
age: 'myAge',
gender: 'myGender',
weight: 'myWeight'
});
// outStream.outValues equals [{name: 'myName', age: 'myAge'}]
set (keyName, handler)
let outStream = myStream.set('sum', (object, index) => object.number + object.otherNumber + index );
myStream.write({number: 5, otherNumber: 10});
// outStream.outValues equals [ { number: 5, otherNumber: 10, sum: 15 } ]
repeat (handler)
let outStream = myStream.repeat( (value, index) => value + index );
myStream.write(2, 3, 2);
// outStream.outValues equals [2, 2, 3, 3, 3, 3, 2, 2, 2, 2]
repeatCount (integer)
let outStream = myStream.repeatCount(2);
myStream.write(2, 3, 2);
// outStream.outValues equals [2, 2, 3, 3, 2, 2]
flatten ()
let outStream = myStream.flatten();
myStream.write([2], [3], [2, 4]);
// outStream.outValues equals [2, 3, 2, 4]
flattenOn (listKeyName, newKeyName)
myStream.write({key1: 'value1', numbers: [1, 2]});
myStream.write({key1: 'value1b', numbers: [4, 5]});
let outStream = myStream.flattenOn('numbers', 'number');
// outStream.outValues equals [{key1: 'value1', number: 1}, {key1: 'value1', number: 2}, {key1: 'value1b', number: 4}, {key1: 'value1b', number: 5}]
Why is flattenOn useful?
imagine we have a set of animals grouped by species
let animalSpecies = new Stream();
animalSpecies.write({species: 'cat', class: 'mammalia', names: ['kitty', 'cupcake']});
animalSpecies.write({species: 'dog', class: 'mammalia', names: ['barf', 'brownNose']});
without flattenOn
, we would need to do something like the following in order to obtain a flat list of animals
animalSpecies
.flatMap(animalSpecies =>
animalSpecies.names.map(name => {
let animal = Object.assign({}, animalSpecies);
delete animal.names;
animal.name = name;
return animal;
}));
but with flattenOn
, we can simply do the following
animalSpecies
.flattenOn('names', 'name');
join (...streams)
let outStream = myStream.join(stream1, stream2);
myStream.write(1, 2);
stream1.write(3, 4);
stream2.write(5, 6);
// outStream.outValues equals [1, 2, 3, 4, 5, 6]
joinCollapse ()
myStream.write(stream1, stream2, stream3);
outStream = myStream.joinCollapse();
stream1.write(1.0, 1.1, 1.2);
stream2.write(2.0, 2.1, 2.2);
stream3.write(3.0, 3.1, 3.2);
// outStream.outValues equals [1.0, 1.1, 1.2, 2.0, 2.1, 2.2, 3.0, 3.1, 3.2]
product (rightStream, leftStreamIdKey, rightStreamIdKey, leftStreamSetKey)
let productStream = myStream.product(otherStream, 'myId', 'otherId', 'other');
myStream.write({myId: 1, myValue: 100});
myStream.write({myId: 2, myValue: 200});
myStream.write({myId: 2, myValue: 201});
otherStream.write({otherId: 2, otherValue: 20});
otherStream.write({otherId: 2, otherValue: 21});
otherStream.write({otherId: 3, otherValue: 30});
// productStream.outValues equals [{myId: 2, myValue: 200, other: {otherId: 2, otherValue: 20}},
// {myId: 2, myValue: 201, other: {otherId: 2, otherValue: 20}},
// {myId: 2, myValue: 200, other: {otherId: 2, otherValue: 21}},
// {myId: 2, myValue: 201, other: {otherId: 2, otherValue: 21}}]
productX (rightStream, matchHandler, handler)
let productStream = myStream.productX(otherStream, (left, right) => left.myId === right.otherId, (left, right) => {
left.paired = true;
right.paired = true;
return {sum: left.myValue + right.otherValue};
});
myStream.write({myId: 1, myValue: 100});
myStream.write({myId: 2, myValue: 200});
myStream.write({myId: 2, myValue: 201});
otherStream.write({otherId: 2, otherValue: 20});
otherStream.write({otherId: 2, otherValue: 21});
otherStream.write({otherId: 3, otherValue: 30});
// myStream.outValues equals [{myId: 1, myValue: 100},
// {myId: 2, myValue: 200, paired: true},
// {myId: 2, myValue: 201, paired: true}]
// otherStream.outValues equals [{otherId: 2, otherValue: 20, paired: true},
// {otherId: 2, otherValue: 21, paired: true},
// {otherId: 3, otherValue: 30}]
// productStream.outValues equals [{sum: 220},
// {sum: 221},
// {sum: 221},
// {sum: 222}]
Note that while product
modifies a copy of left stream's values, leaving left stream unmodified; productX
passes in the original values of left stream, allowing left stream to be modified by the handler as seen in the example above.
to (stream)
myStream.to(outStream);
myStream.write(1, 2);
outStream.write(3, 4);
// outStream.outValues equals [1, 2, 3, 4]
wait (skipOnReject)
myStream.write(Promise.resolve('stream'));
myStream.write(Promise.resolve('async'));
myStream.write(Promise.resolve('data'));
let outStream = myStream.wait();
myStream.write(Promise.resolve('without needing'));
myStream.write(Promise.resolve('async/await'));
myStream.write(Promise.resolve('or .then'));
myStream.write(Promise.reject('rejected'));
// outStream.outValues equals ['stream', 'async', 'data', 'without needing', 'async/await', 'or .then', {rejected: 'rejected', isRejected: true}]
waitOn (key, skipOnReject)
myStream.write({key1: 'value1', key2: Promise.resolve('value2')});
myStream.write({key1: 'value2', key2: Promise.reject('rejectValue2')});
let outStream = myStream.waitOn('key2');
// outStream.outValues equals [{key1: 'value1', key2: 'value2'},
// {key1: 'value2', key2: {rejected: 'rejectValue2', isRejected: true}}]
Why is waitOn useful?
imagine we have a set of users
let users = new Stream();
users.write({userId: '1', height: 3, color: 'blue'}, {userId: '2', height: 4, color: 'green'}, {userId: '3', height: 2, color: 'orange'});
and this api to obtain a user's shape
let getUserShape = userId => {
return Promise.resolve(userId === 1 ? 'circle' : 'square');
};
without waitOn
, we would need to do something like the following in order to include every user's shape
users
.set('shape', ({userId}) => getUserShape(userId))
.map(user => user.shape.then(shape => {
user.shape = shape;
return user;
}))
.wait();
but with waitOn
, we can simply do the following
users
.set('shape', ({userId}) => getUserShape(userId))
.waitOn('shape');
waitOrdered (skipOnReject)
let resolve1, resolve2;
let promise1 = new Promise(resolve => resolve1 = resolve);
let promise2 = new Promise(resolve => resolve2 = resolve);
myStream.write(promise1, promise2);
let outStream = myStream.waitOrdered();
resolve2('promise 2 resolved first');
resolve1('promise 1 resolved last');
// outStream.outValues equals ['promise 1 resolved last', 'promise 2 resolved first']
waitOnOrdered (key, skipOnReject)
let resolve1, resolve2;
let promise1 = new Promise(resolve => resolve1 = resolve);
let promise2 = new Promise(resolve => resolve2 = resolve);
myStream.write({key: promise1}, {key: promise2});
let outStream = myStream.waitOnOrdered('key');
resolve2('promise 2 resolved first');
resolve1('promise 1 resolved last');
// outStream.outValues equals [{key: 'promise 1 resolved last', key: 'promise 2 resolved first'}]
skipOnReject paramater
passing true
as the last paramater to wait
, waitOn
, waitOrdered
, and waitOnOrdered
will ignore values which are rejected, similar to writePromiseSkipOnReject
myStream.write({key1: 'value1', key2: Promise.resolve('value2')});
myStream.write({key1: 'value2', key2: Promise.reject('rejectValue2')});
let outStream = myStream.waitOn('key2', true);
// outStream.outValues equals [{key1: 'value1', key2: 'value2'}]
otherwise, rejected promises are wrapped in a {rejected: <rejected value>, isRejected: true}
structure and written just like resolved promises, similar to writePromise
if (predicateHandler)
myStream.write(110, 10, 30, 130, 50, 150);
let ifStreams = myStream.if(value => value > 100);
// ifStreams.then.outValues equals [110, 130, 150]
// ifStreams.else.outValues equals [10, 30, 50]
console.log('numbers over 100:');
ifStreams.then.each(value => console.log(value));
console.log('numbers under 100:');
ifStreams.else.each(value => console.log(value));
split (predicateHandler, truePredicateHandler, falsePredicateHandler)
myStream.write({species: 'kitten', name: 'tickleMe'});
myStream.write({species: 'kitten', name: 'pokeMe'});
myStream.write({species: 'puppy', name: 'hugMe'});
myStream.split(
animal => animal.species === 'kitten',
kittenStream => kittenStream
.set('sound', () => 'meow')
.set('image', ({name}) => getRandomLolzCatImage(name)),
puppyStream => puppyStream
.set('sound', () => 'wuff')
.set('edible', () => true)
.each(dipInChocolate));
group (handler)
myStream.write({species: 'cat', name: 'blue'}, {species: 'cat', name: 'green'}, {species: 'dog', name: 'orange'});
let species = myStream.group(animal => animal.species);
// species.cats.outValues equals [{species: 'cat', name: 'blue'}, {species: 'cat', name: 'green'}]
// species.dogs.outValues equals [{species: 'dog', name: 'orange'}]
console.log('cats:');
species.cat.each(cat => console.log(' ', cat.name));
console.log('dogs:');
species.dog.each(dog => console.log(' ', dog.name));
groupCount (integerGroupSize)
myStream.write(20, 30, 40, 50, 60, 70, 80);
let groupStreams = myStream.groupCount(3);
// groupStreams.group0.outValues equals [20, 30, 40]
// groupStreams.group1.outValues equals [50, 60, 70]
// groupStreams.group2.outValues equals [80]
groupFirstCount (integerGroupSize)
myStream.write(20, 30, 40, 50, 60, 70, 80);
let groupStreams = myStream.groupFirstCount(3);
// groupStreams.first.outValues equals [10, 20, 30]
// groupStreams.rest.outValues equals [50, 60, 70, 80]
console.log('first 3 numbers:');
groupStreams.first.each(number => console.log(number));
console.log('rest of numbers:');
groupStreams.rest.each(number => console.log(number));
groupNCount (integerGroupSize, integerGroupCount)
myStream.write(20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120);
let groupStreams = myStream.groupCount(3, 2);
// groupStreams.group0.outValues equals [20, 30, 40]
// groupStreams.group1.outValues equals [50, 60, 70]
// groupStreams.rest.outValues equals [80, 90, 100, 110, 120]
groupIndex (...[lists of indices])
myStream.write(0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100);
let groupStreams = myStream.groupIndex([0], [1, 3, 5, 6]);
// groupStreams[0].outValues equals [0]
// groupStreams[1].outValues equals [10, 30, 50, 60]
// groupStreams.rest.outValues equals [20, 40, 70, 80, 90, 100]
console.log('first number:');
groupStreams[0].each(number => console.log(number));
console.log('important numbers:');
groupStreams[1].each(number => console.log(number));
console.log('other numbers:');
groupStreams.rest.each(number => console.log(number));
batch (integerBatchSize)
myStream.write(0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100);
let outStream = myStream.batch(4);
// outStream.outValues equals [[0, 10, 20, 30], [40, 50, 60, 70]]
batchFlat (integerBatchSize)
let outStream = myStream.batchFlat(4);
myStream.write(0, 10, 20);
// outStream.outValues equals []
myStream.write(30, 40, 50);
// outStream.outValues equals [0, 10, 20, 30]
myStream.write(60, 70, 80);
// outStream.outValues equals [0, 10, 20, 30, 40, 50, 60, 70]
myStream.write(90, 100);
// outStream.outValues equals [0, 10, 20, 30, 40, 50, 60, 70]
generate (handler)
myStream.write(10, 40);
let outStream = myStream.generate(value => [value + 1, value * 2]);
// outStream.outValues equals [10, 11, 20, 40, 41, 80]
flatMap (handler)
myStream.write(10, 40);
let outStream = myStream.flatMap(value => [value + 1, value * 2]);
// outStream.outValues equals [11, 20, 41, 80]
throttle (integer)
myStream.write(promise1, promise2, promise3, promise4);
let throttled = myStream.throttle(2);
throttled.stream
.wait()
.each(doStuff)
.each(throttled.nextOne);
After calling throttled = stream.throttle(n)
, throtled.stream
will emit n
values initially. It will emit 1 more value each time throttled.next()
or throttled.nextOne()
are invoked, and m
more values each time throttled.next(m)
is invoked.
myStream.write(1, 2, 3, 4, 5);
let throttled = myStream.throttle(2);
// throttled.stream.outValues equals [1, 2]
throttled.next(2);
// throttled.stream.outValues equals [1, 2, 3, 4]
throttled.next(2);
// throttled.stream.outValues equals [1, 2, 3, 4, 5]
myStream.write(6, 7);
// throttled.stream.outValues equals [1, 2, 3, 4, 5, 6]
Calling throttled = stream.throttle()
is short for calling throttled = stream.throttle(0)
, which results in a lazy stream. throttled.stream
will emit values only when throttled.next
is invoked.
myStream.write(1, 2, 3, 4, 5);
let throttled = myStream.throttle(2);
// throttled.stream.outValues equals [1, 2]
throttled.unthrottle();
// throttled.stream.outValues equals [1, 2, 3, 4, 5]
myStream.write(6, 7);
// throttled.stream.outValues equals [1, 2, 3, 4, 5, 6, 7]
Calling throttled.unthrottle()
will allow all current and future values to pass through without throttling, and rendering throttled.next()
unnecessary.
clean()
myStream.write(1, 2, 3);
let oneToSix = myStream.map(a => a);
myStream.clean();
myStream.write(4, 5, 6);
let fourToFive = myStream.map(a => a);
// myStream.outValues equals [4, 5, 6];
disconnect()
myStream.write(1, 2, 3);
let oneToThree = myStream.map(a => a);
myStream.disconnect();
myStream.write(4, 5, 6);
let oneToSix = myStream.map(a => a);
promise
myStream.promise
returns a promise that resolves when all already written values to the stream have resolved.
myStream.write(promise1, promise2, promise3, promise4);
myStream.promise.then(resolve1234 => process(resolve1234));
length
myStream.length
outValues
myStream.outValues