npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

pipedreams-3b7b

v6.3.2

Published

This is a transitional package used to provide backward compatibility; use pipedreams instead

Downloads

43

Readme

Disclaimer PipeDreams v4 should be considered alpha-level software—expect bugs and API changes.

PipeDreams

npm version Build Status stability-almost stable slogan-NodeJS streams made rad easy

Install as npm install --save pipedreams.

Der Pfeifenraucher

Table of Contents generated with DocToc

Caveat Below examples are all written in CoffeeScript.

PipeDreams

Require Statement

The suggested way to require the PipeDreams library itself and to factor out the most important methods for convenience is as follows:

D               = require 'pipedreams'
{ $, $async }   = D

If you don't like dollar signs in your code or already use $ for something else, you're of course free to D.$ or fall back to the long name, remit:

D                       = require 'pipedreams'
{ remit, remit_async }  = D

In the below, I will assume you required PipeDreams the first way, above.

Streams are Transforms, Transforms are Streams

In the PipeDreams world, write streams can appear anywhere in the pipeline, just like read streams; also, streams and transforms are no different.

How can read streams, write streams and stream transforms be the same? After all, you'd write your typical streamy app approximately like this:

input   = fs.createReadStream   'foo.txt'
output  = fs.createWriteStream  'bar.txt'
input
  .pipe get_transform_A()
  .pipe get_transform_B()
  .pipe get_transform_C()
  .pipe output

In this view, clearly, a read-stream is a source of data—something that pushes data into the stream; a write-stream is a sink—something that accepts data from the stream; and a transform—well, a transform takes data, does something with it, and passes it on. In other words, a transform acts like a write-stream on its 'upper' end (), and acts like a read-stream on its lower end.

In the typical NodeJS way of doing things, you can't just go on with the pipeline after a write-stream; this would be illegal:

# won't work
input   = fs.createReadStream 'foo.txt'
input
  .pipe get_transform_A()
  .pipe fs.createWriteStream 'bar.txt'
  .pipe get_transform_B()

Trying to read from a NodeJS write-stream will elicit a dry Cannot pipe. Not readable. complaint from the engine. After all, this is a write-stream, right, so what should you want to read from it, right? Wrong! Consider this simple setup:

# won't work
fs.createReadStream 'foo.txt'
  .pipe fs.createWriteStream 'copy-1.txt'
  .pipe fs.createWriteStream 'copy-2.txt'
  .pipe fs.createWriteStream 'copy-3.txt'

Isn't it quite obvious that the only sensible course of action here is to A) read from foo.txt and B) copy those bytes to all of copy-1.txt, copy-2.txt, copy-3.txt? Why not? Turns out you can easily achieve the above with PipeDreams:

# works!
D = require 'pipedreams'
D.new_stream 'read', file: 'foo.txt'
  .pipe D.new_stream 'write', file: 'copy-1.txt'
  .pipe D.new_stream 'write', file: 'copy-2.txt'
  .pipe D.new_stream 'write', file: 'copy-3.txt'

When to Call it a Day: Always Use $ 'finish'

Given the asynchronous nature of NodeJS' I/O handling, stream end detection can be a fickle thing and hard to get right. For example, when writing into a file, one might be tempted to wait for an end event:

### TAINT Counter-example; don't do it this way ###
write_sample = ( handler ) =>
  input     = D.new_stream()
  output    = D.new_stream 'write', 'lines', { file: path_1, }
  pipeline  = input
    .pipe D.$show()
    .pipe output
  #.......................................................................................................
  pipeline.on 'end', handler
  #.......................................................................................................
  D.send input, data for data in [ 'foo', 'bar', 'baz', ]
  D.end input

Stress tests have shown this pattern to produce a certain percentage of failures (1 in 10, but that might depend on details of the writing process).

On the other hand, the pattern below passes tests; here, we use the PipeDreams $ 'finish' transform:

write_sample = ( handler ) =>
  input   = D.new_stream()
  output  = D.new_stream 'write', 'lines', { file: path_1, }
  input
    .pipe D.$show()
    .pipe output
    .pipe $ 'finish', handler
  #.......................................................................................................
  D.send input, data for data in [ 'foo', 'bar', 'baz', ]
  D.end input

Never Assume Your Streams to be Synchronous

As a general note that users should keep in mind, please observe that no guarantee is made that any given stream works in a synchronous manner. More specifically and with regard to the most typical usage pattern: never deal with pipelined data 'right below' the pipeline definition, always do that from inside a stream transform.

Here's an example from src/tests.coffee: we create a stream, define a pipeline to split the text into lines and collect those lines into a list; then, we write a multi-line string to it and end the stream. When we now look at what's ended up in the collector, we find that the last line is missing. This may come as a surprise, since nothing in the code suggests that the thing should not work in a simple top-down manner:

@[ "(v4) new_stream_from_text doesn't work synchronously" ] = ( T, done ) ->
  collector = []
  input     = D.new_stream()
  input
    .pipe D.$split()
    .pipe $ ( line, send ) =>
      send line
      collector.push line
  input.write "first line\nsecond line"
  input.end()
  T.eq collector, [ "first line", ] # <-- we're missing the last line here
  done()

Update: The above code is no longer valid and has been removed; however, the following code is still valid

In order for the code to meet expectations, remember to always grab your results from within a stream transform; commonly, this is done with $ 'finish':

@[ "(v4) new_stream_from_text (2)" ] = ( T, done ) ->
  collector = []
  input     = D.new_stream()
  input
    .pipe D.$split()
    .pipe $ ( line, send, end ) =>
      if line?
        send line
        collector.push line
      if end?
        T.eq collector, [ "first line", "second line", ]
        end()
    .pipe $ 'finish', handler
  input.write "first line\nsecond line"
  input.end()

Stream Creation

PipeDreams simplifies and unifies most common stream creation tasks by providing a (fairly) easy-to-use and flexible interface via its new_stream methods. For the future, it is planned to make that API extensible with plug-ins.

The simplest possible use is to call s = D.new_stream() without any arguments, which will give you a through2 stream that you can use as source, as sink or as transform. Here is a very simple example, a function that accepts a callback handler (because all streams are assumed to be asynchronous); it constructs a strem named input, writes two strings to it, and ends it. The pipeline is set up with three PipeDreams API calls: the first to split (and, actually re-join) the data events into lines (sans newline characters); the second to print out data events, and the last one to detect the stream's finish event and call the callback:

