@motorcycle/stream
v3.0.0
Published
Functional and reactive event streams for Motorcycle.ts
Downloads
876
Readme
@motorcycle/stream -- 2.1.0
Functional and reactive event streams for Motorcycle.ts
Get it
yarn add @motorcycle/stream
# or
npm install --save @motorcycle/stream
API Documentation
All functions are curried!
ap<A, B>(fns: Stream<(value: A) => B>, values: Stream<A>): Stream<B>
Applies a stream of functions to the latest from a stream of values.
import { ap, now, periodic, scan, skip, observe } from '@motorcycle/stream'
const count$ = scan(x => x + 1, 0, skip(1, periodic(100)))
const fn$ = now(x => x * x)
const stream = ap(fn$, count$)
observe(console.log, stream)
// 0
// 4
// 9
// ...
export { ap } from '@most/core'
at<A>(time: Time, value: A): Stream<A>
Creates a stream that emits a value after a given amount of time.
export { at } from '@most/core'
at<A>(time: number, value: A): Stream<A>
Create a stream containing a single event at a specific time.
import { at, observe } from '@motorcycle/stream'
observe(console.log, at(1000, 'Hello'))
// After 1 second
// logs 'Hello'
export { at } from '@most/core'
awaitPromises<A>(stream: Stream<Promise<A>>): Stream<A>
Turn a stream of promises into a stream containing the promises' values. Note that order is always preserved, regardless of promise fulfillment order.
import { mergeArray, fromPromise, at, now, observe } from '@motorcycle/stream'
// ----1------->
const a = new Promise(resolve => setTimeout(resolve, 100, 1))
// ---------2-->
const b = new Promise(resolve => setTimeout(resolve, 200, 2))
// --3--------->
const c = new Promise(resolve => setTimeout(resolve, 50, 3))
// bc---a------->
const source = mergeArray([ at(100, a), now(b), now(c) ])
// -----1----23->
const stream = awaitPromises(source)
export { awaitPromises } from '@most/core'
chain<A, B>(f: (value: A) => Stream<B>, stream: Stream<A>): Stream<B>
Creates a new stream by applying a stream-returning function to every event value and merging them into the resulting stream.
import { chain, now, observe } from '@motorcycle/stream'
const stream = chain(x => now(x * 2), now(1000))
observe(console.log, stream)
// 2000
export { chain } from '@most/core'
combine<A, B, C>(f: (a: A, b: B) => C, a$: Stream<A>, b$: Stream<B>): Stream<C>
Apply a function to the most recent event from each stream when a new event arrives on any stream.
import { combine, at, merge, observe } from '@motorcycle/stream'
const a$ = merge(at(100, 100), at(200, 200))
const b$ = merge(at(200, 3000), at(250, 100))
const stream = combine(add, a$, b$)
observe(console.log, stream)
// 3200 -- at time 200 as a result of add(200, 3000)
// 350 -- at time 250 as a result of add(200, 100)
export { combine } from '@most/core'
combineArray<A, B, C>(f: (a: A, b: B) => C, streams: [ Stream<A>, Stream<B> ]): Stream<C>
Applies a function to the most recent event from all streams when a new event arrives on any stream.
import { combineArray, now, merge, at, observe } from '@motorcycle/stream'
const a$ = now(1000)
const b$ = now(2000)
const c$ = merge(at(100, 1), at(200, 2))
const sum = (x, y, z) => x + y + z
const stream = combineArray(sum, [ a$, b$, c$ ])
observe(console.log, stream)
// 3001 -- at time 100 as result of sum(1000, 2000, 1)
// 30002 -- at time 200 as result of sum(1000, 2000, 2)
export { combineArray } from '@most/core'
combineObj<Obj extends object>(obj: { [K in keyof Obj]: Stream<Obj[K]> }): Stream<Obj>
Takes an object of streams and returns a Stream of an object.
import { combineObj, now } from '@motorcycle/stream'
const obj = { a: now(1), b: now(2), c: now(3) }
const stream: Stream<{ a: number, b: number, c: number }> = combineObj(obj)
export function combineObj<Obj extends object>(
object: { readonly [K in keyof Obj]: Stream<Obj[K]> }
): Stream<Obj> {
const objectKeys = keys(object)
const sources = values(object) as Array<Stream<Obj[keyof Obj]>>
return combineArray((...values: Array<Obj[keyof Obj]>) => {
const valuesMap = {} as Obj
for (let i = 0; i < length(values); ++i) valuesMap[objectKeys[i]] = values[i]
return valuesMap
}, sources)
}
concatMap<A, B>(f: (value: A) => Stream<B>, stream: Stream<A>): Stream<B>
Creates a new stream by lazily applying a stream-returning function to each event value of a stream concatenating that stream's values to the resulting stream.
import { concatMap, now, observe } from '@motorcycle/stream'
const source = // --104--101--108--108--111|
const f = (x: number) => now(String.fromCharCode(x))
const stream = concatMap(f, source)
observe(console.log, stream)
// h
// e
// l
// l
// o
export { concatMap } from '@most/core'
constant<A>(value: A, stream: Stream<any>): Stream<A>
Replace each event on a stream with a given value.
import { constant, periodic, observe } from '@motorcycle/stream'
const stream = constant(100, periodic(1000))
observe(console.log, stream) // every 1 second logs 100
export { constant } from '@most/core'
continueWith(f: () => Stream<A>, stream: Stream<A>): Stream<A>
Replace the end signal with a new stream returned by f. Note that f must return a stream.
import { continueWith, at } from '@motorcycle/stream'
// ----1------>
const a = at(100, 1)
// ----2------>
const b = at(100, 2)
// ----1----2->
const stream = continueWith(() => b, a)
export { continueWith } from '@most/core'
debounce<A>(ms: number, stream: Stream<A>): Stream<A>
Wait for a burst of events to subside and keep only the last event in the burst.
import { debounce } from '@motorcycle/stream'
const source = // abcd----abcd--->
// -----d-------d->
const stream = debounce(2, source)
export { debounce } from '@most/core'
delay<A>(ms: number, stream: Stream<A>): Stream<A>
Timeshift a stream by a number of milliseconds.
import { delay } from '@motorcycle/stream'
const source = -1--2--3--4--5---->
// ----1--2--3--4--5->
const stream = delay(3, source)
export { delay } from '@most/core'
drain<A>(stream: Stream<A>): Promise<void>
Activates a stream using an default scheduler instance from @most/scheduler
,
returning a promise of completion.
import { drain } from '@motorcycle/stream'
drain(stream)
.then(() => console.log('complete'))
export const drain = <A>(stream: Stream<A>): Promise<void> => runEffects(stream, scheduler)
during<A>(signal: Stream<Stream<any>>, stream: Stream<A>): Stream<A>
Keep events that occur during a time window defined by a higher-order stream.
import { during } from '@motorcycle/stream'
const source = // -1-2-3-4-5-6-7-8->
const signal = // ------s---------->
const s = // --------x-->
// -------4-5-6-7|
const stream = during(signal, source)
export { during } from '@most/core'
empty<A>(): Stream<A>
Create a stream containing no events, which ends immediately.
import { empty, drain } from '@motorcycle/stream'
const stream = empty()
drain(stream)
.then(() => console.log('complete'))
export { empty } from '@most/core'
filter<A>(predicate: (value: A) => boolean, stream: Stream<A>): Stream<A>
Retain only events for which a predicate is truthy.
import { filter, observe } from '@motorcycle/stream'
const source = // ---true---false---true---|
// resulting stream only contains truthy values
const stream = filter(Boolean, source)
observe(console.log, stream)
// true
// true
export { filter } from '@most/core'
fromPromise<A>(promise: Promise<A>): Stream<A>
Create a stream containing a promise's value.
import { fromPromise, observe } from '@motorcycle/stream'
const a = fromPromise(Promise.resolve(1))
const b = fromPromise(Promise.reject(new Error('failure')))
observe(console.log, a)
.then(() => console.log('done'))
.catch(err => console.error(err.message))
// 1
// done
observe(console.log, b)
.then(() => console.log('done'))
.catch(err => console.error(err.message))
// 'failure'
export { fromPromise } from '@most/core'
hold<A>(stream: Stream<A>): Stream<A>
Deliver the most recently seen event to each new observer the instant it begins observing. A held stream is always multicast.
Given an input stream:
stream: -a---b---c---d-\>
observers which begin observing at different times will see:
observer1: -a---b---c---d-\>
observer2: a-b---c---d-\>
observer3: c--d-\>
import { createDocumentDomSource events } from '@motorcycle/dom'
import { drain, hold, map } from @motorcycle/stream'
const doc = createDocumentDomSource(now(document))
// start holding on first subscription
const click$ = hold(map(e => ({ x: e.clientX, y: e.clientY }), events('click', doc)))
// hold the latest event even before the first subscription
drain(click$)
export function hold<A>(stream: Stream<A>): Stream<A> {
return new Hold<A>(stream)
}
class Hold<A> extends MulticastSource<A> implements Stream<A> {
private has: boolean
private value: A
private scheduler: Scheduler
constructor(stream: Stream<A>) {
super(stream)
}
public run(sink: Sink<A>, scheduler: Scheduler) {
this.scheduler = scheduler
return super.run(sink, scheduler)
}
public add(sink: Sink<A>) {
if (this.has) sink.event(this.scheduler.currentTime(), this.value)
return super.add(sink)
}
public event(time: Time, value: A) {
this.has = true
this.value = value
return super.event(time, value)
}
}
last<A>(stream: Stream<A>): Stream<A>
Returns a stream that will only emit it's last value right before ending. If the stream does not end, then no events will ever occur. If the stream ends before emitting a value, no value will emit.
export function last<A>(stream: Stream<A>): Stream<A> {
return new Last(stream)
}
class Last<A> implements Stream<A> {
constructor(private source: Stream<A>) {}
public run(sink: Sink<A>, scheduler: Scheduler): Disposable {
return this.source.run(new LastSink(sink), scheduler)
}
}
class LastSink<A> implements Sink<A> {
private has: boolean = false
private value: A
constructor(private sink: Sink<A>) {}
public event(_: Time, value: A) {
this.has = true
this.value = value
}
public error(time: Time, error: Error) {
this.has = false
this.sink.error(time, error)
}
public end(time: Time) {
if (this.has) this.sink.event(time, this.value)
this.has = false
this.sink.end(time)
}
}
loop<A, B, C>(f: (accumulator: B, value: A) => { seed: B, value: C }, initial: B, stream: Stream<A>): Stream<A>
Accumulate results using a feedback loop that emits one value and feeds back another to be used in the next iteration.
It allows you to maintain and update a "state" while emitting a different value. In contrast, scan feeds back and produces the same value.
import { loop, periodic, filter, observe } from '@motorcycle/stream'
function pairwiseInterval (acc: number): { seed: number, value: [number, number] } {
const seed = acc + 1
const value = [ acc, seed ]
return { seed, value }
}
const stream = loop(pairwiseInterval, periodic(100))
observe(console.log, stream)
// [ 0, 1 ]
// [ 1, 2 ]
// [ 2, 3 ]
// ....
export { loop } from '@most/core'
map<A, B>(f: (value: A) => B, stream: Stream<A>): Stream<B>
Apply a function to each event value of a stream, returning a new stream containing the returned values.
import { map, now, observe } from '@motorcycle/stream'
const stream = map(x => x + 1, now(100))
observe(console.log, stream) // 101
export { map } from '@most/core'
mapList<A, B>(f: (value: A, index: number) => B, sinksList$: Stream<ArrayLike<A>>): Stream<ReadonlyArray<B>>
Applies a function to all Sinks in a list of Sinks.
import { mapList } from '@motorcycle/stream'
function Component(sources) {
const { listOfData$ } = sources
const sinksList$: Stream<ReadonlyArray<Sinks>> = mapList(
data => ChildComponent({ ...sources, data$: now(data) })),
listOfData$,
)
const childViews$: Stream<ReadonlyArray<Stream<VNode>> =
mapList(({ view$ }) => view$, sinksList$)
...
}
export const mapList: MapList = curry2(__mapList)
export type MapList = {
<A, B>(f: (value: A, index: number) => B, list$: Stream<ArrayLike<A>>): Stream<ReadonlyArray<B>>
<A, B>(f: (value: A, index: number) => B): (
list$: Stream<ArrayLike<A>>
) => Stream<ReadonlyArray<B>>
}
function __mapList<A, B>(
f: (value: A) => B,
list$: Stream<ArrayLike<A>>
): Stream<ReadonlyArray<B>> {
return map<ArrayLike<A>, ReadonlyArray<B>>(mapArray(f), list$)
}
merge<A>(a: Stream<A>, b: Stream<A>): Stream<A>
Creates a new Stream containing events from both streams.
import { merge, at, observe } from '@motorcycle/stream'
const stream = merge(at(1000, 'World'), at(100, 'Hello'))
observe(console.log, stream)
// Hello -- at time 100
// World -- at time 1000
export { merge } from '@most/core'
mergeArray<A>(stream: Array<Stream<A>>): Stream<A>
Creates a new stream containing all events of underlying streams.
import { at, mergeArray, observe } from '@motorcycle/stream'
const stream = mergeArray([
at(100, 'foo'),
at(300, 'baz')
at(200, 'bar'),
])
observe(console.log, stream)
// foo -- at time 100
// bar -- at time 200
// baz -- at time 300
export { mergeArray } from '@most/core'
multicast<A>(stream: Stream<A>): Stream<A>
Returns a stream equivalent to the original, but which can be shared more efficiently among multiple consumers.
import { multicast, observe } from '@motorcycle/stream'
// --1--2--3--4--5--6--7--8-->
const source = // ...
// --1--2--3--4--5--6--7--8-->
observe(console.log, source)
setTimeout(() => {
// --------------1--2--3--4--5--6--7--8-->
observe(console.log, source)
}, 5)
const stream = multicast(source)
// --1--2--3--4--5--6--7--8-->
observe(console.log, stream)
setTimeout(() => {
// --------------5--6--7--8-->
observe(console.log, stream)
}, 5)
export { multicast } from '@most/core'
never<A>(): Stream<A>
Create a stream containing no events, which never ends.
import { never, drain } from '@motorcycle/stream'
const stream = never()
drain(stream) // Returns a promise that never fulfills.
export { never } from '@most/core'
now<A>(value: A): Stream<A>
Create a stream containing a single event at time 0
import { now, observe } from '@motorcycle/stream'
const stream = now(1)
observe(console.log, stream)
// 1
export { now } from '@most/core'
observe<A>(f: (value: A) => any, stream: Stream<A>): Promise<void>
Activates a stream, calling a function f
with each event value, and returns
a Promise of completion.
export const observe: Observe = curry2(<A>(f: (value: A) => any, stream: Stream<A>): Promise<
void
> => drain(tap(f, stream)))
export interface Observe {
<A>(f: (value: A) => any, stream: Stream<A>): Promise<void>
<A>(f: (value: A) => any): (stream: Stream<A>) => Promise<void>
}
periodic(ms: number): Stream<void>
Creates a stream that emits ever time 0 and every n
milliseconds after.
import { periodic } from '@motorcycle/stream'
// void----void----void----void---->
const stream = periodic(5)
export { periodic } from '@most/core'
recoverWith<A>((err: Error) => Stream<A>, stream: Stream<A>): Stream<A>
Recover from a stream failure by calling a function to create a new stream.
import { recoverWith } from '@motorcycle/stream'
// -1-2-3X------->
const a = // ...
// -4-5-6-------->
const b = // ...
// -1-2-3-4-5-6-->
const stream = recoverWith(() => b, a)
export { recoverWith } from '@most/core'
runEffects<A>(stream: Stream<A>, scheduler: Scheduler): Promise<void>
Activate an event stream, and consume all its events.
import { runEffects, tap } from '@motorcycle/stream'
import { newDefaultScheduler } from '@most/scheduler'
const logStream = tap(console.log, stream)
runEffects(logStream, newDefaultScheduler())
.then(() => console.log('complete'))
.catch(err => console.error(err))
export { runEffects } from '@most/core'
sample<A, B, C>(f: (a: A, b: B) => C, sampler: Stream<A>, stream: Stream<B>): Stream<C>
For each event in a sampler stream, apply a function to combine it with the most recent event in another stream. The resulting stream will contain the same number of events as the sampler stream.
s1: -1--2--3--4--5->
sampler: -1-----2-----3->
sample(sum, sampler, s1): -2-----5-----8->
s1: -1-----2-----3->
sampler: -1--2--3--4--5->
sample(sum, sampler, s1): -2--3--5--6--8->
export { sample } from '@most/core'
sampleWith<A>(sampler: Stream<any>, stream: Stream<A>): Stream<A>
Given each event occurrence from a sampler stream takes the latest value from the given stream.
import { sampleWith } from '@motorcycle/stream'
function submit(dom: DomSource): Stream<string> {
const button = query('button', dom)
const input = query('input', dom)
const click$ = events('click', button)
const value$ = map(ev => ev.target.value, events('input', input))
return sampleWith(click$, value$)
}
export const sampleWith = sample(takeRight) as SampleWith
export interface SampleWith {
<A>(sampler: Stream<any>, stream: Stream<A>): Stream<A>
<A>(sampler: Stream<any>): (stream: Stream<A>) => Stream<A>
(sampler: Stream<any>): <A>(stream: Stream<A>) => Stream<A>
}
function takeRight<A>(_: any, value: A): A {
return value
}
scan<A, B>(f: (seed: B, value: A) => B, initial: B, stream: Stream<A>): Stream<B>
Incrementally accumulate results, starting with the provided initial value.
import { scan, periodic, observe } from '@motorcycle/stream'
// creates a stream that increments by 1 every 1000ms
const count$ = scan(x => x + 1, 0, periodic(1000))
observe(console.log, count$)
export { scan } from '@most/core'
scheduler (Scheduler)
A shared instance of the default scheduler from @most/scheduler
import { scheduler, now } from '@motorcycle/stream'
const stream = now(1)
const sink = {
event(time: number, value: number) { ... },
error(time: number, err: Error) { ... },
end(time: number) { ... }
}
const disposable = stream.run(sink, scheduler)
// later
disposable.dispose()
export const scheduler: Scheduler = newDefaultScheduler()
since<A>(startSingal: Stream<any>, stream: Stream<A>): Stream<A>
Discard all events in one stream until the first event occurs in another.:
import { since } from '@motorcycle/stream'
const source = // -1-2-3-4-5-6-7-8->
const start = // --------x-------->
// ---------5-6-7-8->
const stream = since(start, source)
export { since } from '@most/core'
skip<A>(quanity: number, stream: Stream<A>): Stream<A>
Skip the first n
number of events.
import { skip } from '@motorcycle/stream'
const source = // -1-2-3-4-5-6-7-8-9-10->
// -----------6-7-8-9-10->
const stream = skip(5, source)
export { skip } from '@most/core'
skipAfter<A>(predicate: (value: A) => boolean, stream: Stream<A>): Stream<A>
Discard all events after the first event for which predicate returns true.
import { skipAfter } from '@motorcycle/stream'
const source = // --1-2-3-4-5-6-7-8->
// --1-2|
const stream = skipAfter(even, source)
export { skipAfter } from '@most/core'
skipRepeats<A>(stream: Stream<A>): Stream<A>
Remove adjacent events that are equal in terms of value equality.
const a = { a: 1 }
const b = Object.assign({}, a)
const c = { c: 2 }
const source = // --a--b--a--c-->
// --a--------c-->
const stream = skipRepeats(source)
observe(console.log, stream)
// { a: 1 }
// { c: 2 }
export const skipRepeats: SkipRepeats = skipRepeatsWith(equals)
export type SkipRepeats = <A>(stream: Stream<A>) => Stream<A>
skipRepeatsWith<A>(predicate: (a: A, b: A) => boolean, stream: Stream<A>): Stream<A>
Remove adjacent repeated events, using the provided equality function to compare adjacent events.:
import { skipRepeatsWith, observe } from '@motorcycle/stream'
const source = // --a-b-B-c-D-d-e->
const equalsIgnoreCase = (a: string, b: string) =>
a.toLowerCase() === b.toLowerCase()
const stream = skipRepeatsWith(equalsIgnoreCase, source)
observe(console.log, stream)
// a
// b
// c
// D
// e
export { skipRepeatsWith } from '@most/core'
skipWhile(predicate: (value: A) => boolean, stream: Stream<A>): Stream<A>
Discard all events until predicate returns false, and keep the rest.
import { skipWhile } from '@motorcycle/stream'
const source = // -2-4-5-6-8->
// ----5-6-8->
const stream = skipWhile(even, source)
export { skipWhile } from '@most/core'
slice<A>(skip: number, take: number, stream: Stream<A>): Stream<A>
Keep only events in a range, where start <= index < end, and index is the ordinal index of an event in stream.
import { slice } from '@most/core'
const source = // --1--2--3--4--5--6--7--8--9--10-->
// --------3--4--5|
const stream = slice(2, 3, source)
export { slice } from '@most/core'
startWith<A>(initialValue: A, stream: Stream<A>): Stream<A>
Prepends an event to a stream at time 0.
import { startWith, at, observe } from '@motorcycle/stream'
const stream = startWith('Hello', at(1000, 'world'))
observe(console.log, stream)
// At time 0 logs 'Hello'
// At time 1000 logs 'world'
export { startWith } from '@most/core'
state<A, B>(f: (acc: A, value: B) => A, seed$: Stream<A>, values$: Stream<B>): Stream<A>
Especially useful when keeping local state that also needs to be updated from an outside source.
import { Stream } from '@motorcycle/types'
import { query, dragOverEvent, dragstartEvent, dropEvent } from '@motorcycle/dom'
import { sample, map, state, mapList } from '@motorcycle/stream'
import { move } from '@typed/prelude'
export function ReorderableList(sources) {
const { list$, dom } = sources
const li = query('li', dom)
const dragOver$ = dragOverEvent(li)
const dragStart$ = dragstartEvent(li)
const drop$ = dropEvent(li)
const reducer$: Stream<(list: Array<string>) => Array<string>> =
sample((to, from) => move(from, to), map(getKey, drop$), map(getKey, dragStart$))
const reorderedList$ = state((x, f) => f(x), list$, reducer$)
// create all of our <li> tags
const childViews$ = mapList(listItem, reorderedList$)
// create our <ul> containgin our <li> tags
const view$ = map(view, childViews$)
return {
view$,
preventDefault$: dragOver$,
}
}
export const state: State = curry3(__state)
function __state<A, B>(
f: (accumulator: B, value: A) => B,
seed$: Stream<B>,
values$: Stream<A>
): Stream<B> {
return switchMap(seed => scan(f, seed, values$), seed$)
}
export interface State {
<A, B>(f: (accumulator: A, value: B) => A, seed$: Stream<A>, values$: Stream<B>): Stream<A>
<A, B>(f: (accumulator: A, value: B) => A, seed$: Stream<A>): (values$: Stream<B>) => Stream<A>
<A, B>(f: (accumulator: A, value: B) => A): {
(seed$: Stream<A>, values$: Stream<B>): Stream<A>
(seed$: Stream<A>): (values$: Stream<B>) => Stream<A>
}
}
switchCombine<A>(streamList$: Stream<Array<Stream<A>>): Stream<ReadonlyArray<A>>
Flattens an array of streams into an array of values. Particularly useful when dealing with a list of children components.
import { switchCombine, mapSinks, map, now } from '@motorcycle/stream'
function Component(sources) {
const { listOfData$ } = sources
const childSinks$ = map(
listOfData => listOfData.map(data => ChildComponent({ ...sources, data$: now(data) }))
listOfData$
)
const childViews$: Stream<ReadonlyArray<VNode>> =
switchCombine(mapSinks(sinks => sinks.view$, childSinks$))
const view$ = map(view, childView$)
return { view$ }
}
function view(childViews: ReadonlyArray<VNode>): VNode {
// ...
}
export function switchCombine<A>(streamList$: Stream<Array<Stream<A>>>): Stream<ReadonlyArray<A>> {
return switchLatest(
map(
streams => (streams.length === 0 ? now([]) : combineArray((...items) => items, streams)),
streamList$
)
)
}
switchLatest<A>(stream: Stream<Stream<A>>): Stream<A>
Given a higher-order stream, return a new stream that adopts the behavior of (ie emits the events of) the most recent inner stream.
import { switchLatest, now } from '@motorcycle/stream'
const A = // -1--2--3----->
const B = // -4--5--6----->
const C = // -7--8--9----->
// --A-----B-----C-------->
const source = // ...
// ---1--2--4--5--7--8--9->
const stream = switchLatest(source)
export { switchLatest } from '@most/core'
switchMap<A, B = A>(f: (a: A) => Stream<B>, s: Stream<A>): Stream<B>
Applies a function, which returns a higher-order stream, to each event value of a stream and returns a new stream that adopts the behavior of (i.e., emits the events of) the most recent inner stream.
import { now, scan, switchMap, observe, skip } from '@motorcycle/stream'
const a$ = now(1)
const b$ = now(2)
const f = (a: number) => scan((x, y) => x + y, a, b$)
const s = skip(1, switchMap(f, a$))
observe(console.log, s) // 3
export const switchMap: SwitchMapArity2 = curry2(function switchMap<A, B = A>(
f: (a: A) => Stream<B>,
s: Stream<A>
): Stream<B> {
return switchLatest(map(f, s))
})
export interface SwitchMapArity2 {
<A, B = A>(f: (a: A) => Stream<B>, s: Stream<A>): Stream<B>
<A, B = A>(f: (a: A) => Stream<B>): SwitchMapArity1<A, B>
}
export interface SwitchMapArity1<A, B = A> {
(s: Stream<A>): Stream<B>
}
switchMerge<A>(streams$: Stream<Array<Stream<A>>): Stream<A>
Merges a list of streams into a single stream containing events from all of the stream. Particularly useful when dealing with a list of child components.
import { switchMerge, mapSinks, now } from '@motorcycle/stream'
function Component(sources) {
const { listOfData$ } = sources
const childSinks$ = map(
listOfData => listOfData.map(data => ChildComponent({ ...sources, data$: now(data) }))),
listOfData$
)
const foo$ = switchMerge(mapSinks(sinks => sinks.foo$, childSinks$))
return { foo$ }
}
export function switchMerge<A>(streams$: Stream<Array<Stream<A>>>): Stream<A> {
return switchLatest(map(mergeArray, streams$))
}
switchSinkOr<Sinks, K extends keyof Sinks>(or$: Sinks[K], sinkName: K, sinks$: Stream<Sinks>): Sinks[K]
Flattens a stream of sinks into a single sink.
import { switchSinkOr, map, now, never } from '@motorcycle/stream'
const switchSinkOrNever = switchSinkOr(never())
function Component(sources) {
const { listOfItems$ } = sources
const sinks$ = map(items => SubComponent({ ...sources, items$: now(items) }), listOfItems$)
const history$ = switchSinkOrNever('history$', sinks$)
return { history$ }
}
export const switchSinkOr: SwitchSinkOr = curry3<any, any, any, any>(function switchSinkOr<
Sinks extends { readonly [key: string]: Stream<any> },
K extends keyof Sinks = keyof Sinks
>(or$: Sinks[K], sinkName: K, sinks$: Stream<Sinks>): Sinks[K] {
return switchLatest(map(sinks => sinks[sinkName] || or$, sinks$))
})
export interface SwitchSinkOr {
<Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
or$: Sinks[K],
sinkName: K,
sinks$: Stream<Sinks>
): Sinks[K]
<Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
or$: Sinks[K],
sinkName: K
): (sinks$: Stream<Sinks>) => Sinks[K]
<Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
or$: Sinks[K]
): (sinkName: K, sinks$: Stream<Sinks>) => Sinks[K]
<Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
or$: Sinks[K]
): (sinkName: K) => (sinks$: Stream<Sinks>) => Sinks[K]
}
take<A>(quantity: number, stream: Stream<A>): Stream<A>
Take at most the first n
events of a stream.
import { take } from '@motorcycle/stream'
const source = // -1-2-3-4-5-6-7-8-9-10->
// -1-2-3|
const stream = take(3, source)
export { take } from '@most/core'
takeWhile<A>(predicate: (value: A) => boolean, stream: Stream<A>): Stream<A>
Keep all events until predicate returns false, and discard the rest.
import { takeWhile } from '@motorcycle/stream'
const source = // -2-4-5-6-8->
// -2-4-|
const stream = takeWhile(even, source)
export { takeWhile } from '@most/core'
tap<A>(f: (value: A) => any, stream: Stream<A>): Stream<A>
Creates a new stream that upon each event performs a side-effect.
import { tap, drain } from '@motorcycle/stream'
const logStream = tap(console.log, stream)
drain(logStream)
export { tap } from '@most/core'
throttle<A>(ms: number, stream: Stream<A>): Stream<A>
Limit the rate of events to at most one per a number of milliseconds.
In contrast to debounce, throttle simply drops events that occur "too often", whereas debounce waits for a "quiet period".
import { throttle } from '@motorcycle/stream'
const source = // -abcd---abcd--->
// -a-c----a-c---->
const stream = throttle(2, source)
export { throttle } from '@most/core'
throwError(err: Error): Stream<never>
Create a stream in the error state. This can be useful for functions that need to return a stream, but need to signal an error.
import { throwError, chain, now } from '@motorcycle/stream'
const f = (x: Maybe<number>): Stream<number> => isNothing(x)
? throwError(new Error('cannot be given Nothing'))
: now(fromJust(x))
const stream = chain(f, maybe$)
export { throwError } from '@most/core'
until<A>(endSignal: Stream<any>, stream: Stream<A>): Stream<A>
Keep all events in one stream until the first event occurs in another.
import { until } from '@motorcycle/stream'
const source = // --1-2-3-4-5-6-7-8->
const endSignal = // ---------z-------->
// --1-2-3-4|
const stream = until(endSingal, source)
export { until } from '@most/core'
withArrayValues<A>(array: Array<A>, stream: Stream<any>): Stream<A>
Creates a new stream by associating event times with values from an array. The resulting stream will end when all array values have been used or when the underlying stream ends.
import { withArrayValues, periodic, observe } from '@motorcycle/stream'
const stream = withArrayValues([ 1, 2, 3 ], periodic(100))
observe(console.log, stream)
// 1 -- time 0
// 2 -- time 100
// 3 -- time 200
export { withArrayValues } from '@most/core'
zip<A, B, C>(f: (a: A, b: B) => C, a$: Stream<A>, b$: Stream<B>): Stream<C>
Applies a function to corresponding pairs of events from the input streams.
import { zip, observe } from '@motorcycle/stream'
const tuple = (x, y) => [x, y]
const a$ = // --1----3-------5------6----|
const b$ = // --------2--3--------4------|
// // --------[3,2]--[5,3]--[6,4]|
const stream = zip(tuple, a$, b$)
observe(console.log, stream)
// [3, 2]
// [5, 3]
// [6, 4]
export { zip } from '@most/core'
zipArray<A, B, C>(f: (a: A, b: B) => C, streams: [Stream<A>, Stream<B>]): Stream<C>
Applies a function to corresponding pairs of events from the input streams.
import { zipArray, observe } from '@motorcycle/stream'
const tuple = (x, y) => [x, y]
const a$ = // --1----3-------5------6----|
const b$ = // --------2--3--------4------|
// // --------[3,2]--[5,3]--[6,4]|
const stream = zipArray(tuple [a$, b$])
observe(console.log, stream)
// [3, 2]
// [5, 3]
// [6, 4]
export { zipArray } from '@most/core'
zipArrayValues<A, B, C>(f: (arrayValue: A, streamValue: Stream<B>) => C, array: Array<A>, stream: Stream<B>): Stream<C>
Creates a new stream by applying a function with a value at increasing index of an array and the latest event value from a stream. The resulting stream will end when all array values have been used or as soon as the underlying stream ends.
import { zipArrayValues, now, concat, observe } from '@motorcycle/stream'
const f = (x, y) => x + y
const array = [ 100, 200 ]
const stream = concat(now(1), now(2))
observe(console.log, zipArrayValues(f, array, stream))
// 101
// 202
export { zipArrayValues } from '@most/core'