object-stream-tools
v2.3.0
Published
Useful tools for manipulating object streams. Will be especially helpful to developers used to map - filter - reduce approach of nodejs arrays.
Downloads
26
Readme
object-stream-tools
This package brings goodies of functional programming (map, filter, reduce) to node streams.
Installation
npm install --save object-stream-tools
Usage
arrayToStream
Converts existing array to stream of objects. Useful if you want to inject/merge those object to the existing stream.
const ost = require('object-stream-tools')
ost.arrayToStream([{foo: 'bar'}, {web: 'scale'}])
.on('data', data => {
console.log(data)
})
.pipe(somewhereWritable)
Prints
[{foo: 'bar'}, {web: 'scale'}]
streamToSet
Its very useful if you want to get unique elements / set of values
const jsonStream = require('JSONStream')
fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.streamToSet())
.on('data', uniqueSet => {
// here one get array of unique elements
const uniqueArray = Array.from(uniqueSet.values()).sort()
})
filter
If you just want to remove some objects from stream, you probably want to use filter function.
fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.filter(e => e.value > 6))
// here you will get filtered objects
.pipe(jsonStream.stringify())
.pipe(process.stdout)
map-reduce
Map is useful when you want to modify existing objects in the stream.
Reduce is useful if you want to get single object/value based on whole stream, but you dont want to load whole stream to memory.
Example: sum / average value of huge stream
const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
// pick required property you want to reduce over
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0))
.on('data', reducedValue => {
// here you will get reduced value
})
Here is example with buffered/string input output:
const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0)))
.on('data', reducedValue =>
// here you will get reduced value
})
.pipe(jsonStream.stringify())
.pipe(process.stdout)
Please note that if you do not pass initial value reduce function will start in (prev, curr, i) mode. Objects/Array/Reduce
promise to stream
It is a useful helper if you dealing with a lot of smaller data that are wrapped in Promise API, ex:
ost.promiseToStream(myDbQueryThatReturnPromise())
.on('data', data => {
// here you will get a real stream that you can pipe
})
stream to promise
Very handy when you want to consume streams but rest of your application logic uses promises.
ost.streamToPromise(fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.filter(e => e.value > 6)))
.then(data => {
// here you will get filtered objects
})
find
Find is super handy if we want to quickly check if vale/objects exists in the stream. Think about it as a grep on the steroids.
ost.streamToPromise(fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.find(e => e.value > 6)))
.then(foundObj => {
// here you will get found first object that matches condition
// or undefined when there were none that matches
})