f = ( done ) ->
  input = D.new_stream()
  input
    .pipe D.$split()
    .pipe D.$show()
    .pipe $ 'finish', done
  input.write "helo\nworld"
  input.write "!"
  input.end()
  return null

This ultra-simple streaming will do nothing but print out:

'helo'
'world!'

Throughstreams can appear in any role in a stream pipeline: as sources, throughputs, or sinks of data. To demonstrate that, let's modify the above a little (rpr being a helper derived from util.inspect):

f = ( done ) ->
  input   = D.new_stream()
  thruput = D.new_stream()
  output  = D.new_stream()
  #.........................................................................................................
  input
    .pipe D.$split()
    .pipe thruput
    .pipe output
    .pipe $ 'finish', done
  #.........................................................................................................
  thruput
    .pipe $ ( data ) -> log 'thruput', rpr data
  #.........................................................................................................
  output
    .pipe $ ( data ) -> log 'output', rpr data
  #.........................................................................................................
  input.write "helo\nworld"
  input.write "!"
  input.end()
  return null

The only surprise here is the somewhat weird ordering of what lines this function prints to the console:

output 'helo'
thruput 'helo'
output 'world!'
thruput 'world!'
thruput null
output null

One might have expected the text from thruput to come before that from ouput, but the general rule is: do not rely on a specific ordering of events across different stream transform; only the preservation of order within a given transform is guaranteed.

Stream Creation API

D.new_stream has a somewhat unusual call signature; its general format is (if you excuse my failed attempt at writing a sort-of BNF):

D.new_stream [ hints... ], [ kind: seed ], [ settings ]

All parts are optional. hints consists of zero or more 'keywords', 'tags' or 'flags'—single-word strings that switch various behaviors of the stream on or off. hints are followed by an (also optional) single-element key/value object whose key specifies the kind of stream to be built; settings is another object that provides space for more stream-specific settings.

There are currently 6 'kinds' of streams that new_stream can return; the simplest one—a featureless throughstream returned when new_stream is called without arguments—we have already encountered.

The most useful ones are file and pipeline streams. A file stream reads from or writes to a file on disk, and PipeDreams new_stream acts as an interface to NodeJS' fs.createReadStream and fs.createWriteStream, as the case may be. The minimal file stream is created with a 'kind' of 'file' (or, equivalently, 'path'—pick one), and a 'seed' that specifies the file's location. Without any hints, you get a file read stream that emits buffers, so for example

input = D.new_stream file: '/tmp/foo.txt'
input.pipe D.$show()

might spit out <Buffer 68 65 6c 6f 20 77 6f 72 6c 64 0a c3 a4 ...> or something similar (should the file indeed exists). If you

  • 'ascii'—for 7-bit ASCII data only. This encoding method is very fast and will strip the high bit if set.

  • 'utf-8', 'utf8'—Multibyte encoded Unicode characters.

  • 'utf16le'—2 or 4 bytes, little-endian encoded Unicode characters. Surrogate pairs (U+10000 to U+10FFFF) are supported.

  • 'ucs2'—Alias of 'utf16le'.

  • 'base64'—Base64 string encoding. When creating a buffer from a string, this encoding will also correctly accept "URL and Filename Safe Alphabet" as specified in RFC 4648, Section 5.

  • 'binary'—A way of encoding the buffer into a one-byte (latin-1) encoded string. The string 'latin-1' is not supported. Instead, pass 'binary' to use 'latin-1' encoding.

  • 'hex'—Encode each byte as two hexadecimal characters.

path = '/tmp/foo.txt'
D.new_stream file: path
D.new_stream 'read', file: path
D.new_stream 'utf-8', { path, }
D.new_stream 'read', 'lines', { path, }
D.new_stream { path, }
D.new_stream 'write', file: path
D.new_stream 'write', 'lines', { file: '/tmp/foo.txt', }
  • 'file' / 'path'
  • 'pipeline'
  • 'text'
  • 'url'
  • 'transform'
  • 'duplex'
@new_stream = ( P... ) ->

D.new_stream()
D.new_stream pipeline: [ transform, transform, ..., ]
D.new_stream text: "helo world"

D.new_stream           file: "/tmp/foo.txt"
D.new_stream 'read',   file: "/tmp/foo.txt"
D.new_stream 'write',  file: "/tmp/foo.txt"
D.new_stream 'append', file: "/tmp/foo.txt"

D.new_stream           path: "/tmp/foo.txt"
D.new_stream 'read',   path: "/tmp/foo.txt"
D.new_stream 'write',  path: "/tmp/foo.txt"
D.new_stream 'append', path: "/tmp/foo.txt"

D.new_stream           url: "/tmp/foo.txt"
D.new_stream 'read',   url: "/tmp/foo.txt"
D.new_stream 'write',  url: "/tmp/foo.txt"
D.new_stream 'append', url: "/tmp/foo.txt"

D.new_stream 'devnull'

D.new_stream hints* { kind : locator, }

Examples:

source = D.new_stream()

source = D.new_stream pipeline: [ $square(), ( $multiply 2 ), ]

pipeline  = [ $square(), ( $multiply 2 ), ]
source    = D.new_stream { pipeline, }

path    = '../package.json'
input   = D.new_stream { path, }
output  = D.new_stream 'write', path: '/tmp/foo.txt'

Kinds:

text
pipeline
file
path
url

Hints:

'utf-8' / 'utf8'
'binary'
'read'
'write'
'append'
'devnull'

The Remit and Remit-Async Methods

The remit method (as well as its asynchronous companion, remit_async) is very much the centerpiece of the PipeDreams API¹. It accepts a function (call it a 'transformation') and returns a stream transform. In case you're familiar with the event-stream way of doing things, then PipeDreams' remit f is roughly equivalent to event-stream's through on_data, on_end, except you can handle both the on_data and on_end parts in a single function f. remit_async f is roughly equivalent to event-stream's map f.

The behavior of a stream transform is governed by the number of arguments of the transformation function and the optional 'null' tag.

