stream-wrench
v1.1.0
Published
Collection of stream manipulation helpers
Downloads
3
Readme
🔧 Stream Wrench
Toolbox for stream manipulation.
StreamChopper
Implements a NodeJS Readable stream and consumes all written data.
API
const chopper = new StreamChopper();
Method: readUntil(delimiter)
const data = await chopper.readUntil(delimiter);
Reads from the input stream until the String or Buffer delimiter
is found and returns all data that has been received until the occurrence of delimiter without the delimiter itself. The delimiter is also consumed.
Method: seekUntil(delimiter)
await chopper.seekUntil(delimiter);
Like readUntil()
but drops all read data.
Example
Read lines from a text file:
import StreamChopper from 'stream-wrench/stream-chopper';
import {createReadStream} from 'node:fs';
// Open a text file generated by this shell call:
// echo -e "Knock knock!\nNeutrino\nHow's there?" >joke.txt
const src = createReadStream('./joke.txt');
const chopper = new StreamChopper();
src.pipe(chopper);
// Read the first line. Outputs: "Knock knock!"
const firstLine = await chopper.readUntil('\n');
console.log(firstLine.toString());
// Skip the second line.
await chopper.seekUntil('\n');
// Read the third line. Outputs: "How's there?"
const thirdLine = await chopper.readUntil('\n');
console.log(thirdLine.toString());
CircuitBreaker
API
Implements a NodeJS Transform stream which is transparent until a trigger sequence is found. Once it triggered it destroys itself.
const cb = new CircuitBreaker();
Method: setTrigger(sequences)
cb.setTrigger(sequences);
Sets the trigger sequence. Previously set sequences are overwritten.
sequences
may be a Sequence or an Array of Sequence.
Sequence is a Buffer or a String or an Object with the following keys:
seq
: Buffer or String of the trigger sequenceerr
: String with the error message that will be used if the circuit breaker triggers by the givenseq
Method: protectPromise(promise)
const result = await cb.protectPromise(promise);
If the circuit breaker hasn't triggered, the settled state of promise
will be transparently propagated.
Once the circuit breaker has triggered, protectPromise()
will always reject.
Example
import CircuitBreaker from 'stream-wrench/circuit-breaker';
// Setup with trigger sequence 'STOP'.
// All passed data is written to console output.
// Once it triggers, it'll write the error message to the console.
const cb = new CircuitBreaker();
cb.setTrigger('STOP');
cb.on('error', (e) => console.log(e.message));
cb.pipe(process.stdout);
// Can be used to guard promises.
// It'll handle promises transparently if the circuit breaker hasn't triggered, yet.
// Outputs: 42
cb.protectPromise(Promise.resolve(42)).then(console.log);
// Outputs: 13
cb.protectPromise(Promise.reject(13)).catch(console.log);
// Prints 'Hello world!' to stdout
cb.write('Hello world!\n');
// Prints 'Found sequence: STOP' to stdout
cb.write('STOP');
// Promises can't pass the circuit breaker anymore
// Outputs: Found sequence: 'STOP'
cb.protectPromise(Promise.resolve(42)).catch((e) => console.log(e.message));
// Outputs: Found sequence: 'STOP'
cb.protectPromise(Promise.reject(13)).catch((e) => console.log(e.message));