exframe-stream
v1.0.6
Published
exframe-stream description
Downloads
29
Readme
exframe-steam
A library for common stream patterns in the harmony / exframe environments
installation
npm install exframe-stream
usage
import { parallel } from 'exframe-stream';
import { WorkerPool } from 'exframe-worker-pool';
const pool = new WorkerPool();
await pipeline(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
parallel(async (i) => {
return i + 1;
}, pool)
);
terms
|term|description|
|----|-----------|
|streamable
|A stream, iterator, async iterator, generator function, or async generator function|
|duplexable
|A duplex stream or async generator function|
streams
Chain
Chain
is an alternative to the experimental compose
function that combines with some of the functionality of pipeline. This can be used to "chain" streams together into a Duplex
but still maintain references to the head and the tail streams. Can be useful when the interface of the head and the tail are important. Further, chain
is less aggressive about turning streams into Duplexes
if not neccessary than compose
.
examples
import { chain } from 'exframe-stream';
const items = await chain(
[1, 2, 3, 4, 5],
async function* (source) {
yield* source;
}
).toArray();
Constructor
function chain(stream: Streamable, ...streams?: Streamable[], options?: PipelineOptions & FinishedOptions) => Stream
The head of the stream may be readable or writable. All streams that follow another must be writable. The final stream may optionally be writable. If either the head or the tail end up getting returned, they will have the fields listed below added. Streamables that are not already streams will be duplexified.
Rules for What Stream is Returned
- If there is just the one stream, return the one stream.
- If the head is writable and the tail is readable, then return a duplex that exposes the head as the writable and the tail as the readable.
- If the head is writable and the tail is not readable, then return the head
- otherwise return the tail
|field|type|description|
|-----|----|-----------|
|stream
|Streamable
|See streamable|
|streams
|Streamable[]
|Set of streamables that will be chained together|
|options
|PipelineOptions & FinishedOptions
|See PipelineOptions, See FinishedOptions|
Fields
head: Stream
The first stream of the chain.
tail: Stream
The last stream of the chain.
finished: Promise<void>
An already wired up finished
. See finished.
Channel
The channel
is a fixed length buffer that provides ordered input and output as well as strictly blocked reads and writes when the buffer is empty or full respectively. The Channel
is a full Duplex
stream.
examples
import { channel } from 'exframe-stream';
const chan = channel({ max: 10 });
(async () => {
for await (const item of chan) { // will block while there are no items
console.log(item);
}
})();
for (let i = 0; i < 20; ++i) {
await channel.send(i); // will block if the buffer is full
}
import { pipeline } from 'stream/promises';
import channel from 'exframe-stream';
await pipeline(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
channel({ max: 10 }),
async function (source) {
for await (const item of source) {
console.log(item);
}
}
);
Constructor
function channel(options?: ChannelOptions & DuplexOptions) => ChannelStream & Duplex
|field|type|description|
|-----|----|-----------|
|options
|ChannelOptions & DuplexOptions
|See ChannelOptions, See DuplexOptions|
type ChannelOptions
|field|type|description|
|-----|----|-----------|
|max
|integer
default = 10|the maximum number of items in the channel's buffer|
|preRead
|<T, R>(item: T) => Promise<R>
|called before pushing to the read buffer|
|preWrite
|<T, R>(item: T, encoding) => Promise<R>
|Called before storing in the item queue|
Fields
items: T[]
The channel's queue. Not recommended to interact with this array.
async send(item: T) => Promise<void>
Enqueues the given item, will block if the channel is full.
Demultiplex
Demultiplexes some source iterable or stream to 1 or more writables. This stream will currently terminate the stream at it's level. However, using compose
or some other technique, each writable could chain to a number of other streams or iterators. The demultiplex stream can handle both binary and object mode. Any writable that is also a readable will be exposed as readableStreams
on the DemultiplexStream
. The MultiplexStream
will be able to recombine them into a single stream if necessary. If any target becomes unable to take anymore data, then all targets will be blocked for additional data until the blocking target can resume.
examples
import { pipeline } from 'stream/promises';
import { demultiplex, pick } from 'exframe-stream';
await pipeline(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
demultiplex(
async function* (source) {
for await (const item of source) {
console.log(`log: ${item}`);
}
},
compose(
pick(item => item % 2 === 0),
async function* (source) {
for await (const item of source) {
console.log(`even: ${item}`);
}
}
),
{ objectMode: true }
)
);
Constructor
function demultiplex(...writables: (Writable|Function)[], options?: WritableOptions) => DemultiplexStream
Createes a DemultiplexStream
which is a Writable
taking any number of target Writable
s or async generator functions to process items.
|field|type|description|
|-----|----|-----------|
|writables
|(Writable
|Function
)[]|Set of Writable
streams or async generator functions|
|options
|WritableOptions
|See WritableOptions|
Fields
readableStreams: Readable[]
The set target writables that are also readable.
FeedbackLoop
A mechanism that allows items to be pushed back to the start. Will continue until both the source and the repeated work have ended.
examples
import { pipeline } from 'stream/promises';
import { feedbackLoop, FeedbackState } from 'exframe-stream';
await pipeline(
[
{ tasks: [add(1), multiply(4), subtract(3), divide(2)], i: 0, value: 0 },
{ tasks: [add(2), multiply(4), divide(2)], i: 0, value: 0 },
{ tasks: [add(3), multiply(4)], i: 0, value: 0 }
],
feedbackLoop(
async function* (source) {
for await (const context of source) {
context.value = context.tasks[context.i++](context.value);
yield context;
}
},
async function (context) {
return context.i < context.tasks.length
? FeedbackState.Repeat
: FeedbackState.Complete;
},
chain(async function* (source) {
for await (const context of source) {
context.repeated = true;
yield context;
}
})
),
async function (source) {
for await (const context of source) {
console.log(context);
}
}
)
Constructor
function feedbackLoop(operationStream: Duplexable, feedback: async <T>(item: T) => Promise<FeedbackState>, feedbackStream?: Duplexable, options?: FeedbackLoopOptions) => FeedbackLoopStream
Createes a FeedbackLoopStream
which is a Duplex
taking an operation stream to process the items, a feedback filter to determine whether items should repeat and an optional feedbackStream that can process over items that are starting again.
|field|type|description|
|-----|----|-----------|
|operationStream
|Duplexable
|The stream to process over all items with. See duplexable|
|feedback
|async <T>(item: T) => Promise<FeedbackState>
|The filter to determine whether an item should repeat, complete, or be discarded|
|feedbackStream
|Duplexable
|Stream to process over items that are being repeated|
|options
|ReadableOptions
|See ReadableOptions|
Multiplex
Multiplexes a set of source iterables or streams into a single readable. The stream will end when all sources are ended.
examples
import { pipeline } from 'stream/promises';
import { multiplex } from 'exframe-stream';
await pipeline(
multiplex(
async function* () {
for (let i = 0; i < 10; ++i) {
yield i
}
},
[2, 4, 6, 8, 10],
Readable.from([2]),
{ objectMode: true }
),
async function (source) {
for (const item of source) {
console.log(item);
}
}
);
function multiplex(...readables: (Readable|Function)[], options?: ReadableOptions) => MultiplexStream
Createes a MultiplexStream
which is a Readable
taking any number of target Readables
s or async generator functions to process items.
|field|type|description|
|-----|----|-----------|
|readables
|(Readable
|Function
)[]|Set of Readable
streams or async generator functions|
|options
|ReadableOptions
|See ReadableOptions|
Parallel
Executes some operation over the incoming items and outputs the results in the correct order. Uses exframe-worker-pool
to govern the concurrent execution. The parallel stream can only operate in object mode.
function parallel(operation: async <T, R>(T) => Promise<R>, pool: WorkerPool, options?: DuplexOptions): ParallelStream
Creates a ParallelStream
which is a ChannelStream
or a Duplex
. Each incoming item is operated on by the given operation
and each result will be outputted in the incoming order. If operation returns undefined
, the stream will behave essentially like a Writable
rather than a Duplex
.
examples
import { pipeline } from 'stream/promises';
import { parallel } from 'exframe-stream';
import { WorkerPool } from 'exframe-worker-pool';
const pool = new WorkerPool();
await pipeline(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
parallel(async (item) => {
const result = await someRequest(item);
return result;
}, pool)
);
|field|type|description|
|-----|----|-----------|
|operation
|async <T, R>(T) => Promise<R>
|Mapping function to that returns some output for every item|
|pool?
|WorkerPool
|The worker pool to use to govern the amount of concurrency that the parallel stream can use. If not set, then a pool with a max of 1 and overflow of 0 will be created.|
|options?
|DuplexOptions
|See DuplexOptions|
Pick
Special case of parallel
that filters the stream for items matching the given predicate
. Like parallel
is only available for object mode.
examples
import { pipeline } from 'stream/promises';
import { pick } from 'exframe-stream';
await pipeline(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
pick(x => x % 2 === 0),
async function* (source) {
for await (const item of source) {
console.log(item);
}
}
);
function pick(predicate: async <T>(T) => Promise<boolean>, options?: PickOptions&DuplexOptions): ParallelStream
Creates a ParallelStream
that will filter the stream to some subset.
|field|type|description|
|-----|----|-----------|
|operation
|async <T, R>(T) => Promise<R>
|Mapping function to that returns some output for every item|
|options?
|PickOptions&DuplexOptions
|See DuplexOptions|
type PickOptions
|field|type|description|
|-----|----|-----------|
|pool?
|WorkerPool
|The worker pool for the parallel stream.|