npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

reactive-channel

v3.1.2

Published

A simple yet powerful abstraction that enables communication between asynchronous tasks.

Downloads

66

Readme

reactive-channel

A simple yet powerful abstraction that enables communication between asynchronous tasks.

A Channel is an abstraction that enables communication between asynchronous tasks. A channel exposes two objects: tx and rx, which respectively provide methods to transmit and receive data.

Channels can be used and combined in a multitude of ways. The simplest way to use a channel is to create a simplex communication: one task transmits data, another consumes it. A full-duplex communication can be achieved by creating two channels and exchanging the rx and tx objects between two tasks.

It's also possible to create a Multiple Producers Single Consumer (mpsc) scenario by sharing a single channel among several tasks.

NPM Package

npm install reactive-channel

Documentation

Highlights

Channel<T> provides the following properties:

  • tx, an object that exposes methods and properties to interact and query the transmitting end of the channel;
  • rx, an object that exposes methods and properties to interact and query the receiving end of the channel;
  • buffer, a readonly queue that represents the internal buffer containing data waiting to be consumed by the receiving end.

The tx object is a ChannelTx<T>, which in turn exposes the following:

  • capacity, the total capacity of the channel, configured at creation time;
  • send(...), to send data to the receiving end. If the receiving end is not waiting for data, this method will enqueue the message in the channel buffer;
  • sendWait(...), similar to send(...), but let the caller wait for the receiving end to consume the data;
  • close(), to close the channel. This will also reject all pending receiving Promises;
  • closed$, a boolean store that can be used to check if the channel has been closed, i.e. the close() method has been called by either the receiving or transmission end of the channel;
  • canWrite$, a boolean store that can be used to check if the channel is capable of receiving data, that is its buffer is not full and the channel is not closed;
  • availableOutboxSlots$, a store that can be used to check how much space is available in the transmission buffer.

The rx object is a ChannelRx<T>, which in turn exposes the following:

  • capacity, the total capacity of the channel, configured at creation time;
  • recv(...), to receive data. If the transmission buffer contains some data the returned promise resolves immediately, otherwise it will resolve once the transmitting end calls send(...) or sendWait(...);
  • iter()/[Symbol.asyncIterator](), to consume the channel buffer using an async iterator;
  • close(), to close the channel. This will also reject all pending receiving Promises;
  • closed$, a store store that can be used to check if the channel has been closed, i.e. the close() method has been called by either the receiving or transmission end of the channel;
  • canRead$, a boolean store that can be used to check if the channel can be consumed, that is its buffer is not empty and the channel is not closed;
  • filledInboxSlots$, a store that can be used to check how much space is filled in the transmission buffer;
  • pendingRecvPromises$, a store that can be used to check how many recv calls are pending.

This library provides a function called makeChannel to create Channel<T> objects.

Examples

Basics:

import {makeChannel} from 'reactive-channel';

// Create a channel that can be used to pass strings.
const {rx, tx} = makeChannel<string>();
rx.recv().then((message) => console.log(message));
// ...
tx.send('hello!'); // this will resolve the above promise, printing "hello!"

Reactivity:

import {makeChannel} from 'reactive-channel';

// Create a channel that can be used to pass strings.
const {rx, tx} = makeChannel<string>();
tx.availableOutboxSlots$
	// this will immediately print 1024, which is the default channel capacity.
	.subscribe((n) => console.log(`Available slots: ${n}`));

// this will trigger the above subscription, printing 1023,
// because there is no pending `recv` that can consume the data.
tx.send('hello!');
// this will also trigger the subscription, printing 1024 again,
// because `recv()` will consume the channel buffer, emptying it.
rx.recv();

Simple job queue:

import {makeChannel} from 'reactive-channel';

// Create a channel that can be used to pass strings.
const {rx, tx} = makeChannel<{email: string; content: string}>();

async function processEmailQueue() {
	while (!rx.closed$.content()) {
		try {
			const {email, content} = await rx.recv();
			console.log(`Now sending an email to ${email}`);
			// ... your favorite email dispatcher here.
		} catch (err) {
			console.error('unable to send because of the following error:', err);
		}
	}
}

// Somewhere in the startup of your application.
processEmailQueue();

// Somewhere where an email should be sent, e.g. when a user subscribes to your newsletter.
tx.send({
	email: user.email,
	content: 'Subscription activated, you will receive our best deals and coupons!',
});

Abort a blocking recv(...)

You can abort a recv using a simple timeout:

import {makeChannel} from 'reactive-channel';

// Create a channel that can be used to pass strings.
const {rx, tx} = makeChannel<{email: string; content: string}>();
// You can abort a `recv` using a simple timeout:
try {
	const {email, content} = await rx.recv({signal: AbortSignal.timeout(5000)});
} catch (err) {
	if (err instanceof DOMException && err.name === 'TimeoutError') {
		console.warn('No data to consume, timeout expired');
	}
	// ...
}

Or you can abort a recv using some custom logic, e.g. when the user clicks a button:

import {makeChannel} from 'reactive-channel';

// Create a channel that can be used to pass strings.
const {rx, tx} = makeChannel<{email: string; content: string}>();

const abortController = new AbortController();
// Assuming you have a `abortButton` variable
abortButton.addEventListener(
	'click',
	() => abortController.abort(new Error('Action cancelled by the user'))
);

// You can abort a `recv` using an abort signal:
try {
	const {email, content} = await rx.recv({signal: abortController.signal});
} catch (err) {
	// ...
}