npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

object-streaming-tools

v1.4.0

Published

Helper functions to simplify creating and concatenating object streams in NodeJs

Downloads

9

Readme

Object Streaming Tools

Helper functions to simplify creating and concatenating object streams in NodeJS

Motivation

Writing NodeJs streams can be quite challenging. While I myself have create snippets in my IDE to make my life easier when creating them, frequently I found myself writing the same code over and over again. Since I am lazy, I don't really like working this way. And frankly, the German in me simply wanted - nay demanded! - more efficient, DRYer code.

Looking for a solution early 2016, I first explored RxJs3 . While I very much appreciated the beauty of that project's approach, it seemed overkill for what I needed. And when I noticed the significant differences between version 3 and the then up-and-coming version 4, I decided to take another path. I also looked at Highland.js, which is very similar in its approach to our goals here, but was not quit there yet, when we started this.

For the enterprise level application I was designing for and working on with a team at Copperleaf Technologies, I cooked up my first few helper tools that we then continued to develop as a team throughout the year.

At version 1.0, this library was at the state we shipped it with that application in May 2017. Copperleaf Technologies has graciously allowed me to take ownership of the project, so here it is.

I hope some of you will find it useful.

Basics


start with anything

const just = require( 'object-streaming-tools/lib/just' );

just( 'Hello World!' )
  .on( 'data', console.log );

// output:
// Hello World!

iterate

const fromArray = require( 'object-streaming-tools/lib/fromArray' );

fromArray( [ 1, 2, 3 ] )
  .on( 'data', console.log );

// output:
// 1
// 2
// 3

iterate with the spread (...) operator

const just = require( 'object-streaming-tools/lib/just' );

just( ...[ 1, 2, 3 ] )
  .on( 'data', console.log );

// output:
// 1
// 2
// 3

streamify non-streams

when using async functions

const just  = require( 'object-streaming-tools/lib/just' );
const apply = require( 'object-streaming-tools/lib/apply' );

function log( s, next ){

  console.log( s );
  this.emit( 'bar', 'THIS IS SPARTA!' ); // 'this' context is the apply stream
  setImmediate( next, null, s );

}

just( 'foo' )
  .pipe( apply( log ) )
  .on( 'bar', console.log )
  .resume();

// output:
// foo
// THIS IS SPARTA!

when using synchronous functions

Note: Just demonstrating here a technique to achieve this using asyncify from the async library, which is also used internally.

const range    = require( 'object-streaming-tools/lib/range' );
const apply    = require( 'object-streaming-tools/lib/apply' );
const asyncify = require( 'async/asyncify' );

range( 1, 3 )
  .pipe( apply( asyncify( console.log ) ) )
  .resume();

// output:
// 1
// 2
// 3

filter

const just     = require( 'object-streaming-tools/lib/just' );
const filter   = require( 'object-streaming-tools/lib/filter' );
const asyncify = require( 'async/asyncify' );

just( ...[ 0, 1, 2, 3 ] )
  .pipe( filter( asyncify( ( x )=>x >= 2 ) ) )
  .on( 'data', console.log );

// output:
// 2
// 3

// Get rejected items
just( ...[ 0, 1, 2, 3 ] )
  .pipe( filter( asyncify( ( x )=>x >= 2 ) ) )
  .on( filter.RejectedEventKey, console.log )
  .resume();

// output:
// 0
// 1

get a range of numbers

const range = require( 'object-streaming-tools/lib/range' );

range( 1, 3 )
  .on( 'data', console.log );

// output:
// 1
// 2
// 3

iterate over object properties

const just  = require( 'object-streaming-tools/lib/just' );
const forIn = require( 'object-streaming-tools/lib/forIn' );

just( { foo: 'bar' } )
  .pipe( forIn() )
  .on( 'data', ( { key, value } )=>console.log( key, value )  );

// output:
// foo bar

group items in a list by a key's values

by using the key's identity

