npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

web-streams-extensions

v0.12.0

Published

helper methods to create, pipe and map webstreams

Downloads

10,963

Readme

WebStream Extensions

A collection of helper methods for WebStreams, inspired by ReactiveExtensions. Being built on-top of ReadableStream we can have a reactive-pipeline with non-blocking back-pressure built-in.

requires support for ReadableStream use a polyfill if they're not available

Subjects require support for WritableStream. Requires support for async / await.

Creation

from(src: Iterable | AsyncIterable | (()=>Iterable | AsyncIterable) | ReadableLike): ReadableStream

turns an iterable source into a readable stream.

It will not try create an iterator until the result stream is read from.

from([1,2,3,4])
from(function*(){yield 1, yield 2, yield 3, yield 4};
from(async function*(){yield 1, yield 2, yield 3, await Promise.resolve(4)};

of(...args:T[]): ReadableStream

creates a ReadableStream where the chunks will be the in-order arguments passed to it

of(1, "foo", ()=>"bar", {})

concat(...streams: ReadableStream[]): ReadableStream

concatenates several streams together in the order given.

It will not read from the streams until the result stream is read from.

let inputA = [1,2];
let inputB = [3,4];
let expected = [1,2,3,4];
let stream = concat(from(inputA), from(inputB));
let result = await toArray(stream);

defer(cb: ()=>Promise<ReadableStream> | ReadableStream): ReadableStream

await a callback method that returns a readable-stream

let input = [1,2,3,4];
let expected = [1,2,3,4];

let result = await toArray(defer(x=>Promise.resolve(from(input))));

Consuming

toArray(src: ReadableStream): T[]

let input = [1,2,3,4];
let expected = [1,2,3,4];
let result = await toArray(from([1,2,3,4]))

toPromise(src: ReadableStream): T

await exhaustion of the stream and return the last entry

let input = [1,2,3,4];
let expected = 4;
let result = await toPromise(from([1,2,3,4]));

subscribe(src, next, complete, error): ()=>void

immediately begins to read from src, passing each chunk to the next callback and awaiting if it returns a promise. once the source signals the end of the stream, complete is called. if the source stream throws an error, this is passed to the error callback returns a disposer method to stop reading

let src = from(function*(){yield 1, yield 2, yield 3})

subscribe(src, 
  (next)=>{ console.log("Next:", next);})
  ()=>{console.log("Complete")}
  (err)=>{console.log("Error:", err)}
);

Piping

Given inconsistencies in browser support for anything other than ReadableStream, we opted to make an Operator a function of the form:

type Op<T, R> = (src:ReadableStream<T>)=>ReadableStream<R>

this only requires ReadableStream to be implemented/available with getReader support. To aid pipeline these operators, a pipe method is available:

pipe(src: ReadableStream, ...ops:Op): ReadableStream

let input = [1, 2, 3, 4];
let expected = { "1": 1, "2": 2, "4": 4 };

let result = await toPromise(
  pipe(
    from(input),
    filter(x => x != 3),
    buffer(Infinity),
    map(x => {
      return x.reduce((p, c) => { p[c.toString()] = c; return p }, {});
    }),
    first()
  ));

Operators

buffer(count: number, highWaterMark: number): Op<T, T[]>

buffer chunks until the buffer size is count length, then enqueues the buffer and starts a new buffer

let input = [1,2,3,4];
let expected = [[1,2],[3,4]];
let stream = buffer(2)(from(input));
let result = await toArray(stream);

concatAll(): Op<ReadableStream, T>

given a ReadableStream of ReadableStreams, concatenates the output of each stream.

let input = [from([1,2]), from([3,4]), from([5])];
let expected = [1,2,3,4,5];
let stream = concatAll()(from(input));
let result = await toArray(stream);

filter(predicate: (chunk: T) => boolean): Op<T, T>

filter out chunks that fail a predicate

let input = [1,2,3,4];
let expected = [1,2,4];
let stream = filter(x=>x!=3)(from(input));
let result = await toArray(stream);

first(predicate?:(chunk:T)=>boolean): Op<T, T>

returns a stream of one chunk, the first to return true when passed to the selector, or simply the first if no predicate is supplied

let input = [1,2,3,4];
let expected = 3;
let stream = first(x=>x>=3)(from(input));
let result = await toPromise(stream);

last(predicate?:(chunk:T)=>boolean): Op<T, T>

returns a stream of one chunk, the last to return true when passed to the predicate, or simply the last if no predicate is supplied.

let input = [1,2,3,4];
let expected = 3;
let stream = last(x=>x<4)(from(input));
let result = await toPromise(stream);

map<T, R=T>(select:MapSelector<T, R>, highWaterMark): Op<T, R>

given a stream of T and selector f(T)->R, return a stream of R, for all f(T) != undefined

let input = [1,2,3,4];
let expected = [2,4,6,8];
let stream = map(x=>x*2)(from(input));
let result = await toArray(stream);

skip(count: number): Op<T, T>

skip count elements and then stream the rest to the output

let input = [1,2,3,4,5];
let expected = [3,4,5];
let stream = pipe(from(input), skip(2));
let result = await toArray(stream); 

take(count: number): Op<T, T>

take count elements and close

let input = [1,2,3,4,5];
let expected = [1,2];
let stream = pipe(from(input), take(2));
let result = await toArray(stream); 

tap(cb: (chunk: T) => void): Op<T, T>

allows observing each chunk, but the output is exactly the same as in the input.

let input = [1,2,3,4];
let expected = [1,2,3,4];
let result = []
let stream = tap(x=>result.push(x))(from(input));
let result = await toPromise(stream); //execute

timeout(duration: number): Op<T, T>

throws an error if the duration between chunks exceeds the duration (milliseconds)

Subjects

Subjects are duplex streams with automatic tee'ing of the readable. i.e. each access call to subject.readable returns a new ReadableStream.

Subject()

proof of concept - its likely there are cases not covered by the tests.

a Subject instance has the following members:

  readable: ReadableStream<T>;  
  writable: WritableStream<T>;

  next(value:T): number;
  complete(): void;
  error(err): void;

you can pipeTo the subject's writable:

let input = [1, 2, 3, 4];
let subject = new Subject<number>();

let resultPromise = toArray(subject.readable);

from(input).pipeTo(subject.writable);

let result = await resultPromise;//[1,2,3,4]

or pipeThrough the subject:

let input = [1, 2, 3, 4];
let subject = new Subject<number>();

let result = await toArray(from(input).pipeThrough(subject));

expect(result).to.be.deep.eq(expected); // [1,2,3,4]

or manually call next, complete, error

let subject = new Subject<number>();
let resultPromise = toArray(subject.readable);

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.complete();

let result = await resultPromise; // [1,2,3,4]

although mixing these approaches is not advised - unpredictable behavior.