web-streams-extensions
v0.12.0
Published
helper methods to create, pipe and map webstreams
Downloads
10,963
Maintainers
Readme
WebStream Extensions
A collection of helper methods for WebStreams, inspired by ReactiveExtensions. Being built on-top of ReadableStream we can have a reactive-pipeline with non-blocking back-pressure built-in.
requires support for ReadableStream use a polyfill if they're not available
Subjects require support for WritableStream. Requires support for async / await.
Creation
from(src: Iterable | AsyncIterable | (()=>Iterable | AsyncIterable) | ReadableLike): ReadableStream
turns an iterable source into a readable stream.
It will not try create an iterator until the result stream is read from.
from([1,2,3,4])
from(function*(){yield 1, yield 2, yield 3, yield 4};
from(async function*(){yield 1, yield 2, yield 3, await Promise.resolve(4)};
of(...args:T[]): ReadableStream
creates a ReadableStream where the chunks will be the in-order arguments passed to it
of(1, "foo", ()=>"bar", {})
concat(...streams: ReadableStream[]): ReadableStream
concatenates several streams together in the order given.
It will not read from the streams until the result stream is read from.
let inputA = [1,2];
let inputB = [3,4];
let expected = [1,2,3,4];
let stream = concat(from(inputA), from(inputB));
let result = await toArray(stream);
defer(cb: ()=>Promise<ReadableStream> | ReadableStream): ReadableStream
await a callback method that returns a readable-stream
let input = [1,2,3,4];
let expected = [1,2,3,4];
let result = await toArray(defer(x=>Promise.resolve(from(input))));
Consuming
toArray(src: ReadableStream): T[]
let input = [1,2,3,4];
let expected = [1,2,3,4];
let result = await toArray(from([1,2,3,4]))
toPromise(src: ReadableStream): T
await exhaustion of the stream and return the last entry
let input = [1,2,3,4];
let expected = 4;
let result = await toPromise(from([1,2,3,4]));
subscribe(src, next, complete, error): ()=>void
immediately begins to read from src, passing each chunk to the next
callback and awaiting if it returns a promise.
once the source signals the end of the stream, complete
is called.
if the source stream throws an error, this is passed to the error
callback
returns a disposer method to stop reading
let src = from(function*(){yield 1, yield 2, yield 3})
subscribe(src,
(next)=>{ console.log("Next:", next);})
()=>{console.log("Complete")}
(err)=>{console.log("Error:", err)}
);
Piping
Given inconsistencies in browser support for anything other than ReadableStream, we opted to make an Operator a function of the form:
type Op<T, R> = (src:ReadableStream<T>)=>ReadableStream<R>
this only requires ReadableStream to be implemented/available with getReader support. To aid pipeline these operators, a pipe
method is available:
pipe(src: ReadableStream, ...ops:Op): ReadableStream
let input = [1, 2, 3, 4];
let expected = { "1": 1, "2": 2, "4": 4 };
let result = await toPromise(
pipe(
from(input),
filter(x => x != 3),
buffer(Infinity),
map(x => {
return x.reduce((p, c) => { p[c.toString()] = c; return p }, {});
}),
first()
));
Operators
buffer(count: number, highWaterMark: number): Op<T, T[]>
buffer chunks until the buffer size is count
length, then enqueues the buffer and starts a new buffer
let input = [1,2,3,4];
let expected = [[1,2],[3,4]];
let stream = buffer(2)(from(input));
let result = await toArray(stream);
concatAll(): Op<ReadableStream, T>
given a ReadableStream of ReadableStreams, concatenates the output of each stream.
let input = [from([1,2]), from([3,4]), from([5])];
let expected = [1,2,3,4,5];
let stream = concatAll()(from(input));
let result = await toArray(stream);
filter(predicate: (chunk: T) => boolean): Op<T, T>
filter out chunks that fail a predicate
let input = [1,2,3,4];
let expected = [1,2,4];
let stream = filter(x=>x!=3)(from(input));
let result = await toArray(stream);
first(predicate?:(chunk:T)=>boolean): Op<T, T>
returns a stream of one chunk, the first to return true when passed to the selector, or simply the first if no predicate is supplied
let input = [1,2,3,4];
let expected = 3;
let stream = first(x=>x>=3)(from(input));
let result = await toPromise(stream);
last(predicate?:(chunk:T)=>boolean): Op<T, T>
returns a stream of one chunk, the last to return true when passed to the predicate, or simply the last if no predicate is supplied.
let input = [1,2,3,4];
let expected = 3;
let stream = last(x=>x<4)(from(input));
let result = await toPromise(stream);
map<T, R=T>(select:MapSelector<T, R>, highWaterMark): Op<T, R>
given a stream of T and selector f(T)->R, return a stream of R, for all f(T) != undefined
let input = [1,2,3,4];
let expected = [2,4,6,8];
let stream = map(x=>x*2)(from(input));
let result = await toArray(stream);
skip(count: number): Op<T, T>
skip count
elements and then stream the rest to the output
let input = [1,2,3,4,5];
let expected = [3,4,5];
let stream = pipe(from(input), skip(2));
let result = await toArray(stream);
take(count: number): Op<T, T>
take count
elements and close
let input = [1,2,3,4,5];
let expected = [1,2];
let stream = pipe(from(input), take(2));
let result = await toArray(stream);
tap(cb: (chunk: T) => void): Op<T, T>
allows observing each chunk, but the output is exactly the same as in the input.
let input = [1,2,3,4];
let expected = [1,2,3,4];
let result = []
let stream = tap(x=>result.push(x))(from(input));
let result = await toPromise(stream); //execute
timeout(duration: number): Op<T, T>
throws an error if the duration between chunks exceeds the duration (milliseconds)
Subjects
Subjects are duplex streams with automatic tee'ing of the readable. i.e. each access call to subject.readable
returns a new ReadableStream.
Subject()
proof of concept - its likely there are cases not covered by the tests.
a Subject instance has the following members:
readable: ReadableStream<T>;
writable: WritableStream<T>;
next(value:T): number;
complete(): void;
error(err): void;
you can pipeTo
the subject's writable
:
let input = [1, 2, 3, 4];
let subject = new Subject<number>();
let resultPromise = toArray(subject.readable);
from(input).pipeTo(subject.writable);
let result = await resultPromise;//[1,2,3,4]
or pipeThrough
the subject:
let input = [1, 2, 3, 4];
let subject = new Subject<number>();
let result = await toArray(from(input).pipeThrough(subject));
expect(result).to.be.deep.eq(expected); // [1,2,3,4]
or manually call next
, complete
, error
let subject = new Subject<number>();
let resultPromise = toArray(subject.readable);
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.complete();
let result = await resultPromise; // [1,2,3,4]
although mixing these approaches is not advised - unpredictable behavior.