npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

rx-flow-extensions

v2.0.0

Published

Extends Rx with some additional utility functions

Downloads

5

Readme

Rx Flow Extensions

These are simple extensions to make handling some common Rx flows a bit more simple.

Instalation:

$ npm install --save rx-flow-extensions

Usage

The library can be used in three ways

Directly accessing the helper methods:

const Rx = require('rxjs/Rx');
const RxFlowExt = require('rx-flow-extensions');

const obs = Rx.Observable.of(1);
RxFlowExt.just(obs, 2)
  .then(console.log); // 2

Important! when using functions this way, the first parameter is always the observable

Extending a specific observable:

const Rx = require('rxjs/Rx');
const RxFlowExt = require('rx-flow-extensions');

const obs = Rx.Observable.of(1);
RxFlowExt.extend(obs);
obs.just(2)
  .then(console.log); // 2

Extending all observables

const Rx = require('rxjs/Rx');
const RxFlowExt = require('rx-flow-extensions');
RxFlowExt.extend(Rx.Observable.prototype);

const obs = Rx.Observable.of(1);
obs.just(2)
  .then(console.log); // 2

Removing the extensions

You can reset an extended object (i.e. remove all the added methods) by running

const Rx = require('rxjs/Rx');
const RxFlowExt = require('rx-flow-extensions');
RxFlowExt.extend(Rx.Observable.prototype);
RxFlowExt.reset(Rx.Observable.prototype);

const obs = Rx.Observable.of(1);
obs.just(2) // obs.just is not a function
  .then(console.log);

Methods

All methods can be called using the 3 options shown above, we'll assume we have extended all observables for all examples.

cached(time)

It will cache the last result provided by the observable for time milliseconds. It won't ask the observable for new values during that time

const rest = require('rest');
const mime = require('rest/interceptor/mime');
const client = rest.wrap(mime); // Using cujojs/rest
const slow = Rx.Observable.of('my/slow/service/url') // {value: 1}
  .cached(10000)
  .map(res => res.value);

slow.subscribe(console.log); // 1 (will call my/slow/service/url)
slow.subscribe(console.log); // 1 (won't call my/slow/service/url)
setTimeout(
  () => slow.subscribe(console.log),
  11000
); // 1 (after 11000 ms, will call my/slow/service/url)

flatten()

It will flatten an observable of observables of values into an observable of values

Rx.Observable.range(0, 5)
  .map(v => Rx.Observable.of('test'))
  .flatten()
  .subscribe(console.log); // test! (5 times)

just(value)

It just maps the emitted observable values to the specified value

Rx.Observable.range(0, 5)
  .just('test!')
  .subscribe(console.log); // test! (5 times)

matching(obs2, groupBy, condition)

It will merge two observers based on a condition (similar to how two tables are merged in a RDBMS).

const obs1 = Rx.Observable.from([
  {id: 1, value: 'value a'},
  {id: 2, value: 'value b'},
  {id: 3, value: 'value c'}
]);
const obs2 = Rx.Observable.from([
  {foreignId: 3, value2: 'matches' },
  {foreignId: 2, value2: 'matches' },
  {foreignId: 4, value2: 'does not match' }
  {foreignId: 1, value2: 'matches' },
]);

obs1.matching(obs2, a => a.id, (id, b) => id = b.foreignId)
  .subscribe(console.log); /* will emmit:
    [{id: 1, value: 'value a'}, {foreignId: 1, value2: 'matches' }]
    [{id: 2, value: 'value b'}, {foreignId: 2, value2: 'matches' }]
    [{id: 3, value: 'value c'}, {foreignId: 3, value2: 'matches' }]

polling(interval, maxAttempts)

It will get elements emitted by the observable, waiting interval milliseconds. It will stop after maxAttempts.

This method is useful for polling a web service repeatedly until a condition is met.

const rest = require('rest');
const mime = require('rest/interceptor/mime');
const client = rest.wrap(mime); // Using cujojs/rest
Rx.Observable.of('my/polling_results/service/url') // {results: [...], finished: true|false}
  .flatMap(client)
  .polling(1000, 5)
  .map(res => res.entity)
  .takeWhileInclusive(res => !res.finished)
  .flatMap(res => Rx.Observable.fromArray(results))
  .distinct()
  .subscribe(console.log); // result1 result2 result3... After 5 attempts or finished === true

takeWhileInclusive(condition)

It will take elements while the condition is true, including the element after the condition fails for the first time (similar effect to do while loops)

Rx.Observable.range(0, 5)
  .takeWhileInclusive(x => x < 3)
  .subscribe(console.log); // 0 1 2 3