You can call remit ($) with a function that takes one, two, or three arguments, and remit_async ($async) with a function that takes two, or three arguments. The suggested names of the arguments are data (or whatever fits the purpose best) for the first, send for the second, and end for the third argument, where present.

You use send to send data down the stream; when not present in the signature, the data is sent for you; where present, you may call it any number of times to send an arbitrary number of items.

Where the end parameter is present, you must explicitly call end() to end the stream; otherwise, it will remain open indefinitely. Either data or end—but not both—will be null on any given event. More precisely, the stream transform will be called once for each upstream data event (if any) with data != null, and then once with data == null and end != null upon stream end. The pattern to use is always:

stream
  .pipe <...>
  .pipe $ ( data, send, end ) =>
    if data?
      <...>
    if end? # you could use a simple `else` here, but explicit is better than implicit
      <...>
      end()
  .pipe <...>

Where end is not given, data will never be null, except where the 'null' tag is used, as in $ 'null', ( data ) =>. Checking data for null in this kind of transforms allows you to observe the end-of-stream event without having to bother calling it yourself.

Where $async is used, you may call send data any number of times, but you must call send.done() (or send.done data) exactly once to signal completion of your asynchronous transform.

In a nutshell, you have the following options:

┌─────────────────────────────────────────┬─────────────┬─────────────┬─────────────┐
│  signature                              │ data  may   │ must call   │  must call  │
│                                         │  be null    │   end()     │ send.done() │
├─────────────────────────────────────────┼─────────────┼─────────────┼─────────────┼─────────────┐
│  $              ( data            ) ->  │             │             │             │ observers   │
│  $      'null', ( data            ) ->  │      ●      │             │             │             │
├─────────────────────────────────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│  $              ( data, send      ) ->  │             │             │             │             │
│  $      'null', ( data, send      ) ->  │      ●      │             │             │             │
│  $              ( data, send, end ) ->  │      ●      │      ●      │             │ transforms  │
├─────────────────────────────────────────┼─────────────┼─────────────┼─────────────┤             │
│  $async         ( data, send, end ) ->  │      ●      │      ●      │      ●      │             │
└─────────────────────────────────────────┴─────────────┴─────────────┴─────────────┴─────────────┘

You can also use send.end() to end the stream at any point in time and send.error new Error "message" to signal an error.

(Synchronous) Stream Observer

When calling $ with a function that takes a single argument, you get back an Observer, that is, a transform that gets all the data events passed in, but can't send any; in a manner of speaking, an observer is a 'transformation-less transform' (note, however, that in case events are implemented as a mutable object, such as a list or a plain old object, an observer can still mutate that event). The observer will be called once more with data set to null when the stream is about to end:

$ ( data ) ->

You can use that idiom 'inline', i.e. right within the pipeline notation:

input = ( require 'fs' ).createReadStream()
input
  .pipe D.$split()      # convert buffer chunks into single-line strings
  .pipe $ 'null', ( data ) ->
    if data? then console.log "received event:", data
    else          console.log "stream has ended"
  .pipe output

However, for any but the most one-off, simple purposes, you'll probably want a named function; it is customary to write the transform as a factory function that must get called once when being entered into the pipeline.

