npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@shimaore/lake

v3.1.0

Published

functional async generators toolbox

Downloads

1,482

Readme

Functional async generators

The base concept is that async iterators are streams, and that we should provide tools to handle them.

This is similar to the concepts enabled by most.js or rxjs, but without a proprietary loop: the scheduling is entirely done by the JavaScript engine using async generators.

Since all operations are async, the streams are non-blocking.

Compatibility with Node.js streams

These streams are compatible with Node.js' Readable Stream API:

import {from} from '@shimaore/lake'
from( process.stdin )

and with Node.js' Writeable Stream API, using pipeline or Readable.from:

import {pipeline} from 'stream'
import fs from 'fs'
import {bigNaturals} from '@shimaore/lake'
pipeline(
  bigNaturals().skip(1).map( x => x*x ).first(1000).map( x => `${x}\n` ),
  // This will create a file containing the first thousand squares
  fs.createWriteStream('thousand-squares.txt'),
  console.error
)

Fluent API, Merging streams

Some basic data streaming example, first one building integers (positive and negative) from naturals (positive only):

import {merge} from '@shimaore/lake'

Merging streams is done in approximate round-robin fashion; this prevents one stream from eagerly blocking other stream.

const BigIntegers = merge(
  // will enumerate 1, 2, 3, 4, …
  bigNaturals().skip(1),
  // will enumerate 0, -1, -2, -3, …
  bigNaturals().map( x => -x ),
)

const firstTwentyIntegers = from(
  [0,-1,1,-2,2,-3,3,-4,4,-5,5,-6,6,-7,7,-8,8,-9,9,-10].map(BigInt)
)

import test from 'ava'
test('Compute first twenty integers', async t => {
  t.true(await

    BigIntegers
    .first(20)

    .equals( firstTwentyIntegers )

  )
})

Notice how first and equals are methods on the stream.

Building the sum of the first 1000 squares in constant space:

const Sum = (a,v) => a+v

test('Compute the sum of the first 1000 squares', async t => {
  t.is( 333_833_500n, await

    bigNaturals()
    .skip(1)
    .map( x => x*x )
    .first(1000n)
    .reduce(Sum, 0n)

  )
})

Concurrent Map

Use Concurrent Map to process async operations concurrently while controlling the maximum number of concurrent executions.

import {sleep} from '@shimaore/lake'

const squareMicroService = async (x) => {
  await sleep(1)
  return x*x
}

test('Compute the sum of the first 1000 squares using concurrentMap', async t => {
  console.time('with concurrentMap')
  t.is( 333_833_500n, await

    bigNaturals()
    .skip(1)
    .concurrentMap(100,squareMicroService)
    .first(1000n)
    .reduce(Sum, 0n)

  )
  console.timeEnd('with concurrentMap')
})

test('Compute the sum of the first 1000 squares without concurrentMap', async t => {
  console.time('without concurrentMap')
  t.is( 333_833_500n, await

    bigNaturals()
    .skip(1)
    .map( squareMicroService )
    .first(1000n)
    .reduce(Sum, 0n)

  )
  console.timeEnd('without concurrentMap')
})

Tests

The examples in this README file are used as tests.

yarn install && yarn test

API

Lake(iterable|asynciterable)

An Async Iterator and Async Iterable that behaves as a Readable Stream and supports Monadic Event Stream patterns, using only native operators.

It can be used as a proxy for the original (sync or async) iterable, turning it into an async iterator.

It also is an async iterable, which means it can be turned back into a stream.

.map(transform)

Applies a (sync or async) transformation function to each element in the stream.

.concurrentMap(atmost,transform)

Applies an async transformation function to each element in the stream, running at most atmost instances concurrently.

The ordering of the elements is not guaranteed, since it will depend on the evaluation time of the async transform function.

.constant(value)

Transform this stream into a stream that produces the value for each element the original stream produces.

.filter(fun)

Only forward stream values for which the (sync or async) fun function returns (a Promise for) a truthy value.

.skipRepeats()

.skipRepeats(isEqual)

When successive values are identical, only the first one is propagated.

Optionally, a (sync or async) comparison function might be provided to compare using a different criteria than ===; it should return true if its two arguments are considered identical.

.first(n)

.take(n)

Only propagates the first n elements in the stream.

BigInt are used internally; n might be a integer or a BigInt.

.skip(n)

Skips the first n elements in the stream and only start propagating after the n-th element has been received.

BigInt are used internally; n might be a integer or a BigInt.

.delay(timeout)

Insert a delay of timeout milliseconds between each received element.

.startWith(otherStream)

Concatenates the otherStream with this stream.

.continueWith(otherStream)

Contatenates this stream then the otherStream.

.forEach(func)

.tap(func)

Executes the (sync or async) func function for each element in the stream. The stream is unmodified but might be delayed by the execution time of func. The stream will fail if func fails or rejects.

.ap(funs)

Apply a stream of (sync or async) functions to this stream.

Elements of this stream are dropped until funs provides a function.

.switchLatest()

Outputs the data from the latest stream in this stream-of-streams.

.reduce(reducer,accumulator)

Using a (sync or async) reducer function which accepts the latest value of the accumulator and a new value, returns the final value of the accumulator.

.run()

Consumes this stream, throwing away values; returns a Promise.

The Promise will reject if the stream fails for any reason.

.last()

Consumes this stream, returning its last value (or undefined if no value was produced) inside a Promise.

test('Retrieve the 14th BigInt', async t => {
  t.is(14n, await

    bigNaturals()
    .skip(1) // skip 0
    .first(14)
    .last()

  )
})

.equals(otherStream)

.equals(otherStream,isEqual)

Consumes two streams; returns a Promise that is true if both stream yield the same values.

The optional isEqual (sync or async) comparison function should return true if its two arguments are considered equal.

empty()

Builds a stream that finishes immediately.

always(v)

Builds a stream that continously generates the value.

bigNaturals()

Builds a stream that enumerates all positive BigInt values, starting at 0 and incrementing by 1 at each step.

periodic(period)

periodic(period,value)

Builds a stream that generates a new element with the provided value (or undefined if no value is provided) and generates similarly every period milliseconds thereafter.

now(v)

Builds a stream that only produces once, with the value provided.

throwError(e)

Builds a stream that stops immediately with the provided error.

from(iterable|asynciterable)

From any iterable, generates a new "Lake" stream (described above).

The iterable might be an Array, an iterator, an AsyncIterator, a ReadableStream, …

Use Node.js' events.on(emitter,eventName) to create an AsyncIterator from an event-emitter.

equals(streamA,streamB)

equals(streamA,streamB,isEqual)

From two Lake instances or two iterators, returns a boolean Promise indicating whether the two suites are identical.

The optional (sync or async) isEqual function should return true to indicate that its two arguments are considered identical.

import {combine} from '@shimaore/lake'

test('If you take the difference between consecutive square numbers, you generate the odd numbers.', async t => {
  const squaresFromZero = bigNaturals().map( x => x*x )
  const squaresFromOne = bigNaturals().skip(1).map( x => x*x )
  const differences = combine(squaresFromZero,squaresFromOne).map( ([a,b]) => b-a )
  const oddNaturals = bigNaturals().map( x => 2n*x+1n )
  t.true(await

    oddNaturals
    .take(10000)
    .equals(differences.take(10000))

  )
})