npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@hexly/pipeline

v0.0.0-b

Published

Event Processing via Nats Streaming + Redis Queues (Bull.js)

Downloads

3

Readme

PipelineJS

A simple project to facilitate the following workflow:

  1. Stream an event
  2. Queue job(s) related to said event
  3. Process said job(s) in an efficient, easy, distributed fashion

Rationale

Distributed systems provide an amazing value to any long running technical endeavour, as they reduce coupling and the need for large scale rewrites/refactors dramatically in the long run. The older / larger the project, the more value a well distributed system provides. But creating distributed software is difficult and time consuming, if for no other reason because the problems of a traditional system end up into multiple systems – often times without good tooling, visibility, and other factors. But also for many other reasons :)

While the steps listed in the workflow above are relatively straight foward, accomplishing them efficiently and safely is not. Careful attention must be paid to when and how things are emitted, scheduled, processed, and handled (both on the happy path and otherwise). Many tools exist to make distributing a system easier, but past experience of the authors has shown that leveraging a Stream-and-Queue model can enable a small team of varying skillsets & experience to start distributing a system with minimal overhead and a high degree of confidence in their software.

This library aims to leverage a few great open source projects (BullJS and Nats Streaming) built on top of amazing infrastructure (Nats and Redis) to faciliate simple, scalable distributed evented systems.

A Fanbase Example

Let's pretend we've got a fanclub app which needs to send out notifications everytime someone follows or unfollows a celebrity. Some app somewhere handles the pairing of "fans" to "talent", and even tracking whom is following whom. Whenever someone starts or stops following, said app emits a FanbaseEvent to a stream which we've been lucky enough to get access to.

Our job now is to notify fans & talent accordingly...

The initial notifications

We'll assume we've got a fanbase event stream, which emits events including the fan and talent ID, along with an action of followed or unfollowed.

We could imagine a protobuf payload like the following to represent the events:

# { fan, talent, action: [ 'followed', 'unfollowed'] }

and a simple config.js configuration like the following:

module.exports = {
  stan: {
    url: 'nats://localhost:4222',
    clusterId: 'test-cluster',
    clientId: `example-client-id`,
  },
  nats: {
    group: 'pipeline-example-group'
  },
  redis: {
    url: 'redis://127.0.0.1:6379'
  }
}

Business Usecases

Assuming above, we want to write a fanbase notifications pipeline that notifies fans & talent via email when someone joins or leaves a fanclub. We need to handle the following business usecases:

  1. When a fan follows a talent, notify the fan to welcome them
  2. When a fan follows a talent, notify the talent to engage them
  3. When a fan unfollows a talent, notify the fan (just in case they accidentally clicked the button)

Note that we're not notifying the talent – don't nobody need that negativity in their life.

Configuring the pipeline

In addition to the business logic, we need to configure our pipeline to run. To do so, we'll complete the following functional requirements:

  1. Define a router function, that will take a protobuf and queue jobs to "do the work"
  2. Configure our pipeline, including the Nats Stream + Redis Queue + event Protobuf definition
  3. Import some dummy services, for looking up User details and sending Emails
  4. Define workers that dequeue jobs and send the proper notification

We can define the router easily enough, assuming he'll be passed a simple JavaScript object representing the protobuf:


// Functional Requirement 1: create job(s) from `FanbaseEvent`s
module.exports = async event => {
  // we're only interested in new followers, so quit early if its a

  const jobs = []
  const { fan, talent } = event
  
  const data = { fan, talent }
  switch( event.action ){    
    case 'followed':
      // Business Logic #1 (welcome the fan)
      jobs.push({topic: 'notify-fan-following', data })
      // Business Logic #2 (notify the talent)
      jobs.push({topic: 'notify-talent-following', data })
      break;

    case 'unfollowed':
      // Business Logic #3 (confirm unfollowing with the fan)
      jobs.push({topic: 'confirm-fan-unfollowing', data })
      break;

    default: 
      // if a new action came along we didn't support, we could throw an 
      // exception here to prevent further processing; but given it's likely not
      // a huge problem, we could simple emit a notice and carry on

      // TODO: emit notice, maybe via something like `pipeline.notice(..)`?
      break;
      
  }

  return { jobs } 
}

Now we just need to configure our pipeline accordingly:


// Functional Requirement 2: configure our pipeline
const Pipeline = require('@hexly/pipeline')