To denote the special status of a stream transform factory—a function that is a 'factory for potentially stateful transforms that only makes sense when being called as argument to a .pipe call within a stream pipeline' (iknowiknow, that's a mouthful)—it is also customary to prefix the name with a $ (dollar sign).

Since $show, below, is such a factory function, the transform that it returns may hold private state within the closure, an immensely useful technique:

f = ( done ) ->
  $show = ->
    return $ ( data ) ->
      console.log "received data:", data

  $count = ->
    count = 0
    return $ 'null', ( data ) ->
      if data? then count += +1
      else          console.log "stream has ended; read #{count} events"

  input = D.new_stream()
  input
    .pipe D.$split()      # Convert buffer chunks into single-line strings.
    .pipe $show()
    .pipe $count()
    .pipe $ 'finish', done

  D.send input, """
    Here we write
    some lines of text
    into the stream.
    """
  D.end  input # don't forget to end the input stream

In case you were wondering, $split() is a useful convenience method to turn a file readstream—which, in the absence of an encoding argument, will consist of a series of NodeJS Buffer objects–into a series of strings, each one representing one line (without the trailing newline) of the source. UTF-8 encoding is silently assumed.

Note: If you inadvertently forget to stick that remit call in front of your transformation function, you'll get an obscure error message: Cannot read property 'on' of undefined. Just try to remember that this symptom is (often) caused by an omitted remit / $.

Synchronous Transform, No Stream End Detection

When calling $ with a function that takes two arguments, you get back a Synchronous Transform. A synchronous transform receives data events and may send on as many events as it wants—zero or a thousand. The next transform in the pipeline will be called no sooner than the transform exits, whether it has called send in the process or not. In this variant, you can rely on data to never be null:

$ ( data, send ) -> ...

An example for this form is shown in the upcoming section.

Synchronous Transform With Stream End Detection

A Synchronous Transform with End Detection will be called once for each data item and once when the stream is about to end. It is returned by $ when being called with a function that takes three arguments:

$ ( data, send, end ) -> ...

When the transformation eventually gets called from within the pipeline, its third argument (call it end) will be null, except when the the stream is about to end. When that happens, end is a function that must be called to end the stream. In other words, when you have end in your signature but forget to call it, the stream will hang on indefinitely. This can be useful in cases where used wisely, but has the power to bring down empires when done out of neglect.

Use synchronous transforms when you want to both mangle data as it passes by and aggregate data across the entire stream.

f = ( done ) ->
  $as_number = ->
    return $ ( data, send ) ->
      send parseFloat data

  $add = ( increment = 1 ) ->
    return $ ( n, send ) ->
      send n
      send n + increment

  $show = ->
    return $ ( data ) ->
      console.log "received data:", data

  input = D.new_stream()
  input
    .pipe D.$split()      # Convert into single-line strings.
    .pipe $as_number()
    .pipe $add 12
    .pipe D.$sort()
    .pipe $show()
    .pipe $ 'finish', done

  D.send input, "20\n10\n50\n40\n30\n"
  D.end  input

Output:

received data: 10
received data: 20
received data: 22
received data: 30
received data: 32
received data: 40
received data: 42
received data: 50
received data: 52
received data: 62

Asynchronous Transforms

Asynchronous Transforms are constructed in a very similar fashion to their synchronous counterparts, except you use $async (or, remit_async) in place of $ (or, remit); furthermore, there is no counterpart to the 'observer' call signature, so $async has to be called with a stream transformation that accepts eiter two or three arguments.

Asynchronous Transforms are suited for intermittent file and network reads. Since those can happen at an arbitrary time in the future, async stream transforms must always notify the pipeline when they've finished; to do this, there's a callback method tacked unto the send argument called send.done.

You can call send data as often as you like to, but you must call send.done() (or send.done data) whenever you're finished—otherwise the pipeline will hang on indefinitely:

$async ( data, send ) -> ...

An Asynchronous Transform with End Detection will be called once for each data item and once when the stream is ended, so again, be prepared for an empty stream where it is called once with data being null:

$async ( data, send, end ) -> ...

One-Off Tramsforms to Run at the Beginning or the Ending of a Stream

Often you need to perform some action exactly once when a stream has just started or just ended. In PipeDreams, this can be done by using one of the tags 'start', 'stop', 'first', 'last' or 'finish' with a remit call.

When you look at the many ways to 'remit with a tag' listed below, you may feel a little overwhelmed, but the API is in fact fairly straightforward:

  • with 'first' and 'last', you listen in for the first or the last data event of the stream (if any), respectively.

  • when a stream ends before any data item is sent (i.o.w. when you get an empty stream), then 'first' and 'last' transforms are only called (with null for the data argument) when the 'null' tag is present; when no 'null' tag has been given, data will never be null or undefined.

  • with 'start' and 'stop', you listen in for the start and the stop of the stream; a transform with 'start' will be called right before the first data event comes down the stream, and a transform with 'stop' will be called right after the last data event has come down the stream. Both transforms will be called once in any event, even when the stream happens to be empty.

  • $ 'finish', -> is much like $ 'stop', ( send ) ->, except that $ 'finish', -> is called after the stream has finished (at which time all participating transforms / streams in the pipeline should be positively done). Naturally, sending data into the stream makes no sense at that point in time, so a 'finish' transform must not accept any arguments.

  • Observe that the 'null' tag can only be used with 'first' and 'last'; since 'start', 'stop' and 'finish' transforms do never receive data events, using 'null' makes no sense there.

Call signatures:

  • $ 'first', ( data, send ) -> ...
  • $ 'first', 'null', ( data, send ) -> ...
  • $ 'last', ( data, send ) -> ...
  • $ 'last', 'null', ( data, send ) -> ...
  • $ 'start', ( send ) -> ...
  • $ 'stop', ( send ) -> ...
  • $ 'finish', -> ...

These methods are great to sneak in additional data right in front or right behind the first or last events. The null tag signals that whatever you have to add to the stream should be there in any event; as an example, imagine you want to build a JSON serialization of a list of items. In that case, you'd sneak in a left square bracket [ before the first and a right square bracket ] after the last list element, and in case no elements are passed through the stream at all, you still want the serialization to contain both brackets to obtain [], so clearly you'd go with the 'null' tag in this case.

@$ 'finish', @on_finish = ( stream, handler ) ->

The recommended way to detect write completion in a piped stream is to tack a .pipe $ 'finish', handler transform unto the end of your pipeline:

f = ( handler ) ->
  ...
  input
    .pipe $do_this()
    .pipe $do_that()
    .pipe output
    .pipe $ 'finish', handler

Alternatively, if you have an explicit ouput stream (say, output) in your pipeline, you can also call D.on_finish output, handler. Terminating stream processing from handlers for other events (e.g. 'end') and/or of other parts of the pipeline may lead to hard-to-find bugs. Observe that on_finish and $ 'finish' call handler only upon the following turn of the JavaScript event loop.

Note You should not attach anything in the pipeline after a $ 'finish' transform, since the behavior of such a transform is not well defined. When the finish event is fired, then all the stream components have already packed their bags and are ready to return home. The very moment that handler is called, the show is over, and the last batch of events may or may not make it to any given transform below $ 'finish'.

How to Send Null Without Ending the Stream

Had NodeJS streams be conceived at a point in time where JavaScript had already had Symbols, and if the more modern, broader view of 'streams of anything' (as opposed to the more narrow view that 'all streams are bytestreams'), chances are that using send null from within a stream transform would be just like sending any other value, and something like send Symbol.for 'end' would've been used to signal the stream's end.

But this is not how it happened; streams were conceived as a bytes-only thing, and JavaScript had no symbols at the time streams were added to NodeJS. As a result, some value other than a buffer had to be used, and the most natural choice was null—after all, null represents 'no value', and when you send data, you will never want to 'send no value' when you can instead 'not send a value', right? Turns out that's wrong. nulls are everywhere, like it or not: they appear in databases; they are a legal JSON value (I'd even claim: the root value of JSON); they appear in configuration files—nulls are everybody's favorite Nothing-Be-Here placeholders.

Let's say you have a stream of values that you want to construct a JSON file from (demo code; consider to use $as_json_list and/or $intersperse instead):

f = ( path, handler ) ->
  source  = D.new_stream()
  output  = D.new_stream 'write', { path, }
  source
    .pipe $ ( data, send ) => send ( JSON.stringify data ); send ','
    .pipe D.$on_start ( send ) => send '['
    .pipe D.$on_last ( data, send ) => send ']\n'
    .pipe output
    .pipe $ 'finish', handler
  #.........................................................................................................
  D.send  source, 42
  D.send  source, 'a string'
  # D.send  source, null # uncomment to test
  D.send  source, false
  D.end   source

calling f will duly print [42,"a string",false] into the file specified. However, trying the same after uncommenting the line that sends null into the stream will break with stream.push() after EOF; this is because sending null causes the stream to close down. Don't do that unless you want to end the stream.

There are two ways to tunnel null values through stream pipelines: One is to move on from simple data items to events; the other, to use a symbolic value in place of null data.

Yet another conceivable solution lies in rewriting a number of PipeDreams methods so they write a special value whenever they see null, and send null whenever they see the special value for the 'end stream' signal. At the time of this writing, I believe it's better to stick to established conventions; after all, PipeDreams mission is to lower the threshold, avoid surprises, and level the learning curve.

