npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

streamz

v1.8.12

Published

A Swiss-army-knife of a stream

Downloads

49,490

Readme

Streamz - generic all-purpose stream for an object pipeline.

The native stream.Transform does not provide concurrent operations out of the box or multiple incoming pipes going into a single transform. Streamz is a lightweight wrapper around the Transform stream:

  • Executes custom functions (i.e. writes, transforms, filters, pushes) concurrently up to a user defined cap.
  • Tracks outstanding asynchronous functions by waiting for callbacks or resolved promises
  • Keeps count of incoming pipes as they are initiated and reduces count when they end
  • Issues 'end' only when all incoming pipes have ended
  • Issues 'finish' only when all incoming pipes have ended and any concurrent functions have finished
  • Provides downstream pipes in all instances. If nothing is "pushed", the downstream pipe will only receive the end event (at the right time).
  • Passes errors down to the first listener in the chain
  • Provides an easy transition to final promise that is resolved on finish and rejected on error

The stream initiation is as follows (can be called with or without new):

streamz(fn,[options]);

// alternative signature for specifying maximum concurrency:
streamz(concurrency,fn,[options])

The user-supplied function (fn) is executed for each incoming object (either written directly with .write or piped from above). The function must have either one or two arguments. The first argument represents each incoming data packet and the second one is an optional callback. The function can return nothing, a value or a promise.

The function can execute multiple manual pushes (this.push(data)) to pass any objects downstream. If the function returns a value (!= undefined and not a promise), that value is automatically pushed. If the function returns a promise that ultimately returns a non-undefined value, that final value is pushed too. And finally if (optionally) a callback returns a value that gets pushed as well. Basically any non-undefined returns through function results, callbacks or promise results get automatically pushed. Therefore if you want to only manually push from inside the function make sure that any return values (or resolves) show up as undefined.

When defining a custom function you can chose between a static function, a function with a callback and a function that returns a Promise. The static function is considered immediately resolved upon execution, whereas the others are considered asynchronous. However, a custom function can also utilize a callback and a promise at the same time - in which case the callback signals when the stream is ready to receive next packet and the promise fulfillment signals when the processing is done. This allows concurrency management linked to connection pools (i.e. fire cb when a pool issued an available connection) while the promise doesn't get resolved until the corresponding database call has completed. Using this approach, a stream will not finish until outstanding promises have resolved - in addition to any callbacks.

Options are passed on to the parent Transform stream, however objectMode is explicitly set as true. By default, the execution of user-supplied function is sequential (i.e. only one function runs at a time). This can be changed by specifying the maximum concurrent functions through a concurrency property in the options (defaults to 1). Keep in mind that with concurrent cap above one, the order of the outputs might be different from the inputs. Dynamic concurrency can be achieved by defining concurrency as a function that returns an integer. The function will, in this case, be executed every time concurrency has to be evalued.

Concurrency can also be defined through the alternative function signature, where the maximum concurrency is the first argument, fn as the second and options as the third.

If you specify the option keepAlive: true, the streamz object will need an extra .end() to close. This prevents accidental closing when piping multiple streams with uncertain timings (including periods of no open streams) into a streamz object.

You can also specify a catch handler in options that is going to catch any errors from the user-supplied-function. If the handler errors as well (or returns a Promise rejection) the error will be emitted. If the handler does not error the stream will continue as if no error occured. This makes it easy to silence errors or put retry mechanisms in place without having to wrap everything within try-catch (especially when dealing with async functions). In addition to receiving the error, the catch handler receives the data that caused the error as a second argument.

Example of silencing errors from await inside the function

  .pipe(streamz(async function(url) {
    return await requestAsync(url);
  },{
  	catch: (e,url) => console.log(`error ${e.message} fetching url ${url}`);
  }))

Streamz supports self.writes, i.e. if you this.write(data) within the custom function, the data will be processed by the streamz component (recursively) even if the inbound stream has ended. This can be useful for scrapers that need to traverse through pages etc.

As with vanilla node streams, a custom _flush() function can be defined to handle any remaining buffers after all written data has been consumed. The flush can be defined on the fly with flush property in options.

Here is an example of a stream that buffers:

streamz(function(d) {
  this.buffer = this.buffer || [];
  this.buffer.push(d);
},{
  flush : function(cb) {
    this.push(this.buffer);
    cb();
  }
})

Any errors that come up in a streamz object are passed to the children if no custom error listener has been defined. This allows errors to propagate down to the first error listener or to the rejection of a final promise (if the chain ends in .promise())

A chain that ends with .promise() returns a promise that collects any data passed down in an array and resolves when the stream is finished or is rejected if any uncaught error occured. If no data is passed down to the end, the promise simply resolves to an empty array.