npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

kafka-sse

v0.4.2

Published

KafkaSSE - Kafka Consumer to HTTP SSE/EventSource

Downloads

6

Readme

KafkaSSE

Kafka Consumer to HTTP SSE/EventSource

Uses node-rdkafka KafkaConsumer to stream JSON messages to clients over HTTP in SSE/EventSource format

The Last-Event-ID and EventSource id field will be used to handle auto-resume during client disconnects. By using EventSource to connect to a KafkaSSE endpoint, your client will automatically resume from where it left off if it gets disconnected from the server. Every message sent to the client will have an id field that will be a JSON array of objects describing each latest topic, partition and either offset or timestamp seen by this client. On a reconnect, this object will be sent back as the Last-Event-ID header, and will be used by KafkaSSE to assign a KafkaConsumer to start from those offsets.

If the underlying Kafka cluster supports timestamp based consumption, the Last-Event-ID header may initially provide assignments with timestamp entries instead of offset. If timestamps are provided, then Kafka will be quried for the offsets that most closely belong to the provided timestamps. This allows for historical consumption from Kafka without having to know the logical Kafka partition offsets for particular times in the past.

KafkaSSE can be configured to always use timestamps instead of offsets in the EventSource event id field via the useTimestampForId option. If this option is true, each EventSource event id (which automatically is used for the Last-Event-ID header) will be set with the Kafka message timestamp instead of offset. This is less precise than using offsets, but is better if you need to hide the underlying Kafka cluster's message offsets to support multi-DC. If users want to use offsets, they can manually modify the Last-Event-ID header before reconnecting with the latest message offsets instead of timestamps. offset in the Last-Event-ID will take precedence over timestamp if present.

See client.js for a couple of examples of Last-Event-ID offset and timestamp based assignment.

See also Kasocki.

Usage

HTTP server KafkaSSE set up

The kafka-sse module exports a function that wraps up handling an HTTP SSE request for Kafka topics.

'use strict';
const kafkaSse = require('kafka-sse');
const server = require('http').createServer();

const options = {
    kafkaConfig: {'metadata.broker.list': 'mybroker:9092'}
}

server.on('request', (req, res) => {
    const topics = req.url.replace('/', '').split(',');
    console.log(`Handling SSE request for topics ${topics}`);
    kafkaSse(req, res, topics, options)
    // This won't happen unless client disconnects or kafkaSse encounters an error.
    .then(() => {
        console.log('Finished handling SSE request.');
    });
});

server.listen(6917);
console.log('Listening for SSE connections at http:/localhost:6917/:topics');

Custom deserializer

The default deserializer used for messages returned from node-rdkafka assumes that kafkaMessage.value is a utf-8 byte buffer containing a JSON string. It parses kafkaMessage.value into an object, and then sets it as kafkaMessage.message. kafkaMessage.message is what will be sent to the connected SSE client as an event.

You may override this default deserializer. The deserializer is given the kafkaMessage as returned by node-rdkafka consume. You must make sure to set the message field on this object, and not modify the other top level fields such as topic, offset and partition. These are used to set the Last-Event-ID header.

function customDeserializer(kafkaMessage) {
    kafkaMessage.message = JSON.parse(kafkaMessage.value.toString());
    kafkaMessage.message.extraInfo = 'I was deserialized by a custom deserializer';
    return kafkaMesssage;
}

//...

kafkaSse(req, res, topics, {
    deserializer: customDeserializer,
});

// ...

Server Side filtering

By default, all consumed messages are sent to the client. However, you may provide a custom filter function as the filterer option. This function will be given the kafkaMessage as returned by the deserializer function. The message will be kept and sent to the client if your filter function returns true, otherwise it will be skipped.

/**
 * Only send events to SSE clients that have `price` field greater than `10.0`;
 */
function filterFunction(kafkaMessage) {
    return kafkaMessage.message.price >= 10.0;
}

//...

kafkaSse(req, res, topics, {
    filterer: filterFunction,
});

// ...

NodeJS EventSource usage

const EventSource = require('eventsource');
'use strict';
const topics = process.argv[2];
const port   = 6917

const url = `http://localhost:${port}/${topics}`;
console.log(`Connecting to Kafka SSE server at ${url}`);
let eventSource = new EventSource(url);

eventSource.onopen = function(event) {
    console.log('--- Opened SSE connection.');
};

eventSource.onerror = function(event) {
    console.log('--- Got SSE error', event);
};

eventSource.onmessage = function(event) {
    // event.data will be a JSON string containing the message event.
    console.log(JSON.parse(event.data));
};

Errors

If an error is encountered during SSE client connection, a normal HTTP error response will be returnred, along with JSON information about the error in the response body. However, once the SSE connection has started, the HTTP response header will have already been set to 200. From that point on, errors are given to the client as onerror EventSource events. You must register an onerror function for your EventSource object to receive these.

Notes on Kafka consumer state

In normal use cases, Kafka (and previously Zookeeper) handles consumer state. Kafka keeps track of multiple consumer processes in named consumer groups, and handles rebalancing of those processes as they come and go. Kafka also handles offset commits, keeping track of the high water mark each consumer has reached in each topic and partition.

KafkaSSE is intended to be exposed to the public internet by enabling web based consumers to use HTTP to consume from Kafka. Since the internet at large cannot be trusted, we would prefer to avoid allowing the internet to make any state changes to our Kafka clusters. KakfaSSE pushes as much consumer state management to the connected clients as it can.

Offset commits are not supported. Instead, latest subscription state is sent as the EventSource id field with each event. This information can be used during connection initializion in the Last-Event-ID header to specify the positions at which KafkaSSE should start consuming from Kafka. Last-Event-ID should be an assignments array, of the form:

[
    { topic: 'topicA', partition: 0, offset 12345 },
    { topic: 'topicB', partition: 0, offset 46666 },
    { topic: 'topicB', partition: 1, offset 45555 },
]

Consumer group management is also not supported. Each new SSE client corresponds to a new consumer group. There is no way to parallelize consumption from Kafka for a single connected client. Ideally, we would not register a consumer group at all with Kafka, but as of this writing librdkafka and blizzard/node-rdkafka don't support this yet. Consumer groups that are registered with Kafka are named after the x-request-id header, or a uuid if this is not set, e.g. KafkaSSE-2a360ded-1da0-4258-bad5-90ce954b7c52.

node-rdkafka consume modes

The node-rdkafka client that KafkaSSE uses has several consume APIs. KafkaSSE uses the Standard Non flowing API.

Testing

On host

Mocha tests require a running 0.11+ Kafka broker at localhost:9092 with delete.topic.enable=true. test/utils/kafka_fixture.sh will prepare topics in Kafka for tests. npm test will download, install, and run a Kafka broker. If you already have one running locally, then npm run test-local will be easier to run. You may set the KAFKA_TOPICS_CMD and the KAFKA_CONSOLE_PRODUCER_CMD environment variables if you would like to override the commands used in kafka_fixture.sh.

With Docker

Testing with docker requires docker-engine >- 1.12.4 and docker-compose >= 1.9.0. All you have to do is run ./test/docker-tests.sh. You shouldn't even need to have node installed for this to run.

To Do

  • tests for kafkaEventHandlers