npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

heliograph

v7.0.0

Published

Tools to support message passing via async iterators

Downloads

282

Readme

heliograph

npm CI Status dependencies Status devDependencies Status

Tools to support message passing via async iterators

Usage

Install heliograph by running:

yarn add heliograph

Sources

makeAsyncIterator({ next, ...otherProperties })

Manually define an async iterator

import { makeAsyncIterator } from 'heliograph'

async function run() {
  let currentCount = 1
  const iterator = makeAsyncIterator({
    async next() {
      if (currentCount <= 3) {
        return { done: false, value: currentCount++ }
      } else {
        return { done: true }
      }
    },

    doSomethingElse() {
      console.log('Hello There')
    }
  })

  for await (const value of iterator) {
    console.log(value)
  }

  iterator.doSomethingElse()
}

run()

.next() is called whenever a consumer wants to pull the next value from the iterator; it must return either { done: false, value: 'VALUE' } or { done: true }.

Other properties that are passed in will be added to the returned iterator.

fromClock(interval)

Creates an async iterator that ticks after every interval.

import { fromClock } from 'heliograph'

async function run() {
  for await(const hour of fromClock(1000 * 60 * 60)) {
    console.log(hour)
  }
}

run()

For example, when set to tick every hour (an interval of 1000 * 60 * 60), ticks will be emitted every hour on the hour starting with the next nearest hour. So, if the current time is 10:30, the first tick will be at 11:00 and then at 12:00.

Values cannot be emitted exactly on time but are guaranteed not to be emitted early. From testing, the longest delay seen has been 10ms.

fromQueue()

Creates an async iterator that waits for and pulls values pushed into a queue

import * as fs from 'fs'
import { promisify } from 'util'
import { fromQueue } from 'heliograph'

const sleep = promisify(setTimeout)

const queue = fromQueue()

async function produce() {
  queue.push(1)
  await sleep(1000)
  queue.push(2)
  await sleep(2000)
  queue.push(3)

  // queue.pushError(new Error('Something went wrong'))
  queue.end()
}

async function consume() {
  for await (const value of queue) {
    console.log(value)
  }
}

produce()
consume()

Push values into the queue using .push(value). Values are buffered until they are pulled out by a consumer. Signal to consumers that no more values will be produced by calling .end(); any values still in the queue will be drained first.

Signal an error condition by calling .push(error). Any values in the queue will be drained first. Subsequent attempts to pull values will throw the given error.

fromEventEmitter(eventEmitter, messageEventName, ?endEventName, ?errorEventName)

Creates an async iterator that queues up events from an EventEmitter.

import EventEmitter from 'events'
import { fromEventEmitter } from 'heliograph'

async function run() {
  const eventEmitter = new EventEmitter()
  const iterator = fromEventEmitter(eventEmitter, 'message', 'end', 'error')

  eventEmitter.emit('message', 1)
  eventEmitter.emit('message', 2)
  eventEmitter.emit('message', 3)
  eventEmitter.emit('end')

  // eventEmitter.emit('error', new Error('Something Wrong'))

  for await (const message of iterator) {
    console.log(message)
  }
}

run()

With semantics similar to fromQueue(), whenever a message event is emitted, its value is enqueued. Optionally, an end event or error event can be provided; when emitted, they are translated into calls to .end() and .pushError(error), respectively.

fromEventTarget(eventTarget, eventName, options?)

Creates an async iterator that queues up events from an EventTarget, commonly used by DOM objects.

import { fromEventTarget } from 'heliograph'

async function run() {
  const button = document.createElement('button')
  const iterator = fromEventEmitter(button, 'click', { passive: true })

  button.click()
  button.click()

  for await (const message of iterator) {
    console.log(message)
  }
}

run()

An optional third argument, when given, will be passed directly to EventTarget.addEventListener().

With semantics similar to fromQueue(), whenever a message event is emitted, its value is enqueued.

fromStream(readableStream)

Creates an async iterator that pulls values from a Readable Stream.

Note that as of Node v11.14.0, stream.Readable instances are async iterators. So, there's no need to convert them. However, many third-party libraries don't yet include this interface change.

import * as fs from 'fs'
import { fromStream } from 'heliograph'

async function run() {
  const stream = fs.createReadStream('some-file')
  for await (const chunk of fromStream(stream)) {
    console.log(chunk)
  }
}

run()

When the stream ends, the async interator will end. When the stream emits an error, the async iterator will throw that error.

fromWebSocket(url)

Creates an async iterator that connects to the given URL and emits incoming messages.

import { fromWebSocket } from 'heliograph'

async function run() {
  const socket = await fromWebSocket('wss://echo.websocket.org/')

  await socket.send('One')
  await socket.send('Two')
  await socket.close()

  for await (const message of socket) {
    console.log(message)
  }
}

run()

The iterator will end when either the client or server disconnects.

Sinks

consume(processItem)(iterator)

Consume the items of the given async iterator

import { pipe, consume } from 'heliograph'

async function* numbers() {
  yield 1
  yield 2
  yield 3
}

async function run() {
  await pipe(
    numbers(),
    consume(n => console.log(n))
  )
}

run()

consume returns a promise that resolves when the iterator ends and rejects if it throws an error.

toArray(iterator)

Collect the items of the given async iterator into an array

import { pipe, toArray } from 'heliograph'

async function* numbers() {
  yield 1
  yield 2
  yield 3
}