Using Events instead of Data Items

In my experience, writing 'something streamy' to process whatever data often starts out as an idea how to send transform_this data, then send transform_that data—as a sketch where those stream transforms all receive bits of raw business source data in a piecemeal fashion, and emit bits of business data in the targetted format. Quite often the result will look like the last code snippet, above: lots of simple, unnamed stream transforms whose definitions are put right in the middle of the pipeline.

There's nothing wrong with that approach, and the simplicity of it sure helps to get started. On the downside, sending raw business data doesn't scale very far; it tends to break down the very moment you realize that at some point in your stream you want to communicate facts that are not part of the business data itself, but belong to a meta level. It is then that moving from 'data items' to 'events' is appropriate.

'Using events' just means wrapping each piece of data into a container object. JavaScript's simplest container is the list (a.k.a. the Array type), so one way of wrapping data is to always send pairs [ event_name, event_value, ]. Here's what a rewritten stream transform might look like:

f = ( path, handler ) ->
  #.......................................................................................................
  $serialize = =>
    return $ ( event, send ) =>
      [ kind, value, ] = event
      return send event unless kind is 'data'
      send [ 'json', ( JSON.stringify value ), ]
  #.......................................................................................................
  $insert_delimiters = =>
    return $ ( event, send ) =>
      [ kind, value, ] = event
      send event
      return unless kind is 'json'
      send [ 'command', 'delimiter', ]
  #.......................................................................................................
  $start_list = => D.$on_start (        send ) => send [ 'command', 'start-list', ]
  $stop_list  = => D.$on_last  ( event, send ) => send [ 'command', 'stop-list',  ]
  #.......................................................................................................
  $as_text = =>
    return $ ( event, send ) =>
      [ kind, value, ] = event
      return send value     if kind is 'json'
      return send event unless kind is 'command'
      ### Here I take the liberty to insert newlines so as to render multi-line JSON: ###
      switch command = value
        when 'delimiter' then send ',\n'
        when 'start-list' then send '[\n'
        when 'stop-list'  then send '\n]\n'
        else send.error new Error "unknown command #{rpr command}"
      return null
  #.......................................................................................................
  source  = D.new_stream()
  output  = D.new_stream 'write', { path, }
  source
    .pipe $serialize()
    .pipe $insert_delimiters()
    .pipe $start_list()
    .pipe $stop_list()
    .pipe $as_text()
    .pipe output
    .pipe $ 'finish', handler
  #.........................................................................................................
  D.send  source, [ 'data', 42,         ]
  D.send  source, [ 'data', 'a string', ]
  D.send  source, [ 'data', null,       ]
  D.send  source, [ 'data', false,      ]
  D.end   source
#.........................................................................................................
f '/tmp/foo.json', ( error ) =>
  throw error if error?
  done()
#.........................................................................................................
return null

This is, admittedly, a lot of code for such a simple task; however, when applications get more complex, it often pays to deal with slightly more abstracted objects that carry richer semantics. Depending on circumstances, one might opt for { named: 'values', } instead of modelling events as [ 'tuples', ], as PODs afford more code self-documentation and extensibility. But even within the limits of this small example, the added complexity of the event-based approach does have some justification: nowhere in the code did we have to pay attention whether or not some piece of data is or is not null—wrapping all the data freed us from having to treat this special value in any special ways (we traded that with the obligation to deal with different kinds of events, to be sure).

Using a Symbolic Value for Null

If you don't want to wrap your data into event objects, an easy way to tunnel nulls through a pipeline is to replace it with some other value. If you know all your business data consists of wither chunks of text or nulls, you can any time just send a 0 (number zero) and still keep the distinction clear. Keeping things that simple, however, quickly breaks down as soon as the first user of your code does send series of numbers into the stream.

A much more robust solution is offered by JavaScript symbols. Symbols are a new primitive data type in JS; they have been specifically designed to be used everywhere where single values are needed that do not conflict with existing values or APIs. They come in two flavors: private and global.

A private symbol is created as d = Symbol 'xy', where 'xy' is a text of your own choosing; it is just used to identify the symbol, to give it some human-readable semantics. Each private symbol has its own identity, even when the same string is used for its creation, so ( Symbol 'A' ) != ( Symbol 'A' ) always holds. This makes private symbols great for lots of uses where a value of distinct identity—a singleton—is needed, a value that can not (or, depending on use, is pretty hard to) be reproduced from any part of code outside the very place where the original was instantiated.

On the other hand, a global symbol is created as d = Symbol.for 'xy'; the difference to private symbols is in the availability, as it were, of the corresponding value. For all code running within the same code context (i.e. normally the same process), ( Symbol a ) == ( Symbol b ) holds exactly when a === b (and both values are strings).

In other words, a stream transform that wants to check for the occurrence of a global symbol for a given string 'foo' can just compare stream data items by saying if data is Symbol.for 'foo'. If it wants to check for a private symbol 'foo', on the other hand, it must have access to some property of some object that holds a reference to that symbol.

In an effort to establish standard procedures to make dealing with null data items easier, the PipeDreams library provides a reference the global symbol for the string 'null' as D.NULL = Symbol.for 'null'; therefore, stream transforms are free to check for either if data is D.NULL or if data is Symbol.for 'null' with no difference in semantics whatsoever. The beauty of this approach: no 'ordinary / business data type' is used, but a 'meta data type' (so chances of accidental collisions are minimized), and that client code can insert and check for 'meta nulls' without having to specifically reference D.NULL. What's more, patterns like

if data is Symbol.for 'null' then send "found a null!"

are not only memorable to the writer, they're about as readable as programming languages can get.

Let's have a look at how to use the null symbol:

f = ( path, handler ) ->
  source  = D.new_stream()
  output  = D.new_stream 'write', { path, }
  source
    .pipe $ ( data, send ) => if data is Symbol.for 'null' then send 'null' else send JSON.stringify data
    .pipe $ ( data, send ) => send data; send ','
    .pipe D.$on_start (       send ) => send '['
    .pipe D.$on_last  ( data, send ) => send ']\n'
    .pipe output
    .pipe $ 'finish', handler
  #.........................................................................................................
  data_items = [ 42, 'a string', null, false, ]
  for data in data_items
    D.send source, if data is null then Symbol.for 'null' else data
  D.end source

