pulls
v1.7.47
Published
pulls - readable streams for Node.js and Browsers.
Downloads
1,460
Readme
pulls
pulls - readable streams for Node.js and Browsers.
Naturally readable streams are everything that implements async iterable protocol or iterable protocol.
Streams are:
- Array
- Async Iterable
- Iterable objects
- Generators
- and Node.js readable streams.
It's lightweight with 200LOC and fast - more than 2M/s throughput.
Basic usage examples
Consume array as stream for ex. and pipeline as map -> filter -> reduce
import {pipeline, stream} from "pulls";
import {map} from "pulls/map";
import {filter} from "pulls/filter";
import {reduce} from "pulls/reduce";
import {tap} from "pulls/tap";
import {sequence} from "pulls/sequence";
const expected = await pipeline(
stream([1, 2, 3]), // automatically converts array to stream as async interable
map((x) => x * 2), // multiple; = 2, 4, 6
filter((x) => x > 2), // filter only x > 2; = 4, 6
// types input:number, output:number[]
reduce<number, number[]>((acc, value) => [...acc, value], []), // collect to array as expected
tap(console.log) // prints output
);
// expected: [4, 6]
expect(expected).toEqual([4, 6])
One more example with async stream.
It takes 1s to calculate 1M dataset.
// consume async iterator as stream and sum some values of 1m
const expected = await pipeline(
sequence(1000000), // creates low CPU/memory stream
map((x) => x * 2), // multiple; = 2, 4, 6...
filter((x) => x > 2), // filter only x > 2; = 4, 6....
// types input:number, output:number
reduce<number, number>((acc, value) => acc + value, 0), // sum values
tap(console.log) // prints output; 999998999998
);
// expected: 999998999998
expect(expected).toEqual(999998999998)
It's pipeline to consume async generator as stream as ex.
import {pipeline} from "pulls";
import {tap} from "pulls/tap";
// consume generator/async iterable protocol as stream
async function* Generate123() {
yield 1
yield 2
yield Promise.resolve(3)
}
const expected: number[] = [];
const collect = (value:number) => expected.push(value);
await pipeline(
Generate123(), // consume generator as stream
tap<number>(collect) // collect output to array as expected
)
// expected: [1, 2, 3]
expect(expected).toEqual([1, 2, 3])
Guide
TBD
Documentation
TBD
Stream examples
To get a point for a readable sources just check the information that I mentioned before. async iterable protocol or iterable protocol and more about iterating over async iterables. Or it could be enough to check examples here.
// stream 1
const asyncIterable = {
[Symbol.asyncIterator]() {
return {
i: 0,
next() {
if (this.i < 3) {
return Promise.resolve({ value: this.i++, done: false });
}
return Promise.resolve({ done: true });
}
};
}
};
// stream 2
const myAsyncIterable = {
async* [Symbol.asyncIterator]() {
yield "hello";
yield "async";
yield "iteration!";
}
};
// stream 3
let mySyncIterator = {
next: function() {
// ...
},
[Symbol.iterator]: function() { return this; }
};
// stream 4 / Node.js
const readableStream = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
// stream 5 / Node.js
const readableStream = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
// stream 6 / Node.js
import {Readable} from 'stream';
function* gen() {
yield 'One line\n';
yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
// stream 7 / Node.js
import {Readable} from 'stream';
const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
Node.js streams cooperation
Node.js streams are readable and implements async iterable protocol and could be consumed in pipelines.
import {pipeline} from "pulls";
import {Readable} from 'stream';
function* gen() {
yield 'One line\n';
yield 'Another line\n';
}
const readableStream = Readable.from(gen(), {encoding: 'utf8'});
// example
await pipeline(readableStream, /* map(...), filter(...) */)
Good article how to convert async iterable to Node.js readable and back could help you to manage Node.js streams.
TODOs
More information will come soon about transformations, composition, and stream management and scalability.