npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@jsxt/stream

v5.0.0

Published

Stream is a library for creating a async iterable pull stream from a push source

Downloads

1

Readme

Stream

Introduction

The JavaScript language now has async iterators and async generators, however not included is a way to create async iterators from existing data sources.

This library is designed to fill that void by providing a single concrete type with a lot of similarities to the Promise type. It also borrows ideas from the observable proposal and allows any observer to also be used as the first argument to Stream.

NOTE: Like the Observable proposal the goal of Stream is to have minimal API surface which is why Stream does't include any operators such as .map/.filter/etc, those should provided by some other library

Examples

const interval = new Stream(stream => {
    setInterval(stream.yield, 1000)
})

for await (const _ of interval) {
    console.log("Tick")
}
function mediaChunks(mediaRecorder, stopWhen) {
    return new Stream(stream => {
        mediaRecorder.ondataavailable = ({ data }) => stream.yield(data)
        stopWhen(stream.return)
    })
}

const userVoice = await navigator.mediaDevices.getUserMedia({ audio: true })
const stopWhen = callback => setTimeout(stopWhen, 10000)

const recorder = new MediaRecorder(userVoice)

for await (const chunk of mediaChunks(recorder, stopWhen)) {
    // Even if db.append is slow the rest of the chunks will still be queued
    await db.table(filename).append(chunk)
}

API

new Stream(initializer, { queue=new Queue() }={})

The Stream constructor requires a single paramater as it's first argument, the initializer will be called immediately a StreamController object, it may optionally return a single function that will be called when cleanup is started and the stream is complete.

Optionally as a second argument an options bag may be provided. At current the only available option is queue.

StreamOptions.queue

Optionally you can pass a custom queue object. It needs to conform to the following interface:

interface AbstractQueue<T> {
    isEmpty: boolean;
    enqueue: (item: T) => any;
    dequeue: () => T;
}

The only other invariant on these methods is that that if .isEmpty is true then until the microtask ends the next call to .dequeue must return an item. .dequeue will never be called if .isEmpty is false.

stream.next()

This is the implementation of AsyncIterator.next, if an item is in the queue then it'll return the first item from the queue. Otherwise we'll wait until the next item is available.

Note: Multiple calls to .next/.return are safe even without waiting for the previous one to resolve. The results will always be resolved in order.

stream.return(value?)

This is the implementation of AsyncIterator.return, if a value is passed and the queue is not yet complete then that value will be the value in the IteratorResult.

When calling this method cleanup will be scheduled to happen once all pending calls to .next are resolved.

If the cleanup callback returns a promise then this will not resolve until the returned promise is resolved.

Note: If the queue still has items they will simply be ignored.

StreamController

The stream controller object is how you put values into the iterator. You can send values, throw an error or end the iterable at any point.

StreamController.yield(value)/StreamController.next(value)

The .yield method (and its alias .next) put a value in the stream. If there are already calls to .next waiting it will be immediately sent to those, otherwise it will enqueue the values in the queue.

StreamController.throw/StreamController.error

The .throw method (and its alias .error) causes the stream to terminate with an error.

StreamController.return/StreamController.complete

The .return method (and its alias .complete) will cause the stream to become complete once the final items on the queue are consumed.

Default Queue

The default queue is a simple FIFO queue with infinite length.

State Overview

In order for the stream to clean up at an appropriate time every stream object has a current state. These are similar to Promise objects which have pending, fulfilled and rejected states.

Here's the full state diagram. It's just a teensy tiny bit more complicated than the Promise one:

a diagram showing all the states of the stream

Okay so it's a bit more complicated than the Promise one but it guarantees that all .next/.return calls will be appropriately fulfilled if they have been requested.