npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@binaryme/picky

v2.0.2

Published

MQTT topic observable

Downloads

2

Readme

Picky

The function topicQualifier is useful for "picking" message interestes from an MQTT message stream by using the RxJs operator interest. As the resulting observable is subscribed to (and unsubscribed from) it will in turn subscribe to (and unsubscribe from) the specified MQTT topic.

Usage

While subscribed to multiple MQTT topics, including, for example 'a/b/c', you receive an MQTT message for the topic messageTopic. topicQualifier will return true if the messageTopic qualifies as a valid subscriptionTopic.

const subscriptionTopic = 'a/b/c';
const pic:boolean = topicQualifier(messageTopic, subscriptionTopic);

The usefulness of topicQualifier is clear when wildcards are used.

topicQualifier(messageTopic, 'a/b/+');
topicQualifier(messageTopic, 'a/b/#');

topicQualifier is also able to return the wildcard matches between the subject topic and specification:

topicQualifier('tele/device0/motion/sensor2', 'tele/+/motion/#', true);
// returns ['device0','sensor2'];

topicQualifier('a/b/c/d/e/f','a/+/+/d/#',true); //-> ['b', 'c', 'e/f']
topicQualifier('a/b/c/d/e/f','a/#',true); //-> ['b/c/d/e/f']
topicQualifier('a/b/c/d/e/f','x/#',true); //-> null

The operator interest acts as an RxJs filter by making use of topicQualifier. The interest operator also manages its own MQTT topic subscription.

const sub = {
  next(topics: string | string[]) {
    // mqttClient.subscribe(topics);
  }
};
const unsub = {
  next(topics: string | string[]) {
    // mqttClient.unsubscribe(topics);
  }
};

// isolate MQTT messages we are interested in
const lights$ = mqttMessageStream$.pipe(
  interest('tele/lights/+', sub, unsub)
)

The interest operator can also act as a filter which instead removes qualified messages from the stream and passes them to a callback.

const iotDevicesExclLights$ = mqttMessageStream$.pipe(
  interest('tele/lights/+', sub, unsub, ({topic, payload}) => {
    // act on light message
  })
)

Using the mqtt client we can use the createMessageStream function to do the following:

const host = 'localhost:1883';
const sub = new Subject();
const unsub = new Subject();
const pub = new Subject();

// create an MQTT message stream
const msg$ = createMessageStream(host, sub, unsub, pub);

// isolate messages for light switches
const light$ = msg$.pipe(interest('tele/switch/light/+', sub, unsub));

// control the power of the lights
// Note that only when subscribed does the operator subscribe
// to the MQTT topic
const subscription = light$.subscribe(({topic, payload}) => {
  const LightPin = topic.split('/').pop();
  // then assuming the payload is correct...
  pub.next({topic:`cmnd/light/${lightPin}`, payload});
});

// the operator in turn unsubscribes from the MQTT topic
subscription.unsubscribe();

To have all communication printed instead of communicating with an MQTT broker use the createMessageStreamDebug function.

const msg$ = createMessageStreamDebug(host, sub, unsub, pub);

The matches operator works exactly the same as the interest operator, but in addition to filtering topic interests it also adds a match property to the value. This value contains the wildcard matches as described previously. With this we don't have to manipulate topic strings to get identifiers from the message topic. The previous code snippet can then be done like this:

const host = 'localhost:1883';
const sub = new Subject();
const unsub = new Subject();
const pub = new Subject();

const msg$ = createMessageStream(host, sub, unsub, pub);

const light$ = msg$.pipe(matches('tele/switch/light/+', sub, unsub));

const subscription = light$.subscribe(({topic, payload, match}) => {
  // one wildcard (`+`) will result in one matched wildcard value
  const [LightPin] = match;
  pub.next({topic:`cmnd/light/${LightPin}`, payload});
});

subscription.unsubscribe();

When using multiple topics with matches the resulting match object becomes a bit more detailed.

// Given the message topic
`tele/room1/device2/sensor1/12`
// `matches` will have the following results:

// single topic
matches('tele/+/+/#', sub, unsub)
// results in
['room1', 'device2', 'sensor1/12']

// multiple topics
matches(['tele/+/+/#','proxied_tele/+/+/#'], sub, unsub)
// results in an array lookup list with wildcard matches for each topic
[
  ['tele/+/+/#', ['room1', 'device2', 'sensor1/12']],
  ['proxied_tele/+/+/#', null],
]