@farar/nodes
v1.0.1
Published
Nodes provides a framework for building type-safe data transformation graphs using Node.js streams.
Downloads
65
Readme
Nodes
Nodes provides a framework for building type-safe data transformation graphs using Node.js streams.
Introduction
Nodes provides an intuitive framework for constructing data transformation graphs using native Node.js streams. You can use the built-in library of commonly used data transformation Node
classes or implement your own.
Features
- A type-safe graph-like API pattern for building data transformation graphs based on Node.js streams.
- Consume any native Node.js Readable, Writable, Duplex, or Transform stream and add it to your graph.
- Error handling and selective termination of inoperable graph components.
- Automatic message queueing in order to assist with handling of backpressure.
Table of Contents
Installation
npm install @farar/nodes
Concepts
Node
A Node
is a component of a graph-like data transformation pipeline. Each Node
is responsible for transforming its input into an output that can be consumed by its connected Node
instances. By connecting Node
instances into a network, sophisticated graph-like data transformation pipelines can be constructed.
Examples
A Graph API Pattern Logger Implementation </example>
Please see the Streams Logger implementation.
API
The Node class.
new nodes.Node<InT, OutT>(stream, options)
<IntT>
The input into the stream.<OutT>
The output from the stream.stream
<stream.Writable | stream.Readable>
An instance of aWritable
,Readable
,Duplex
, orTransform
Node.js stream.options
<NodeOptions>
errorHandler
<(err: Error, ...params: Array<unknown>) => void>
An optional error handler that will be used in the event of an internal Error. Default:console.error
public node.connect(...nodes)
- nodes
<Array<T>>
An array ofNode<OutT, unknown>
to be connected to thisNode
.
Returns: <Node<InT, OutT>>
public node.disconnect(...nodes)
- nodes
<Array<T>>
An array ofNode<OutT, unknown>
to be disconnected from thisNode
.
Returns: <Node<InT, OutT>>
protected node._write(data, encoding)
- data
<InT>
Data to write to the writable side of the stream. - encoding
<NodeJS.BufferEncoding>
An optional Node.js encoding Default:utf-8
. Returns:<Promise<void>>
The Nodes Config Settings Object
Config.errorHandler <ErrorHandler>
An optional error handler. Default: console.error
Config.debug <boolean>
Announce internal activities e.g., connectiing and disconnecting from Node
instances. Default: false
How-Tos
How to Implement a Data Transformation Node
In order to implement a data transformation Node
, extend the Node
class and pass a Node.js stream.Writable
implementation to the super's constructor.
For example, the following StringToNumber
implementation will convert a numeric string to a number.
NB In this example,
writableObjectMode
andreadableObjectMode
are both set totrue
; hence, the Node.js stream implementation will handle the input and output as objects. It's important thatwritableObjectMode
andreadableObjectMode
accurately reflect the input and output types of yourNode
.
import * as stream from 'node:stream';
import { Config, Node } from '@farar/nodes';
export class StringToNumber extends Node<string, number> {
constructor(options: stream.TransformOptions) {
super(new stream.Transform({
...options, ...{
writableObjectMode: true,
readableObjectMode: true,
transform: (chunk: string, encoding: BufferEncoding, callback: stream.TransformCallback) => {
try {
const result = parseFloat(chunk.toString());
callback(null, result);
}
catch (err) {
if (err instanceof Error) {
callback(err);
Config.errorHandler(err);
}
}
}
}
}));
}
}
How to Consume a Readable, Writable, Duplex, or Transform Node.js Stream
In this hypothetical example a type-safe Node
is constructed from a net.Socket
. The resulting Node
instance can be used in a data transformation graph.
import * as net from "node:net";
import { once } from "node:events";
net.createServer((socket: net.Socket) => socket.pipe(socket)).listen(3000);
const socket = net.createConnection({ port: 3000 });
await once(socket, "connect");
const socketHandler = new Node<Buffer, Buffer>(socket);
Backpressure
The Node
class has a _write
method that respects backpressue; when a stream is draining it will queue messages until a drain
event is emitted by the Node's stream. Your application can optionally monitor the size of the queue and respond appropriately.
If you have a stream that is backpressuring, you can increase the high water mark on the stream in order to mitigate drain events.
Best Practices
Avoid reuse of Node
instances (unless you know what you are doing!).
Reusing the same Node
instance can result in unexpected phenomena. If the same Node
instance is used in different locations in your graph, you need to think carefully about the resulting edges that are connected to both the input and the output of the Node
instance. Most of the time if you need to use the same class of Node
more than once, it's advisable to create a new instance for each use.
Error Handling
Nodes may be used in diverse contexts, each with unique requirements. Nodes should never throw if the API is used in accordance with the documentation. However, "phenomena happens"; hence, you may choose to handle errors accordingly!
Nodes defaults to logging its errors to process.stderr
. If your application requires that errors throw, you may set an errorHandler
on the Config
object that does that. Alternatively, your handler may consume the Error
and handle it otherwise.
Optionally configure all internal Node
errors to be thrown.
import { Config } from "@farar/nodes";
Config.errorHandler = (err: Error) => {
throw err;
};
Test
git clone https://github.com/faranalytics/nodes.git
cd nodes
npm install && npm update
npm test