npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

nsq-seneca

v0.0.1

Published

Use nsq to connect seneca services.

Downloads

1

Readme

nsq-seneca

Use NSQ to connect Seneca services.

This is in principle similar to a Seneca transport, but with a different API, not really idiomatic Seneca. The biggest reason why this is not "idiomatic Seneca" is that message routing is based on explicit topics (NSQ topics) and not arbitrarily complex patterns.

Why nsq-seneca?

Normal Seneca transports try to be network transparent. They try to make so that your code works the same way whether the "remote" service is physically remote or not.

The problem I have with them is that most of the time this abstraction is leaky, and service discovery gets in the way. When I started working on this seneca-balance-client and seneca-mesh were not ready, and I decided that NSQ was a good solution.

Over time I started liking the NSQ approach to building a distributed system more and more, and I wanted to connect my Seneca services using NSQ. The existing Seneca NSQ transport does not really support channels, and it generates topic names that are based on Seneca patterns. In the end I decided to go in a different direction, and build a Seneca plugin that would allow me to connect Seneca services in an NSQ idiomatic way.

This project is this plugin.

Base API

Require the module (FIXME: I should publish it first :-)) and obtain a Seneca instance:

var nsqt = require('seneca-nsq')
var Seneca = require('seneca)
var s = Seneca()

The module provides two main functions as entry point: handle (aliased as listen) and forward (aliased as client). Both of them can be used as Seneca plugins:

  • handle provides the "server" functionality (like listen in Seneca transports), it is called like this because when it is used the Seneca instance will handle those messages, listening to an NSQ topic.
  • forward is the "client" (like client in Seneca transports), it is called like this because when it is used the Seneca instance will forward those messages, publishing to an NSQ topic.

Both of them accept an options object, which can contain every option that can be passed to the Reader constructor from nsqjs plus the following ones:

  • topic: string: the NSQ topic used by this "service" (no default, this option is mandatory)
  • topicProperty: string: the property of the message that will contain the topic, used to build the Seneca pattern handled by this plugin instance (default: 'role')
  • chan: null | string: the NSQ channel (default is null, used for "forward" instances because they only publish, and for the "main" handler instance, see about request-reply below)
  • lookupdHTTPAddresses: Array<string>: the array of nsqlookupd addresses (default ['127.0.0.1:4161'])
  • writerNsqdHost: string: the address of the nsqd used for publishing (default '127.0.0.1')
  • writerNsqdPort: number: the port of the nsqd used for publishing (default 4150)
  • reply: boolean: whether this topic supports the request-reply pattern (see below, default false)
  • replyBy: number: the timeout for replies in milliseconds (default 20000)
  • replyToProperty: string: the message property used to carry the "reply to" NSQ topic (default rt$)
  • replyByProperty: string: the message property used to carry the "reply by" value, which is also the request id (default rb$)
  • sharding: ShardingOptions | undefined: description of how the topic is sharded (see below, default undefined for no sharding)
  • forwardDelay: number: delay in milliseconds for NSQ writer initialization (default 0)
  • handleDelay: number: delay in milliseconds for NSQ readers initialization (default 0)

From experience, the NSQ options you will want to set are:

  • maxInFlight (the default of 1 makes no sense at all for throughput)
  • lookupdPollInterval (too speed up service discovery)

At this point the plugin can be used like this:

// Send all actions matching {role: 'jobs'} to the NSQ topic 'jobs'
s.use(nsqt.forward, {topic: 'jobs'})

// This will go to NSQ topic 'jobs':
s.act({role: 'jobs', data: 'something to do...'})

// Listen to NSQ topic 'news' with rome 'log'
// and "act" all those messages on the local seneca instances
s.use(nsqt.handle, {topic: 'news', chan: 'log'})

// Invoke "callback" for every message received on topic 'news' by channel 'log':
s.add({role: 'news', chan: 'log'}, callback)

The plugin can be used any number of times because it always returns a different name to Seneca. Consider those functions like factories that create client and server plugin instances at will.

A server is always also a client

Note that a Seneca instance that is acting as a handler wor a particular topic will always publish messages to the NSQ topic first, and will handle them after having received them from NSQ.

More concretely, after a statement like s.use(nsqt.handle, {topic: 'news', chan: 'log'}), invoking s.act({role: 'news', data: 'something to say'}) on the same Seneca instance will not act the message immediately on the local Seneca. The message will be published remotely first. This is necessary because otherwise other listeners to the same topic on different channels would not have a chance to consume it. The NSQ semantics mandates this behavior, and this is one way in which nsq-seneca in idiomatic NSQ more than idiomatic Seneca.

When the message will be received, it will be acted on the local Seneca with a 'chan' property that matches the channel that the handler was listening to. In this way it is possible to "host" handlers for different channels on the same Seneca instance, and they will not interfere with each other because each of them can pick its own "version" of the message by channel. What matters is that the 'add' statements specify patterns that include the channel (like in the example above).

nsq-seneca prefers to be explicit about topics and channels instead of trying to be "network transparent" and hide the underlying NSQ. It tries to make the most out of NSQ and its strengths (particularly its broadcast semantics to different channels), but this means that the programmer must be aware of the underlying NSQ.

Request-reply pattern

NSQ does not support replies to messages, but Seneca does and they can be useful.

nsq-seneca creates a unique, ephemeral reply topic for every plugin instance, and implements all the machinery needed so that Seneca handlers can reply in a Seneca idiomatic way (passing an error or a value to the callback) and everything will just work.

However, there's a catch: NSQ supports broadcasts to channels, but in practice Seneca expects a single reply for every "published" message. By convention, we decided that only a handler that is listening on a channel that has the same name as the topic can send a reply. We call it the "main" handler, and it can be created implicitly by passing a 'null' channel (which is internally converted to having the same name as the topic anyway).

The internal implementation cleans up the timed out requests every 5 seconds for efficiency (so that we don't create a timer for every request), and the system is designed to scale to handle a lot of them. Note that "every 5 seconds" does not mean that the timeout is 5 seconds: it is just the granularity of the timeout (TODO: make this granularity configurable). In practice this builds a request-reply pattern on top of NSQ.

Sharding

All the above works perfectly for stateless services. nsq-seneca also implements a form of sharding for stateful services.

TODO: document this :-)