tream
v0.1.4
Published
Lightweight lazy streams in TypeScript
Downloads
13
Maintainers
Readme
Lightweight lazy streams in TypeScript
Data types
The first abstraction of lazy streams representation called Pull
.
It's a function which must be called by consumer side to get a next value from the stream.
In response to get value request the producer side must send value or terminate stream.
The Pull
function has two arguments: push
and done
, which can be used in this purposes.
Note: Implementation requires that the push
and done
both must be deferred.
Also the consumer side can cancel getting value using function which returned from Pull
.
You cannot operate with streams when pull
request is started but you can either wait for push
response or cancel request immediatelly by undo
operation.
In both cases you get the pull
to make next requests.
The done
response means the end of stream. The ended (or terminated) stream cannot be used to get next values.
Pull<Type>
The Pull
function defined as:
interface Pull<Type> {
(push: Push<Type>, done?: Done): Undo<Type>;
}
In common words Pull
type represents source or stream.
Push<Type>
When stream as active (i.e. not ended) it can send values in response of Pull
calls by calling Push
function.
The Push
function defined as:
interface Push<Type> {
(val: Type, pull: Pull<Type>): void;
}
It sends to consumer the value and the next Pull
which can be used by consumer to get the next value and so on.
Done
In case of ended (or terminated) stream it must send end-of-stream signal by calling Done
function.
The Done
defined as:
interface Done {
(): void;
}
Undo<Type>
The Undo
function purposed to cancel pull request and defined as:
interface Undo<Type> {
(): Pull<Type>;
}
The returned Pull
function can be used to getting the next values from stream.
None
The value none
of type None
is used instead of special values like null
or undefined
in order to represent the lack of values. This technique allows work with special values (null
and undefined
) in streams like with any other ordinary values without exceptions.
Maybe<Type>
The type Maybe
is intended to represent values which may be lack. The Maybe
type defined as:
type Maybe<Type> = Type | None;
Simple usage example:
import { Maybe, none, some } from 'tream/stream';
let str: Maybe<string> = none;
if (some(str)) {
// do something with 'str'
console.log(str.length);
}
Creating streams
The first thing which we would like to do in order to operate with streams is a creation of streams. Actually there is a much ways to do this.
Special streams
empty
: The stream which has no values and ends immediately.never
: The stream which not sends values and never ends.once(value)
: The stream which sends single value and ends.repeat(value)
: The stream which sends same values and never ends.
Generators
Generator is a function which returns some value each time when it called.
Also generator may return none
to end the stream.
The particular case of generator is an iterator which gets an array and sends it values from first to last.
import { generate, iterate } from 'tream/stream';
const n = generate(1, s => [s * (s + 1), s]); // => 1, 2, 6, 42, 1806, ...
const v = iterate([1, 2, 3, 4, 5]); // => 1, 2, 3, 4, 5
Channels
Channel is a pair of sink and stream which co-exists independently. In other words the sink can be used to send values and the stream will get all of it which is sended through channel. So the producer side doesn't need awaiting get value requests from consumer side.
import { channel, collect } from 'tream/stream';
const [[send, end], src] = channel<number>();
send(1);
send(2);
setTimeout(() => {
send(3);
end();
}, 150);
collect(src)(val => {
console.log(val); // => [1, 2, 3]
});
Forks
Same stream cannot be used multiple times directly but a Fork
abstraction makes it possible.
The Fork
is a function which might been called to clone stream.
This function is a result of applying operation fork
to the source stream.
import {iterate, fork, collect} from 'tream/stream';
const src = fork(iterate([1, 2, 3]));
const a = src();
const b = src();
const c = src();
collect(a)(val => { console.log(val); }); // => [1, 2, 3]
collect(b)(val => { console.log(val); }); // => [1, 2, 3]
collect(c)(val => { console.log(val); }); // => [1, 2, 3]
Timers
TODO...
Request
Streaming algebra
To operate with streams in monadic style developed so called streaming algebra.
The streaming algebra includes the set of operations which takes a streams and creates new streams as result. Besides streams it also take functions and values which help control behavior of operations.
Map
The map
operation is purposed to convert values using some function which take a value and returns new.
import {map, iterate} from 'tream/stream';
map(v => v * v,
iterate([1, 2, 3])); // => [1, 4, 9]
map(v => `a=${v}`,
iterate([1, 2, 3])); // => ["a=1", "a=2", "a=3"]
Filter
The operation filter
was designed to filter values in stream using some function which take a value and returns boolean.
import {filter, iterate} from 'tream/stream';
filter(v => v > 0,
iterate([-1, 0, 2, 0.1])); // => [2, 0.1]
Filter Map
The operator filter_map
combines filtering and mapping.
import {filter_map, iterate, none} from 'tream/stream';
filter_map(v => v > 0 ? v * 2 : none,
iterate([-1, 0, 2, 0.1])); // => [4, 0.2]
To remove values from stream the function must return none
.
Scan
The operator scan
is a form of filter_map
with state.
import {scan, iterate, none} from 'tream/stream';
scan(1, (s /* previous state */, v) =>
[s + 1 /* next state */, v > 0 ? v * s : none],
iterate([-1, 0, 2, 0.1])); // => [6, 0.4]
Forward
import {repeat, iterate, forward} from 'tream/stream';
forward(() => repeat(4), iterate([1, 2, 3]));
// => [1, 2, 3, 4, 4, 4, 4, ...]
Then
import {repeat, iterate, taken, then} from 'tream/stream';
then(v => taken(v, repeat(`${v}`)), iterate([1, 2, 3]));
// => ["1", "2", "2", "3", "3", "3"]
Each
import {collect, id, map, taken, each} from 'tream/stream';
import {interval} from 'tream/timer';
collect(each(val => map(id(val), interval(15)),
count(taken(4, interval(50)))));
// => [1, 1, 1, 2, 2, 2, 3, 3, 3]
Take
// TODO
Head
// TODO
Skip
// TODO
TakeN
import {iterate, taken, collect} from 'tream/stream';
collect(taken(3, iterate([1, 2, 3, 4, 5])))
(val => { console.log(val); }); // => [1, 2, 3]
SkipN
import {iterate, skipn, collect} from 'tream/stream';
collect(skipn(2, iterate([1, 2, 3, 4, 5])))
(val => { console.log(val); }); // => [3, 4, 5]
Fold
import {iterate, fold, collect} from 'tream/stream';
collect(fold(0, (pre, val) => pre + val,
iterate([1, 2, 3, 4, 5])))
(val => { console.log(val); }); // => 15
Collect
import {iterate, collect} from 'tream/stream';
collect(iterate([1, 2, 3]))
(val => { console.log(val); }); // => [1, 2, 3]
Last
import {iterate, collect} from 'tream/stream';
last(iterate([1, 2, 3]))(val => { console.log(val); });
// => 3
Select
import {id, map, join, taken, collect} from 'tream/stream';
import {interval} from 'tream/timer';
collect(taken(5, join([
map(id(100), interval(100)),
map(id(30), interval(30))
])))(val => { console.log(val); });
// => [30, 30, 30, 100, 30]
Combine
import {id, map, combine, taken, collect} from 'tream/stream';
import {interval} from 'tream/timer';
collect(taken(5, combine([
map(id(100), interval(100)),
map(id(30), interval(30))
])))(val => { console.log(val); });
// => [[none, 30], [none, 30], [none, 30], [100, 30], [100, 30]]
Join
Combine
import {id, map, join, taken, collect} from 'tream/stream';
import {interval} from 'tream/timer';
collect(taken(5, join([
map(id(100), interval(100)),
map(id(30), interval(30))
])))(val => { console.log(val); });
// => [[100, 30], [100, 30]]
Chain
The operator chain
concatenates streams from the first to the last.
When the first stream is ended, the second begins and so on.
import {once, empty, repeat, iterate, taken, chain, collect} from 'tream/stream';
collect(chain([
once(1),
iterate([2, 3]),
empty,
taken(3, repeat(4))
]))(val => { console.log(val); }); // => [1, 2, 3, 4, 4, 4]