npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

primus-backpressure

v3.0.3

Published

Node streams2 over Primus: added back-pressure!

Downloads

112

Readme

primus-backpressure   Build Status Coverage Status NPM version

Node streams2 over Primus: added back-pressure!

  • Pass in a Primus client or spark, get back a stream.Duplex. Do this on both sides of a Primus connection.
  • write returns true when the receiver is full.
  • Use read and readable to exert back-pressure on the sender.
  • Unit tests with 100% coverage.
  • Browser unit tests using webpack and PhantomJS.

The API is described here.

Example

Exerting backpressure over Primus:

var Primus = require('primus'),
    PrimusDuplex = require('primus-backpressure').PrimusDuplex,
    primus = Primus.createServer({ port: 9000 }),
    Socket = primus.Socket,
    assert = require('assert'),
    read_a = false;

primus.once('connection', function (spark)
{
    var spark_duplex = new PrimusDuplex(spark,
    {
        highWaterMark: 2
    });

    assert.equal(spark_duplex.write('ab'), false);

    spark_duplex.on('drain', function ()
    {
        assert(read_a);
    });
});

var client_duplex = new PrimusDuplex(new Socket('http://localhost:9000'),
{
    highWaterMark: 1
});

client_duplex.once('readable', function ()
{
    assert.equal(this.read().toString(), 'a');
    read_a = true;
    assert.equal(this.read(), null);

    this.once('readable', function ()
    {
        assert.equal(this.read().toString(), 'b');
        primus.end();
        console.log('done')
    });
});

Another Example

Piping data over Primus:

var Primus = require('primus'),
    PrimusDuplex = require('primus-backpressure').PrimusDuplex,
    primus = Primus.createServer({ port: 9000 }),
    Socket = primus.Socket,
    assert = require('assert'),
    crypto = require('crypto'),
    tmp = require('tmp'),
    fs = require('fs');

primus.once('connection', function (spark)
{
    var spark_duplex = new PrimusDuplex(spark);

    tmp.tmpName(function (err, random_file)
    {
        assert.ifError(err);

        var random_buf = crypto.randomBytes(1024 * 1024);

        fs.writeFile(random_file, random_buf, function (err)
        {
            assert.ifError(err);

            tmp.tmpName(function (err, out_file)
            {
                assert.ifError(err);

                var random_stream = fs.createReadStream(random_file),
                    out_stream = fs.createWriteStream(out_file);

                out_stream.on('finish', function ()
                {
                    fs.readFile(out_file, function (err, out_buf)
                    {
                        assert.ifError(err);
                        assert.deepEqual(out_buf, random_buf);

                        fs.unlink(random_file, function (err)
                        {
                            assert.ifError(err);
                            fs.unlink(out_file, function (err)
                            {
                                assert.ifError(err);
                                primus.end();
                                console.log('done');
                            });
                        });
                    });
                });

                spark_duplex.pipe(out_stream);
                random_stream.pipe(spark_duplex);
            });
        });
    });
});

var client_duplex = new PrimusDuplex(new Socket('http://localhost:9000'));
client_duplex.pipe(client_duplex);

Installation

npm install primus-backpressure

Licence

MIT

Test

Node client to Node server:

grunt test

Browser client to Node server (requires PhantomJS):

grunt test-browser

Code Coverage

grunt coverage

c8 results are available here.

Coveralls page is here.

Lint

grunt lint

API

PrimusDuplex inherits from stream.Duplex so you can call any method from stream.Readable and stream.Writable.

Extra constructor options and an additional parameter to readable.read are described below.

Source: index.js

PrimusDuplex(msg_stream, [options])

Creates a new PrimusDuplex object which exerts back-pressure over a Primus connection.

Both sides of a Primus connection must use PrimusDuplex — create one for your Primus client and one for your spark as soon as you have them.

Parameters:

  • {Object} msg_stream The Primus client or spark you wish to exert back-pressure over.
  • {Object} [options] Configuration options. This is passed onto stream.Duplex and can contain the following extra properties:
    • {Function} [encode_data(chunk, encoding, start, end, internal)] Optional encoding function for data passed to writable.write. chunk and encoding are as described in the writable.write documentation. The difference is that encode_data is synchronous (it must return the encoded data) and it should only encode data between the start and end positions in chunk. Defaults to a function which does chunk.toString('base64', start, end). Note that PrimusDuplex may also pass some internal data through this function (always with chunk as a Buffer, encoding=null and internal=true).

    • {Function} [decode_data(chunk, internal)] Optional decoding function for data received on the Primus connection. The type of chunk will depend on how the peer PrimusDuplex encoded it. Defaults to a function which does Buffer.from(chunk, 'base64'). If the data can't be decoded, return null (and optionally call this.emit to emit an error). Note that PrimusDuplex may also pass some internal data through this function (always with internal=true) — in which case you must return a Buffer.

    • {Integer} [max_write_size] Maximum number of bytes to write onto the Primus connection at once, regardless of how many bytes the peer is free to receive. Defaults to 0 (no limit).

    • {Boolean} [check_read_overflow] Whether to check if more data than expected is being received. If true and the high-water mark for reading is exceeded then the PrimusDuplex object emits an error event. This should not normally occur unless you add data yourself using readable.unshift — in which case you should set check_read_overflow to false. Defaults to true.

Go: TOC

PrimusDuplex.prototype.read([size], [send_status])

See readable.read. PrimusDuplex adds an extra optional parameter, send_status.

Parameters:

  • {Number} [size] Optional argument to specify how much data to read. Defaults to undefined (you can also specify null) which means return all the data available.
  • {Boolean} [send_status] Every time you call read, a status message is sent to the peer PrimusDuplex indicating how much space is left in the internal buffer to receive new data. To prevent deadlock, these status messages are always sent — they aren't subject to back-pressure. Normally this is fine because status messages are small. However, if your application reads data one byte at a time, for example, you may wish to control when status messages are sent. To stop a status message being sent when you call read, pass send_status as false. send_status defaults to true. To force a status message to be sent without reading any data, call read(0).

Go: TOC | PrimusDuplex.prototype

—generated by apidox