npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

rx-flowable

v0.1.2

Published

RxJS observables with back-pressure

Downloads

13

Readme

build npm stability-wip

rx-flowable

observables with lossless back-pressure

RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code.

rx-flowable makes it possible to back-pressure observables in RxJS 7. This is useful when an observable produces values faster than a subscriber can consume them.

Please consider that a number of solutions for lossless back-pressured streaming of values already exist for the Javascript platform, as listed in the bibliography below. This module is designed for projects that already make extensive use of RxJS, and prefer not to adopt another library, with the associated knowledge overhead.

⚠️ Use of this library requires a solid understanding of RxJS, especially when authoring observable pipelines. This is because native RxJS operators are not designed for back-pressure and may behave unexpectedly if used naively.

Feedback and contributions welcome!

principle

The basic building block of rx-flowable is an Observable of Bites, called a Consumable. A bite is an object containing:

  • a value
  • a function, next(), which invites the source to produce the next value

Therefore, a consumable is simply an observable which does not produce values until it is invited to. In many analyses this is called a "pull" model – we "pull" values from the stream source one at a time; often contrasted with the native "push" model of RxJS in which the source produces values as fast as it can.

Here is an example of subscribing to a consumable:

function consume(values: Consumable<number>) {
  // We use the Observable.subscribe method as normal
  values.subscribe(async nextBite => {
    // Each value supplied to the subscriber is a bite
    try {
      // Hard work takes time, but the consumable will wait for us...
      await hardWork(nextBite.value);
    } finally {
      // ... until we ask for the next bite
      nextBite.next();
      // This subscriber will be called again if there are any more values
    }
  });
}

(Note that Observable.subscribe is already a "pull"-style method. Many observables will not produce anything until subscribed to, irrespective of their subsequent speed. rx-flowable affords this possibility at a granular level.)

The principle of consuming bites by calling next is simple and powerful, but has an important downside. That is, if next is not called, the next value will not be produced, and the source may hang on to some underlying resource indefinitely. This means that consumables are generally more prone to resource leaks than observables. In the example above, we take care to wrap the hard work in a try-finally block to mitigate this – assuming for simplicity that any error is not catastrophic and we can continue processing.

Once subscribed, consumables are "hot". If multiple subscribers are attached to a consumable, late subscribers will only receive values that prior subscribers have invited from the source. Further, the consumable will produce values at the pace of the slowest subscriber. That is, a value is not produced until every subscriber has invited it.

In case we want to stop processing, instead of calling next it is possible to unsubscribe from the consumable. Once the last subscriber has unsubscribed, the consumable is able to release its held resources.

flowable

If the speed of the subscriber is unknown at design time, for example if the streaming is part of a library interface, then a consumable can be wrapped as a Flowable. A flowable is also an observable, directly of values (so that it is straightforward to use in pipelining), but with a consume property to re-enter the back-pressured world:

// Database library code:
function readFromDatabase(query) {
  const stream = this.db.readStream(query);
  const consumable = consume(stream); // See sources, below
  return flowable(consumable);
}
// ... client 1:
{
  const cursor = readFromDatabase('SELECT * FROM DATA');
  // Using the flowable directly as an observable without calling next()
  cursor.pipe(filter(isPrintable)).subscribe(console.log);
}
// ... client 2:
{
  const cursor = readFromDatabase('SELECT * FROM DATA');
  // OR using it as a consumable to back-pressure from an expensive downstream
  cursor.consume.subscribe(({ value, next }) =>
    transformAndLoad(value).finally(next));
}

A subscriber via Observable.subscribe() always receives all data, but it may be delayed by any subscribed consumers.

sources

This library provides implementations of Consumable for the following common sources of values:

  • Javascript Iterables
  • Promises
  • Readables (such as NodeJS Readables)
  • Other observables (non-consumable observables will buffer values)

These can be constructed using the consume function in the consume module.

operators

Consumables can be pipelined using RxJS operators. These native operators will see bites instead of raw values. Care must be taken to ensure that next is called correctly for every input bite, if the pipeline is to complete successfully. Since this can sometimes require non-obvious but actually boilerplate code, this library provides specialised operators which can be used in place of native ones to correctly handle calling of next in common situations.

  • flatMap is a specialisation of concatMap
  • ignoreIf is like an inverse of filter
  • batch is a specialisation of bufferCount
  • Please suggest or Pull Request!

biblio

background

  • ReactiveX, RxJS: Reactive Extensions For JavaScript, https://github.com/ReactiveX/rxjs
  • ReactiveX, backpressure operators, https://reactivex.io/documentation/operators/backpressure.html (note: the reference to RxJS ControlledObservable is not applicable to RxJS 7, see below)
  • ReactiveX, RxJS v4 Rx.Observable.prototype.controlled, https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/controlled.md
  • Reactive Streams, Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. https://www.reactive-streams.org/

alternatives

  • NodeJS, A stream is an abstract interface for working with streaming data in Node.js, https://nodejs.org/docs/latest/api/stream.html
  • WhatWG, Streams: APIs for creating, composing, and consuming streams of data that map efficiently to low-level I/O primitives, https://streams.spec.whatwg.org/
  • ReactiveX, The Interactive Extensions for JavaScript (IxJS), https://github.com/ReactiveX/IxJS
  • Ruben Verborgh, Asynchronous iterators for JavaScript, https://github.com/RubenVerborgh/AsyncIterator
  • Kevin Ghadyani, Lossless Backpressure in RxJS, https://itnext.io/lossless-backpressure-in-rxjs-b6de30a1b6d4