npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@servicebus/kafkabus

v2.0.5

Published

Simple service bus for sending events between processes using kafka.

Downloads

7

Readme

@servicebus/kafkabus

Build Status codecov

Simple service bus for sending events between processes using kafka.

@servicebus is a collection of components made for building distributed systems, and as such, some of the tools exist to help implement patterns such as CQRS (see more below).

You can use servicebus send/listen for one to one messaging, or publish/subscribe for one to many messaging.

Send / Listen

Servicebus allows simple sending and recieving of messages in a 1:1 sender:listener configuration. The following two processes will send an event message called 'my.event' every second from process A to process B via Kafka.

Calling done will commit the message offset, marking it as processed.

Process A:

var bus = require('servicebus').bus();
bus.listen('my.event', function (event, message, done, fail) {
  console.log(event);
  done() 
});

Process B:

var bus = require('servicebus').bus();

setInterval(async function () {
  bus.send('my.event', { my: 'event' });
}, 1000);

Streaming

Kafkabus uses Transactions by default, but they can be disabled in favor of sending messages as fast as possible. There is also a produceBatch function for publishing many messages at once.

await bus.send('my.event', { my: 'event' }, { transaction: false });
await bus.listen('my.event', { transaction: false }, function (event) {
  console.log(event)
});

Authentication

TODO: Kafka auth config

Publish / Subscribe

Servicebus can also send messages from 1:N processes in a fan-out architecture. In this pattern, one sender publishes a message and any number of subscribers can receive. The pattern for usage looks very similar to send/listen, and under the hood, makes use of the same produce/consume with different options configured:

Process A (can be run any number of times, all will receive the event):

var bus = require('servicebus').bus();
bus.subscribe('my.event', function (event, message, done, fail) {
  console.log(event);
  done()
});

Process B:

var bus = require('servicebus').bus();

setInterval(async function () {
  await bus.publish('my.event', { my: 'event' });
}, 1000);

Middleware

Servicebus allows for middleware packages to enact behavior at the time a message is sent or received. They are very similar to connect middleware in their usage:

  bus.use(bus.package());
  bus.use(bus.correlate());

Middleware may define one or two functions to modify incoming or outgoing messages:

  function logIncoming (queueName, message, options, next) {
    log('received ' + util.inspect(message));
    next(null, queueName, message, options);
  }

  function logOutgoing (queueName, message, options, next) {    
    log('sending ' + util.inspect(message));
    next(null, queueName, message, options);
  }

  return {
    handleIncoming: logIncoming,
    handleOutgoing: logOutgoing
  };

handleIncoming pipelines behavior to be enacted on an incoming message. handleOutgoing pipelines behavior to be enacted on an outgoing message. To say that the behavior is pipelined is to say that each middleware is called in succession, allowing each to enact its behavior before the next. (in from protocol->servicebus->middleware 1->middleware 2->servicebus->user code)

Official Middleware

Correlate

Correlate simply adds a .cid (Correlation Identity) property to any outgoing message that doesn't already have one. This is useful for following messages in logs across services.

Correlate Middleware - View on GitHub

Package

Package repackages outgoing messages, encapsulating the original message as a .data property and adding additional properties for information like message type and datetime sent:

  // bus.publish('my:event', { my: 'event' });
  {
    my: 'event'
  };

becomes

  {
    data: {
      my: 'event'
    }
    , datetime: 'Wed, 04 Sep 2013 19:31:11 GMT'
    , type: 'my:event'
  };

Package Middleware - View on GitHub

CQRS/ES and servicebus-register-handlers

register-handlers is a package that allows you to register and entire folder of handlers at once. It also contains some syntactical sugar for use in CQRS systems, allowing you to easy create command handlers and event handlers by simply creating a folder of handlers, and providing register-handlers with that folder name.

await registerHandlers({
  bus,
  path: path.resolve(process.cwd(), 'handlers')
})

If using ES Modules, set modules: true.

await registerHandlers({
  bus,
  path: path.resolve(process.cwd(), 'handlers'),
  modules: true
})

Example Command Handler

To use a command handler you must export a command string, and a function named listen.

import log from 'llog'
import { TodoList } from '../lib/models/TodoList'
import { todoListRepository } from '../lib/repos/todoListRepository.mjs'

export const command = 'list.item.add'
export const ack = false // kafka has it's own acks - we don't need servicebus's

log.info({ msg: `registering ${command}`, command })

//
// WARNING: You can not use an () => {} function here, because the context
// that contains the bus will not be bound properly!
//
export const listen = async function ({ type, data, datetime }, done, fail) {
  try {
    const { bus } = this
    const { todoListId, item } = data
    const { todo, complete } = item

    if (!todoListId) throw new Error(`${command} - todoListId must be defined!`)

    // JSON logging
    // Great for filtering in Kibana
    log.info({ msg: `executing listen handler for ${command}`, command, todo, complete, type, datetime })

    // do something, if it's a model, probably with a repository pattern
    //  * I'm using sourced, and sourced-repo-mongo in this example

    let todoList

    try {
      todoList = await todoListRepository.getAsync(todoListId)
    } catch (err) {
      log.error('Error calling todoListRepository.getAsync')
      log.error(err)
      throw new Error({ name: 'ERROR_GET_ASYNC' })
    }

    if (!todoList) {
      todoList = new TodoList()

      todoList.initialize({
        id: todoListId
      })
    }

    todoList.addItem(item)
    todoList.on('item.added', () => {
      bus.publish('list.item.added', item)
      log.info({ msg: 'list.item.added', item })
      done()
    })

    await todoListRepository.commitAsync(todoList)

  } catch (err) {
    log.error(err)
    fail(`Command Handler Failed for ${command} - ${err}`)
  }
}

// meanwhile... in another service...
//
// bus.send('list.item.add', {
//   item: {
//     todo: "Make this",
//     complete: false
//   }
// })
//
//

Example Event Handler

To use an event handler you must export an event string, and a function named subscribe.

import log from 'llog'

export const event = 'list.item.added'
export const ack = false

log.info({ msg: `registering ${event}`, event })

export const subscribe = function ({ type, datetime, data }, done) {
  const { item } = data
  const { todo, complete } = item

  log.info({ msg: `executing listen handler for ${event}`, event, todo, complete, type, datetime })
  // do something
  done()
}

// meanwhile... in another service...
//
// bus.publish('list.item.added', item)
//