npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

pc-seneca-kafka-transport

v1.1.12

Published

A traqnsport plugin for seneca to send and receive messages via kafka topics

Downloads

8

Readme

seneca-kafka-transport

This plugin allows to send messages to kafka topic and listen for messages from kafka topics.

Communication is async by design so no response is ever expected.

Under the wood it uses https://github.com/oleksiyk/kafka to communicate with kafka

Sending messages

Messages will be sent as JSON string to the Kafka topic specified in the client.

Only save the actual message will be sent with no seneca metadata!

Receiving messages

We define a Kafka topic we are interested and a groupId so Kafka can keep track of our service offset for the specified topic.

We also define a pin to indicate the message pattern we are interested.

This is because we do not expect a 1:1 mapping between kafka topic and seneca patterns.

Every time a message is received we will always first (at most one pattern) acknowledge the receipt to kafka (which will increase our groupId offset) and then if the message content matches the pattern pinned the plugin will simply call the seneca.act with the messages.

If maxTime

Usage

This is the most basic usage and will expect kafka on localhost on its default port 9092

Consumer

require('seneca')()
  .use('pc-seneca-kafka-transport')
  .add('cmd:register,type:user', (msg, reply) => {
     // TRIGGER BUSINESS LOGIC FOR NEW USER REGISTRATION
     reply()
   })
  .listen({
    type: 'kafka',
    pin: 'cmd:register,type:user',
    kafkaTopic: 'user'
  });

A consumer will open a consumer connection to the topic specified and will consume any message that matches the pin

Producer

const client = require('seneca')()
  .use('pc-seneca-kafka-transport')
  .client({
    type: 'kafka',
    pin: 'cmd:register,type:user',
    kafkaTopic: 'user'
  });

A producer will send any messages that matches the pin to the configured kafka topic

Global kafka Options

By default this plugin will try to connect to a kafka broker on localhost:9092. It is using under the wood https://github.com/oleksiyk/kafka so please refer there for details

To set global kafka options you can pass an options object to the .use declaration that contains the following properties:

{
  global:{},
  producer:{},
  consumer:{}
}

global can be any property that is common to both producer and consumer!

require('seneca')()
  .use('pc-seneca-kafka-transport', {
    global:{
        connectionString: 'kafka-host1:9092,kafka-host2:9092'
    }
  });

Transport Options

Kafka producer requires the topic to connect to and the pin pattern of the message to send to this topic.

You can pass any options supported by the no-kafka library.

The producer obj will be merged to the kafka global options provided in the use. call

const client = require('seneca')()
  .use('pc-seneca-kafka-transport')
  .client({
    type: 'kafka',
    pin: 'cmd:register,type:user',
    kafkaTopic: 'user',
    producer: {

    }
  });

Kafka Consumer requires the topic to connect to and the pin pattern of the message it is listening to.

You can pass any options supported by the no-kafka library.

The consumer obj will be merged to the kafka global options provided in the use. call

if discardIfOlderThan is set, whenever the kafka listener receives a message, if the message has a timestamp property and it is older than the configured value the message will be discarded.

if discardIfOlderThan is not set or the message does not have a timestamp property all message will be processed normally.

discardIfOlderThan uses https://www.npmjs.com/package/timestring so it supports any keywords supported by timestring.

const server = require('seneca')()
  .use('pc-seneca-kafka-transport')
  .listen({
    type: 'kafka',
    pin: 'cmd:register,type:user',
    kafkaTopic: 'user',
    consumer: {
      groupId: 'userManager'
    },
    discardIfOlderThan: '5 minutes'
  });

It's important to understand that every message received from specified topic will be always acknowledge to kafka and only if the message pattern matched the specified pin will trigger a corresponding eneca act

This means that you can't have multiple listen with different pins listening on the same topic as they will interfere each other and you can lose messages.

Instead you should define a wildcard pin that will handle all the topics and call the appropriate act to process the message

If wildcard is not enough you can simply pass an array of pins. In this case use the prop pins instead of pin.

Another option, if you have multiple listen calls, is to make sure each one define its own groupId

Please note also, that if you intend to have multiple instance of the same consumer service for HA you need to make sure to set the groupId to avoid consuming the same message multiple times

const server = require('seneca')()
  .use('pc-seneca-kafka-transport')
  .listen({
    type: 'kafka',
    pin: 'cmd:register,type:*',
    kafkaTopic: 'user',
    consumer: {
      groupId: 'userManager'
    }
  });

Issues

At the moment we are using no-kafka whose producer implementation has a bug and does not set the message timestamp We are bypassing the issue by adding the timestamp to the payload instead.