npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@reactgular/observables

v2.0.0

Published

Angular library that contains handy rxjs operators.

Downloads

22

Readme

Build Status Coverage Status npm version

What is Observables?

Observables is a small Rxjs 6 library that contains handy operators and utilities.

Why use this rxjs library?

This library contains operators and utilities that solve some very common problems that I face with Rxjs. Here is a quick list of features that I use most often in projects.

Installation

To get started, install the package from npm.

npm install @reactgular/observables

This package requires the Rxjs 6 as a peer dependency.

npm install rxjs

Usage

Operators and utilities are imported from the package path @reactgular/observables.

For example;

import {Observable} from 'rxjs';
import {windowResize, distinctStringify} from '@reactgular/observables';

function conditionalResize(cond$: Observable<boolean>): Observable<{}> {
    return windowResize(250).pipe(
      enabledWhen(cond$)
    );
}

Operators

Here is a list of observable operators that you can use from this library.

Operators | Operators | Operators | Operators | Operators | Operators ----------|-----------|-----------|-----------|-----------|----------- after | before | beforeError | counter | disabledWhen | distinctArray distinctDeepEqual | distinctStringify | enabledWhen | falsy | historyBuffer | ifOp loadFirst | mapFirst | mapLast | negate | pluckDistinct | scanLatestFrom truthy | withMergeMap | withSwitchMap | | |

Utilities

Here is a list of utility functions that you can use from this library.

Operators | Operators | Operators | Operators | Operators | Operators ----------|-----------|-----------|-----------|-----------|----------- combineEarliest | mergeChain | mergeDelayError | mergeTrim | roundRobin | switchChain toObservable | windowResize | | | |

Operators List

after

Emits the value that came after the value that passed the provided condition.

This operator has the following limitations:

  • This operator will never emit if the observable only emits one or fewer values.
  • This operator will never emit the first value.
  • If no values pass the provided condition, then nothing is emitted.
after<T>(cond: (current: T, next: T) => boolean): MonoTypeOperatorFunction<T>

Example:

of('starting', 'started', 'error', 'restarting').pipe(
    after(v => v === 'error')
).subscribe(v => console.log(v)); // prints "restarting"

[source] [tests] [up]


before

Emits the value that came before the value that passed the provided condition.

This operator has the following limitations:

  • This operator will never emit if the observable only emits one or fewer values.
  • This operator will never emit the last value.
  • If no values pass the provided condition, then nothing is emitted.
before<T>(cond: (current: T, prev: T) => boolean): MonoTypeOperatorFunction<T>

Example:

of('starting', 'started', 'error', 'restarting').pipe(
    before(v => v === 'error')
).subscribe(v => console.log(v)); // prints "started"

[source] [tests] [up]


beforeError

Emits an array of values that came before an error and silences the error. You can specify how many values to emit upon an error (the default is 1). The emitted array contains the most recent value first followed by older values.

This is a good operator for debugging to see what values preceded an error.

Example:

of('starting','started','restarting').pipe(
    map(n => {
        if(n === 'restarting') { throw new Error() }
        return n;
    }),
    beforeError()
}).subscribe(v => console.log(v)); // prints ["started"]

[source] [tests] [up]


counter

Increments a counter for each emitted value.

counter<T>(): OperatorFunction<T, [number, T]>

Example:

of('a', 'b', 'c', 'd').pipe(
    counter()
).subscribe(v => console.log(v));
// [1, 'a']
// [2, 'b']
// [3, 'c']
// [4, 'd']    

[source] [tests] [up]


disabledWhen

The inner observable can emit a truthy value to stop the emitting of values from the outer observable, and emit a falsy to resume emitting values.

Does not emit any values until the inner observable emits a falsy value.

disabledWhen<T>(disabled$: Observable<boolean>): MonoTypeOperatorFunction<T>

[source] [tests] [up]


distinctArray

Only emits when an array contains different values than the last and ignores the order of those values. The array must contain sortable values otherwise the results are unpredictable.

This operator sorts each array value before comparison.

distinctArray<T>(): MonoTypeOperatorFunction<T[]>

Example:

of([1,2,3], [3,2,1], [1, 3, 2], [4, 5, 6], [1, 2, 3]).pipe(
    distinctArray()
).subscribe(v => console.log(v));
// prints
// [1,2,3]
// [4,5,6]
// [1,2,3]

[source] [tests] [up]


