npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@fine-js/channels

v0.0.2

Published

Bits of Clojure's `core.async` ported to JS

Downloads

26

Readme

Channels.js

Library for asynchronous in-process communication based on Clojure's excellent core.async. Read through introductory blog post or watch Strange Loop presentation for more details and inspiration.

It's queues, essentially, except there are Promises on top because JavaScript.

npm install @fine-js/channels

Supported Platforms

Library's source code is plain JavaScript with no runtime dependencies that works in Node.js and modern browsers.

Node.js

After installing a dependency, require the module as usual:

const {chan} = require('@fine-js/channels')
console.dir(chan())

Browsers

A build for browsers is available as a single-file download and via UNPKG (docs). Here is an example of embedding version 0.0.2 in particular:

<script src="https://unpkg.com/@fine-js/[email protected]/browser.js"
        integrity="sha384-aLUgfMcOf6P0qxZ4k0e084VdlxfruOqU0zXhYBSZS28Y07u7Zoo1NBbYnNpNynck"
        crossorigin="anonymous"></script>

<script>
  const {chan} = window.finejs.channels
  console.dir(chan())
</script>

Alternatively, you can build your own version with Browserify or any other bundler, as well as serve require.resolve('@fine-js/channels/browser.js') from your server directly.

Overview

TODO…

Terminology

Channel is a plain object with two operations:

  • put(ch, val) puts a value onto the channel
  • take(ch) takes a value from the channel

Port is a description of either operation:

  • ch is a port representing taking a value from channel ch
  • [ch, val] is a port representing putting val onto the channel ch

Each channel can be backed by a buffer of given capacity.

Parking is creating and returning a promise that will possibly never resolve, or resolve at indefinite time in the future. This is in contrast to immediately which is used in this document to roughly mean guaranteed resolution in the near future (often, after a couple of microtask ticks).

API

Basics
  • chan() creates a channel
  • put() puts a value onto the channel
  • take() takes a value from the channel
  • close() closes a channel
Core
  • alt() runs at most one of the given operations returning result of handler
  • alts() runs at most one of the given ports
  • poll() takes a value from channel, when immediately possible
  • offer() puts a value onto channel, when immediately possible
  • timeout() creates a self-closing channel
Buffers
JavaScript Extras

chan()

Creates a channel with an optional buffer:

  • chan() unbuffered channel
  • chan(capacity) blocking buffer of given capacity
  • chan(buffer) passed buffer
chan()
chan(10)
chan(sliding(1))

put()

Puts a value onto the channel. null or undefined are not allowed. Will park if no buffer space is available. Resolves to true unless channel is already closed.

put(ch, 42)
ch.put(42)

take()

Takes a value from the channel. Will park if nothing is available. Resolves to a value taken or null if channel is already closed.

take(ch)
ch.take()

close()

Closes given channel. The channel will no longer accept any puts (they will be ignored). Data in the channel remains available for taking, until exhausted, after which takes will return null.

If there are any pending takes, they will be dispatched with null. Any parked puts will remain so until a taker releases them. Closing a closed channel is a no-op. Returns null.

close(ch)
ch.close()

timeout()

Returns a channel that will close after given number of millisecods.

timeout(100)

alt()

Executes one of several channel operations via alts() and passess its result to a handler. Accepts a list of operations follwed by corresponding handler and options to pass to alts(), where:

  • each operation is either
    • single channel to take from
    • array of ports
  • each handler is either
    • a function accepting (val, channel)
    • anything else

Each channel may only appear once.

Resolves to handler's result if it's a function, handler itself otherwise.

alt([
  a,
  (val) => console.log('read %o from a', val),

  [b, c],
  (val, ch) => console.log('read %o from %o', val, ch),

  [[ch, 42]],
  'wrote 42 to ch',
])

alt([
  [[worker1, task], [worker2, task], [worker3, task]],
  `task ${task.id} queued`,
], {default: () => `try again later in ${waittime()} seconds`})

alt([
  results,
  (val) => ({status: 200, text: val}),

  timeout(150),
  {status: 504},
], {priority: true})

alts()

Completes at most one of several ports.

Unless the priority option is true, if more than one port is ready, a non-deterministic choice will be made. If no operation is ready and a default value is supplied, [default-val, alts.default] will be returned. Otherwise alts will park until the first operation to become ready completes.

Resolves to [val port] of the completed operation, where val is the value taken for takes, and a boolean (true unless already closed, as per ch.put) for puts.

alts([ch1, ch2, ch3])

alts([
  [worker1, task],
  [worker2, task],
  [worker3, task],
], {default: 'try again later'})

alts([
  results,
  timeout(150),
], {priority: true})

poll()

Takes a value from the channel, when immediately possible. Resolves a value taken if successful, null otherwise.

poll(ch)

offer()

Puts a value into the channel, when immediately possible. Resolves to true if offer succeeds.

offer(ch, 42)

unbuffered()

Giving this buffer to a channel will make it act as a synchronisation point between readers and writers: puts are parked until the value is taken by consumer.

This is a buffer channel will get when you call chan() or chan(0).

buffer()

Creates a blocking buffer:

  • buffer() with capacity 1
  • buffer(capacity) with given capacity.

When full, puts will be parked.

ch(16)
ch(buffer(16))

dropping()

Creates a dropping non-blocking buffer:

  • dropping() with capacity 1
  • dropping(capacity) with given capacity

When full, puts will complete puts, but values discarded.

ch(dropping(3000))

sliding()

Creates a sliding non-blocking buffer:

  • sliding() with capacity 1
  • sliding(capacity) with given capacity

When full, puts will complete puts, thier values buffered, but oldest elements in buffer will be dropped.

ch(sliding(1))

ch[Symbol.asyncIterator]()

Returns Async Iterator over values being put onto the channel. Not meant to be used directly, instead:

// Simply consuming everything until the channel closes.
for await (const message of ch)
  console.log(message)

// Note that the loop's body will NOT see every single message being put onto the channel,
// since, effectively, this calls `take()` on each iteration and somewhat equivalent to:
while (!closed(ch))
  console.log(await ch.take())

// Reading until we see a sentinel value
for await (const byte of ch)
  if (byte === 0)
    break;

Read more over at MDN: