@ggoodman/channels
v0.2.0
Published
An implementation of a closable, tailable `Channel` primitive in idiomatic JavaScript that can be used to decouple producers and consumers in concurrent code.
Downloads
14
Maintainers
Readme
Channels
An implementation of a closable, tailable Channel
primitive in idiomatic JavaScript that can be used to decouple producers and consumers in concurrent code.
Features:
- Designed for JavaScript and its 'concurrency' model.
- Uses modern JavaScript features like synchronous and asynchronous iterators.
- Supports closing channels and racing between channels.
Installation
npm install --save @ggoodman/channels
or
yarn add @ggoodman/channels
Example
import { channel, Channel } from './';
const timeout = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
const createBall = () => ({ hits: 0, status: '' });
type Ball = ReturnType<typeof createBall>;
const wiff = channel<Ball>();
const waff = channel<Ball>();
const createBat = async (inbound: Channel<Ball>, outbound: Channel<Ball>) => {
for await (const ball of inbound) {
ball.hits++;
ball.status = ball.status === 'wiff!' ? 'waff!' : 'wiff!';
console.log(`🎾 Ball hit ${ball.hits} time(s), ${ball.status}`);
await outbound.put(ball); // smash the ball back
await timeout(500); // assume it's going to take a bit to hit the ball
}
console.log(`🛑 OK, fun's over`);
};
process.on('SIGINT', () => {
// Important to close BOTH sides because if the timing is unlucky, you read from
// the open side and attempt to write to the closed side.
wiff.close();
waff.close();
});
createBat(waff, wiff); // create a bat that will wiff waffs
createBat(wiff, waff); // create a bat that will waff wiffs
waff.put(createBall());
Concepts
Writing
Writing to a channel is a blocking operation. That is, it returns a Promise<boolean>
. Code that is writing to a channel should typically block on the write using something like:
await channel.put(message);
The returned Promise will only resolve at the earlier of:
- The target Channel is closed. In that case, the returned Promise will resolve to
false
. - A message is consumed from the target channel. In that case the Promise will resolve to
true
. - The target Channel is buffered and its buffer is not yet full.
Buffered vs unbuffered
However, sometimes you may want to be able to write a certain number of messages to a Channel before any readers start consuming messages. In a single-threaded environment, like JavaScript, this could easily lead to a deadlocked program; execution will never proceed to the logic consuming from the channel since it is blocked on writing to it.
To get around this, a Channel may be created with a buffer size. As many messages as the buffer size can be written to the Channel before calls to put
will block on a read.
Closing channels
A Channel can be closed, signalling that further calls to put
should be treated as exceptions. When a channel is closed, all outstanding Promises for pending writes will resolve with the value false
.
Note: This means that buffered messages that had been accepted but not consumed will be lost.
All outstanding, and new iterations consuming the channel will also immediately complete. This also means that iterations racing multiple channels will also immediately stop iteration.
Blocking reads from a channel (such as those waiting for a value to be written to the channel) will resolve to the special closed
signal.
Iterable Promises
Blocking reads from Channel.read
or select
return objects having a IterablePromise
type. This object can act both as a Promise
or as an AsyncIterable
.
If you are interested in reading a single message from a Channel, you will probably treat these objects as Promises. Note that since a Channel might already be closed, or might become closed after starting to block on an operation, the Promise
may resolve to a special closed
signal.
const message = await channel.take();
if (message === closed) {
// Handle case where no message was received since the channel is closed
return;
}
// Handle the message
If you are interested in reading a a stream of messages from a Channel, you can leverage the JavaScript for..await..of
construct:
for await (const message of channel.take()) {
// Operate on each message
}
API
Channel
An interface representing a Channel, having the following properties:
readonly closed: boolean
a property signalling whether the Channel is closedreadonly size: number
a readonly property indicating the total number of pending messages in the channel (buffered or otherwise).close(): void
close the channel.put(msg: unknown): Promise<boolean>
write a message to the channel. The returned Promise will resolve totrue
if the message was consumed (or was buffered). It may resolve tofalse
if the channel was closed before the message could be read.
channel<T>(bufferSize?: number): Channel<T>
Create a new Channel that will, optionally, buffer up to bufferSize
messages. Returns a Channel
instance.
closed: unique symbol
A Symbol
to which calls to Channel.read()
might resolve if the target Channel is closed.
Important: When using blocking reads, it is important to test resolved values against
closed
.
select({ [key: string]: Channel }): IterablePromise<{ key: string, value: unknown }>
Race multiple channels against each other. This api can be used both as a Promise
or an AsyncIterable
. In either case, the resolved value will be an object with the shape { key, value }
. The key
is the key in the object passed to select to identify from which channel the value
was produced.
ChannelClosedError
constructor
The constructor function for the ChannelClosedError
errors that will be thrown when attempting to write to a closed channel.
Inspiration
- (@paybase/csp)[https://github.com/paybase/csp] was a major inspiration. I added the ability to close channels and tweaked the approach for
select
to be fully typed.