const just     = require( 'object-streaming-tools/lib/just' );
const apply    = require( 'object-streaming-tools/lib/apply' );
const asyncify = require( 'async/asyncify' );
const keyBy    = require( 'object-streaming-tools/lib/keyBy' );

just( ...[ { foo: 'bar' }, { foo: 'baz' } ] )
  .pipe( keyBy( 'foo' ) )
  .pipe( apply( asyncify( JSON.stringify ) ) )
  .pipe( apply( asyncify( console.log ) ) )
  .resume();

// output:
// { "bar": { "foo": "bar" }, "baz": { "foo": "baz" } }

by using a function to calculate the key

const just     = require( 'object-streaming-tools/lib/just' );
const keyBy    = require( 'object-streaming-tools/lib/keyBy' );

just( ...[ { foo: 'bar'  }, { foo: 'baz' } ] )
  .pipe( keyBy( ( { foo } )=>foo ) )
  .on( 'data' , ( result )=>console.log( JSON.stringify( result ) ) );

// output:
// { "bar": { "foo": "bar" }, "baz": { "foo": "baz" } }

Note: Both approaches above demonstrate various techniques to achieve the same result. Neither technique is meant to be prescriptive.


emit values of an object

const just  = require( 'object-streaming-tools/lib/just' );
const values = require( 'object-streaming-tools/lib/forIn' );

just( { foo: 'bar' } )
  .pipe( values() )
  .on( 'data', console.log );

// output:
// bar

switch things up

const range    = require( 'object-streaming-tools/lib/range' );
const switchBy = require( 'object-streaming-tools/lib/switchBy' );
const asyncify = require( 'async/asyncify' );

const lookup = [ 'one', 'three', 'five' ];

const forTrue  = { ifMatches: true,  thenDo: asyncify( x=>lookup[ x ] ) };
const forFalse = { ifMatches: false, thenDo: asyncify( x=>x ) };

range( 1, 5 )
  .pipe( switchBy( asyncify( x=>!!x % 2 ), [ forTrue, forFalse ] ) )
  .on( 'data', console.log );

// output:
// one
// 2
// three
// 4
// five

start from a callback

const fs           = require( 'fs-extra' );
const fromCallback = require( 'object-streaming-tools/lib/fromCallback' );

fromCallback( fs.readJson.bind( null, 'list.json') )
  .pipe( flatten() )
  .on( 'data', console.log;

// output, given the file contents of 'list.json' => [ "foo", "bar", "baz" ]:
// foo bar baz

emit arrays of a specified length

const items = [1, 2, 3, 4, 5, 6];

just(...items)
  .pipe( asLengthLimitedArrays( 4 ) )
  .on( 'data', console.log );

// output:
// [ 1, 2, 3, 4 ]
// [ 5, 6 ]

creates a slice of the stream starting from start index and up to, but not including, end index

end defaults to Infinity

const items = ['val1', 'val2', 'val3', 'val4', 'val5', 'val6'];
const start = 2;
just(...items)
  .pipe( emitRange( start ) )
  .on( 'data', console.log );

// output:
// val3
// val4
// val5
// val6

from start to end

const items = ['val1', 'val2', 'val3', 'val4', 'val5', 'val6'];
const start = 2;
const end = 5;
just(...items)
  .pipe( emitRange( start, end ) )
  .on( 'data', console.log );

// output:
// val3
// val4

emit only unique items in a stream

unique

const items = [1, 2, 3, 4, 4, 5];
just(...items)
  .pipe( unique() )
  .on( 'data', console.log );

// output:
// 1
// 2
// 3
// 4
// 5

uniqueBy

via a string iteratee
const items = [{id: 'foo'}, {id: 'bar'}, {id: 'foo'}];
const attributeName = 'id'
just(...items)
  .pipe( uniqueBy( attributeName ) )
  .on( 'data', console.log );

// output:
// {id: 'foo'}
// {id: 'bar'}
via an iteratee function
const items = [2.1, 1.2, 2.3];
just(...items)
  .pipe( uniqueBy( Math.floor ) )
  .on( 'data', console.log );

// output:
// 2.1
// 1.2

TBC