npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

object-streaming

v0.3.1

Published

A type-safe framework for stream processing of objects in TypeScript, useful for efficiently processing streams of data such as reading from a database using a cursor.

Downloads

9

Readme

Type-safe Object Streaming

A type-safe framework for stream processing of objects in TypeScript, useful for efficiently processing streams of data such as reading from a database using a cursor.

The framework provides a number of useful built-in streams, and you can easily create your own.

Installation

$ npm install --save object-streaming

Usage

These examples are written in TypeScript, but this module is also completely compatible with plain JavaScript - essentially just remove any type definitions.

Here's a stream that processes numbers:

let strm = source<number>();

strm
  .pipe(filter((x: number) => x % 7 !== 0))
  .pipe(map((x: number) => x * 2))
  .pipe(batch({maxItems: 3, idleTimeout: 100}))
  .pipe(forEach((array: number[]) => {
    console.log('Batched array length:', array.length);
  }))
  .pipe(spread())
  .pipe(branch(
    (x: number) => x % 10 === 0,
    forEach((x: number) => console.log(`[${x}]`))
  ))
  .pipe(forEach((x: number) => console.log(x)));



for (let i = 0; i < 20; i++) {
  strm.input(i);
}

// Output:
// Batched array length: 3
// 2
// 4
// 6
// Batched array length: 3
// 8
// [10]
// 12
// Batched array length: 3
// 16
// 18
// [20]
// Batched array length: 3
// 22
// 24
// 26
// Batched array length: 3
// [30]
// 32
// 34
// Batched array length: 2
// 36
// 38

Concepts

Complex processing streams can be constructed by combining multiple simple stream objects. Stream objects define:

  • An input type
  • An input method
  • An output type
  • An output method
  • A pipe method

A stream accepts an object when input is called, does some processing on it, and calls output with the results of the processing. A stream can output objects to zero or more streams. strmA.pipe(strmB) will cause all outputted objects of strmA to be inputted into strmB; pipe will then return strmB so that strmB's output can be piped into something else. This allows for a clean method-chaining API.

Since complex streams are made up of multiple stream objects, the entry and exit stream objects will be different. Therefore, in the following code, strm will be a reference to the last stream object, not the first:

let strm = source<number>()
   .pipe(/* some other stream */)
   .pipe(forEach((x: number) => console.log(x))); // <- 'strm' has a reference to the stream created by this forEach(), not source()

If you need both the entry and exit streams, define the entry stream first, then compose the composite stream:

let entryStrm = source<number>();

let exitStrm = entryStrm
  .pipe(/* ... */)
  .pipe(/* ... */)
  .pipe(/* ... */)
  .pipe(/* ... */);

// Later in the code...

exitStrm.pipe(forEach(x => console.log(x)));
entryStrm.input('xyz');

More conveniently, you can use the stream utility function to return a single Stream object, which takes a function with a single argument of the entry (source) stream.

let strm = stream<number,string>(src => src
  .pipe(/* ... */)
  .pipe(/* ... */)
  .pipe(/* ... */)
  .pipe(/* ... */)
);

strm.pipe(forEach(x => console.log(x)));
strm.input('xyz');

Built-In Utility Streams

forEach(callback)

For each item that passes through the stream, call the callback, and then pass the item on to the next stream:

strm
  .pipe(forEach((item: SomeClass) => {
	item.foo = true;
    item.bar();
  });

map(callback)

Map each item that passes through the stream using the callback, and pass it onto the next stream:

strm
  .pipe(map((num: number) => new String(num)))

filter(callback)

Pass each item into the callback; if it returns true, pass it onto the next stream, otherwise drop the item:

strm
   .pipe(filter((item: SomeClass) => item.isFlagChecked()))

branch(callback, altStream)

Pass each item into the callback; if it returns true, pass it onto the alternate stream, otherwise pass it onto the next stream:

strm
  .pipe(branch(
	(item: SomeClass) => true,
	forEach((item: SomeClass) => console.log(item.foo))
  ))
  .pipe(forEach((item: SomeClass) => {
	// This will never run because the branch callback always returns true!
  }));

split(...streams)

Each input item is passed to all the given streams, and then passed to the next stream:


strm
  .pipe(split(
	forEach((num: number) => console.log('A', num))
	forEach((num: number) => console.log('B', num))
	forEach((num: number) => console.log('C', num))
  ))
  .pipe(forEach((num: number) => console.log(num)))

strm.input(123)

// Outputs:
// A 123
// B 123
// C 123
// 123

merge(...streams)

Each item outputted by the given streams is passed to the next stream:

let strmA = source<number>(),
	strmB = source<number>();

merge(strmA, strmB)
  .pipe(forEach((num: number) => console.log(num)))

strmA.input(123);
strmB.input(456);

// Output:
// 123
// 456

merge can also be used in-line:

let strmA = source<number>(),
	strmB = source<number>(),
	strmC = source<number>();

strmA
  .pipe(map((num: number) => num * 2))
  .pipe(merge(strmA, strmB))
  .pipe(forEach((num: number) => console.log(num)));

strmA.input(123);
strmA.input(456);
strmA.input(789);

// Output:
// 246
// 456
// 789

batch(options)

Groups together multiple items into an array:

strm
  .pipe(batch({maxItems: 4, idleTimeout: 100}))
  .pipe(forEach((nums: number[]) => console.log(nums)))

for (let i = 0; i < 10; i++) {
  strm.input(i);
}

// Output:
// [0, 1, 2, 3]
// [4, 5, 6, 7]
// [8, 9]

The options affect the behaviour of the batcher:

  • maxItems: number - when the number of items in the batch reaches maxItems, then emit the batch
  • idleTimeout: number - after receiving an item, when no new items have been received for the duration of the idle timeout (in milliseconds), emit the batch
  • delayTimeout: number - after receiving the first item into an empty batch, set a timer for the specified duration (in milliseconds); when the timer fires, emit the batch with all the items collected in that time

idleTimeout and delayTimeout can't be used together.

When no options are provided, it defaults to idleTimeout = 0, which emits the batch on the next event loop iteration.

spread()

Takes an array an passes each item onto the next stream:

let strm = source<number[]>();

strm
  .pipe(spread())
  .pipe(forEach((num: number) => console.log(num)));

Custom Streams

There are two main classes for defining custom streams: SourceStream<T> and Stream<I,O>.

SourceStream is used when a class only outputs objects. Call this.output to emit an object:

import { SourceStream } from 'object-stream';

class BasicClockStream extends SourceStream<number> {
  constructor() {
    super();

    setInterval(() => this.output(Date.now()), 1000);
  }
}

let clockStrm = new BasicClockStream();

clockStrm.pipe(/* another stream that accepts 'number' */);

Stream is used when to build stream processing classes that take an input and emit an output. It must define a public input method which accepts a single argument of the specified input type:

import { Stream } from 'object-stream';

class IntParseStream extends Stream<string,number> {
  public input(obj: string) {
    if (/\d+/.test(obj)) {
      this.output(parseInt(obj));
    }
  }
}

let ips = new IntParseStream();

ips
  .pipe(forEach((num: number) => console.log(num)));

ips.input('123');
ips.input('abc');
ips.input('456');

// Output:
// 123
// 456