npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

gustav

v0.7.2

Published

Framework for building realtime processing flows in Node.js

Downloads

68

Readme

Gustav

Stream processing construction kit


Build Status


Status: Alpha. Most major functionality is in place but details are subject to change

Gustav makes realtime data processing simple and modular. Each individual unit of processing (let's call them nodes) can be strung together into a workflow. That probably sounds really complicated. Let's look at some code:

'use strict';
import {gustav} from 'gustav';
import {twitSearch, twitSend} from 'gustav-twitter';
import {sentiment, findAngry} from 'gustav-sentiment';

// Find angry people on Twitter and cheer them up with a walrus
let walrusTweeter = gustav.createWorkflow()
  .source(twitSearch, {query: 'bad'})
  .transf(sentiment, {prop: 'text'})
  .transf(findAngry)
  .sink(twitSend, {
    message: (tweet) =>
      `@${tweet.user.screen_name} Have a happy walrus: http://i.imgur.com/T9kVIAq.jpg`
  });

walrusTweeter.start();

Phew. 16 lines of code, not too many. Let's talk about what's happening here:

Concepts

With Gustav, individual nodes are strung together into a workflow (if you're a computer science-y person, a workflow is a Directed, Acyclic Graph).

Node

There are three types of nodes in Gustav:

Source

Source nodes create a stream from some external source to be passed on to further Gustav nodes for processing. Almost always, a source node will be calling new Observable. In the walrus example, we used twitSearch, so let's look at that:

export let twitSearch = config => {
  return new Observable(o => {
    client.stream('statuses/filter', {
      track: config.track,
      language: 'en'
    }, stream => {
      console.log('stream', stream);
      stream.on('data', function(tweet) {
        console.log(tweet.text);
        o.next(tweet);
      });

      stream.on('error', function(error) {
        o.error(error);
      });
    });
  });
};

config is passed in as the second argument to .source:

.source(twitSearch, {query: 'bad'})

You don't need to know too much about the Twitter API to recognize that this creates an Observable, gets a stream, and then emits whenever that stream has data/error events. This'll be even simpler when Observable.fromCallback gets merged.

Alright, now we've got an observable sequence of data, what shall we do with it?

Transform

Transformer nodes take a stream of data and manipulate it in some way. For example, here's the two transform nodes we used above:


export let sentiment = (config, iO) => {
  return iO.map(item => {
    let sentiValue = senti(item[config.prop]);

    item.sentiment = sentiValue;
    return item;
  });
};

export let findAngry = iO => iO.filter(item => item.sentiment.comparative < 0);

The first (sentiment) maps over all values, calculates the sentiment of the tweet adding that new data to the value before passing it on. The second (findAngry) filters the stream, only allowing tweets with a negative comparative value through.

We've got our data, we've shaped it into what we want, now what?

Sink

A sink node takes our data and outputs it somewhere. It can be as simple as the console:

export let consoleNode = iO => iO.subscribe(console.log);

or more complicated, like sending tweets:

export let twitSend = (config, iO) => {
  return iO.subscribe(item => {
    client.post('statuses/update', {status: config.message(item)}, function(error, tweet, response){
      if(error) throw error;
      console.log(tweet);  // Tweet body.
      console.log(response);  // Raw response object.
    });
  });
};

Anytime you're calling .subscribe, it should be in a sink node.

Sweet! We have the ability to grab data, change it in some way, and send that data elsewhere. How do we string everything together?

Workflow

A Gustav flow is a directed, acyclic graph composed of the node types covered above. Gustav supports multiple workflows, so your app may look something like:

// Workflow 0
s1 --- t2 --- o2

// Workflow 1
s2 -- \
       t1 --- t2 --- o1
s3 -- /

We create a workflow with the intuitive .createWorkflow() method, which can be passed an optional name:

let myWorkflow = gustav.createWorkflow('myWorkflow');

This workflow is empty and lonely - let's add some nodes to it. You always need to start with a source node:

let mySource = gustav.createWorkflow()
    .source('stocks', {symbol: 'KITTN'});

Further nodes can be chained onto it, using the following methods:

  • .transf Attach a transformer
  • .sink Attach a sink (terminal, returns the workflow itself)
  • merge(...nodes) Merge several nodes into a single stream
  • .tap Attach a sink but return the previous node for chaining

Couplers

Gustav is designed to work with an external tool that manages where all of the processed items go, while Gustav workflows manage the actual processing. Redis, Kafka and RabbitMQ are currently supported as well as an in-memory processor that provides no guarantees and is best used for testing.

These are used as such:


// Second string param names the coupler
// optional, and the coupler does come with a default
gustav.coupler(new GustavRedis(), 'myRedis');

// Since we've specified a Redis coupler, the following automagically knows to listen to the `demo-in` channel and push any events down the workflow
  .from('demo-in')
  // Vanilla transformer node
  .transf('someTransfNode')
  // Anything hitting this sink publishes a message to the `demo-out` channel
  .to('demo-out')
  .start();