node-internal-pubsub
v1.0.1
Published
A publish/subscribe API similar to that in node_redis, minus the redis
Downloads
15
Maintainers
Readme
node-internal-pubsub
A publish/subscribe API similar to that in node_redis
, minus the redis.
Built on pattern-emitter.
Installation
Using npm, you can install node-internal-pubsub with
npm install node-internal-pubsub
. You can also require it as a dependency in
your package.json
file:
"dependencies": {
"node-internal-pubsub": "1.0.x"
}
Overview
Quite a few SockJS and Socket.IO examples/tutorials create a redis client in subscriber mode per socket connection. They demonstrate code along the lines of:
sockjs.on('connection', function(conn) {
var sub = redis.createClient();
...
}
or
io.sockets.on('connection', function(socket){
var sub = redis.createClient();
...
}
While the node_redis
pubsub API simplifies the management of subscriptions as
opposed to listeners (e.g. using EventEmitter), it likely shouldn't be used for
internal message routing as seen in the above example. This is mostly due to the
possible duplication of data and unnecessary network IO.
As such, this library exists to simplify the transition to using a single
redis client for all connections, in addition to an internal pubsub mechanism.
Most of the API has been designed to resemble that in node_redis
, though a
few key differences exists:
- Subscribers and publishers must be created using
pubsub.createSubscriber
andpubsub.createPublisher
, respectively - Rather than offering glob-based pattern subscriptions, regular expressions are available
- Subscribers do not exit subscriber mode when the subscription count reaches 0
Performance
A benchmark can be found at benchmarks/singleChannelMultiSubs.js
. It tests
the performance of sending 10,000,000 messages in a single channel using
2,000 redis clients/subscribers, compared to a single redis client
and 2,000 node-internal-pubsub
subscribers. Example results can be seen below:
$ node benchmarks/singleChannelMultiSubs.js
Setting up redis suite
Receiving 10000000 messages with 2000 redis subscribers
Setting up pubsub suite
Receiving 10000000 messages with 1 redis sub, 2000 pubsub subscribers
Redis subscribers
Running time: 29735 ms
Avg messages received per second: 336,304
Redis subscriber with pubsub subscribers
Running time: 1203 ms
Avg messages received per second: 8,312,551
This was ran with a local redis server on my Macbook Air. A ~24x improvement in performance can be seen in the above output by using the 2,000 internal subscribers as opposed to the same number of redis clients.
Examples
The following is a brief example showing how to setup the library with an existing websocket server:
var redisClient = redis.createClient();
var redisSub = redis.createClient();
var pub = pubsub.createPublisher();
// Subscribe to all incoming messages, and publish them to the internal pubsub
redisSub.psubscribe('*');
redisSub.on('pmessage', function(pattern, channel, msg) {
pub.publish(channel, msg);
});
wsServer.on('connection', function(conn) {
var sub = pubsub.createSubscriber();
// Publish messages received from the user to redis
conn.on('data', function(message) {
redisClient.publish('chatmessages', message);
});
});
Basic examples can be found in the examples
directory.
Publisher
A publisher in the internal pub sub module. Publishes messages by invoking emit on an instance of PatternEmitter.
createPublisher()
Creates and returns a new Publisher instance.
var pubsub = require('node-internal-pubsub');
var publisher = pubsub.createPublisher();
publisher.publish(channel, message)
Publishes a message to the given channel. Channel is expected to be a string, though message can be any object.
publisher.publish('channel:1', 'A message to send to all channel subscribers');
Subscriber
A subscriber in the internal pub sub module. Each subscriber holds a count of the number of subscriptions it holds. It extends EventEmitter, but rather than emitting newListener and removeListener events, it emits events corresponding to those used in node_redis: message, pmessage, subscribe, psubscribe, unsubscribe, and punsubscribe.
createSubscriber()
Creates and returns a new Subscriber instance.
var pubsub = require('node-internal-pubsub');
var subscriber = pubsub.createSubscriber();
subscriber.subscribe([channel1], [channel2], [...])
Subscribes to all messages published in the given channels. Accepts multiple channels as arguments, and emits a subscribe event for each newly subscribed channel. The event is passed the channel name, and the current subscription count. Ignores channels that are already subscribed to.
subscriber.subscribe('channel:1', 'channel:2');
subscriber.on('message', function(channel, message) {
console.log('Received:', channel, '-', message);
});
publisher.publish('channel:1', 'Example message');
// 'Received: channel:1 - message'
subscriber.unsubscribe([...channels])
Unsubscribes from messages in each of the provided channels. Accepts multiple channels as arguments, and emits an unsubscribe event for each channel. The event is passed the channel name, and the current subscription count. Ignores channels that are not among the current subscriptions. If no arguments are passed, the subscriber is unsubscribed from all channels.
subscriber.subscribe('channel:1', 'channel:2');
subscriber.unsubscribe('channel:1', 'channel:2');
subscriber.psubscribe([pattern1], [pattern2], [...])
Subscribes to all messages published in channels matching the given regular expressions' patterns. Accepts multiple RegExp objects as arguments, and emits a psubscribe event for each newly subscribed pattern. The event is passed the RegExp, and the current subscription count. Any non RegExp instances passed to this function are cast to a string, and used to create a RegExp. Ignores patterns that are already subscribed to.
subscriber.psubscribe(/channel/);
subscriber.on('pmessage', function(pattern, channel, message) {
console.log('Received:', pattern, '-', channel, '-', message);
});
publisher.publish('channel:1', 'Example message');
// 'Received: /channel/ - channel:1 - Example message'
subscriber.punsubscribe([...patterns])
subscriber.psubscribe(/channel/);
subscriber.punsubscribe(/channel/);
Unsubscribes from each of the provided regular expressions' patterns. Accepts multiple RegExp objects as arguments, and emits a punsubscribe event for each pattern. The event is passed the RegExp, and the current subscription count. Any non RegExp instances passed to this function are cast to a string, and used to create a RegExp. Ignores patterns that are not among the current subscriptions. If no arguments are passed, the subscriber is unsubscribed from all patterns.
Events
Each subscriber instance extends EventEmitter and, like node_redis subscriber clients, emit the following events: message, pmessage, subscribe, psubscribe, unsubscribe, and punsubscribe.
message
{string} channel, {*} message
Emitted when a message is received on a subscribed channel.
subscriber.subscribe('example');
subscriber.on('message', function(channel, message) {
console.log(channel, message);
});
publisher.publish('example', 'message'); // 'example message'
pmessage
{RegExp} pattern, {string} channel, {*} message
Emitted when a message is received on a channel matching a pattern subscription.
subscriber.subscribe(/^example/i);
subscriber.on('message', function(pattern, channel, message) {
console.log(pattern, channel, message);
});
publisher.publish('ExampleSub', 'message'); // '/^example/i ExampleSub message'
subscribe
{string} channel, {int} count
Emitted when a new channel is subscribed to. Both the channel name and the updated subscription count are passed to the listener.
subscriber.on('subscribe', function(channel, count) {
console.log(channel, count);
});
subscriber.subscribe('example'); // 'example 1'
psubscribe
{RegExp} pattern, {int} count
Emitted when a new pattern is subscribed to. The pattern and the updated subscription count are passed to the listener.
subscriber.on('psubscribe', function(pattern, count) {
console.log(pattern, count);
});
subscriber.psubscribe(/^\w+$/); // '/^\w+$/ 1'
unsubscribe
{string} channel, {int} count
Emitted in response to unsubscribing from a channel. Both the channel name and the updated subscription count are passed to the listener.
subscriber.subscribe('example');
subscriber.on('unsubscribe', function(channel, count) {
console.log(channel, count);
});
subscriber.unsubscribe('example'); // 'example 0'
punsubscribe
{RegExp} pattern, {int} count
Emitted in response to unsubscribing from a pattern The pattern and the updated subscription count are passed to the listener.
subscriber.psubscribe(/\w+/);
subscriber.on('punsubscribe', function(pattern, count) {
console.log(pattern, count);
});
subscriber.punsubscribe(/\w+/); // '/\w+/ 0'