ballvalve
v5.0.1
Published
adapt streaming data into async iterators
Downloads
10
Maintainers
Readme
ballvalve
This is a collection of helpers for creating and using the ES 2018 AsyncIterable
type. It includes all the usual functional methods (map
, reduce
, collect
, and so on), as well as adapters for building an AsyncIterable
out of a push-based stream, and reading discrete bytes from an AsyncIterator<Uint8Array>
.
For example, to transform a nodejs buffer stream into a series of 4-byte little-endian u32
values, and sum them:
import * as stream from "node:stream";
import { asyncIter, byteReader } from "ballvalve";
// chop a stream into 4-byte chunks
async function* into4(stream: AsyncIterable<Uint8Array>) {
const reader = byteReader(stream);
while (true) {
const chunk = await reader.read(4);
if (chunk === undefined) return;
if (chunk.length < 4) throw new Error("partial frame");
yield chunk;
}
}
// generate a sample stream of 8 bytes, encoding 0x4030201 and 0x5060708
const s = stream.Readable.from([
Buffer.from("0102030408", "hex"),
Buffer.from("0706", "hex"),
Buffer.from("05", "hex")
]);
// map each chunk into its u32 value, and sum them as we go.
const sum = await asyncIter(into4(s)).map(buffer => buffer.readUInt32LE(0)).reduce((sum, n) => sum + n, 0);
// 0x9090909
As another example, you can transform a nodejs stream into lines of text, split at linefeeds:
async function* lines(stream: AsyncIterable<Uint8Array>) {
const reader = byteReader(stream);
while (true) {
const line = await reader.readUntil(10);
if (line === undefined) return;
yield line.toString("utf-8");
}
}
const s = stream.Readable.from([ "hell", "o\nsa", "ilor\nxyzzy\n" ].map(s => Buffer.from(s)));
for await (const line of asyncIter(lines(s))) {
console.log(line.trim());
}
// "hello", "sailor", "xyzzy"
Building
npm install && npm test
API
asyncIter<A>(iter: AsyncIterable<A> | AsyncIterator<A> | Iterable<A>): ExtendedAsyncIterable<A>
Ensure that an object is async-iterable, by wrapping it in an object with a [Symbol.asyncIterator]
method if necessary, and then wrap it in a fancy object with functional methods:
map<B>(f: (item: A) => (B | Promise<B>)): ExtendedAsyncIterable<B>
Transform items from one type to another, just like
map
on an array. The function may return a promise.flatMap<B>(f: (item: A) => AsyncIterable<B>): ExtendedAsyncIterable<B>
Transform each item into an async-iterator of a possibly different type, and chain them together, just like
flatMap
on an array.filter(f: (item: A) => boolean): ExtendedAsyncIterable<A>
Keep only the items where
f
returns true, just likefilter
on an array.filterMap<B>(f: (item: A) => B | undefined): ExtendedAsyncIterable<B>
Transform items from one type to another, discarding any new items that are
undefined
.find(f: (item: A) => boolean): Promise<A | undefined>
Return the first item where
f
returns true, orundefined
if it never does. Stop reading from the iterator as soon as an item is found.some(f: (item: A) => boolean): Promise<boolean>
Return true if
f
ever returns true for an item, just likesome
on an array. Stop reading from the iterator as soon as an item is found. Other languages call this functionany
.every(f: (item: A) => boolean): Promise<boolean>
Return true if
f
returns true for every item until the iterator is exhausted, just likeevery
on an array. Stop reading from the iterable as soon as any item "fails" the test. Other languages call this functionall
.reduce<B>(f: (sum: B, item: A) => B, start: B): Promise<B>
Starting with the value
start
, accumulate by callingf
on each element with the running total so far. The return value fromf
will be the running total passed tof
for the next item. The final return value fromf
is the final result ofreduce
. This behaves just likereduce
on an array, except that the starting value is required.collect(): Promise<A[]>
Use
reduce
to collect all values into a single array. Return that array when the iterator is exhausted.chain(iter: AsyncIterable<A>): ExtendedAsyncIterable<A>
Return a new async-iterable which contains each item from this iterable first, and then each element from
iter
.iter
is not even read until this iterator is exhausted.static chainAll<A>(iters: AsyncIterable<A>[]): ExtendedAsyncIterable<A>
Return a new async-iterable which contains items from each iterable, exhausting each one before moving to the next.
zip<B>(iter: AsyncIterable<B>): ExtendedAsyncIterable<[ A, B ]>
Return a new async-iterable which pairs each item from this iterable with an incoming item from
iter
. Each item-pair is provided only when each of the iterators has provided its next item. When either of the iterators is exhausted, the new iterator ends too.merge<B>(...iter: AsyncIterable<B>[]): ExtendedAsyncIterable<A | B>
Return a new async-iterable which contains items from this iterable and each of
iter
, in the order that those items become available. The order of items may be unpredictable.static mergeAll<A>(iterables: AsyncIterable<A>[]): ExtendedAsyncIterable<A>
Return a new async-iterable which contains items from each of
iterables
, in the order that items become available. The order of the items may be unpredictable.enumerate(): ExtendedAsyncIterable<[ number, A ]>
Return a new async-iterable which pairs each item from this iterable with its index, starting from 0.
splitWhen(f: (item: A) => boolean): [ ExtendedAsyncIterable<A>, ExtendedAsyncIterable<A> ]
Return two new iterables:
- The first returns items as long as
f
returns false. - The second contains the first items where
f
returns true, and all items that follow it.
This splits the iterable into two consecutive iterables at an arbitrary point, doing the opposite of
chain
.- The first returns items as long as
tee(count: number = 2): ExtendedAsyncIterable<A>[]
Return
count
new iterables, each of which generates the same items, using this iterable as a source. If any of the new iterables is left un-read, it will queue items into memory as the other iterables are used, so you may want to make sure that all of the new iterables have an active reader.partition(f: (item: A) => boolean): [ ExtendedAsyncIterable<A>, ExtendedAsyncIterable<A> ]
Split this iterable in two: the first provides items where
f
returns true; the second provides items wheref
returns false. Liketee
, if either of the returned iterables isn't actively processed, it will build up a queue of items, so make sure both iterables have an active reader.takeWhile(f: (item: A) => boolean): ExtendedAsyncIterable<A>
Return a new iterable that provides items as long as
f
returns true. Oncef
returns false for any item, that item is dropped and the new iterable ends.take(n: number): Promise<A[]>
Return a new iterable that provides up to
n
items from this iterable, then ends.takeFor(msec: number): ExtendedAsyncIterable<A>
Return a new iterable that provides items from this iterable until
msec
milliseconds have passed, at which point it will abruptly end.takeUntil(deadline: number): ExtendedAsyncIterable<A>
Return a new iterable that provides items from this iterable until
Date.now()
is greater than or equal todeadline
, at which point it will abruptly end.dropWhile(f: (item: A) => boolean): ExtendedAsyncIterable<A>
Return a new iterable that drops all items as long as
f
returns true. The first provided item will be the first wheref
returned false. Afterwards, all items are passed along.drop(n: number): ExtendedAsyncIterable<A>
Return a new iterable that drops the first
n
items, and provides any items after that.after(f: () => Promise<void>): ExtendedAsyncIterable<A>
Return a new iterable that calls
f
when it's exhausted, and waits forf
to finish before ending.withPromiseAfter(): [ ExtendedAsyncIterable<A>, Promise<void> ]
Return a new iterable and a promise. The promise is fulfilled when the iterator is exhausted.
toNodeStream(options: ReadableOptions = {}): Readable
Create and return a nodejs
Readable
stream that pulls from this iterable. This can be useful when dealing with older libraries.
byteReader(wrapped: AsyncIterable<Uint8Array>): ByteReader
Wrap an AsyncIterable<Uint8Array>
to provide byte-level read operations.
read(size: number): Promise<Uint8Array | undefined>
Read
size
bytes, waiting for data to arrive if necessary. If the stream ends before enough data is received, it will return as much as it can. If the stream has already ended, it will returnundefined
.readUntil(byte: number): Promise<Uint8Array | undefined>
Read and buffer data from this stream until a specific byte is seen. Once it's seen, all buffered data up to & including the requested byte is returned. If the byte is never seen before the stream ends, it returns
undefined
.readUntilMatch(test: (buffer: Uint8Array) => number): Promise<Uint8Array | undefined>
Read and buffer data from this stream until a
test
returns an index indicating a successful match. If no match is found before the stream ends, it returnsundefined
.test
is called with all currently buffered data, each time new data arrives. It should return -1 on an unsuccessful match. On success, it should return the index right after the end of the match (as if you were about to pass it toslice
).streamUntilMatch(test: (buffer: Uint8Array) => number, sizeHint: number = 1): ExtendedAsyncIterable<Uint8Array>
Generate a stream which passes through data from this stream until
test
returns an index indicating a successful match, or this stream ends. If a match was found, after the new stream ends, this (original) stream will resume at the start of the match. Otherwise, the original stream will end too.test
is called on each buffer as it arrives from the underlying stream. It should return -1 on an unsuccessful match. on success, it should return the index of the start of the match.If
sizeHint
is greater than 1, each subsequent call totest
will havesizeHint - 1
bytes repeated from the end of the previous buffer, so that nosizeHint
-length span of the stream will be broken up across calls totest
. (This is used bystreamUntilBuffer
.)streamUntilBuffer(match: Uint8Array): ExtendedAsyncIterable<Uint8Array>
Generate a stream which passes through data from this stream until an exact match for
match
is found, or this stream ends. If a match was found, after the new stream ends, this (original) stream will resume at the start of the match. Otherwise, the original stream will end too.unread(buffer: Uint8Array)
"unshift" a buffer back onto the head of the stream.
remainder(): Uint8Array | undefined
Return all currently buffered data without waiting for more, and clear the buffers. If nothing is buffered, it returns
undefined
.
PushAsyncIterator<A>(pullNext?: () => (A | undefined))
Adapt a streaming source into an AsyncIterable
.
You may use this class in two different modes:
- pure push: Call
push(item)
as each item arrives, and don't set apullNext
callback. Incoming items are queued for the recipient. - demand-based: Call
push()
with no argument when you think data is ready, and usepullNext
to fetch the next item (orundefined
if it was a false alarm and nothing is ready yet).
StreamAsyncIterator
is a good example of a demand-based iterator. A pure push iterator has no flow control, but may sometimes be necessary when adapting sources that have no flow control, like an "rxjs" stream.
This class works by responding to next()
with an unfulfilled promise. When you call push
, if an item is available, it's posted into that promise. If not, it calls pullNext
to see if an item is available that way.
When the stream is finished, call end()
to signal to the recipient that the iterator is done. Any queued items (from push(item)
) will be delivered first.
On error, call error(error)
to cause all current and future next()
calls to throw an exception.
push(item?: A)
end()
error(error: Error)
StreamAsyncIterator(stream: stream.Readable, public size?: number)
(Note: Recent nodejs releases implement streams as AsyncIterable
, so this class is now deprecated, and provided only as an example of how to use StreamAsyncIterator
.)
Wrap a nodejs Readable
stream into an AsyncIterator
. The iterable will place the stream into "pull" mode (paused) and emit Uint8Array
objects on demand until it reaches the end of the stream, or the stream emits an error.
size
is an optional parameter to pass to the stream's read()
method, if you want to try to read chunks of a specific size.
License
Apache 2 (open-source) license, included in LICENSE.txt
.
Authors
@robey - Robey Pointer [email protected]