distinctDeepEqual

Only emits when the current value is deeply different than the last. Two values that have different references, but contain the same properties will be compared to be the same. This is the same for arrays, nested objects, dates and regular expressions.

distinctDeepEqual<T>(): MonoTypeOperatorFunction<T>

Example:

of([1,2],[2,1],{a:1, b:1},{b:1, a:1}).pipe(
    distinctDeepEqual()
).subscribe(v => console.log(v));
// prints
// [1,2]
// {a:1, b:1}

[source] [tests] [up]


distinctStringify

Emits all items from the source Observable that are distinct by comparison using JSON.stringify() on each value.

Arrays with same values in different orders will be seen as different, and the same for objects with properties in different order.

distinctStringify<T>(): MonoTypeOperatorFunction<T>

Example:

of([1,2,3], [1,2,3], [3,2,1], {a: 1}, {a: 1}, {a: 1, b: 1}, {b: 1, a: 1}, "one", "one", "two")
    .pipe(distinctStringify())
    .subscribe(v => console.log(v));

// [1,2,3]
// [3,2,1]
// {a: 1}
// {a: 1, b: 1}
// {b: 1, a: 1}
// "one"
// "two"

[source] [tests] [up]


enabledWhen

The inner observable can emit a falsy value to stop the emitting of values from the outer observable, and emit a truthy to resume emitting values.

Does not emit any values until the inner observable emits a truthy value.

enabledWhen<T>(enabled: Observable<boolean>): MonoTypeOperatorFunction<T>

[source] [tests] [up]


falsy

Emits only falsy values. Performs a filter(v => !v) operator internally.

falsy<T>(): MonoTypeOperatorFunction<T>

Example:

of(0, "Hello", false, [1,2], "")
    .pipe(falsy(), toArray())
    .subscribe(v => console.log(v)); // prints [0, false, ""]

[source] [tests] [up]


historyBuffer

Emits an array that starts with the current value followed by previous values. Pass a count number to limit the length of the array, otherwise the array will continue to grow in length until the observable completes.

historyBuffer<T>(count?: number): OperatorFunction<T, T[]>

Example:

of(1,2,3,4,5).pipe(
   bufferHistory(3)
).subscribe(v => console.log(v));
// [1]
// [2,1]
// [3,2,1]
// [4,3,2]
// [5,4,3]

[source] [tests] [up]


ifOp

Apply an operator based on a condition. This operator only adds another operator when the condition is true. When the condition is false the source observable is not modified.

ifOp<T, R>(cond: boolean, operator: OperatorFunction<T, R>): OperatorFunction<T, T | R>

Examples:

Creates an observable of Window resize events with optional debouncing.

windowResize(debounce?: number) {
   return fromEvent(window, 'resize').pipe(
      ifOp(Boolean(debounce), debounceTime(debounce))
   );
}

If you are looking to apply two different operators based upon a conditional if/else, then you can use a simple ?: condition in the pipe() chain.

function switchOrMerge(cond: boolean): Observable<number> {
    const projector = (value) => of(value).pipe(startWith(99));
    return of(1,2,3).pipe(
       cond ? switchMap(projector) : mergeMap(projector)
    );
}

[source] [tests] [up]


loadFirst

Emits objects that describe the loading of data from a remote resource (like making a HTTP request). The objects contain the status property which can be either "start", "value" or "error", and a value property which holds the first data emitted by the outer observable.

This operator only reads the first value from the outer observable, and then completes.

There is always a start object emitted first followed by either a value or error object. The error object can be a thrown error or the result of the outer observable completing without any results.

export interface LoadFirst<T> {
    status: string;
    value: T | undefined;
}

loadFirst<T, S, E>(start?: S, empty?: E): OperatorFunction<T, LoadFirst<T | S | E>>

Example:

of("Hello World").pipe(
    loadFirst()
).subscribe(v => console.log(v));
// prints 
// {state: "start", value: undefined}
// {state: "value", value: "Hello World"}

You can use this operator to make loading indicators for Angular components.

Example:

import {loadFirst, LoadFirst} from '@reactgular/observable/operators';

@Component({
    selector: 'example',
    template: `
        <ng-container *ngIf="load$ | async as load" [ngSwitch]="load.status">
            <div *ngSwitchCase='"start"'>
                Please wait while loading...
            </div>
            <div *ngSwitchCase='"value"'>
                {{load.value}}
            </div>
            <div *ngSwitchCase='"error"'>
                There was an error loading data...
            </div>
        </ng-container>`
})
export class ExampleComponent implements OnInit {
    public load$: Observable<LoadFirst<any>>;
   
