real-value-stream-adapter
v0.9.0
Published
A library which adapts a node stream into a real-value stream node
Downloads
6
Readme
real-value-stream-adapter Library
Context
This library provides a wrapping aroud nodejs Readable,Writeable and Duplex streams to support the real-value-lang interpreter.
This library has been created to take advantage of the node js streams functionality which provides the mechanism to orchestrate backpressure. THis is needed when processing a pipeline when there are nodes in the pipeline which can only handle events at some maximum throughputs or concurrency. Effectively nodejs streams provide a way to minimize resource usage in a pipeline.
Install
npm install real-value-stream-adapter
yarn install real-value-stream-adapter
How to use
see the unit test case
About
DuplexAdapter
A DuplexAdapter is a closure around a stream.Duplex. The primary state of the closure is a buffer between the upstream and all downstream nodes. The buffer is processed in a FIFO manner to downstream nodes.
High Water Mark
The High Water Mark is a significant part of the backpressure concept.
For each DuplexAdapter, a highWaterMark is specified and is passed to the underlying nodejs Duplex which is managing the upstream to downstream communication so that Duplex instance is buffering more than highWaterMark entities at any one time. It is a constraint throttle on the pipeline at a specific node in the pipeline.
return new DuplexAdapter({
highWaterMark,
Signaled writing
The contract between the underlying Duplex nodes is that when pushing from an upstream Duplex node to a downstream Duplex node the stream API indicates whether subsequent values are allowed to be pushed. This code is part of the read function and the return statement breaks out of the while loop in order to back off pushing values down stream. The node wont resume pushing until a subsequent read call from the streams api.
if (buffer.length > 0) {
let count = numberAllowedToPush
debug(`Count ${count}`)
while (buffer.length > 0 && count > 0) {
let next = buffer.shift()
debugIO(`${name}: Writing Value:${next} : RemainingDepth:${depth()}`)
let p = duplex.push(next)
count--
if (!p) {
return
}
}
}
Slow handling
For each chunk written to a stream node it needs to respond that it is handled by invoking a callback. So the handling functionality should only invoke the functionality when the node has processed the chunk.
Stream Splitting
Splitting a stream into multiple other streams
For example here value from a generator will flow to node2 and node 3 through node 1
let inputStream = ReadableAdapter(generator).pipe(node1)
inputStream.pipe(node2)
inputStream.pipe(node3)
Handling end of stream
When a pipeline is set up and runs because of its asynchronous nature it may be significant to know when the processig has ended.
A duplex node signals it has no more data to write downstream by 'pushing null'.
if (allRead && buffer.length === 0 && allProcessed()) {
debug(`${name} signaling stream end`)
duplex.push(null)
}
There are 2 signficant callbacks related to listening to duplex node events. Finish is called on a stream node when the upstream node signals it is done (which it does by pushing null). WHen this happens the DuplexAdapter node may still have items in its buffer which it should write down stream and so if it has downstream colleagues it shouldnt yet stop.
Destroy is called on the DuplexAdapter node when upstream has signalled there is nothing more and there is nothing left in its buffer. So it is save to destroy.
Note that when there is a pipeline of DuplexAdapters the Destroy is not called on the nodes at the end of the pipeline as they will have nothing draining them.
Support for transformation
A significant functional part of the DuplexAdpater is the eventHandler function which needs to be provided.
This function is called as each object is received at a node. It is provided
- event: the event content
- depth : a function to check the current buffer depth
- bufferForDownstream: a function which can be invoked to buffer 0 or more related events to downstream
- callback : a function which must be call when the event is handled. This can be called asynchronously (after some delay)
let eventHandler = (event, depth, downstream, callback){
....
}
The eventHandler is the way a DuplexAdapter can implement any filtering,transformation,de/duplication,batching functionality.