const FanbaseEvent = require('./example_pb') // what type of message comes through Nats
const router = require('./router')

const pipes = await Pipeline.initialize(require('./config.js')) // give us a factory to create pipelines
const pipe = await pipes.pipe('fanbase', 'fanbase-notifications', FanbaseEvent, router)

Do the work

And just like that, our pipe is ready to attach workers to:

// Functional Requirement 3: import some dummy DAO services
const svc = {
  users: require('./example_users_service'),
  email: require('./example_email_service')
}

// Functional Requirement 4-1: Handle welcoming the fan (Business Logic 1)
pipe.dequeue('notify-fan-following', async job => {
  const fan = await svc.users.findById( job.data.fan )
  job.progress(5)
  const talent = await svc.users.findById( job.data.talent )
  job.progress(10)

  const message = {
    to: fan.email,
    subject: `You've started following ${talent.name}!`,
    message: `Hey ${fan.name}, thanks for following ${talent.name}!`
  }
  job.progress(15)

  const confirmation = await svc.email.send(message)
  job.progress(99)
  return confirmation  
})

// Functional Requirement 4-2: Handle notifying the talent (Business Logic 2)
pipe.dequeue('notify-talent-following', async job => {
  const fan = await svc.users.findById( job.data.fan )
  job.progress(5)
  const talent = await svc.users.findById( job.data.talent )
  job.progress(10)

  const message = {
    to: fan.email,
    subject: `${fan.name} started following you!`,
    message: `Congratulations ${talent.name}, ${fan.name} just joined your fanclub!`
  }
  job.progress(15)

  const confirmation = await svc.email.send(message)
  job.progress(99)
  return confirmation  
})

// Functional Requirement 4-3: Handle confirming to the unfollowing fan (Business Logic 3)
pipe.dequeue('confirm-fan-unfollowing', async job => {
  const fan = await svc.users.findById( job.data.fan )
  job.progress(5)
  const talent = await svc.users.findById( job.data.talent )
  job.progress(10)

  const message = {
    to: fan.email,
    subject: `You've stopped following ${talent.name}!`,
    message: `Hey ${fan.name}, we just wanted to confirm that you've stopped following ${talent.name}.`
  }
  job.progress(15)

  const confirmation = await svc.email.send(message)
  job.progress(99)
  return confirmation  
})

It's simple, but allows us to separate the work into the following easily testable pieces:

  1. Decode protobuf messages from a stream
  2. Queue isolated units of work (i.e. jobs) to be operated on by simple single-purpose functions
  3. Specify workers to dequeue jobs, allowing them to be processed in a (potentially parallel) distributed manner, all while updating their progress along the way (amonst other BullJS niceities)

Observability: Is it working?

TODO: Listening to events; error handling; health checks; metrics

For the short term though, take advantage of those events for errors or something:

pipe.on('error', (...args) => console.warn('pipeline error', args))

Expanding to accomodate change

TODO: Evolve & add workers to support multiple messaging formats (SMS, Social Notifications)

Workers Streaming Events

TODO: Prune the workers down and create separate notification stream(s?)

Reference

Terms:

  • Stream
  • Queue
  • Pipeline
  • Job
  • Router
  • Worker

Pipeline Events:

  • * – Any event. And I do mean any.
  • error - Generally contains at least stage, error, message
  • error:<stage> - Similar to standard error, but includes stage in event name
  • stream -
  • cron -
  • cron:<topic> -
  • job:start -
  • job:complete -
  • job:failed -

Preferred Pipeline Methods:

  • streamAndQueue(stream, queue, protobuf, router) – A convenience method for the ideal case of the following workflow
Some Service: Emit Protobuf Message 
-> Stream 
  -> <Deserialized Protobuf> 
    -> Router splits 1 message into N jobs
      -> Queue Named Job(s) 
        -> Workers dequeue job and complete work 
  • dequeue(topic, handler)
  • See also on(event, handler) and once(event, handler) from Node's EventEmitter (to be used inconjunction with the Pipeline Events listed above

Additional Pipeline Methods

  • stream(stream, converter)

  • queue(name)

  • async cron(topic, repeat, onEmit?)

  • enqueue(topic, data)

  • `` –

  • `` –

Cron: A necessary evil

TODO: Make ourselves feel less terrible about why we support cron in evented systems