    public constructor(private http: HttpClient) { }
   
    public ngOnInit() {
        this.data$ = this.http
            .get('https://example.com/api')
            .pipe(loadFirst());    
    }
}

[source] [tests] [up]


mapFirst

Applies a given project function to the first value emitted by the source Observables, and emits the resulting value. Only the first value is projected and subsequent values are emitted without projection.

This operator is an alias for doing map((value, indx) => indx === 0 ? project(value) : value)

Example:

of(1,2,3,4).pipe(
    mapFirst(v => v * 100)
).subscribe(v => console.log(v)); 
// 100
// 2
// 3
// 4

[source] [tests] [up]


mapLast

Applies a given project function to the last value emitted by the source Observables, and emits the resulting value. Only the last value is projected and previous values are emitted without projection. This operator uses pairwise() internally and emits each value only when a next value is emitted or the source observable completes.

If you use both mapFirst() and mapLast() on an observable that emits only a single value and completes, then both operators will project on the same value.

This operator has the following limitations:

  • Each emitted value is the previous value from the source observable, and the last value is flushed out when the source completes.
  • Projects the last value even if the observable emits only a single value and completes.
mapLast<T, R>(project: (value: T) => R): OperatorFunction<T, T | R>

Example:

of(1,2,3).pipe(
    mapLast(v => v + 1000)
).subscribe(v => console.log(v));
// 1
// 2
// 1003

[source] [tests] [up]


negate

Maps truthy values to false, and falsy values to true. Performs a map(v => !v) internally.

negate<T>(): OperatorFunction<T, boolean>

Example:

of(0, "Hello", false, [1,2,3], "").pipe(
    negate(),
    toArray()
).subscribe(v => console.log(v));
// prints [true, false, true, false, true]

[source] [tests] [up]


pluckDistinct

Maps each source value (an object) to its specified nested property, and only emits distinct changes. It is the same as applying a pluck() followed by a distinctUntilChanged().

pluckDistinct<T, R>(...properties: string[]): OperatorFunction<T, R>

Example:

from([
    {name: 'John Smith'},
    {name: 'John Smith'},
    {name: 'Jane Doe'},
    {name: 'Jane Doe'}
]).pipe(
    pluckDistinct('name'),
    toArray()
).subscribe(v => console.log(v)); // prints ['John Smith', 'Jane Doe']

[source] [tests] [up]


scanLatestFrom

Applies an accumulator function over the source Observable, and returns each intermediate result. The seed value is the latest value from the second observable. If the source observable emits multiple values before the second observable emits a value, then the latest from both observables will be used instead. Accumulated values are discarded when the second observable emits a seed value.

Accumulated values are discarded when the second observable emits a seed value, and a new value is calculate using the accumulator function.

Accumulator function parameters:

  • acc is the accumulated value and is either the latest value from the second observable or the previous value from the accumulator.
  • value is the value from the source observable.
  • index is the offset number from the source observable.
  • reset is true when the acc parameter has been reset by the second observable emitting a value.
scanLatestFrom<T, A, R>(accumulator: (acc: A | R, value: T, index: number, reset: boolean) => R, latest: Observable<A>): OperatorFunction<T, R>

[source] [tests] [up]


truthy

Emits only truthy values. This operator is an alias for filter(v => Boolean(v)), but most people write filter(Boolean) because it's shorter. The problem with using filter(Boolean) is that the observable type can change to Boolean by TypeScript. So using truthy() is a shorter alias for the longer form that persists the generic type.

truthy<T>(): MonoTypeOperatorFunction<T>

Example:

of(0, false, [1,2,3], "Hello", "", {}).pipe(
    truthy(),
    toArray()
).subscribe(v => console.log(v));
// prints [[1,2,3], "Hello", {}]

[source] [tests] [up]


withMergeMap

Applies a mergeMap to the outer observable, and maps the inner observable to an array that contains the value of both the outer and inner observables as Observable<[outer, inner]>.

withMergeMap<T, R>(inner: (x: T) => Observable<R>): OperatorFunction<T, [T, R]>

Example:

of('A', 'B', 'C').pipe(
    withMergeMap(() => of('1'))
).subscribe(v => console.log(v));
// ['A', '1']
// ['B', '1']
// ['C', '1']

