parallel-transform-stream
v1.0.1
Published
stream.Transform with parallel transforms
Downloads
372
Readme
Parallel Transform Stream
A NodeJS transform stream which runs transformations in parallel and preserves input order.
npm install parallel-transform-stream --save
This module's core is based on parallel-transform.
It was fully rewritten in ES6, and provides a more flexible, inheritance-based interface.
Usage
This module is (almost) a drop-in replacement for standard NodeJS transform streams.
import ParallelTransform from 'parallel-transform-stream';
class MyParallelTransformStream extends ParallelTransform {
constructor() {
super({maxParallel: 50, objectMode: true}); // process up to 50 transforms in parallel
}
_parallelTransform(data, encoding, done) {
// long-running, asynchronous operation
done(null, data);
}
// optional, like in stream.Transform
_parallelFlush(done) {
// finish stuff, if required
done();
}
}
Alternatively, you can use the following shortcut function:
import ParallelTransform from 'parallel-transform-stream';
const MyParallelTransformStream = ParallelTransform.create((data, encoding, done) => {
// long-running, asynchronous operation
done(null, data);
});
## Documentation
All classes extending the ParallelTransform
class must implement the method _parallelTransform
.
They may implement _parallelFlush
, although this is not required.
### API
ParallelTransform.create(transform, flush = function(done) { done(); }, defaultOptions = {})
transform
<Function>
The _transform function of the stream. See below for more detailsflush
<Function>
The _flush function of the stream. See below for more detailsdefaultOptions
<Object>
Default options for the class constructor
API for extending ParallelTransform
The constructor of the ParallelTransform
class accepts all options accepted by stream.Transform
. In addition, it accepts the maxParallel
property, which set the maximum number of parallel transformations.
All classes extending ParallelTransform
must implement the _parallelTransform
method, and may implement the _parallelFlush
method.
ParallelTransform._parallelTransform(chunk, encoding, callback)
chunk
<Buffer>
|<String>
The chunk to be transformed.encoding
<String>
If the chunk is a string, then this is the encoding type. If chunk is a buffer, then this is the special value - 'buffer', ignore it in this case.callback
<Function>
A callback function to be called after the supplied chunk has been processed. The first argument passed to the callback must be an error which has occurred during transformation, ornull
. The second argument is the result. The stream will stop processing transforms and emit anerror
event instantly if the error passed to the callback function was notnull
.
Please note that, as opposed to traditional NodeJS transform streams, you MUST NOT call this.push
directly. Emit values through the callback function instead.
You must not call the callback more than once.
ParallelTransform._parallelFlush(callback)
callback
<Function>
A callback function to be called when the stream has finished flushing.
ParallelTransform
implementations may implement the transform._flush() method. This will be called when there is no more written data to be consumed, but before the 'end' event is emitted signaling the end of the Readable stream.
### Migrating from stream.Transform
- Change
extends stream.Transform
toextends ParallelTransform
- Rename
_transform(data, encoding, done)
to_parallelTransform(data, encoding, done)
- If using
_flush
: Rename_flush(done)
to_parallelFlush(done)
- Replace
this.push(data); done();
withdone(null, data);
Example
import stream from 'stream';
class MyTransformStream extends stream.Transform {
constructor() {
super({objectMode: true});
}
_transform(data, encoding, done) {
// do something with `data`
this.push(data);
done();
}
}
becomes
import ParallelTransform from 'parallel-transform-stream';
class MyTransformStream extends ParallelTransform {
constructor() {
super({objectMode: true});
}
_parallelTransform(data, encoding, done) {
// do something with `data`
done(null, data);
}
}
Gotchas and caveats
- Calling
this.push()
will result in unexpected behaviour. Push results by callingdone(null, result)
. - Calling
done()
more than once will result in unexpected behaviour - By design, you cannot push multiple results from a single transform