@msiviero/stream-utils
v0.1.2
Published
Stream utils that provide same abstraction as array's map, filter, reduce
Downloads
2
Readme
Node.js array-like stream abstraction
Installation
npm i @msiviero/stream-utils
Installation
Typedocs with full documentation and examples are available at https://msiviero.github.io/stream-utils
Samples
Simple text file read and processing
The file is processed in chunks, so the memory usage is linear. Plus the backpressure mechanism is built-in
import { createReadStream } from "fs";
import { Collect, Count, Distinct, Filter, Map, Splitter } from "@msiviero/stream-utils";
createReadStream("./data/bigfile.txt")
.pipe(new Splitter({ separator: "\n" }))
.pipe(new Map((line: Buffer) => line.toString("utf8")))
.pipe(new Distinct((line: string) => line))
.pipe(new Map((line: string) => line.split(";")))
.pipe(new Filter((columns: string[]) => columns[0] === "to_keep"))
.pipe(new Count())
.pipe(new Collect())
.on("close", (items: number[]) => {
console.log(`Count of remaining records is: ${items[0]}`);
});
An http request is a stream too
A node.js server's request is a readable stream, so you can receive a large request body and processing can be done just in time chunk per chunk, without having to fit it in memory
import { Collect, Count, Distinct, Map, Splitter } from "@msiviero/stream-utils";
import { createServer } from "http";
createServer((request, response) => {
request
.pipe(new Splitter({ separator: "\n" }))
.pipe(new Map((line: Buffer) => line.toString("utf8")))
.pipe(new Distinct((line: string) => line))
.pipe(new Count())
.pipe(new Collect())
.on("close", (items: number[]) => {
response.end(`Request body contains ${items[0]} lines`);
});
})
.listen(9000);
Node provides built-in streams
That can be used as a source or sink for transformations provided by this package
- HTTP requests and responses, from client and server
- fs write and read streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdin, stdout and stderr
- process.stdin process.stdout, process.stderr
Built in writable usage of zlib library
An example of http server that receives a text body, parse it, deduplicates lines and then creates a gzip file with the content
import { Distinct, Map, Splitter } from "@msiviero/stream-utils";
import { createWriteStream } from "fs";
import { createServer } from "http";
import { createGzip } from "zlib";
createServer((request, response) => {
request
.pipe(new Splitter({ separator: "\n" }))
.pipe(new Map((line: Buffer) => line.toString("utf8")))
.pipe(new Distinct((line: string) => line))
.pipe(new Map((line: string) => `${line}\n`))
.pipe(createGzip())
.pipe(createWriteStream("request.gz"))
.on("close", () => response.end());
})
.listen(9000);
Built in readable usage of zlib library
An example of http server that receives a gzipped file as body, parse it, the distinct lines and then pipes to the response stream
import { Count, Distinct, Map, Splitter } from "@msiviero/stream-utils";
import { createServer } from "http";
import { createGunzip } from "zlib";
createServer((request, response) => {
request
.pipe(createGunzip())
.pipe(new Splitter({ separator: "\n" }))
.pipe(new Map((line: Buffer) => line.toString("utf8")))
.pipe(new Distinct((line: string) => line))
.pipe(new Count())
.pipe(new Map((items: [number]) => Buffer.from(`${items}`))) // serialize item
.pipe(response);
})
.listen(9000);