pipedreams-3b7b
v6.3.2
Published
This is a transitional package used to provide backward compatibility; use pipedreams instead
Downloads
43
Maintainers
Readme
Disclaimer PipeDreams v4 should be considered alpha-level software—expect bugs and API changes.
PipeDreams
Install as npm install --save pipedreams
.
Table of Contents generated with DocToc
- PipeDreams
- TL;DR: Things to Keep in Mind
- Plugins
- PipeDreams API
- @$
- @$as_json_list = ( tags... ) ->
- @$as_list = ( names... ) ->
- @$as_text = ( stringify ) ->
- @$async
- @$batch
- @$benchmark = ( title = null ) ->
- @$benchmark.summarize = =>
- @$bridge = ( stream ) ->
- @$collect
- @$count
- @$decode = ( encoding = 'utf-8' ) ->
- @$drop
- @$filter
- @$intersperse = ( joiners... ) ->
- @$join = ( outer_joiner = '\n', inner_joiner = ', ' ) ->
- @$lockstep
- @$parse_csv
- @$pass
- @$sample = ( p = 0.5, options ) ->
- @$show
- @$select = ( selector, tracks ) ->
- @$sort = ( sorter, settings ) ->
- @$split = ( matcher, mapper, settings ) ->
- @$spread
- @$stop_time
- @$stringify = ( stringify ) ->
- @tap = ( stream, P..., handler ) ->
- @$tap = ( stream, settings ) ->
- @$throttle_bytes
- @$throttle_items
- @$transform = ( method ) ->
- @end = ( me ) ->
- @isa_stream
- @isa_readable_stream = ( x ) ->
- @isa_writable_stream = ( x ) ->
- @isa_duplex_stream = ( x ) ->
- @new_stream
- @remit, @$, @remit_async, @$async
- @send = ( me, data ) ->
- ToDo
Caveat Below examples are all written in CoffeeScript.
PipeDreams
Require Statement
The suggested way to require
the PipeDreams library itself and to factor
out the most important methods for convenience is as follows:
D = require 'pipedreams'
{ $, $async } = D
If you don't like dollar signs in your code or already use $
for something else, you're of course free to
D.$
or fall back to the long name, remit
:
D = require 'pipedreams'
{ remit, remit_async } = D
In the below, I will assume you require
d PipeDreams the first way, above.
Streams are Transforms, Transforms are Streams
In the PipeDreams world, write streams can appear anywhere in the pipeline, just like read streams; also, streams and transforms are no different.
How can read streams, write streams and stream transforms be the same? After all, you'd write your typical streamy app approximately like this:
input = fs.createReadStream 'foo.txt'
output = fs.createWriteStream 'bar.txt'
input
.pipe get_transform_A()
.pipe get_transform_B()
.pipe get_transform_C()
.pipe output
In this view, clearly, a read-stream is a source of data—something that pushes data into the stream; a write-stream is a sink—something that accepts data from the stream; and a transform—well, a transform takes data, does something with it, and passes it on. In other words, a transform acts like a write-stream on its 'upper' end (), and acts like a read-stream on its lower end.
In the typical NodeJS way of doing things, you can't just go on with the pipeline after a write-stream; this would be illegal:
# won't work
input = fs.createReadStream 'foo.txt'
input
.pipe get_transform_A()
.pipe fs.createWriteStream 'bar.txt'
.pipe get_transform_B()
Trying to read from a NodeJS write-stream will elicit a dry Cannot pipe. Not readable.
complaint from the
engine. After all, this is a write-stream, right, so what should you want to read from it, right? Wrong!
Consider this simple setup:
# won't work
fs.createReadStream 'foo.txt'
.pipe fs.createWriteStream 'copy-1.txt'
.pipe fs.createWriteStream 'copy-2.txt'
.pipe fs.createWriteStream 'copy-3.txt'
Isn't it quite obvious that the only sensible course of action here is to A) read from foo.txt
and B) copy
those bytes to all of copy-1.txt
, copy-2.txt
, copy-3.txt
? Why not? Turns out you can easily achieve
the above with PipeDreams:
# works!
D = require 'pipedreams'
D.new_stream 'read', file: 'foo.txt'
.pipe D.new_stream 'write', file: 'copy-1.txt'
.pipe D.new_stream 'write', file: 'copy-2.txt'
.pipe D.new_stream 'write', file: 'copy-3.txt'
When to Call it a Day: Always Use $ 'finish'
Given the asynchronous nature of NodeJS' I/O handling, stream end detection can be a fickle thing and hard
to get right. For example, when writing into a file, one might be tempted to wait for an end
event:
### TAINT Counter-example; don't do it this way ###
write_sample = ( handler ) =>
input = D.new_stream()
output = D.new_stream 'write', 'lines', { file: path_1, }
pipeline = input
.pipe D.$show()
.pipe output
#.......................................................................................................
pipeline.on 'end', handler
#.......................................................................................................
D.send input, data for data in [ 'foo', 'bar', 'baz', ]
D.end input
Stress tests have shown this pattern to produce a certain percentage of failures (1 in 10, but that might depend on details of the writing process).
On the other hand, the pattern below passes tests; here, we use the PipeDreams $ 'finish'
transform:
write_sample = ( handler ) =>
input = D.new_stream()
output = D.new_stream 'write', 'lines', { file: path_1, }
input
.pipe D.$show()
.pipe output
.pipe $ 'finish', handler
#.......................................................................................................
D.send input, data for data in [ 'foo', 'bar', 'baz', ]
D.end input
Never Assume Your Streams to be Synchronous
As a general note that users should keep in mind, please observe that no guarantee is made that any given stream works in a synchronous manner. More specifically and with regard to the most typical usage pattern: never deal with pipelined data 'right below' the pipeline definition, always do that from inside a stream transform.
Here's an example from src/tests.coffee
: we create a stream, define a pipeline to split the text into
lines and collect those lines into a list; then, we write a multi-line string to it and end the stream. When
we now look at what's ended up in the collector, we find that the last line is missing. This may come as a
surprise, since nothing in the code suggests that the thing should not work in a simple top-down manner:
@[ "(v4) new_stream_from_text doesn't work synchronously" ] = ( T, done ) ->
collector = []
input = D.new_stream()
input
.pipe D.$split()
.pipe $ ( line, send ) =>
send line
collector.push line
input.write "first line\nsecond line"
input.end()
T.eq collector, [ "first line", ] # <-- we're missing the last line here
done()
Update: The above code is no longer valid and has been removed; however, the following code is still valid
In order for the code to meet expectations, remember to always grab your results from within a stream
transform; commonly, this is done with $ 'finish'
:
@[ "(v4) new_stream_from_text (2)" ] = ( T, done ) ->
collector = []
input = D.new_stream()
input
.pipe D.$split()
.pipe $ ( line, send, end ) =>
if line?
send line
collector.push line
if end?
T.eq collector, [ "first line", "second line", ]
end()
.pipe $ 'finish', handler
input.write "first line\nsecond line"
input.end()
Stream Creation
PipeDreams simplifies and unifies most common stream creation tasks by providing a (fairly) easy-to-use and
flexible interface via its new_stream
methods. For the future, it is planned to make that API extensible
with plug-ins.
The simplest possible use is to call s = D.new_stream()
without any arguments, which will give you a
through2 stream that you can use as source, as sink or as transform.
Here is a very simple example, a function that accepts a callback handler (because all streams are assumed
to be asynchronous); it constructs a strem named input
, writes two strings to it, and ends it. The
pipeline is set up with three PipeDreams API calls: the first to split (and, actually re-join) the data
events into lines (sans newline characters); the second to print out data events, and the last one to detect
the stream's finish
event and call the callback:
f = ( done ) ->
input = D.new_stream()
input
.pipe D.$split()
.pipe D.$show()
.pipe $ 'finish', done
input.write "helo\nworld"
input.write "!"
input.end()
return null
This ultra-simple streaming will do nothing but print out:
'helo'
'world!'
Throughstreams can appear in any role in a stream pipeline: as sources, throughputs, or sinks of data.
To demonstrate that, let's modify the above a little (rpr
being a helper derived from util.inspect
):
f = ( done ) ->
input = D.new_stream()
thruput = D.new_stream()
output = D.new_stream()
#.........................................................................................................
input
.pipe D.$split()
.pipe thruput
.pipe output
.pipe $ 'finish', done
#.........................................................................................................
thruput
.pipe $ ( data ) -> log 'thruput', rpr data
#.........................................................................................................
output
.pipe $ ( data ) -> log 'output', rpr data
#.........................................................................................................
input.write "helo\nworld"
input.write "!"
input.end()
return null
The only surprise here is the somewhat weird ordering of what lines this function prints to the console:
output 'helo'
thruput 'helo'
output 'world!'
thruput 'world!'
thruput null
output null
One might have expected the text from thruput
to come before that from ouput
, but the general rule is:
do not rely on a specific ordering of events across different stream transform; only the preservation of
order within a given transform is guaranteed.
Stream Creation API
D.new_stream
has a somewhat unusual call signature; its general format is (if you excuse my failed attempt
at writing a sort-of BNF):
D.new_stream [ hints... ], [ kind: seed ], [ settings ]
All parts are optional. hints
consists of zero or more 'keywords', 'tags' or 'flags'—single-word strings
that switch various behaviors of the stream on or off. hints
are followed by an (also optional)
single-element key/value object whose key specifies the kind
of stream to be built; settings
is another
object that provides space for more stream-specific settings.
There are currently 6 'kinds' of streams that new_stream
can return; the simplest one—a featureless
throughstream returned when new_stream
is called without arguments—we have already encountered.
The most useful ones are file
and pipeline
streams. A file
stream reads from or writes to a file on
disk, and PipeDreams new_stream
acts as an interface to NodeJS' fs.createReadStream
and
fs.createWriteStream
, as the case may be. The minimal file stream is created with a 'kind' of 'file' (or,
equivalently, 'path'—pick one), and a 'seed' that specifies the file's location. Without any hints, you get
a file read stream that emits buffers, so for example
input = D.new_stream file: '/tmp/foo.txt'
input.pipe D.$show()
might spit out <Buffer 68 65 6c 6f 20 77 6f 72 6c 64 0a c3 a4 ...>
or something similar (should the file
indeed exists). If you
'ascii'
—for 7-bit ASCII data only. This encoding method is very fast and will strip the high bit if set.'utf-8'
,'utf8'
—Multibyte encoded Unicode characters.'utf16le'
—2 or 4 bytes, little-endian encoded Unicode characters. Surrogate pairs (U+10000 to U+10FFFF) are supported.'ucs2'
—Alias of 'utf16le'.'base64'
—Base64 string encoding. When creating a buffer from a string, this encoding will also correctly accept "URL and Filename Safe Alphabet" as specified in RFC 4648, Section 5.'binary'
—A way of encoding the buffer into a one-byte (latin-1) encoded string. The string 'latin-1' is not supported. Instead, pass 'binary' to use 'latin-1' encoding.'hex'
—Encode each byte as two hexadecimal characters.
path = '/tmp/foo.txt'
D.new_stream file: path
D.new_stream 'read', file: path
D.new_stream 'utf-8', { path, }
D.new_stream 'read', 'lines', { path, }
D.new_stream { path, }
D.new_stream 'write', file: path
D.new_stream 'write', 'lines', { file: '/tmp/foo.txt', }
- 'file' / 'path'
- 'pipeline'
- 'text'
- 'url'
- 'transform'
- 'duplex'
@new_stream = ( P... ) ->
D.new_stream()
D.new_stream pipeline: [ transform, transform, ..., ]
D.new_stream text: "helo world"
D.new_stream file: "/tmp/foo.txt"
D.new_stream 'read', file: "/tmp/foo.txt"
D.new_stream 'write', file: "/tmp/foo.txt"
D.new_stream 'append', file: "/tmp/foo.txt"
D.new_stream path: "/tmp/foo.txt"
D.new_stream 'read', path: "/tmp/foo.txt"
D.new_stream 'write', path: "/tmp/foo.txt"
D.new_stream 'append', path: "/tmp/foo.txt"
D.new_stream url: "/tmp/foo.txt"
D.new_stream 'read', url: "/tmp/foo.txt"
D.new_stream 'write', url: "/tmp/foo.txt"
D.new_stream 'append', url: "/tmp/foo.txt"
D.new_stream 'devnull'
D.new_stream hints* { kind : locator, }
Examples:
source = D.new_stream()
source = D.new_stream pipeline: [ $square(), ( $multiply 2 ), ]
pipeline = [ $square(), ( $multiply 2 ), ]
source = D.new_stream { pipeline, }
path = '../package.json'
input = D.new_stream { path, }
output = D.new_stream 'write', path: '/tmp/foo.txt'
Kinds:
text
pipeline
file
path
url
Hints:
'utf-8' / 'utf8'
'binary'
'read'
'write'
'append'
'devnull'
The Remit and Remit-Async Methods
The remit
method (as well as its asynchronous companion, remit_async
) is
very much the centerpiece of the PipeDreams API¹. It accepts a function (call
it a 'transformation') and returns a stream transform. In case you're
familiar with the event-stream
way of doing things, then PipeDreams' remit f
is roughly equivalent
to event-stream's through on_data, on_end
, except you can handle both the
on_data
and on_end
parts in a single function f
. remit_async f
is
roughly equivalent to event-stream's map f
.
The behavior of a stream transform is governed by the number of arguments of the transformation function
and the optional 'null'
tag.
You can call remit
($
) with a function that takes one, two, or three arguments, and remit_async
($async
) with a function that takes two, or three arguments. The suggested names of the arguments are
data
(or whatever fits the purpose best) for the first, send
for the second, and end
for the third
argument, where present.
You use send
to send data down the stream; when not present in the signature, the data is sent
for you; where present, you may call it any number of times to send an arbitrary number of items.
Where the end
parameter is present, you must explicitly call end()
to end the stream; otherwise, it will
remain open indefinitely. Either data
or end
—but not both—will be null
on any given event. More
precisely, the stream transform will be called once for each upstream data event (if any) with data != null
,
and then once with data == null
and end != null
upon stream end. The pattern to use is always:
stream
.pipe <...>
.pipe $ ( data, send, end ) =>
if data?
<...>
if end? # you could use a simple `else` here, but explicit is better than implicit
<...>
end()
.pipe <...>
Where end
is not given, data
will never be null
, except where the 'null'
tag is used,
as in $ 'null', ( data ) =>
. Checking data
for null
in this kind of transforms allows you to
observe the end-of-stream event without having to bother calling it yourself.
Where $async
is used, you may call send data
any number of times, but you must call send.done()
(or send.done data
) exactly once to signal completion of your asynchronous transform.
In a nutshell, you have the following options:
┌─────────────────────────────────────────┬─────────────┬─────────────┬─────────────┐
│ signature │ data may │ must call │ must call │
│ │ be null │ end() │ send.done() │
├─────────────────────────────────────────┼─────────────┼─────────────┼─────────────┼─────────────┐
│ $ ( data ) -> │ │ │ │ observers │
│ $ 'null', ( data ) -> │ ● │ │ │ │
├─────────────────────────────────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ $ ( data, send ) -> │ │ │ │ │
│ $ 'null', ( data, send ) -> │ ● │ │ │ │
│ $ ( data, send, end ) -> │ ● │ ● │ │ transforms │
├─────────────────────────────────────────┼─────────────┼─────────────┼─────────────┤ │
│ $async ( data, send, end ) -> │ ● │ ● │ ● │ │
└─────────────────────────────────────────┴─────────────┴─────────────┴─────────────┴─────────────┘
You can also use send.end()
to end the stream at any point in time and send.error new Error "message"
to
signal an error.
(Synchronous) Stream Observer
When calling $
with a function that takes a single argument, you get
back an Observer, that is, a transform that gets all the data events
passed in, but can't send any; in a manner of speaking, an observer is a
'transformation-less transform' (note, however, that in case events are
implemented as a mutable object, such as a list or a plain old object, an
observer can still mutate that event). The observer will be called once more
with data
set to null
when the stream is about to end:
$ ( data ) ->
You can use that idiom 'inline', i.e. right within the pipeline notation:
input = ( require 'fs' ).createReadStream()
input
.pipe D.$split() # convert buffer chunks into single-line strings
.pipe $ 'null', ( data ) ->
if data? then console.log "received event:", data
else console.log "stream has ended"
.pipe output
However, for any but the most one-off, simple purposes, you'll probably want a named function; it is customary to write the transform as a factory function that must get called once when being entered into the pipeline.
To denote the special status of a stream transform factory—a function that is
a 'factory for potentially stateful transforms that only makes sense when
being called as argument to a .pipe
call within a stream pipeline'
(iknowiknow, that's a mouthful)—it is also customary to prefix the name with a
$
(dollar sign).
Since $show
, below, is such a factory function, the transform that
it returns may hold private state within the closure, an immensely useful
technique:
f = ( done ) ->
$show = ->
return $ ( data ) ->
console.log "received data:", data
$count = ->
count = 0
return $ 'null', ( data ) ->
if data? then count += +1
else console.log "stream has ended; read #{count} events"
input = D.new_stream()
input
.pipe D.$split() # Convert buffer chunks into single-line strings.
.pipe $show()
.pipe $count()
.pipe $ 'finish', done
D.send input, """
Here we write
some lines of text
into the stream.
"""
D.end input # don't forget to end the input stream
In case you were wondering, $split()
is a useful convenience method to turn
a file readstream—which, in the absence of an encoding argument, will consist
of a series of NodeJS Buffer
objects–into a series of strings, each one
representing one line (without the trailing newline) of the source. UTF-8
encoding is silently assumed.
Note: If you inadvertently forget to stick that remit
call in front of
your transformation function, you'll get an obscure error message: Cannot
read property 'on' of undefined
. Just try to remember that this symptom is
(often) caused by an omitted remit
/ $
.
Synchronous Transform, No Stream End Detection
When calling $
with a function that takes two arguments, you get back a
Synchronous Transform. A synchronous transform receives data events and
may send on as many events as it wants—zero or a thousand. The next transform
in the pipeline will be called no sooner than the transform exits, whether it
has called send
in the process or not. In this variant, you can rely on
data
to never be null
:
$ ( data, send ) -> ...
An example for this form is shown in the upcoming section.
Synchronous Transform With Stream End Detection
A Synchronous Transform with End Detection will be called once for each
data
item and once when the stream is about to end. It is returned by $
when being called with a function that takes three arguments:
$ ( data, send, end ) -> ...
When the transformation eventually gets called from within the pipeline, its
third argument (call it end
) will be null
, except when the the stream is
about to end. When that happens, end
is a function that must be called
to end the stream. In other words, when you have end
in your signature but
forget to call it, the stream will hang on indefinitely. This can be useful in
cases where used wisely, but has the power to bring down empires when done out
of neglect.
Use synchronous transforms when you want to both mangle data as it passes by and aggregate data across the entire stream.
f = ( done ) ->
$as_number = ->
return $ ( data, send ) ->
send parseFloat data
$add = ( increment = 1 ) ->
return $ ( n, send ) ->
send n
send n + increment
$show = ->
return $ ( data ) ->
console.log "received data:", data
input = D.new_stream()
input
.pipe D.$split() # Convert into single-line strings.
.pipe $as_number()
.pipe $add 12
.pipe D.$sort()
.pipe $show()
.pipe $ 'finish', done
D.send input, "20\n10\n50\n40\n30\n"
D.end input
Output:
received data: 10
received data: 20
received data: 22
received data: 30
received data: 32
received data: 40
received data: 42
received data: 50
received data: 52
received data: 62
Asynchronous Transforms
Asynchronous Transforms are constructed in a very similar fashion to their
synchronous counterparts, except you use $async
(or, remit_async
) in place
of $
(or, remit
); furthermore, there is no counterpart to the 'observer'
call signature, so $async
has to be called with a stream transformation that
accepts eiter two or three arguments.
Asynchronous Transforms are suited for intermittent file and network
reads. Since those can happen at an arbitrary time in the future, async
stream transforms must always notify the pipeline when they've finished;
to do this, there's a callback method tacked unto the send
argument called
send.done
.
You can call send data
as often as you like to, but you must call
send.done()
(or send.done data
) whenever you're finished—otherwise the
pipeline will hang on indefinitely:
$async ( data, send ) -> ...
An Asynchronous Transform with End Detection will be called
once for each data
item and once when the stream is ended, so again,
be prepared for an empty stream where it is called once with data
being
null
:
$async ( data, send, end ) -> ...
One-Off Tramsforms to Run at the Beginning or the Ending of a Stream
Often you need to perform some action exactly once when a stream has just
started or just ended. In PipeDreams, this can be done by using one of the tags
'start'
, 'stop'
, 'first'
, 'last'
or 'finish'
with a remit call.
When you look at the many ways to 'remit with a tag' listed below, you may feel a little overwhelmed, but the API is in fact fairly straightforward:
with
'first'
and'last'
, you listen in for the first or the last data event of the stream (if any), respectively.when a stream ends before any data item is sent (i.o.w. when you get an empty stream), then
'first'
and'last'
transforms are only called (withnull
for thedata
argument) when the'null'
tag is present; when no'null'
tag has been given,data
will never benull
orundefined
.with
'start'
and'stop'
, you listen in for the start and the stop of the stream; a transform with'start'
will be called right before the first data event comes down the stream, and a transform with'stop'
will be called right after the last data event has come down the stream. Both transforms will be called once in any event, even when the stream happens to be empty.$ 'finish', ->
is much like$ 'stop', ( send ) ->
, except that$ 'finish', ->
is called after the stream has finished (at which time all participating transforms / streams in the pipeline should be positively done). Naturally, sending data into the stream makes no sense at that point in time, so a'finish'
transform must not accept any arguments.Observe that the
'null'
tag can only be used with'first'
and'last'
; since'start'
,'stop'
and'finish'
transforms do never receive data events, using'null'
makes no sense there.
Call signatures:
$ 'first', ( data, send ) -> ...
$ 'first', 'null', ( data, send ) -> ...
$ 'last', ( data, send ) -> ...
$ 'last', 'null', ( data, send ) -> ...
$ 'start', ( send ) -> ...
$ 'stop', ( send ) -> ...
$ 'finish', -> ...
These methods are great to sneak in additional data right in front or right behind the first or last events.
The null
tag signals that whatever you have to add to the stream should be there in any event; as an
example, imagine you want to build a JSON serialization of a list of items. In that case, you'd sneak in a
left square bracket [
before the first and a right square bracket ]
after the last list element, and
in case no elements are passed through the stream at all, you still want the serialization to contain both
brackets to obtain []
, so clearly you'd go with the 'null'
tag in this case.
@$ 'finish', @on_finish = ( stream, handler ) ->
The recommended way to detect write completion in a piped stream is to tack a .pipe $ 'finish', handler
transform unto the end of your pipeline:
f = ( handler ) ->
...
input
.pipe $do_this()
.pipe $do_that()
.pipe output
.pipe $ 'finish', handler
Alternatively, if you have an explicit ouput stream (say, output
) in your
pipeline, you can also call D.on_finish output, handler
. Terminating stream
processing from handlers for other events (e.g. 'end'
) and/or of other parts
of the pipeline may lead to hard-to-find bugs. Observe that on_finish
and
$ 'finish'
call handler
only upon the following turn of the JavaScript event
loop.
Note You should not attach anything in the pipeline after a
$ 'finish'
transform, since the behavior of such a transform is not well
defined. When the finish
event is fired, then all the stream components have
already packed their bags and are ready to return home. The very moment that
handler
is called, the show is over, and the last batch of events may or may
not make it to any given transform below $ 'finish'
.
How to Send Null Without Ending the Stream
Had NodeJS streams be conceived at a point in time where JavaScript had already had
Symbols, and if the
more modern, broader view of 'streams of anything' (as opposed to the more narrow view that 'all streams are
bytestreams'), chances are that using send null
from within a stream transform would be just like sending
any other value, and something like send Symbol.for 'end'
would've been used to signal the stream's end.
But this is not how it happened; streams were conceived as a bytes-only thing, and JavaScript had no symbols
at the time streams were added to NodeJS. As a result, some value other than a buffer had to be used, and
the most natural choice was null
—after all, null
represents 'no value', and when you send data, you will
never want to 'send no value' when you can instead 'not send a value', right? Turns out that's wrong.
null
s are everywhere, like it or not: they appear in databases; they are a legal JSON value (I'd even
claim: the root value of JSON); they appear in configuration files—null
s are everybody's favorite
Nothing-Be-Here placeholders.
Let's say you have a stream of values that you want to construct a JSON file from (demo code;
consider to use $as_json_list
and/or $intersperse
instead):
f = ( path, handler ) ->
source = D.new_stream()
output = D.new_stream 'write', { path, }
source
.pipe $ ( data, send ) => send ( JSON.stringify data ); send ','
.pipe D.$on_start ( send ) => send '['
.pipe D.$on_last ( data, send ) => send ']\n'
.pipe output
.pipe $ 'finish', handler
#.........................................................................................................
D.send source, 42
D.send source, 'a string'
# D.send source, null # uncomment to test
D.send source, false
D.end source
calling f
will duly print [42,"a string",false]
into the file specified. However, trying the same after
uncommenting the line that sends null
into the stream will break with stream.push() after EOF
; this is
because sending null
causes the stream to close down. Don't do that unless you want to end the stream.
There are two ways to tunnel null
values through stream pipelines: One is to move on from simple data
items to events; the other, to use a symbolic value in place of null
data.
Yet another conceivable solution lies in rewriting a number of PipeDreams methods so they write a special value whenever they see
null
, and sendnull
whenever they see the special value for the 'end stream' signal. At the time of this writing, I believe it's better to stick to established conventions; after all, PipeDreams mission is to lower the threshold, avoid surprises, and level the learning curve.
Using Events instead of Data Items
In my experience, writing 'something streamy' to process whatever data often
starts out as an idea how to send transform_this data
, then send
transform_that data
—as a sketch where those stream transforms all receive bits
of raw business source data in a piecemeal fashion, and emit bits of business
data in the targetted format. Quite often the result will look like the last
code snippet, above: lots of simple, unnamed stream transforms whose definitions
are put right in the middle of the pipeline.
There's nothing wrong with that approach, and the simplicity of it sure helps to get started. On the downside, sending raw business data doesn't scale very far; it tends to break down the very moment you realize that at some point in your stream you want to communicate facts that are not part of the business data itself, but belong to a meta level. It is then that moving from 'data items' to 'events' is appropriate.
'Using events' just means wrapping each piece of data into a container object. JavaScript's simplest
container is the list (a.k.a. the Array type), so one way of wrapping data is to always send pairs
[ event_name, event_value, ]
. Here's what a rewritten stream transform might look like:
f = ( path, handler ) ->
#.......................................................................................................
$serialize = =>
return $ ( event, send ) =>
[ kind, value, ] = event
return send event unless kind is 'data'
send [ 'json', ( JSON.stringify value ), ]
#.......................................................................................................
$insert_delimiters = =>
return $ ( event, send ) =>
[ kind, value, ] = event
send event
return unless kind is 'json'
send [ 'command', 'delimiter', ]
#.......................................................................................................
$start_list = => D.$on_start ( send ) => send [ 'command', 'start-list', ]
$stop_list = => D.$on_last ( event, send ) => send [ 'command', 'stop-list', ]
#.......................................................................................................
$as_text = =>
return $ ( event, send ) =>
[ kind, value, ] = event
return send value if kind is 'json'
return send event unless kind is 'command'
### Here I take the liberty to insert newlines so as to render multi-line JSON: ###
switch command = value
when 'delimiter' then send ',\n'
when 'start-list' then send '[\n'
when 'stop-list' then send '\n]\n'
else send.error new Error "unknown command #{rpr command}"
return null
#.......................................................................................................
source = D.new_stream()
output = D.new_stream 'write', { path, }
source
.pipe $serialize()
.pipe $insert_delimiters()
.pipe $start_list()
.pipe $stop_list()
.pipe $as_text()
.pipe output
.pipe $ 'finish', handler
#.........................................................................................................
D.send source, [ 'data', 42, ]
D.send source, [ 'data', 'a string', ]
D.send source, [ 'data', null, ]
D.send source, [ 'data', false, ]
D.end source
#.........................................................................................................
f '/tmp/foo.json', ( error ) =>
throw error if error?
done()
#.........................................................................................................
return null
This is, admittedly, a lot of code for such a simple task; however, when applications get more complex, it
often pays to deal with slightly more abstracted objects that carry richer semantics. Depending on
circumstances, one might opt for { named: 'values', }
instead of modelling events as [ 'tuples', ]
, as
PODs afford more code self-documentation and extensibility. But even within the limits of this small
example, the added complexity of the event-based approach does have some justification: nowhere in the code
did we have to pay attention whether or not some piece of data is or is not null
—wrapping all the data
freed us from having to treat this special value in any special ways (we traded that with the obligation to
deal with different kinds of events, to be sure).
Using a Symbolic Value for Null
If you don't want to wrap your data into event objects, an easy way to tunnel null
s through a pipeline
is to replace it with some other value. If you know all your business data consists of wither chunks of text
or null
s, you can any time just send a 0
(number zero) and still keep the distinction clear. Keeping
things that simple, however, quickly breaks down as soon as the first user of your code does send series
of numbers into the stream.
A much more robust solution is offered by JavaScript symbols. Symbols are a new primitive data type in JS; they have been specifically designed to be used everywhere where single values are needed that do not conflict with existing values or APIs. They come in two flavors: private and global.
A private symbol is created as d = Symbol 'xy'
, where 'xy'
is a text of your own choosing; it is just
used to identify the symbol, to give it some human-readable semantics. Each private symbol has its own
identity, even when the same string is used for its creation, so ( Symbol 'A' ) != ( Symbol 'A' )
always
holds. This makes private symbols great for lots of uses where a value of distinct identity—a singleton—is
needed, a value that can not (or, depending on use, is pretty hard to) be reproduced from any part of code
outside the very place where the original was instantiated.
On the other hand, a global symbol is created as d = Symbol.for 'xy'
; the difference to private symbols is
in the availability, as it were, of the corresponding value. For all code running within the same code
context (i.e. normally the same process), ( Symbol a ) == ( Symbol b )
holds exactly when a === b
(and
both values are strings).
In other words, a stream transform that wants to check for the occurrence of a global symbol for a given
string 'foo'
can just compare stream data items by saying if data is Symbol.for 'foo'
. If it wants to
check for a private symbol 'foo'
, on the other hand, it must have access to some property of some
object that holds a reference to that symbol.
In an effort to establish standard procedures to make dealing with null
data items easier, the PipeDreams
library provides a reference the global symbol for the string 'null'
as D.NULL = Symbol.for 'null'
;
therefore, stream transforms are free to check for either if data is D.NULL
or if data is Symbol.for
'null'
with no difference in semantics whatsoever. The beauty of this approach: no 'ordinary / business
data type' is used, but a 'meta data type' (so chances of accidental collisions are minimized), and that
client code can insert and check for 'meta null
s' without having to specifically reference D.NULL
.
What's more, patterns like
if data is Symbol.for 'null' then send "found a null!"
are not only memorable to the writer, they're about as readable as programming languages can get.
Let's have a look at how to use the null
symbol:
f = ( path, handler ) ->
source = D.new_stream()
output = D.new_stream 'write', { path, }
source
.pipe $ ( data, send ) => if data is Symbol.for 'null' then send 'null' else send JSON.stringify data
.pipe $ ( data, send ) => send data; send ','
.pipe D.$on_start ( send ) => send '['
.pipe D.$on_last ( data, send ) => send ']\n'
.pipe output
.pipe $ 'finish', handler
#.........................................................................................................
data_items = [ 42, 'a string', null, false, ]
for data in data_items
D.send source, if data is null then Symbol.for 'null' else data
D.end source
TL;DR: Things to Keep in Mind
Never Assume a Stream to be Synchronous
Always Use $ 'finish' to Detect End of Stream
Never Use Null to Send, Unless You Want the Stream to End
Don't Use a Pass Thru Stream in Front of a Read Stream
Here's a Minimal Working Example, using PipeDreams' underlying mississippi library (assuming its methods are well-tested and reasonably bug-free); our only mistake is that the pipeline has a pass-thru stream in front of a file read stream:
#-----------------------------------------------------------------------------------------------------------
f = ( handler ) ->
MSP = require 'mississippi'
pipeline = [
( MSP.through.obj() )
( ( require 'fs' ).createReadStream 'foo.txt', encoding: 'utf-8' )
]
input = MSP.pipeline.obj pipeline...
input
.pipe D.$show()
MSP.finished input, ( error ) =>
return handler error if error
handler()
return null
Now it would be great if this code failed on pipeline construction time, preferrably with a sane error message and a helpful pointer into our code. It does not do that; instead, it fails with an obscure message and irrelevant (to the developper) pointers, to wit:
Error: premature close
at onclose (.../pipedreams/node_modules/end-of-stream/index.js:44:54)
at emitNone (events.js:85:20)
at emit (events.js:179:7)
at Duplexify._destroy (.../pipedreams/node_modules/duplexify/index.js:191:8)
at .../pipedreams/node_modules/duplexify/index.js:174:10
at _combinedTickCallback (node.js:370:9)
at process._tickDomainCallback (node.js:425:11)
Always Use an Output and Wait for it
Beware of Incompatible Libraries
When re-writing the algorithms of PipeDreams for version 4, I wanted not only to weed out some odd bugs that appeared in strange corner cases, I also wanted to make sure that PipeDreams does not inadvertently cause some streams to fall back into pre-Streams-v3 mode. Consequently, I had to say good-bye to e.g. github.com/dominictarr/event-stream and github.com/dominictarr/split, both of which had served their purpose very well so far, but were no more up-to-date with the developement of NodeJS streams.
Happy at first was I when finding github.com/mcollina/split2, a library that says it "is inspired by @dominictarr split module, and it is totally API compatible with it"; further, it promises to be "based on through2 by @rvagg and [to be] fully based on Stream3". That's great! Just swap the one for the other, done!
Sadly, that didn't work out. I'm not claiming split2 has bugs, all I can say is that it did not reliably work within PipeDreams pipelines; the issue seems to be with stream end detection. Maybe there's something wrong with some PipeDreams method; I just don't know. All I do know is that github.com/maxogden/binary-split does work for me as advertised.
I think the takeaway here is that NodeJS streams are pretty complex beasts. I realize that I've put a lot of work into understanding streams and how to use them right, and I still do think that it's a worthwhile effort. But in all that complexity, there's always a chance that one party gets it flat wrong, or has made some as-such-valid, but nevertheless conflicting design decisions—a fault may occur in the PipeDreams code, in the client code (i.e. Your Code), or in some 3rd party module.
When faced with some fault, try to write a minimal test case (also known as Minimal Working Example (MWE)) and cleanly delineate (for example, by switching parts of the code on and off and re-running the test) exactly where and under what conditions the test works and where and when it fails.
Handle Errors Asynchronously
Error handling in NodeJS (and, generally, in any JavaScript VM) can be a bitch. Be it said that this
part of the language could be and should be vastly improved. For the time being, consider to use
CND.run
.
NOTE that running production code with CND.run
is not recommended as it does come with a hefty
performance penalty.
npm install --save cnd
To run a method, catch all the synchronous and asynchronous errors and print a stacktrace
to process.stderr
(using console.error
):
CND = require `cnd`
CND.run -> f 42
Same as the above, but do the error handling yourself:
CND.run ( -> f 42 ), ( error ) -> foobar()
Same as the last, except have CND output the error's stacktrace and be called back afterwards:
CND.run ( -> f 42 ), null, ( error ) -> foobar()
NB.: CND.run
may be made configurable in the future; as of now, it is hardwired to use colors
and always provide long (cross-event) stack traces. Colors used are blue for NodeJS VM built-ins,
green for errors originating from modules installed under node_modules
, and yellow for everything
else.
Plugins
PipeDreams Plugin: Tabulate
See the Plugin Tabulate Readme.
PipeDreams Plugin: TSV
See the Plugin TSV Readme.
PipeDreams API
Note In the below, headings show the exact signature of each method as defined in the source.
@
is CoffeeScript's symbol for JavaScript'sthis
—replace it with whatever your favorite import symbol for the PipeDreams library may be. In the explanatory texts, that isD
as in the rest of this document.
@$
@$as_json_list = ( tags... ) ->
Turn a stream of data into a JSON list. The source of $as_json_list
demonstrates
the usefulness of transform combinations:
@$as_json_list = ( tags... ) ->
if ( pretty = 'pretty' in tags ) and ( arity = tags.length ) > 1
throw new Error "expected at most single tag 'pretty', go #{rpr tags}"
if pretty then intersperse = @$intersperse '[\n ', ',\n ', '\n ]\n'
else intersperse = @$intersperse '[', ',', ']'
return @new_stream pipeline: [
( @$stringify() )
( intersperse )
( @$join '' ) ]
$as_json_list
accepts a single argument; when present, that must be the string 'pretty'
to turn the
result from a one-liner (for small amounts of data) to a one-record-per-line JSON representation.
'Pretty-printed' JSON files are great because they lend themselves both to be edited in your favorite text
editor and to be efficiently processed in a linewise fashion.
@$as_list = ( names... ) ->
Turn named attributes into list of values.
@$as_text = ( stringify ) ->
Turn all data items into texts using JSON.stringify
or a custom stringifier. null
and any strings
in the data stream is passed through unaffected. Observe that buffers in the stream will very probably not
come out the way you'd expect them; this is because there's no way to know for the method what kind of
data they represent.
This method is handy to put as a safeguard right in front of a .pipe output_file
clause to avoid
illegal non-buffer
issues.
@$async
@$batch
@$benchmark = ( title = null ) ->
(see tests)
@$benchmark.summarize = =>
(see tests)
@$bridge = ( stream ) ->
Make it so that the pipeline may be continued even below a writable but not
readable stream. Conceivably, this method could have be named tunnel
as
well. Something to get you across, you get the meaning. Useful for NodeJS
writable streams which do not normally allow you to pipe something out of—in
other words, when you pipe something into, say, fs.createWriteStream
'/tmp/foo.txt'
, you can't take that stream and pipe it somewhere else. This
won't work:
input
.pipe ( require 'fs' ).createWriteStream 'foo.txt'
.pipe ( require 'fs' ).createWriteStream 'bar.txt'
... but this works ...
input
.pipe D.$bridge ( require 'fs' ).createWriteStream 'foo.txt'
.pipe D.$bridge ( require 'fs' ).createWriteStream 'bar.txt'
... and this will work, too; all PipeDreams streams allow being piped from:
input
.pipe D.new_stream 'write', path: 'foo.txt'
.pipe D.new_stream 'write', path: 'bar.txt'
@$collect
@$count
@$decode = ( encoding = 'utf-8' ) ->
@$drop
@$filter
@$intersperse = ( joiners... ) ->
Similar to $join
, $intersperse
allows to put extra data in between each pair of original data; in
contradistinction to $join
, however, $intersperse
does not stringify any data, but keeps the insertions
as separate events.
$intersperse
expects between one and three argument, each one of which may be a function or an arbitrary
piece of data. If it receives a single argument, that value will be used as a 'mid joiner', i.e. it will be
emitted in between any two events in the stream.
If $intersperse
is called with two arguments, it will insert the value of the first argument before the
first and after the last event in the stream. If called with three, the first argument goes to the
beginning, the second to all the in-betweens, and the last to the end of the stream. In case one of the
joiners is null
, it will not be applied, and the same goes for joiners that are functions: if a
joiner function returns null
, no extra data item will be sent.
This simple test case provides an overview:
demo = ( x..., handler ) =>
input = D.new_stream()
input
.pipe D.$intersperse x...
.pipe D.$collect()
.pipe $ ( data ) ->
if data?
help x, data.join ''
.pipe $ 'finish', handler
D.send input, 'a'
D.send input, 'b'
D.send input, 'c'
D.end input
Here's a rundown of what resulta to expect with which kinds of signatures; it's probably best to stick to the starred call patterns for clarity:
### 1 way to call with 0 arguments ###
D.$intersperse() # 'abc' — No-Op
#.......................................................................................................
### 2 ways to call with 1 arguments ###
D.$intersperse null # 'abc' — No-Op
D.$intersperse '—' # 'a—b—c' — mid *
#.......................................................................................................
### 4 ways to call with 2 arguments ###
D.$intersperse null, null # 'abc' — No-Op
D.$intersperse null, '—' # 'a—b—c' — mid
D.$intersperse '{', null # '{abc{' — start == stop
D.$intersperse '{', '—' # '{a—b—c{' — mid, start == stop
#.......................................................................................................
### 8 ways to call with 3 arguments ###
D.$intersperse null, null, null # 'abc' — No-Op
D.$intersperse null, null, '}' # 'abc}' — stop *
D.$intersperse null, '—', null # 'a—b—c' — mid
D.$intersperse null, '—', '}' # 'a—b—c}' — mid, stop *
D.$intersperse '{', null, null # '{abc' — start *
D.$intersperse '{', null, '}' # '{abc}' — start, stop *
D.$intersperse '{', '—', null # '{a—b—c' — start, mid *
D.$intersperse '{', '—', '}' # '{a—b—c}' — start, mid, stop *
@$join = ( outer_joiner = '\n', inner_joiner = ', ' ) ->
Join all strings and lists in the stream. $join
accepts two arguments, an outer_joiner
and an
inner_joiner
. Joining works in three steps: First, all list encountered in the stream are joined using
the inner_joiner
, turning each list into a string as a matter of course. In the second step, the entire
stream data is collected into a list (using PipeDreams $collect
). In the last step, that collection is
turned into a single string by joining them with the outer_joiner
. The outer_joiner
defaults to a
newline, the inner_joiner
to a comma and a space.
@$lockstep
@$parse_csv
@$pass
@$read_file = ( settings ) ->
Transform that to turn file paths into file contents. $read_file
uses NodeJS'
Buffer.concat
method so it should be reasonably performant.
By default, $read_file
passes on a list [ path, content, ]
because it is expected that most of the time,
subsequent transforms will need to know the origins of the contents. If you only want content
to be passed
on, call $read_file
with { bare: no, }
. In order to turn the content buffer into a string, add an
encoding
argument, e.g. { encoding: 'utf-8', }
@$sample = ( p = 0.5, options ) ->
Given a 0 <= p <= 1
, interpret p
as the probability to pick a given record and otherwise toss
it, so that $sample 1
will keep all records, $sample 0
will toss all records, and
$sample 0.5
(the default) will toss (on average) every other record.
You can pipe several $sample()
calls, reducing the data stream to 50% with each step. If you know
your data set has, say, 1000 records, you can cut down to a random sample of 10 by piping the result of
calling $sample 1 / 1000 * 10
(or, of course, $sample 0.01
).
Tests have shown that a data file with 3'722'578 records (which didn't even fit into memory when parsed)
could be perused in a matter of seconds with $sample 1 / 1e4
, delivering a sample of around 370
records. Because these records are randomly selected and because the process is so immensely sped up, it
becomes possible to develop regular data processing as well as coping strategies for data-overload
symptoms with much more ease as compared to a situation where small but realistic data sets are not
available or have to be produced in an ad-hoc, non-random manner.
Parsing CSV: There is a slight complication when your data is in a CSV-like format: in that case,
there is, with 0 < p < 1
, a certain chance that the first line of a file is tossed, but some
subsequent lines are kept. If you start to transform the text line into objects with named values later in
the pipe (which makes sense, because you will typically want to thin out largeish streams as early on as
feasible), the first line kept will be mis-interpreted as a header line (which must come first in CSV
files) and cause all subsequent records to become weirdly malformed. To safeguard against this, use
$sample p, headers: true
(JS: $sample( p, { headers: true } )
) in your code.
Predictable Samples: Sometimes it is important to have randomly selected data where samples are constant across multiple runs:
once you have seen that a certain record appears on the screen log, you are certain it will be in the database, so you can write a snippet to check for this specific one;
you have implemented a new feature you want to test with an arbitrary subset of your data. You're still tweaking some parameters and want to see how those affect output and performance. A random sample that is different on each run would be a problem because the number of records and the sheer bytecount of the data may differ from run to run, so you wouldn't be sure which effects are due to which causes.
To obtain predictable samples, use $sample p, seed: 1234
(with a non-zero number of your choice);
you will then get the exact same
sample whenever you re-run your piping application with the same stream and the same seed. An interesting
property of the predictable sample is that—everything else being the same—a sample with a smaller p
will always be a subset of a sample with a bigger p
and vice versa.
@$show
@$select = ( selector, tracks ) ->
Sometimes it is convenient to split up stream processing depending on the 'kind' of data item at hand. For example, imagine you do a typesetting job where each bit coming down the line is either a typesetting command or else a bit of text; imagine you're doing multilingual typesetting in English and Arabic or Chinese and it's easy to see that the processing pipeline could at some point be split up like this:
│
┌──────▼──────┐
│ all events │
└──────┬──────┘
▼
┌─────────────┐
│ $select │
└────▼───▼────┘
┌─────────────────────┘ └─────┐
▼ ▼
┌─────────────┐ ┌─────────────┐
│ commands │ │ text events │
└──────┬──────┘ └──────┬──────┘
│ ▼
│ ┌─────────────┐
│ │ $select │
│ └────▼─▼─▼────┘
│ │ │ │
│ ┌─────────────┘ │ └─────────────┐
│ ▼ ▼ ▼
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ │ English │ │ Arabic │ │ Chinese │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ └─────────────┐ │ ┌─────────────┘
│ ▼ ▼ ▼
│ ┌─────────────┐
│ │ text events │
│ └──────┬──────┘
└─────────────────────┐ ┌─────┘
▼ ▼
┌─────────────┐
│ all events │
└──────┬──────┘
▼
This way, we can have a 'branch' or 'track' of the pipeline that deals exclusively with commands, and another one that deals exclusively with text events; the latter could at some point again be split up to do some kind of text processing that only applies to a certain language or certain script.
The ususal alternative to splitting the processing into tracks is to maintain a single chain of stream
transforms. This has two disadvantages: for one thing, there will be a lot of code duplication because
almost all transforms will need some kind of almost-identical if
statement near the top to decide which
events to deal with. On the other hand, transforms loose part of their 'innocence': a transform that, at its
heart, only wants to deal with some text snippet has, all of a sudden, been requisitioned to dabble in event
sieving as well.
Below we demonstrate a rather simpler example than the one outlined above. In that example, there are three
'translator' stream transforms that turn incoming numbers into the corresponding English, French, and German
words. Further, there's a transform that draws a separator line. These four functions are registered in the
tracks
object. Also, there's a dispatch
function that accepts a piece of data
and returns an
object to indicate where to direct the incoming data.
In general, the dispatch
function should return a plain old dictionary (POD) with the obligatory element
key
(to select the target stream) and the facultative element data
(to replace the data originally
passed into $select
). The key
element may be
Symbol.for 'pass'
to make$select
pass the data through without submitting it to any kind of processing;Symbol.for 'drop
to make$select
drop the data;- a stream or transform to send the value to;
- a valid key of the object or list passed as second argument to
select
; - or a list of zero or more of any of the above to send
data
to any number of destinations.
#------------------------------------------------------------------------------
say_it_in_english = $ ( n, send ) ->
if n?
switch n
when 1 then send 'one'
when 2 then send 'two'
when 3 then send 'three'
else send 'many'
return null
#------------------------------------------------------------------------------
say_it_in_french = $ ( n, send ) ->
switch n
when 1 then send 'un'
when 2 then send 'deux'
when 3 then send 'troix'
else send 'beaucoup'
return null
#------------------------------------------------------------------------------
say_it_in_german = $ ( n, send ) ->
switch n
when 1 then send 'eins'
when 2 then send 'zwei'
when 3 then send 'drei'
else send 'viele'
return null
#------------------------------------------------------------------------------
draw_a_separator = $ ( ignore, send ) ->
send '—————'
return null
#------------------------------------------------------------------------------
dispatch = ( data ) ->
return key: Symbol.for 'drop' if data is 'drop this one'
return key: Symbol.for 'pass' if data is 'pass this one'
return key: 'SEP' if data is '---'
[ languages, number, ] = data
if languages is '*' then languages = [ 'EN', 'FR', 'DE', ]
else languages = ( language.toUpperCase() for language in languages.split ',' )
return key: languages, data: number
#------------------------------------------------------------------------------
tracks =
EN: say_it_in_english
FR: say_it_in_french
DE: say_it_in_german
SEP: draw_a_separator
#------------------------------------------------------------------------------
probes = [
[ 'fr', 1, ]
[ 'fr', 2, ]
[ 'fr', 3, ]
[ 'fr', 4, ]
'pass this one'
'---'
[ 'en,fr', 1, ]
'---'
'drop this one'
[ '*', 1, ]
'---'
[ 'en', 2, ]
'---'
[ 'de', 3, ]
[ 'de', 4, ]
]
#------------------------------------------------------------------------------
matchers = [
"un"
"deux"
"troix"
"beaucoup"
"pass this one"
"—————"
"one"
"un"
"—————"
"one"
"un"
"eins"
"—————"
"two"
"—————"
"drei"
"viele"
]
#------------------------------------------------------------------------------
my_input = D.new_stream()
my_input
.pipe D.$select dispatch, tracks
.pipe D.$collect()
.pipe $ ( results ) =>
T.eq results.length, matchers.length
T.eq results[ idx ], matcher for matcher, idx in matchers
.pipe $ 'finish', done
#..............................................................................
D.send my_input, probe for probe in probes
D.end