npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

exframe-stream

v1.0.6

Published

exframe-stream description

Downloads

29

Readme

exframe-steam

A library for common stream patterns in the harmony / exframe environments

installation

npm install exframe-stream

usage

import { parallel } from 'exframe-stream';
import { WorkerPool } from 'exframe-worker-pool';

const pool = new WorkerPool();

await pipeline(
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  parallel(async (i) => {
    return i + 1;
  }, pool)
);

terms

|term|description| |----|-----------| |streamable |A stream, iterator, async iterator, generator function, or async generator function| |duplexable |A duplex stream or async generator function|

streams

Chain

Chain is an alternative to the experimental compose function that combines with some of the functionality of pipeline. This can be used to "chain" streams together into a Duplex but still maintain references to the head and the tail streams. Can be useful when the interface of the head and the tail are important. Further, chain is less aggressive about turning streams into Duplexes if not neccessary than compose.

examples

import { chain } from 'exframe-stream';

const items = await chain(
  [1, 2, 3, 4, 5],
  async function* (source) {
    yield* source;
  }
).toArray();

Constructor

function chain(stream: Streamable, ...streams?: Streamable[], options?: PipelineOptions & FinishedOptions) => Stream

The head of the stream may be readable or writable. All streams that follow another must be writable. The final stream may optionally be writable. If either the head or the tail end up getting returned, they will have the fields listed below added. Streamables that are not already streams will be duplexified.

Rules for What Stream is Returned

  • If there is just the one stream, return the one stream.
  • If the head is writable and the tail is readable, then return a duplex that exposes the head as the writable and the tail as the readable.
  • If the head is writable and the tail is not readable, then return the head
  • otherwise return the tail

|field|type|description| |-----|----|-----------| |stream|Streamable|See streamable| |streams|Streamable[]|Set of streamables that will be chained together| |options|PipelineOptions & FinishedOptions|See PipelineOptions, See FinishedOptions|

Fields

head: Stream

The first stream of the chain.

tail: Stream

The last stream of the chain.

finished: Promise<void>

An already wired up finished. See finished.

Channel

The channel is a fixed length buffer that provides ordered input and output as well as strictly blocked reads and writes when the buffer is empty or full respectively. The Channel is a full Duplex stream.

examples

import { channel } from 'exframe-stream';

const chan = channel({ max: 10 });

(async () => {
  for await (const item of chan) { // will block while there are no items
    console.log(item);
  }
})();

for (let i = 0; i < 20; ++i) {
  await channel.send(i); // will block if the buffer is full
}
import { pipeline } from 'stream/promises';

import channel from 'exframe-stream';

await pipeline(
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  channel({ max: 10 }),
  async function (source) {
    for await (const item of source) {
      console.log(item);
    }
  }
);

Constructor

function channel(options?: ChannelOptions & DuplexOptions) => ChannelStream & Duplex

|field|type|description| |-----|----|-----------| |options|ChannelOptions & DuplexOptions|See ChannelOptions, See DuplexOptions|

type ChannelOptions

|field|type|description| |-----|----|-----------| |max|integerdefault = 10|the maximum number of items in the channel's buffer| |preRead|<T, R>(item: T) => Promise<R>|called before pushing to the read buffer| |preWrite|<T, R>(item: T, encoding) => Promise<R>|Called before storing in the item queue|

Fields

items: T[]

The channel's queue. Not recommended to interact with this array.

async send(item: T) => Promise<void>

Enqueues the given item, will block if the channel is full.

Demultiplex

Demultiplexes some source iterable or stream to 1 or more writables. This stream will currently terminate the stream at it's level. However, using compose or some other technique, each writable could chain to a number of other streams or iterators. The demultiplex stream can handle both binary and object mode. Any writable that is also a readable will be exposed as readableStreams on the DemultiplexStream. The MultiplexStream will be able to recombine them into a single stream if necessary. If any target becomes unable to take anymore data, then all targets will be blocked for additional data until the blocking target can resume.

examples

import { pipeline } from 'stream/promises';

import { demultiplex, pick } from 'exframe-stream';

await pipeline(
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  demultiplex(
    async function* (source) {
      for await (const item of source) {
        console.log(`log: ${item}`);
      }
    },
    compose(
      pick(item => item % 2 === 0),
      async function* (source) {
        for await (const item of source) {
          console.log(`even: ${item}`);
        }
      }
    ),
    { objectMode: true }
  )
);

Constructor

function demultiplex(...writables: (Writable|Function)[], options?: WritableOptions) => DemultiplexStream

Createes a DemultiplexStream which is a Writable taking any number of target Writables or async generator functions to process items.

|field|type|description| |-----|----|-----------| |writables|(Writable|Function)[]|Set of Writable streams or async generator functions| |options|WritableOptions|See WritableOptions|

Fields