[source] [tests] [up]


withSwitchMap

Applies a switchMap to the outer observable, and maps the inner observable to an array that contains the value of both the outer and inner observables as Observable<[outer, inner]>.

withSwitchMap<T, R>(inner: (x: T) => Observable<R>): OperatorFunction<T, [T, R]>

Example:

of('A', 'B', 'C').pipe(
    withSwitchMap(() => of('1'))
).subscribe(v => console.log(v));
// ['A', '1']
// ['B', '1']
// ['C', '1']

[source] [tests] [up]


Utilities List

combineEarliest

Unlike combineLatest() which does not emit a value until all observables emits at least one value. The combineEarliest() emits immediately upon the first observable that emits a value substituting a value (defaults to undefined) for any awaiting values from the other observables.

combineEarliest<O extends Observable<any>, S, R>(observables: O[], substitute?: S): Observable<R>

Example:

combineEarliest([
    interval(1000),
    of('A').pipe(delay(1000)),
    of('B').pipe(delay(2000))
]).pipe(take(3)).subscribe(v => console.log(v));

// [0, undefined, undefined]
// [1, 'A', undefined]
// [2, 'A', 'B']

[source] [tests] [up]


mergeChain

When the source observable emits a value it is passed to the next switchTo function which returns another observable, and the value from that observable is passed onto the next switchTo function. It creates a new observable that emits an array of all values emitted from chained observables.

Uses mergeMap() internally to chain the functions together.

mergeChain<T, R>(source: Observable<T>, ...mergeTo: Array<(...values: any[]) => Observable<any>>): Observable<R>

Example:

mergeChain(
    store.select('company'),
    (company) => store.selectPriceChanges(company.id),
    (price, company) => store.selectPriceUpdates(price.id)
).subscribe(([changes, price, company]) => console.log(changes, price, company));

[source] [tests] [up]


mergeDelayError

Creates an output observable which concurrently emits all values from every given input observable, but delays any thrown errors until all observables have completed, and throws the first error.

All observables must complete before any awaiting error are thrown.

mergeDelayError<T>(...observables: Observable<T>[]): Observable<T>

Example:

mergeDelayError(
    of(1,2,3),
    throwError('ERROR')
).subscribe(
    v => console.log(v),
    err => console.error(err)
);
// prints
// 1
// 2
// 3
// ERROR

[source] [tests] [up]


mergeTrim

Creates an output observable which concurrently emits all values from every given input observable until any observable completes.

mergeTrim<T>(...observables: Observable<T>[]): Observable<T>

[source] [tests] [up]


roundRobin

Creates an output observable which emits values from each observable in a round robin sequence. Where the first observable must emit a value, before the next observable emits a value and starts over after all observables have emitted a value.

function roundRobin<T>(...observables: Observable<T>[]): Observable<T>

[source] [tests] [up]


switchChain

When the source observable emits a value it is passed to the next switchTo function which returns another observable, and the value from that observable is passed onto the next switchTo function. It creates a new observable that emits an array of all values emitted from chained observables.

Uses switchMap() internally to chain the functions together.

switchChain<T, R>(source: Observable<T>, ...switchTo: Array<(...values: any[]) => Observable<any>>): Observable<R>

Example:

switchChain(
    http.get('/user'),
    (user) => http.get(`/projects/${user.projectId}`),
    (project, user) => http.get(`/company/${project.companyId}`),
    (company, project, user) => http.get(`/brand/${company.brandId}`)
).subscribe(([brand, company, project, user]) => console.log(brand, company, project, user));

[source] [tests] [up]


toObservable

Converts the parameter to an observable, or returns the value if already an observable.

toObservable<T>(value: T | Observable<T>): Observable<T>

Example:

An example where an array of values is converted into an array of observables.

const values = [100, of(200), 300];
forkJoin(values.map(toObservable))
    .subscribe(v => console.log(v));
// prints [100, 200, 300]

[source] [tests] [up]


windowResize

Emits changes in the window size with optional debounce time.

windowResize(debounce?: number, wnd?: Window): Observable<{ innerWidth: number, innerHeight: number }>

Example:

Creates an observable of the window aspect ratio.

const aspect$ = windowResize(250).pipe(
   map(({innerWidth, innerHeight}) => innerWidth / innerHeight)
);

[source] [tests] [up]