TL;DR: Things to Keep in Mind

Never Assume a Stream to be Synchronous

Always Use $ 'finish' to Detect End of Stream

Never Use Null to Send, Unless You Want the Stream to End

Don't Use a Pass Thru Stream in Front of a Read Stream

Here's a Minimal Working Example, using PipeDreams' underlying mississippi library (assuming its methods are well-tested and reasonably bug-free); our only mistake is that the pipeline has a pass-thru stream in front of a file read stream:

#-----------------------------------------------------------------------------------------------------------
f = ( handler ) ->
  MSP   = require 'mississippi'
  pipeline = [
    ( MSP.through.obj() )
    ( ( require 'fs' ).createReadStream 'foo.txt', encoding: 'utf-8' )
    ]
  input = MSP.pipeline.obj pipeline...
  input
    .pipe D.$show()
  MSP.finished input, ( error ) =>
    return handler error if error
    handler()
  return null

Now it would be great if this code failed on pipeline construction time, preferrably with a sane error message and a helpful pointer into our code. It does not do that; instead, it fails with an obscure message and irrelevant (to the developper) pointers, to wit:

Error: premature close
    at onclose (.../pipedreams/node_modules/end-of-stream/index.js:44:54)
    at emitNone (events.js:85:20)
    at emit (events.js:179:7)
    at Duplexify._destroy (.../pipedreams/node_modules/duplexify/index.js:191:8)
    at .../pipedreams/node_modules/duplexify/index.js:174:10
    at _combinedTickCallback (node.js:370:9)
    at process._tickDomainCallback (node.js:425:11)

Always Use an Output and Wait for it

Beware of Incompatible Libraries

When re-writing the algorithms of PipeDreams for version 4, I wanted not only to weed out some odd bugs that appeared in strange corner cases, I also wanted to make sure that PipeDreams does not inadvertently cause some streams to fall back into pre-Streams-v3 mode. Consequently, I had to say good-bye to e.g. github.com/dominictarr/event-stream and github.com/dominictarr/split, both of which had served their purpose very well so far, but were no more up-to-date with the developement of NodeJS streams.

Happy at first was I when finding github.com/mcollina/split2, a library that says it "is inspired by @dominictarr split module, and it is totally API compatible with it"; further, it promises to be "based on through2 by @rvagg and [to be] fully based on Stream3". That's great! Just swap the one for the other, done!

Sadly, that didn't work out. I'm not claiming split2 has bugs, all I can say is that it did not reliably work within PipeDreams pipelines; the issue seems to be with stream end detection. Maybe there's something wrong with some PipeDreams method; I just don't know. All I do know is that github.com/maxogden/binary-split does work for me as advertised.

I think the takeaway here is that NodeJS streams are pretty complex beasts. I realize that I've put a lot of work into understanding streams and how to use them right, and I still do think that it's a worthwhile effort. But in all that complexity, there's always a chance that one party gets it flat wrong, or has made some as-such-valid, but nevertheless conflicting design decisions—a fault may occur in the PipeDreams code, in the client code (i.e. Your Code), or in some 3rd party module.

When faced with some fault, try to write a minimal test case (also known as Minimal Working Example (MWE)) and cleanly delineate (for example, by switching parts of the code on and off and re-running the test) exactly where and under what conditions the test works and where and when it fails.

Handle Errors Asynchronously

Error handling in NodeJS (and, generally, in any JavaScript VM) can be a bitch. Be it said that this part of the language could be and should be vastly improved. For the time being, consider to use CND.run.

NOTE that running production code with CND.run is not recommended as it does come with a hefty performance penalty.

npm install --save cnd

To run a method, catch all the synchronous and asynchronous errors and print a stacktrace to process.stderr (using console.error):

CND = require `cnd`
CND.run -> f 42

Same as the above, but do the error handling yourself:

CND.run ( -> f 42 ), ( error ) -> foobar()

Same as the last, except have CND output the error's stacktrace and be called back afterwards:

CND.run ( -> f 42 ), null, ( error ) -> foobar()

NB.: CND.run may be made configurable in the future; as of now, it is hardwired to use colors and always provide long (cross-event) stack traces. Colors used are blue for NodeJS VM built-ins, green for errors originating from modules installed under node_modules, and yellow for everything else.

Plugins

PipeDreams Plugin: Tabulate

See the Plugin Tabulate Readme.

PipeDreams Plugin: TSV

See the Plugin TSV Readme.

PipeDreams API

Note In the below, headings show the exact signature of each method as defined in the source. @ is CoffeeScript's symbol for JavaScript's this—replace it with whatever your favorite import symbol for the PipeDreams library may be. In the explanatory texts, that is D as in the rest of this document.

@$

@$as_json_list = ( tags... ) ->

Turn a stream of data into a JSON list. The source of $as_json_list demonstrates the usefulness of transform combinations:

@$as_json_list = ( tags... ) ->
  if ( pretty = 'pretty' in tags ) and ( arity = tags.length ) > 1
    throw new Error "expected at most single tag 'pretty', go #{rpr tags}"
  if pretty then  intersperse = @$intersperse '[\n  ', ',\n  ', '\n  ]\n'
  else            intersperse = @$intersperse '[', ',', ']'
  return @new_stream pipeline: [
    ( @$stringify()   )
    ( intersperse     )
    ( @$join ''       ) ]

$as_json_list accepts a single argument; when present, that must be the string 'pretty' to turn the result from a one-liner (for small amounts of data) to a one-record-per-line JSON representation. 'Pretty-printed' JSON files are great because they lend themselves both to be edited in your favorite text editor and to be efficiently processed in a linewise fashion.

@$as_list = ( names... ) ->

Turn named attributes into list of values.

@$as_text = ( stringify ) ->

Turn all data items into texts using JSON.stringify or a custom stringifier. null and any strings in the data stream is passed through unaffected. Observe that buffers in the stream will very probably not come out the way you'd expect them; this is because there's no way to know for the method what kind of data they represent.

This method is handy to put as a safeguard right in front of a .pipe output_file clause to avoid illegal non-buffer issues.

@$async

@$batch

@$benchmark = ( title = null ) ->

(see tests)

@$benchmark.summarize = =>

(see tests)

