npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

channel-ts

v0.1.2

Published

Channels implemented in Typescript using async/await

Downloads

937

Readme

channel-ts

Minimal Async/Await Channels in Typescript

Node.js CI License NPM

Features

  • Simple API with JavaScript's async iterators for receivers
  • Broadcast on MultiReceiverChannel
  • Observe on object and notify manually
  • Mutex

Examples

Multi producer and single consumer

This channel allows multiple sender to send data to a single receiver. The messages starts buffering as soon as the channel is created. No messages are lost.

import { SimpleChannel } from "channel-ts";

const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

// printer waits for the messages on the channel until it closes
async function printer(chan: SimpleChannel<string>) {
    for await(const data of chan) { // use async iterator to receive data
        console.log(`Received: ${data}`);
    }
    console.log("Closed");
}

// sender sends some messages to the channel
async function sender(id: number, chan: SimpleChannel<string>) {
    await delay(id*2000);
    chan.send(`hello from ${id}`); // sends data, boundless channels don't block
    await delay(2800);
    chan.send(`bye from ${id}`); // sends some data again
}

async function main() {
    const chan = new SimpleChannel<string>(); // creates a new simple channel
    const p1 = printer(chan); // uses the channel to print the received data
    const p2 = [0, 1, 2, 3, 4].map(async i => sender(i, chan)); // creates and spawns senders

    await Promise.all(p2); // waits for the sender
    chan.close(); // closes the channel on the server end
    await p1; // waits for the channel to close on the receiver end too
}

main();

Output

Simple Output

Multi producer and multi consumer

This channel allows multiple senders to send data to a multiple receivers. This channel needs explicit creation of receiver. All the messages are broadcast and buffered for receivers to receive. So messages are lost for the duration when the receiver was not created. Creation of receiver is similar to subscription in publisher-subscriber pattern.

import { MultiReceiverChannel, SimpleReceiver } from "channel-ts";

const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

// printer waits for the messages on the channel until it closes
async function printer(id: string, chan: SimpleReceiver<string>) {
    for await(const data of chan) { // use async iterator to receive data
        console.log(`Printer ${id} received: ${data}`);
    }
    console.log(`Printer ${id} closed`);
}

// sender sends some messages to the channel
async function sender(id: number, chan: MultiReceiverChannel<string>) {
    await delay(id*2000);
    chan.send(`hello from ${id}`); // sends data, boundless channels don't block
    await delay(2800);
    chan.send(`bye from ${id}`); // sends some data again
}

async function main() {
    const chan = new MultiReceiverChannel<string>(); // creates a new simple channel
    const r1 = chan.receiver();
    const p1 = printer("A", r1); // uses the channel to print the received data
    const r2 = chan.receiver();
    const p2 = printer("B", r2); // uses the channel to print the received data
    const p3 = [0, 1, 2, 3, 4].map(async i => sender(i, chan)); // create and spawn senders

    await Promise.all(p3); // wait for sender
    chan.removeReceiver(r1); // close channel
    chan.removeReceiver(r2); // close channel
    chan.close();
    await Promise.all([p1, p2]); // wait for channel to close on receiver end
}

main();

Output

Simple Output

Observe

Observe is a function which creates Observable type of JavaScript objects. The object is mutated and notify() is called to send data to receiver. This technique is similar to observer pattern.

Please note that the object is shared between the sender and the receiver, so the receiver while reading the received message might run into a risk of observing further changes made by the sender. To guarantee that a received message is not modified, please make sure that each receiver is not pre-empted for the entire duration of reading the message and the message is not mutated by the receivers.

import { observe, SimpleReceiver, Observable } from "channel-ts";

const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

// printer waits for the messages on the channel until it closes
async function printer(id: string, chan: SimpleReceiver<number[]>) {
    for await(const data of chan) { // use async iterator to receive data
        console.log(`Printer ${id} received: ${JSON.stringify(data)}`);
    }
    console.log(`Printer ${id} closed`);
}

// sender sends some messages to the channel
async function sender(id: number, array: Observable<number[]>) {
    await delay(id*1500);
    array.fill(1); // does some manipulation
    array.notify(); // notifies all the receivers with the value
    await delay(2200);
    array[0] = id * 111; // does some manipulation
    array[1] = 0;
    array[2] = (9-id) * 111;
    array.notify(); // notifies all the receivers with the value
}

async function main() {
    const chan = observe([0, 0, 0]); // creates a new observable, works with objects
    const r1 = chan.receiver();
    const p1 = printer("A", r1); // uses the channel to print received data
    const r2 = chan.receiver();
    const p2 = printer("B", r2); // uses the channel to print received data
    const p3 = [0, 1, 2, 3, 4].map(async i => sender(i, chan)); // create and spawn senders

    await Promise.all(p3); // wait for sender
    chan.removeReceiver(r1); // close channel
    chan.removeReceiver(r2); // close channel
    await Promise.all([p1, p2]); // wait for channel to close on receiver end
}

main();

Output

Simple Output

Mutex

Mutex allows asynchronous program to have synchronization by the application of locking. This is useful when a shared resource is accessed concurrently.

import { Mutex } from "channel-ts";

const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

const main = async () => {
	const task1 = async (): Promise<void> => {
		console.log("task 1: going to start");
		const guard = await mutex.acquire();
		console.log("task 1: mutex acquired");
		await delay(2000);  // assume shared access done by task 1
		console.log("task 1: finished");
		guard.release();
		console.log("task 1: mutex released");
	};
	const task2 = async (): Promise<void> => {
		await delay(200);
		console.log("task 2: going to start");
		const guard = await mutex.acquire();
		console.log("task 2: mutex acquired");
		await delay(1500);  // assume shared access done by task 2
		console.log("task 2: finished");
		guard.release();
		console.log("task 2: mutex released");
	};
	const task3 = async (): Promise<void> => {
		await delay(400);
		console.log("task 3: going to start");
		const guard = await mutex.acquire();
		console.log("task 3: mutex acquired");
		await delay(1800);  // assume shared access done by task 3
		console.log("task 3: finished");
		guard.release();
		console.log("task 3: mutex released");
	};
	// wait for all our tasks
	await Promise.all([task1(), task2(), task3()]);
};

main();

Output

Simple Output

License

MIT license