npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

tream

v0.1.4

Published

Lightweight lazy streams in TypeScript

Downloads

10

Readme

Lightweight lazy streams in TypeScript

License: MIT npm version npm downloads Build Status

Data types

The first abstraction of lazy streams representation called Pull. It's a function which must be called by consumer side to get a next value from the stream.

In response to get value request the producer side must send value or terminate stream. The Pull function has two arguments: push and done, which can be used in this purposes.

Note: Implementation requires that the push and done both must be deferred.

Also the consumer side can cancel getting value using function which returned from Pull.

You cannot operate with streams when pull request is started but you can either wait for push response or cancel request immediatelly by undo operation. In both cases you get the pull to make next requests.

The done response means the end of stream. The ended (or terminated) stream cannot be used to get next values.

Pull<Type>

The Pull function defined as:

interface Pull<Type> {
    (push: Push<Type>, done?: Done): Undo<Type>;
}

In common words Pull type represents source or stream.

Push<Type>

When stream as active (i.e. not ended) it can send values in response of Pull calls by calling Push function. The Push function defined as:

interface Push<Type> {
    (val: Type, pull: Pull<Type>): void;
}

It sends to consumer the value and the next Pull which can be used by consumer to get the next value and so on.

Done

In case of ended (or terminated) stream it must send end-of-stream signal by calling Done function. The Done defined as:

interface Done {
    (): void;
}

Undo<Type>

The Undo function purposed to cancel pull request and defined as:

interface Undo<Type> {
    (): Pull<Type>;
}

The returned Pull function can be used to getting the next values from stream.

None

The value none of type None is used instead of special values like null or undefined in order to represent the lack of values. This technique allows work with special values (null and undefined) in streams like with any other ordinary values without exceptions.

Maybe<Type>

The type Maybe is intended to represent values which may be lack. The Maybe type defined as:

type Maybe<Type> = Type | None;

Simple usage example:

import { Maybe, none, some } from 'tream/stream';

let str: Maybe<string> = none;

if (some(str)) {
    // do something with 'str'
    console.log(str.length);
}

Creating streams

The first thing which we would like to do in order to operate with streams is a creation of streams. Actually there is a much ways to do this.

Special streams

  • empty: The stream which has no values and ends immediately.
  • never: The stream which not sends values and never ends.
  • once(value): The stream which sends single value and ends.
  • repeat(value): The stream which sends same values and never ends.

Generators

Generator is a function which returns some value each time when it called. Also generator may return none to end the stream.

The particular case of generator is an iterator which gets an array and sends it values from first to last.

import { generate, iterate } from 'tream/stream';

const n = generate(1, s => [s * (s + 1), s]); // => 1, 2, 6, 42, 1806, ...

const v = iterate([1, 2, 3, 4, 5]); // => 1, 2, 3, 4, 5

Channels

Channel is a pair of sink and stream which co-exists independently. In other words the sink can be used to send values and the stream will get all of it which is sended through channel. So the producer side doesn't need awaiting get value requests from consumer side.

import { channel, collect } from 'tream/stream';

const [[send, end], src] = channel<number>();

send(1);
send(2);

setTimeout(() => {
  send(3);
  end();
}, 150);

collect(src)(val => {
  console.log(val); // => [1, 2, 3]
});

Forks

Same stream cannot be used multiple times directly but a Fork abstraction makes it possible. The Fork is a function which might been called to clone stream. This function is a result of applying operation fork to the source stream.

import {iterate, fork, collect} from 'tream/stream';

const src = fork(iterate([1, 2, 3]));

const a = src();
const b = src();
const c = src();

collect(a)(val => { console.log(val); }); // => [1, 2, 3]
collect(b)(val => { console.log(val); }); // => [1, 2, 3]
collect(c)(val => { console.log(val); }); // => [1, 2, 3]

Timers

TODO...

Request

Streaming algebra

To operate with streams in monadic style developed so called streaming algebra.

The streaming algebra includes the set of operations which takes a streams and creates new streams as result. Besides streams it also take functions and values which help control behavior of operations.

Map

