npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

muxdemux

v1.1.1

Published

multiplex and demultiplex (mux / demux) streams into an single stream (object mode or not)

Downloads

20

Readme

muxdemux

multiplex and demultiplex (mux / demux) streams into an single stream (object mode or not)

Installation

npm i --save muxdemux

Usage

Example: muxdemux substreams

var muxdemux = require('muxdemux')
var mux = muxdemux()
var demux = muxdemux(handleSubstream)
mux.pipe(demux)
mux.substream('foo').write(new Buffer('hello world'))
mux.substream('bar').write(new Buffer('yolo'))
function handleSubstream (substream, name) {
  if (name === 'foo') {
    substream.once('data', function (data) {
      data.toString() // 'hello world'
    })
    substream.pipe(/* any other stream */)
  } else if (name === 'bar') {
    substream.once('data', function (data) {
      data.toString() // 'yolo'
    })
  }
}

Example: muxdemux substream events

Substream events are encoded and sent down the stream as data packets. Demux streams will decode these data packets and emit the events as if they occurred on downstream substreams themselves. All emitted from a mux.substream().emit(...) will be propagated to downstream substreams.

var muxdemux = require('muxdemux')
var mux = muxdemux()
var demux = muxdemux(handleSubstream)
mux.pipe(demux)
mux.substream('foo').emit('buffer-event', new Buffer('🔥'))
mux.substream('bar').emit('error', new Error('boom'))
mux.substream('qux').emit('other-event', { abc: 'hello' })
function handleSubstream (substream, name) {
  if (name === 'foo') {
    // buffers are encoded on json and reparsed as buffers
    substream.once('buffer-event', function (buf, {
      buf instanceof Buffer // true
      buf.toString() // '🔥'
    })
  } else if (name === 'bar') {
    substream.once('error', function (err) {
      // errors are encoded on json and reparsed as errors
      err instanceof Error // true
      err.message // 'boom'
      err.stack // 'Error: boom ...'
    })
  } else if (name === 'qux') {
    substream.once('other-event', function (data) {
      // errors are encoded on json and reparsed as errors
      typeof data // 'object'
      data // { abc: 'hello' }
    })
  }
}

Example: muxdemux object mode streams

For now muxdemux assumes that substreams share the same objectMode as their parents; substreams of objectMode:true mux/demux streams will also be objectMode:true and vice versa.

var muxdemux = require('muxdemux')
var mux = muxdemux.obj()
var demux = muxdemux.obj(handleSubstream)
mux.pipe(demux)
mux.substream('foo').write({ hello: 1 })
mux.substream('bar').write({ world: 2 })
function handleSubstream (substream, name) {
  if (name === 'foo') {
    substream.once('data', function (data) {
      data // { hello: 1 }
    })
    substream.pipe(/* any other stream */)
  } else if (name === 'bar') {
    substream.once('data', function (data) {
      data // { world: 2 }
    })
  }
}

Example: muxdemux substreams end

If all of a mux/demux's substreams end the mux/demux stream will also end. The same is also true for the opposite. If a mux/demux stream ends, it's substreams will be ended.

var muxdemux = require('muxdemux')
var mux = muxdemux.obj()
var demux = muxdemux.obj(function handleSubstream (substream, name) { /* ... */ })
mux.on('finish', handleMuxFinish)
demux.on('finish', handleDemuxFinish)
mux.pipe(demux)
var foo = mux.substream('foo')
foo.write({ hello: 1 })
var bar = mux.substream('bar')
bar.write({ world: 2 })
// end substreams
foo.end()
bar.end()
function handleMuxFinish () {
  // get's called bc all substreams ended (both foo and bar)
}
function handleDemuxFinish () {
  // get's called downstream bc all substreams ended (both foo and bar)
}

Example: unexpected muxdemux finish

If a muxdemux finishes before it's substreams it will emit an error to each unfinished substream. This default behavior can be disabled by passing opts.unexpectedFinishError = false

var muxdemux = require('muxdemux')
var mux = muxdemux.obj()
var foo = mux.substream('foo')
var bar = mux.substream('bar')
foo.on('error', function (err) {
  // called bc mux finished before foo-substream finished
  err // [Error: unexpected muxdemux finish]
})
bar.on('error', function () {
  // not called, bc bar finished before mux
})
bar.end() // bar ends first, hence no error
mux.end()

Example: unexpected muxdemux error

If a muxdemux errors before it's substreams finish it will emit an error to each unfinished substream. This default behavior can be disabled by passing opts.unexpectedFinishError = false

var muxdemux = require('muxdemux')
var mux = muxdemux.obj()
var foo = mux.substream('foo')
var bar = mux.substream('bar')
foo.on('error', function (err) {
  // called bc mux finished before foo-substream finished
  // error message is prepended w/ 'unexpected muxdemux error: '
  err // [Error: unexpected muxdemux error: boom]
})
bar.on('error', function () {
  // not called, bc bar finished before mux
})
bar.end() // bar ends first, hence no error
mux.emit('error', new Error('boom'))

Example: circular streams

Circular substream data is filtered out of muxdemux streams by default. But if you want to be explicit and prevent non-substream data from infinitely circulating through your stream use opts.circular. opts.circular will filter out non-substream data. Circular streams are useful if you want substream events to be emitted "upstream" and "downstream".

// server.js
var muxdemux = require('muxdemux')
var websocket = /* ... */
var mux = muxdemux({ circular: true })
websocket.pipe(mux).pipe(websocket)
var fooSubstream = mux.substream('foo')
fooSubstream.on('custom1', function (data) {
  console.log(data) // "hello"
  fooSubstream.emit('custom2', 'world')
})

// client.js
var muxdemux = require('muxdemux')
var websocket = /* ... */
var mux = muxdemux({ circular: true })
websocket.pipe(demux).pipe(websocket)
var fooSubstream = demux.substream('foo')
fooSubstream.on('custom2', function () {
  console.log(data) // "world"
})
fooSubstream.emit('custom1', 'hello')

License

MIT