@hexly/pipeline
v0.0.0-b
Published
Event Processing via Nats Streaming + Redis Queues (Bull.js)
Downloads
3
Keywords
Readme
PipelineJS
A simple project to facilitate the following workflow:
- Stream an event
- Queue job(s) related to said event
- Process said job(s) in an efficient, easy, distributed fashion
Rationale
Distributed systems provide an amazing value to any long running technical endeavour, as they reduce coupling and the need for large scale rewrites/refactors dramatically in the long run. The older / larger the project, the more value a well distributed system provides. But creating distributed software is difficult and time consuming, if for no other reason because the problems of a traditional system end up into multiple systems – often times without good tooling, visibility, and other factors. But also for many other reasons :)
While the steps listed in the workflow above are relatively straight foward, accomplishing them efficiently and safely is not. Careful attention must be paid to when and how things are emitted, scheduled, processed, and handled (both on the happy path and otherwise). Many tools exist to make distributing a system easier, but past experience of the authors has shown that leveraging a Stream-and-Queue model can enable a small team of varying skillsets & experience to start distributing a system with minimal overhead and a high degree of confidence in their software.
This library aims to leverage a few great open source projects (BullJS and Nats Streaming) built on top of amazing infrastructure (Nats and Redis) to faciliate simple, scalable distributed evented systems.
A Fanbase Example
Let's pretend we've got a fanclub app which needs to send out notifications everytime someone
follows or unfollows a celebrity. Some app somewhere handles the pairing of "fans" to "talent", and even tracking whom is following whom. Whenever someone starts or stops following, said app emits a FanbaseEvent
to a stream which we've been lucky enough to get access to.
Our job now is to notify fans & talent accordingly...
The initial notifications
We'll assume we've got a fanbase
event stream, which emits events including the fan
and talent
ID, along with an action
of followed
or unfollowed
.
We could imagine a protobuf payload like the following to represent the events:
# { fan, talent, action: [ 'followed', 'unfollowed'] }
and a simple config.js
configuration like the following:
module.exports = {
stan: {
url: 'nats://localhost:4222',
clusterId: 'test-cluster',
clientId: `example-client-id`,
},
nats: {
group: 'pipeline-example-group'
},
redis: {
url: 'redis://127.0.0.1:6379'
}
}
Business Usecases
Assuming above, we want to write a fanbase notifications
pipeline that notifies fans & talent via email
when someone joins or leaves a fanclub. We need to handle the following business usecases:
- When a fan follows a talent, notify the fan to welcome them
- When a fan follows a talent, notify the talent to engage them
- When a fan unfollows a talent, notify the fan (just in case they accidentally clicked the button)
Note that we're not notifying the talent – don't nobody need that negativity in their life.
Configuring the pipeline
In addition to the business logic, we need to configure our pipeline to run. To do so, we'll complete the following functional requirements:
- Define a
router
function, that will take a protobuf and queuejobs
to "do the work" - Configure our pipeline, including the Nats Stream + Redis Queue + event Protobuf definition
- Import some dummy services, for looking up User details and sending Emails
- Define
workers
that dequeue jobs and send the proper notification
We can define the router easily enough, assuming he'll be passed a simple JavaScript object representing the protobuf:
// Functional Requirement 1: create job(s) from `FanbaseEvent`s
module.exports = async event => {
// we're only interested in new followers, so quit early if its a
const jobs = []
const { fan, talent } = event
const data = { fan, talent }
switch( event.action ){
case 'followed':
// Business Logic #1 (welcome the fan)
jobs.push({topic: 'notify-fan-following', data })
// Business Logic #2 (notify the talent)
jobs.push({topic: 'notify-talent-following', data })
break;
case 'unfollowed':
// Business Logic #3 (confirm unfollowing with the fan)
jobs.push({topic: 'confirm-fan-unfollowing', data })
break;
default:
// if a new action came along we didn't support, we could throw an
// exception here to prevent further processing; but given it's likely not
// a huge problem, we could simple emit a notice and carry on
// TODO: emit notice, maybe via something like `pipeline.notice(..)`?
break;
}
return { jobs }
}
Now we just need to configure our pipeline accordingly:
// Functional Requirement 2: configure our pipeline
const Pipeline = require('@hexly/pipeline')
const FanbaseEvent = require('./example_pb') // what type of message comes through Nats
const router = require('./router')
const pipes = await Pipeline.initialize(require('./config.js')) // give us a factory to create pipelines
const pipe = await pipes.pipe('fanbase', 'fanbase-notifications', FanbaseEvent, router)
Do the work
And just like that, our pipe
is ready to attach workers
to:
// Functional Requirement 3: import some dummy DAO services
const svc = {
users: require('./example_users_service'),
email: require('./example_email_service')
}
// Functional Requirement 4-1: Handle welcoming the fan (Business Logic 1)
pipe.dequeue('notify-fan-following', async job => {
const fan = await svc.users.findById( job.data.fan )
job.progress(5)
const talent = await svc.users.findById( job.data.talent )
job.progress(10)
const message = {
to: fan.email,
subject: `You've started following ${talent.name}!`,
message: `Hey ${fan.name}, thanks for following ${talent.name}!`
}
job.progress(15)
const confirmation = await svc.email.send(message)
job.progress(99)
return confirmation
})
// Functional Requirement 4-2: Handle notifying the talent (Business Logic 2)
pipe.dequeue('notify-talent-following', async job => {
const fan = await svc.users.findById( job.data.fan )
job.progress(5)
const talent = await svc.users.findById( job.data.talent )
job.progress(10)
const message = {
to: fan.email,
subject: `${fan.name} started following you!`,
message: `Congratulations ${talent.name}, ${fan.name} just joined your fanclub!`
}
job.progress(15)
const confirmation = await svc.email.send(message)
job.progress(99)
return confirmation
})
// Functional Requirement 4-3: Handle confirming to the unfollowing fan (Business Logic 3)
pipe.dequeue('confirm-fan-unfollowing', async job => {
const fan = await svc.users.findById( job.data.fan )
job.progress(5)
const talent = await svc.users.findById( job.data.talent )
job.progress(10)
const message = {
to: fan.email,
subject: `You've stopped following ${talent.name}!`,
message: `Hey ${fan.name}, we just wanted to confirm that you've stopped following ${talent.name}.`
}
job.progress(15)
const confirmation = await svc.email.send(message)
job.progress(99)
return confirmation
})
It's simple, but allows us to separate the work into the following easily testable pieces:
- Decode protobuf messages from a stream
- Queue isolated units of work (i.e.
jobs
) to be operated on by simple single-purpose functions - Specify
workers
to dequeuejobs
, allowing them to be processed in a (potentially parallel) distributed manner, all while updating their progress along the way (amonst other BullJS niceities)
Observability: Is it working?
TODO: Listening to events; error handling; health checks; metrics
For the short term though, take advantage of those events for errors or something:
pipe.on('error', (...args) => console.warn('pipeline error', args))
Expanding to accomodate change
TODO: Evolve & add workers to support multiple messaging formats (SMS, Social Notifications)
Workers Streaming Events
TODO: Prune the workers down and create separate notification stream(s?)
Reference
Terms:
- Stream
- Queue
- Pipeline
- Job
- Router
- Worker
Pipeline Events:
*
– Any event. And I do mean any.error
- Generally contains at leaststage
,error
,message
error:<stage>
- Similar to standard error, but includesstage
in event namestream
-cron
-cron:<topic>
-job:start
-job:complete
-job:failed
-
Preferred Pipeline Methods:
streamAndQueue(stream, queue, protobuf, router)
– A convenience method for the ideal case of the following workflow
Some Service: Emit Protobuf Message
-> Stream
-> <Deserialized Protobuf>
-> Router splits 1 message into N jobs
-> Queue Named Job(s)
-> Workers dequeue job and complete work
dequeue(topic, handler)
–- See also
on(event, handler)
andonce(event, handler)
from Node's EventEmitter (to be used inconjunction with thePipeline Events
listed above
Additional Pipeline Methods
stream(stream, converter)
–queue(name)
–async cron(topic, repeat, onEmit?)
–enqueue(topic, data)
–`` –
`` –
Cron: A necessary evil
TODO: Make ourselves feel less terrible about why we support cron in evented systems