The map operation is purposed to convert values using some function which take a value and returns new.

import {map, iterate} from 'tream/stream';

map(v => v * v,
  iterate([1, 2, 3])); // => [1, 4, 9]

map(v => `a=${v}`,
  iterate([1, 2, 3])); // => ["a=1", "a=2", "a=3"]

Filter

The operation filter was designed to filter values in stream using some function which take a value and returns boolean.

import {filter, iterate} from 'tream/stream';

filter(v => v > 0,
  iterate([-1, 0, 2, 0.1])); // => [2, 0.1]

Filter Map

The operator filter_map combines filtering and mapping.

import {filter_map, iterate, none} from 'tream/stream';

filter_map(v => v > 0 ? v * 2 : none,
  iterate([-1, 0, 2, 0.1])); // => [4, 0.2]

To remove values from stream the function must return none.

Scan

The operator scan is a form of filter_map with state.

import {scan, iterate, none} from 'tream/stream';

scan(1, (s /* previous state */, v) =>
  [s + 1 /* next state */, v > 0 ? v * s : none],
  iterate([-1, 0, 2, 0.1])); // => [6, 0.4]

Forward

import {repeat, iterate, forward} from 'tream/stream';

forward(() => repeat(4), iterate([1, 2, 3]));
// => [1, 2, 3, 4, 4, 4, 4, ...]

Then

import {repeat, iterate, taken, then} from 'tream/stream';

then(v => taken(v, repeat(`${v}`)), iterate([1, 2, 3]));
// => ["1", "2", "2", "3", "3", "3"]

Each

import {collect, id, map, taken, each} from 'tream/stream';
import {interval} from 'tream/timer';

collect(each(val => map(id(val), interval(15)),
  count(taken(4, interval(50)))));
// => [1, 1, 1, 2, 2, 2, 3, 3, 3]

Take

// TODO

Head

// TODO

Skip

// TODO

TakeN

import {iterate, taken, collect} from 'tream/stream';

collect(taken(3, iterate([1, 2, 3, 4, 5])))
  (val => { console.log(val); }); // => [1, 2, 3]

SkipN

import {iterate, skipn, collect} from 'tream/stream';

collect(skipn(2, iterate([1, 2, 3, 4, 5])))
  (val => { console.log(val); }); // => [3, 4, 5]

Fold

import {iterate, fold, collect} from 'tream/stream';

collect(fold(0, (pre, val) => pre + val,
  iterate([1, 2, 3, 4, 5])))
  (val => { console.log(val); }); // => 15

Collect

import {iterate, collect} from 'tream/stream';

collect(iterate([1, 2, 3]))
  (val => { console.log(val); }); // => [1, 2, 3]

Last

import {iterate, collect} from 'tream/stream';

last(iterate([1, 2, 3]))(val => { console.log(val); });
// => 3

Select

import {id, map, join, taken, collect} from 'tream/stream';
import {interval} from 'tream/timer';

collect(taken(5, join([
  map(id(100), interval(100)),
  map(id(30), interval(30))
])))(val => { console.log(val); });
// => [30, 30, 30, 100, 30]

Combine

import {id, map, combine, taken, collect} from 'tream/stream';
import {interval} from 'tream/timer';

collect(taken(5, combine([
  map(id(100), interval(100)),
  map(id(30), interval(30))
])))(val => { console.log(val); });
// => [[none, 30], [none, 30], [none, 30], [100, 30], [100, 30]]

Join

Combine

import {id, map, join, taken, collect} from 'tream/stream';
import {interval} from 'tream/timer';

collect(taken(5, join([
  map(id(100), interval(100)),
  map(id(30), interval(30))
])))(val => { console.log(val); });
// => [[100, 30], [100, 30]]

Chain

The operator chain concatenates streams from the first to the last. When the first stream is ended, the second begins and so on.

import {once, empty, repeat, iterate, taken, chain, collect} from 'tream/stream';

collect(chain([
    once(1),
    iterate([2, 3]),
    empty,
    taken(3, repeat(4))
]))(val => { console.log(val); }); // => [1, 2, 3, 4, 4, 4]