async function run() {
  const numbersArray = await pipe(
    numbers(),
    toArray
  )

  console.log(numbersArray)
}

run()

Operators

concat(...iterators)

Return a new async iterator that emits all of the items of the first given iterator, followed by all of the items of the next, etc.

import { concat } from 'heliograph'

async function* numbersA() {
  yield 1
  yield 2
}

async function* numbersB() {
  yield 1
  yield 2
}

async function run() {
  const allNumbers = concat(numbersA(), numbersB())

  for await (const number of allNumbers) {
    console.log(number)
  }
}

run()

filter(include)(iterator)

Return a new async iterator whose items are items from the given iterator that evaluate to true when passed to the given inclusion function.

import { filter } from 'heliograph'

async function* numbers() {
  yield 1
  yield 2
  yield 3
}

async function run() {
  const iterator = filter(n => n % 2 === 0)(numbers())
  for await (const evenNumber of iterator) {
    console.log(evenNumber)
  }
}
run()

fork(iterator, times)

Copy an async iterator so that separate operators can be applied

import { fork, filter } from 'heliograph'

async function* numbers() {
  yield 1
  yield 2
  yield 3
}

async function run() {
  const [numbers1, numbers2] = fork(numbers(), 2)
  const evenNumbers = filter(n => n % 2 === 0)(numbers1)
  const oddNumbers = filter(n => n % 2 !== 0)(numbers2)
}
run()

map(transform)(iterator)

Return a new async iterator whose items are the result of transforming each item of the given async iterator.

import { map } from 'heliograph'

async function* numbers() {
  yield 1
  yield 2
  yield 3
}

async function run() {
  const iterator = map(n => n * 2)(numbers())
  for await (const doubledNumber of iterator) {
    console.log(doubledNumber)
  }
}
run()

merge(...iterators)

Interleave the items from multiple async iterators as they arrive

import { promisify } from 'util'
import { merge } from 'heliograph'

const sleep = promisify(setTimeout)

async function* numbers() {
  yield 1
  yield 2
  yield 3
}

async function* otherNumbers() {
  yield 42
  yield 43
  yield 44
}

async function run() {
  const iterator = merge(numbers(), otherNumbers())
  for await (const number of iterator) {
    console.log(number)
  }
}

run()

observe(processItem)(iterator)

Observe the items of an async iterator and return a new async iterator that yields the same items. Errors are not passed to the observer.

import { pipe, observe } from 'heliograph'

async function* numbers() {
  yield 1
  yield 2
  yield 3
}

const iterator = pipe(
  numbers(),
  observe(n => console.log(n))
)

async function run() {
  for await (const number of iterator) {
    console.log(number)
  }
}

run()

partition(predicate)(iterator)

Break up an iterator's items into a series of groups

import { partition } from 'heliograph'

async function* streamItems() {
  yield { group: 1 }
  yield { group: 1 }
  yield { group: 2 }
  yield { group: 3 }
  yield { group: 3 }
  yield { group: 3 }
}

async function run() {
  const groups = partition((x, y) => x.group !== y.group)(streamItems())

  for await (const group of groups) {
    console.log(group)
  }
}

run()

pipe(iterator, ...transforms)

Pass an iterator through a series of transforms.

import { pipe, map, filter } from 'heliograph'

async function* numbers() {
  yield 1
  yield 2
  yield 3
  yield 4
}

async function run() {
  const iterator = pipe(
    numbers(),
    filter(number => number % 2 === 0),
    map(evenNumber => evenNumber * 3)
  )

  for await (const number of iterator) {
    console.log(number)
  }
}

run()

sample(scheduleIterator)(valueIterator)

Downsample the values of an async iterator based on when and how often a scheduler async iterator emits ticks.

import { fromEventTarget, fromClock, pipe, sample } from 'heliograph'

const div = document.querySelector('div')

const throttledMouseMoves = pipe(
  fromEventEmitter(div, 'mousemove', { passive: true }),
  sample(fromClock(100))
)

for await(const event of throttledMouseMoves) {
  console.log(event)
}

scan(accumulate, initialValue)(iterator)

Combine (or reduce) values one at a time, emitting the current accumulated value each time.

import { scan } from 'heliograph'

async function* numbers() {
  yield 1
  yield 2
  yield 3
  yield 4
  yield 5
}

async function run() {
  const runningTotal = scan((acc, n) => acc + n, 0)(numbers())

  for await (const total of runningTotal) {
    console.log(total)
  }
}

run()

scanWindow(windowSize, transform)(iterator)

Compute values from a rolling window

import { scanWindow } from 'heliograph'

async function* numbers() {
  yield 1
  yield 2
  yield 3
  yield 4
  yield 5
}

async function run() {
  const addedNumbers = scanWindow(3, (x, y, z) => x + y + z)(numbers())

  for await (const addedNumber of addedNumbers) {
    console.log(addedNumber)
  }
}

run()

Note that for a given windowSize, the first windowSize - 1 items emitted will be null.

zip(iterator1, iterator2)

Pair up items from two async iterators

import { promisify } from 'util'
import { zip } from 'heliograph'

const sleep = promisify(setTimeout)

async function* numbers() {
  yield 1
  yield 2
  yield 3
}

async function* letters() {
  yield 'a'
  yield 'b'
  yield 'c'
}

async function run() {
  const iterator = zip(numbers(), letters())
  for await (const [number, letter] of iterator) {
    console.log(number, leter)
  }
}

run()