@$bridge = ( stream ) ->

Make it so that the pipeline may be continued even below a writable but not readable stream. Conceivably, this method could have be named tunnel as well. Something to get you across, you get the meaning. Useful for NodeJS writable streams which do not normally allow you to pipe something out of—in other words, when you pipe something into, say, fs.createWriteStream '/tmp/foo.txt', you can't take that stream and pipe it somewhere else. This won't work:

input
  .pipe ( require 'fs' ).createWriteStream 'foo.txt'
  .pipe ( require 'fs' ).createWriteStream 'bar.txt'

... but this works ...

input
  .pipe D.$bridge ( require 'fs' ).createWriteStream 'foo.txt'
  .pipe D.$bridge ( require 'fs' ).createWriteStream 'bar.txt'

... and this will work, too; all PipeDreams streams allow being piped from:

input
  .pipe D.new_stream 'write', path: 'foo.txt'
  .pipe D.new_stream 'write', path: 'bar.txt'

@$collect

@$count

@$decode = ( encoding = 'utf-8' ) ->

@$drop

@$filter

@$intersperse = ( joiners... ) ->

Similar to $join, $intersperse allows to put extra data in between each pair of original data; in contradistinction to $join, however, $intersperse does not stringify any data, but keeps the insertions as separate events.

$intersperse expects between one and three argument, each one of which may be a function or an arbitrary piece of data. If it receives a single argument, that value will be used as a 'mid joiner', i.e. it will be emitted in between any two events in the stream.

If $intersperse is called with two arguments, it will insert the value of the first argument before the first and after the last event in the stream. If called with three, the first argument goes to the beginning, the second to all the in-betweens, and the last to the end of the stream. In case one of the joiners is null, it will not be applied, and the same goes for joiners that are functions: if a joiner function returns null, no extra data item will be sent.

This simple test case provides an overview:

demo = ( x..., handler ) =>
  input = D.new_stream()
  input
    .pipe D.$intersperse x...
    .pipe D.$collect()
    .pipe $ ( data ) ->
      if data?
        help x, data.join ''
    .pipe $ 'finish', handler
  D.send input, 'a'
  D.send input, 'b'
  D.send input, 'c'
  D.end input

Here's a rundown of what resulta to expect with which kinds of signatures; it's probably best to stick to the starred call patterns for clarity:

### 1 way to call with 0 arguments ###
D.$intersperse()                   # 'abc'      — No-Op
#.......................................................................................................
### 2 ways to call with 1 arguments ###
D.$intersperse null                # 'abc'      — No-Op
D.$intersperse '—'                 # 'a—b—c'    — mid                   *
#.......................................................................................................
### 4 ways to call with 2 arguments ###
D.$intersperse null,  null         # 'abc'      — No-Op
D.$intersperse null,  '—'          # 'a—b—c'    — mid
D.$intersperse '{',   null         # '{abc{'    — start == stop
D.$intersperse '{',   '—'          # '{a—b—c{'  — mid, start == stop
#.......................................................................................................
### 8 ways to call with 3 arguments ###
D.$intersperse null,  null, null   # 'abc'      — No-Op
D.$intersperse null,  null, '}'    # 'abc}'     — stop                  *
D.$intersperse null,  '—',  null   # 'a—b—c'    — mid
D.$intersperse null,  '—',  '}'    # 'a—b—c}'   — mid, stop             *
D.$intersperse '{',   null, null   # '{abc'     — start                 *
D.$intersperse '{',   null, '}'    # '{abc}'    — start, stop           *
D.$intersperse '{',   '—',  null   # '{a—b—c'   — start, mid            *
D.$intersperse '{',   '—',  '}'    # '{a—b—c}'  — start, mid, stop      *

@$join = ( outer_joiner = '\n', inner_joiner = ', ' ) ->

Join all strings and lists in the stream. $join accepts two arguments, an outer_joiner and an inner_joiner. Joining works in three steps: First, all list encountered in the stream are joined using the inner_joiner, turning each list into a string as a matter of course. In the second step, the entire stream data is collected into a list (using PipeDreams $collect). In the last step, that collection is turned into a single string by joining them with the outer_joiner. The outer_joiner defaults to a newline, the inner_joiner to a comma and a space.

@$lockstep

@$parse_csv

@$pass

@$read_file = ( settings ) ->

Transform that to turn file paths into file contents. $read_file uses NodeJS' Buffer.concat method so it should be reasonably performant.

By default, $read_file passes on a list [ path, content, ] because it is expected that most of the time, subsequent transforms will need to know the origins of the contents. If you only want content to be passed on, call $read_file with { bare: no, }. In order to turn the content buffer into a string, add an encoding argument, e.g. { encoding: 'utf-8', }

@$sample = ( p = 0.5, options ) ->

Given a 0 <= p <= 1, interpret p as the probability to pick a given record and otherwise toss it, so that $sample 1 will keep all records, $sample 0 will toss all records, and $sample 0.5 (the default) will toss (on average) every other record.

You can pipe several $sample() calls, reducing the data stream to 50% with each step. If you know your data set has, say, 1000 records, you can cut down to a random sample of 10 by piping the result of calling $sample 1 / 1000 * 10 (or, of course, $sample 0.01).