readableStreams: Readable[]

The set target writables that are also readable.

FeedbackLoop

A mechanism that allows items to be pushed back to the start. Will continue until both the source and the repeated work have ended.

examples

import { pipeline } from 'stream/promises';

import { feedbackLoop, FeedbackState } from 'exframe-stream';

await pipeline(
  [
    { tasks: [add(1), multiply(4), subtract(3), divide(2)], i: 0, value: 0 },
    { tasks: [add(2), multiply(4), divide(2)], i: 0, value: 0 },
    { tasks: [add(3), multiply(4)], i: 0, value: 0 }
  ],
  feedbackLoop(
    async function* (source) {
      for await (const context of source) {
        context.value = context.tasks[context.i++](context.value);
        yield context;
      }
    },
    async function (context) {
      return context.i < context.tasks.length
        ? FeedbackState.Repeat
        : FeedbackState.Complete;
    },
    chain(async function* (source) {
      for await (const context of source) {
        context.repeated = true;
        yield context;
      }
    })
  ),
  async function (source) {
    for await (const context of source) {
      console.log(context);
    }
  }
)

Constructor

function feedbackLoop(operationStream: Duplexable, feedback: async <T>(item: T) => Promise<FeedbackState>, feedbackStream?: Duplexable, options?: FeedbackLoopOptions) => FeedbackLoopStream

Createes a FeedbackLoopStream which is a Duplex taking an operation stream to process the items, a feedback filter to determine whether items should repeat and an optional feedbackStream that can process over items that are starting again.

|field|type|description| |-----|----|-----------| |operationStream|Duplexable|The stream to process over all items with. See duplexable| |feedback|async <T>(item: T) => Promise<FeedbackState>|The filter to determine whether an item should repeat, complete, or be discarded| |feedbackStream|Duplexable|Stream to process over items that are being repeated| |options|ReadableOptions|See ReadableOptions|

Multiplex

Multiplexes a set of source iterables or streams into a single readable. The stream will end when all sources are ended.

examples

import { pipeline } from 'stream/promises';

import { multiplex } from 'exframe-stream';

await pipeline(
  multiplex(
    async function* () {
      for (let i = 0; i < 10; ++i) {
        yield i
      }
    },
    [2, 4, 6, 8, 10],
    Readable.from([2]),
    { objectMode: true }
  ),
  async function (source) {
    for (const item of source) {
      console.log(item);
    }
  }
);

function multiplex(...readables: (Readable|Function)[], options?: ReadableOptions) => MultiplexStream

Createes a MultiplexStream which is a Readable taking any number of target Readabless or async generator functions to process items.

|field|type|description| |-----|----|-----------| |readables|(Readable|Function)[]|Set of Readable streams or async generator functions| |options|ReadableOptions|See ReadableOptions|

Parallel

Executes some operation over the incoming items and outputs the results in the correct order. Uses exframe-worker-pool to govern the concurrent execution. The parallel stream can only operate in object mode.

function parallel(operation: async <T, R>(T) => Promise<R>, pool: WorkerPool, options?: DuplexOptions): ParallelStream

Creates a ParallelStream which is a ChannelStream or a Duplex. Each incoming item is operated on by the given operation and each result will be outputted in the incoming order. If operation returns undefined, the stream will behave essentially like a Writable rather than a Duplex.

examples

import { pipeline } from 'stream/promises';

import { parallel } from 'exframe-stream';
import { WorkerPool } from 'exframe-worker-pool';

const pool = new WorkerPool();

await pipeline(
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  parallel(async (item) => {
    const result = await someRequest(item);

    return result;
  }, pool)
);

|field|type|description| |-----|----|-----------| |operation|async <T, R>(T) => Promise<R>|Mapping function to that returns some output for every item| |pool?|WorkerPool|The worker pool to use to govern the amount of concurrency that the parallel stream can use. If not set, then a pool with a max of 1 and overflow of 0 will be created.| |options?|DuplexOptions|See DuplexOptions|

Pick

Special case of parallel that filters the stream for items matching the given predicate. Like parallel is only available for object mode.

examples

import { pipeline } from 'stream/promises';

import { pick } from 'exframe-stream';

await pipeline(
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  pick(x => x % 2 === 0),
  async function* (source) {
    for await (const item of source) {
      console.log(item);
    }
  }
);

function pick(predicate: async <T>(T) => Promise<boolean>, options?: PickOptions&DuplexOptions): ParallelStream

Creates a ParallelStream that will filter the stream to some subset.

|field|type|description| |-----|----|-----------| |operation|async <T, R>(T) => Promise<R>|Mapping function to that returns some output for every item| |options?|PickOptions&DuplexOptions|See DuplexOptions|

type PickOptions

|field|type|description| |-----|----|-----------| |pool?|WorkerPool|The worker pool for the parallel stream.|