primus-backpressure
v3.0.3
Published
Node streams2 over Primus: added back-pressure!
Downloads
112
Maintainers
Readme
primus-backpressure
Node streams2 over Primus: added back-pressure!
- Pass in a Primus client or spark, get back a
stream.Duplex
. Do this on both sides of a Primus connection. write
returnstrue
when the receiver is full.- Use
read
andreadable
to exert back-pressure on the sender. - Unit tests with 100% coverage.
- Browser unit tests using webpack and PhantomJS.
The API is described here.
Example
Exerting backpressure over Primus:
var Primus = require('primus'),
PrimusDuplex = require('primus-backpressure').PrimusDuplex,
primus = Primus.createServer({ port: 9000 }),
Socket = primus.Socket,
assert = require('assert'),
read_a = false;
primus.once('connection', function (spark)
{
var spark_duplex = new PrimusDuplex(spark,
{
highWaterMark: 2
});
assert.equal(spark_duplex.write('ab'), false);
spark_duplex.on('drain', function ()
{
assert(read_a);
});
});
var client_duplex = new PrimusDuplex(new Socket('http://localhost:9000'),
{
highWaterMark: 1
});
client_duplex.once('readable', function ()
{
assert.equal(this.read().toString(), 'a');
read_a = true;
assert.equal(this.read(), null);
this.once('readable', function ()
{
assert.equal(this.read().toString(), 'b');
primus.end();
console.log('done')
});
});
Another Example
Piping data over Primus:
var Primus = require('primus'),
PrimusDuplex = require('primus-backpressure').PrimusDuplex,
primus = Primus.createServer({ port: 9000 }),
Socket = primus.Socket,
assert = require('assert'),
crypto = require('crypto'),
tmp = require('tmp'),
fs = require('fs');
primus.once('connection', function (spark)
{
var spark_duplex = new PrimusDuplex(spark);
tmp.tmpName(function (err, random_file)
{
assert.ifError(err);
var random_buf = crypto.randomBytes(1024 * 1024);
fs.writeFile(random_file, random_buf, function (err)
{
assert.ifError(err);
tmp.tmpName(function (err, out_file)
{
assert.ifError(err);
var random_stream = fs.createReadStream(random_file),
out_stream = fs.createWriteStream(out_file);
out_stream.on('finish', function ()
{
fs.readFile(out_file, function (err, out_buf)
{
assert.ifError(err);
assert.deepEqual(out_buf, random_buf);
fs.unlink(random_file, function (err)
{
assert.ifError(err);
fs.unlink(out_file, function (err)
{
assert.ifError(err);
primus.end();
console.log('done');
});
});
});
});
spark_duplex.pipe(out_stream);
random_stream.pipe(spark_duplex);
});
});
});
});
var client_duplex = new PrimusDuplex(new Socket('http://localhost:9000'));
client_duplex.pipe(client_duplex);
Installation
npm install primus-backpressure
Licence
Test
Node client to Node server:
grunt test
Browser client to Node server (requires PhantomJS):
grunt test-browser
Code Coverage
grunt coverage
c8 results are available here.
Coveralls page is here.
Lint
grunt lint
API
PrimusDuplex
inherits from stream.Duplex
so you can call any method from stream.Readable
and stream.Writable
.
Extra constructor options and an additional parameter to readable.read
are described below.
Source: index.js
PrimusDuplex(msg_stream, [options])
Creates a new
PrimusDuplex
object which exerts back-pressure over a Primus connection.
Both sides of a Primus connection must use PrimusDuplex
— create one for your Primus client and one for your spark as soon as you have them.
Parameters:
{Object} msg_stream
The Primus client or spark you wish to exert back-pressure over.{Object} [options]
Configuration options. This is passed ontostream.Duplex
and can contain the following extra properties:{Function} [encode_data(chunk, encoding, start, end, internal)]
Optional encoding function for data passed towritable.write
.chunk
andencoding
are as described in thewritable.write
documentation. The difference is thatencode_data
is synchronous (it must return the encoded data) and it should only encode data between thestart
andend
positions inchunk
. Defaults to a function which doeschunk.toString('base64', start, end)
. Note thatPrimusDuplex
may also pass some internal data through this function (always withchunk
as aBuffer
,encoding=null
andinternal=true
).{Function} [decode_data(chunk, internal)]
Optional decoding function for data received on the Primus connection. The type ofchunk
will depend on how the peerPrimusDuplex
encoded it. Defaults to a function which doesBuffer.from(chunk, 'base64')
. If the data can't be decoded, returnnull
(and optionally callthis.emit
to emit an error). Note thatPrimusDuplex
may also pass some internal data through this function (always withinternal=true
) — in which case you must return aBuffer
.{Integer} [max_write_size]
Maximum number of bytes to write onto the Primus connection at once, regardless of how many bytes the peer is free to receive. Defaults to 0 (no limit).{Boolean} [check_read_overflow]
Whether to check if more data than expected is being received. Iftrue
and the high-water mark for reading is exceeded then thePrimusDuplex
object emits anerror
event. This should not normally occur unless you add data yourself usingreadable.unshift
— in which case you should setcheck_read_overflow
tofalse
. Defaults totrue
.
Go: TOC
PrimusDuplex.prototype.read([size], [send_status])
See
readable.read
.PrimusDuplex
adds an extra optional parameter,send_status
.
Parameters:
{Number} [size]
Optional argument to specify how much data to read. Defaults toundefined
(you can also specifynull
) which means return all the data available.{Boolean} [send_status]
Every time you callread
, a status message is sent to the peerPrimusDuplex
indicating how much space is left in the internal buffer to receive new data. To prevent deadlock, these status messages are always sent — they aren't subject to back-pressure. Normally this is fine because status messages are small. However, if your application reads data one byte at a time, for example, you may wish to control when status messages are sent. To stop a status message being sent when you callread
, passsend_status
asfalse
.send_status
defaults totrue
. To force a status message to be sent without reading any data, callread(0)
.
Go: TOC | PrimusDuplex.prototype
—generated by apidox—