Tests have shown that a data file with 3'722'578 records (which didn't even fit into memory when parsed) could be perused in a matter of seconds with $sample 1 / 1e4, delivering a sample of around 370 records. Because these records are randomly selected and because the process is so immensely sped up, it becomes possible to develop regular data processing as well as coping strategies for data-overload symptoms with much more ease as compared to a situation where small but realistic data sets are not available or have to be produced in an ad-hoc, non-random manner.

Parsing CSV: There is a slight complication when your data is in a CSV-like format: in that case, there is, with 0 < p < 1, a certain chance that the first line of a file is tossed, but some subsequent lines are kept. If you start to transform the text line into objects with named values later in the pipe (which makes sense, because you will typically want to thin out largeish streams as early on as feasible), the first line kept will be mis-interpreted as a header line (which must come first in CSV files) and cause all subsequent records to become weirdly malformed. To safeguard against this, use $sample p, headers: true (JS: $sample( p, { headers: true } )) in your code.

Predictable Samples: Sometimes it is important to have randomly selected data where samples are constant across multiple runs:

  • once you have seen that a certain record appears on the screen log, you are certain it will be in the database, so you can write a snippet to check for this specific one;

  • you have implemented a new feature you want to test with an arbitrary subset of your data. You're still tweaking some parameters and want to see how those affect output and performance. A random sample that is different on each run would be a problem because the number of records and the sheer bytecount of the data may differ from run to run, so you wouldn't be sure which effects are due to which causes.

To obtain predictable samples, use $sample p, seed: 1234 (with a non-zero number of your choice); you will then get the exact same sample whenever you re-run your piping application with the same stream and the same seed. An interesting property of the predictable sample is that—everything else being the same—a sample with a smaller p will always be a subset of a sample with a bigger p and vice versa.

@$show

@$select = ( selector, tracks ) ->

Sometimes it is convenient to split up stream processing depending on the 'kind' of data item at hand. For example, imagine you do a typesetting job where each bit coming down the line is either a typesetting command or else a bit of text; imagine you're doing multilingual typesetting in English and Arabic or Chinese and it's easy to see that the processing pipeline could at some point be split up like this:

                               │
                        ┌──────▼──────┐
                        │  all events │
                        └──────┬──────┘
                               ▼
                        ┌─────────────┐
                        │   $select   │
                        └────▼───▼────┘
       ┌─────────────────────┘   └─────┐
       ▼                               ▼
┌─────────────┐                 ┌─────────────┐
│  commands   │                 │ text events │
└──────┬──────┘                 └──────┬──────┘
       │                               ▼
       │                        ┌─────────────┐
       │                        │   $select   │
       │                        └────▼─▼─▼────┘
       │                             │ │ │
       │               ┌─────────────┘ │ └─────────────┐
       │               ▼               ▼               ▼
       │        ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
       │        │   English   │ │   Arabic    │ │   Chinese   │
       │        └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
       │               └─────────────┐ │ ┌─────────────┘
       │                             ▼ ▼ ▼
       │                        ┌─────────────┐
       │                        │ text events │
       │                        └──────┬──────┘
       └─────────────────────┐   ┌─────┘
                             ▼   ▼
                        ┌─────────────┐
                        │  all events │
                        └──────┬──────┘
                               ▼

This way, we can have a 'branch' or 'track' of the pipeline that deals exclusively with commands, and another one that deals exclusively with text events; the latter could at some point again be split up to do some kind of text processing that only applies to a certain language or certain script.

The ususal alternative to splitting the processing into tracks is to maintain a single chain of stream transforms. This has two disadvantages: for one thing, there will be a lot of code duplication because almost all transforms will need some kind of almost-identical if statement near the top to decide which events to deal with. On the other hand, transforms loose part of their 'innocence': a transform that, at its heart, only wants to deal with some text snippet has, all of a sudden, been requisitioned to dabble in event sieving as well.

Below we demonstrate a rather simpler example than the one outlined above. In that example, there are three 'translator' stream transforms that turn incoming numbers into the corresponding English, French, and German words. Further, there's a transform that draws a separator line. These four functions are registered in the tracks object. Also, there's a dispatch function that accepts a piece of data and returns an object to indicate where to direct the incoming data.

In general, the dispatch function should return a plain old dictionary (POD) with the obligatory element key (to select the target stream) and the facultative element data (to replace the data originally passed into $select). The key element may be

  • Symbol.for 'pass' to make $select pass the data through without submitting it to any kind of processing;
  • Symbol.for 'drop to make $select drop the data;
  • a stream or transform to send the value to;
  • a valid key of the object or list passed as second argument to select;
  • or a list of zero or more of any of the above to send data to any number of destinations.
#------------------------------------------------------------------------------
say_it_in_english = $ ( n, send ) ->
  if n?
    switch n
      when 1 then send 'one'
      when 2 then send 'two'
      when 3 then send 'three'
      else send 'many'
  return null

#------------------------------------------------------------------------------
say_it_in_french = $ ( n, send ) ->
  switch n
    when 1 then send 'un'
    when 2 then send 'deux'
    when 3 then send 'troix'
    else send 'beaucoup'
  return null

#------------------------------------------------------------------------------
say_it_in_german = $ ( n, send ) ->
  switch n
    when 1 then send 'eins'
    when 2 then send 'zwei'
    when 3 then send 'drei'
    else send 'viele'
  return null

#------------------------------------------------------------------------------
draw_a_separator = $ ( ignore, send ) ->
  send '—————'
  return null

#------------------------------------------------------------------------------
dispatch = ( data ) ->
  return key: Symbol.for 'drop'  if data is 'drop this one'
  return key: Symbol.for 'pass'  if data is 'pass this one'
  return key: 'SEP'              if data is '---'
  [ languages, number, ] = data
  if languages is '*'   then languages = [ 'EN', 'FR', 'DE', ]
  else                       languages = ( language.toUpperCase() for language in languages.split ',' )
  return key: languages, data: number

#------------------------------------------------------------------------------
tracks =
  EN:   say_it_in_english
  FR:   say_it_in_french
  DE:   say_it_in_german
  SEP:  draw_a_separator

#------------------------------------------------------------------------------
probes = [
  [ 'fr', 1, ]
  [ 'fr', 2, ]
  [ 'fr', 3, ]
  [ 'fr', 4, ]
  'pass this one'
  '---'
  [ 'en,fr',  1, ]
  '---'
  'drop this one'
  [ '*',  1, ]
  '---'
  [ 'en', 2, ]
  '---'
  [ 'de', 3, ]
  [ 'de', 4, ]
  ]

#------------------------------------------------------------------------------
matchers = [
  "un"
  "deux"
  "troix"
  "beaucoup"
  "pass this one"
  "—————"
  "one"
  "un"
  "—————"
  "one"
  "un"
  "eins"
  "—————"
  "two"
  "—————"
  "drei"
  "viele"
  ]

#------------------------------------------------------------------------------
my_input = D.new_stream()
my_input
  .pipe D.$select dispatch, tracks
  .pipe D.$collect()
  .pipe $ ( results ) =>
    T.eq results.length, matchers.length
    T.eq results[ idx ], matcher for matcher, idx in matchers
  .pipe $ 'finish', done
#..............................................................................
D.send  my_input, probe for probe in probes
D.end