npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

real-value-stream-adapter

v0.9.0

Published

A library which adapts a node stream into a real-value stream node

Downloads

6

Readme

real-value-stream-adapter Library

Context

This library provides a wrapping aroud nodejs Readable,Writeable and Duplex streams to support the real-value-lang interpreter.

This library has been created to take advantage of the node js streams functionality which provides the mechanism to orchestrate backpressure. THis is needed when processing a pipeline when there are nodes in the pipeline which can only handle events at some maximum throughputs or concurrency. Effectively nodejs streams provide a way to minimize resource usage in a pipeline.

Install

npm install real-value-stream-adapter
yarn install real-value-stream-adapter

How to use

see the unit test case

About

DuplexAdapter

A DuplexAdapter is a closure around a stream.Duplex. The primary state of the closure is a buffer between the upstream and all downstream nodes. The buffer is processed in a FIFO manner to downstream nodes.

High Water Mark

The High Water Mark is a significant part of the backpressure concept.

For each DuplexAdapter, a highWaterMark is specified and is passed to the underlying nodejs Duplex which is managing the upstream to downstream communication so that Duplex instance is buffering more than highWaterMark entities at any one time. It is a constraint throttle on the pipeline at a specific node in the pipeline.

return new DuplexAdapter({
        highWaterMark,
Signaled writing

The contract between the underlying Duplex nodes is that when pushing from an upstream Duplex node to a downstream Duplex node the stream API indicates whether subsequent values are allowed to be pushed. This code is part of the read function and the return statement breaks out of the while loop in order to back off pushing values down stream. The node wont resume pushing until a subsequent read call from the streams api.

if (buffer.length > 0) {
            let count = numberAllowedToPush
            debug(`Count ${count}`)
            while (buffer.length > 0 && count > 0) {
                let next = buffer.shift()
                debugIO(`${name}: Writing  Value:${next} : RemainingDepth:${depth()}`)
                let p = duplex.push(next)
                count--
                if (!p) {
                    return
                }
            }
        }
Slow handling

For each chunk written to a stream node it needs to respond that it is handled by invoking a callback. So the handling functionality should only invoke the functionality when the node has processed the chunk.

Stream Splitting

Splitting a stream into multiple other streams

For example here value from a generator will flow to node2 and node 3 through node 1

let inputStream = ReadableAdapter(generator).pipe(node1)
  inputStream.pipe(node2)
  inputStream.pipe(node3)
Handling end of stream

When a pipeline is set up and runs because of its asynchronous nature it may be significant to know when the processig has ended.

A duplex node signals it has no more data to write downstream by 'pushing null'.

if (allRead && buffer.length === 0 && allProcessed()) {
    debug(`${name} signaling stream end`)
    duplex.push(null)
}

There are 2 signficant callbacks related to listening to duplex node events. Finish is called on a stream node when the upstream node signals it is done (which it does by pushing null). WHen this happens the DuplexAdapter node may still have items in its buffer which it should write down stream and so if it has downstream colleagues it shouldnt yet stop.

Destroy is called on the DuplexAdapter node when upstream has signalled there is nothing more and there is nothing left in its buffer. So it is save to destroy.

Note that when there is a pipeline of DuplexAdapters the Destroy is not called on the nodes at the end of the pipeline as they will have nothing draining them.

Support for transformation

A significant functional part of the DuplexAdpater is the eventHandler function which needs to be provided.

This function is called as each object is received at a node. It is provided

  • event: the event content
  • depth : a function to check the current buffer depth
  • bufferForDownstream: a function which can be invoked to buffer 0 or more related events to downstream
  • callback : a function which must be call when the event is handled. This can be called asynchronously (after some delay)
let eventHandler = (event, depth, downstream, callback){
    ....
}

The eventHandler is the way a DuplexAdapter can implement any filtering,transformation,de/duplication,batching functionality.