npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

kafka-java-bridge

v0.2.8

Published

Kafka java client wrapper. Supports kafka version 0.8 and up

Downloads

245

Readme

kafka-java-bridge

Built with Grunt Build Status npm version Dependency Status devDependency Status npm downloads NPM license

Nodejs wrapper for the JAVA kafka 0.8 client API.

Motivation

The need to have a production quality kafka0.8 client implementation in Nodejs. Please see:

Installation

  1. Make sure you have java v7 or higher installed
  2. Run npm install kafka-java-bridge

Consumer Example


var HLConsumer = require("kafka-java-bridge").HLConsumer;

var consumerOptions = {
    zookeeperUrl: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka",
    groupId: "example-consumer-group-id",
    topics: ["example-topic1","example-topic2"],
    getMetadata: true
};

var consumer = new HLConsumer(consumerOptions);

consumer.start(function (err) {
    if (err) {
        console.log("Error occurred when starting consumer. err:", err);
    } else {
        console.log("Started consumer successfully");
    }
});

consumer.on("message", function (msg, metadata) {
    console.log("On message. message:", msg);
    console.log("On message. metadata:", JSON.stringify(metadata));
});

consumer.on("error", function (err) {
    console.log("On error. err:", err);
});

process.on('SIGINT', function() {
    consumer.stop(function(){
       console.log("consumer stopped");
        // Timeout to allow logs to print
        setTimeout(function(){
            process.exit();
        } , 300);
    });
});

Producer Example

var StringProducer = require('kafka-java-bridge').StringProducer;
var BinaryProducer = require('kafka-java-bridge').BinaryProducer;

var stringProducer = new StringProducer({bootstrapServers: "broker1:9092, broker2:9092"});
var binaryProducer = new BinaryProducer({zookeeperUrl: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka"});

const buf = new Buffer([0x0, 0x1, 0x2, 0x3, 0x4]);
binaryProducer.send("myBinaryTopic", buf, function(err, msgMetadata){
    console.log('send msg cb. err = ' + err + '. metadata = ' + JSON.stringify(msgMetadata));
});
stringProducer.send("myStringTopic", "testString", function(err, msgMetadata){
    console.log('send msg cb. err = ' + err + '. metadata = ' + JSON.stringify(msgMetadata));
});

process.on('SIGINT', function() {
    stringProducer.close(function(err){
        binaryProducer.close(function(err) {
            process.exit();
        });
    });
});

Performance and stability

Performance

Libraries compared:

  • kafka-java-bridge , this package.
  • kafka-node, available High Level Consumer for kafka0.8.
  1. We show below representative cpu consumption (lower is better) for processing same amount of messages per second(~11K).

image 1.

|Library name |CPU% average| |:----------------:|:------------:| |kafka-java-bridge |11.76 | |kafka-node |73 |

  1. Consumer comparision (number of messages). Tested with 16GB Ram, 4 core machine on Amazon AWS EC2 Instance. (Metircs measured with Newrelic)

|Library name |Rpm Avg|Network Avg|Cpu/System Avg| |:----------------:|:------------:|:------------:|:------------:| |kafka-java-bridge |947K |300 Mb/s |6.2% | |kafka-node |87.5K |75 Mb/s |11.2% |

Kakfa-Java-Bridge RPM Kafka-Node RPM

Stability

Kafka-java-bridge wraps Confluent's official Java High Level Consumer.

While testing kafka-node we encountered multiple issues such as:

Those issues along side with the inadequate performance results where the trigger for developing this library.

API

HLConsumer(options)

Consumer object allows messages fetching from kafka topic. Each consumer can consume messages from multiple topics.


var consumerOptions = {
    zookeeperUrl: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka",
    groupId: "example-consumer-group-id",
    topic: "example-topic",
    serverPort: 3042,// Optional
    threadCount: 1,// Optional
    properties: {"rebalance.max.retries": "3"}// Optional
};

| Option name |Mandatory |Type |Default value|Description| |:--------------|:-------------:|:--------|:-------------:|:------------| | zookeeperUrl | Yes |String |undefined |Zookeeper connection string.| | groupId | Yes |String |undefined |Kafka consumer groupId. From kafka documentation: groupId is a string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.| | topic | No |String |undefined |Kafka topic name.| | getMetadata | No |boolean|false |If true, message metadata(topic, partition, offset) will be provided with each message. Use false for better performance.| | topics | Yes |Array of String |undefined |Kafka topics names array.| | serverPort | No |Number |3042 |Internal server port used to transfer the messages from the java thread to the node js thread.| |threadCount | No |Number |1 |The threading model revolves around the number of partitions in your topic and there are some very specific rules. For More information: kafka consumer groups| | getMetadata | No |Boolean |false |Get message metadata (contains topic, partition and offset ).|
|properties | No |Object |undefined |Properties names can be found in the following table: high level consumer properties.|

Events emitted by the HLConsumer:

  • message: this event is emitted when a message is consumed from kafka.
  • error: this event is emitted when an error occurs while consuming messages.

hlConsumer.start(cb)

Start consumer messages from kafka topic.

cb - callback is called when the consumer is started.

If callback was called with err it means consumer failed to start.

hlConsumer.stop(cb)

Stop consuming messages.

cb - callback is called when the consumer is stopped.

message/error events can still be emitted until stop callback is called.

StringProducer(options) / BinaryProducer(options)

Producer object produces messages to kafka. With each message topic is specified so one producer can produce messages to multiple topics.

StringProducer should be used to send string messages. BinaryProducer should be used to send binary messages.


var producerOptions = {
    zookeeperUrl: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka",
    properties: {"client.id": "kafka-java-bridge"}// Optional
};
OR 
var producerOptions = {
    bootstrapServers: "kafka:2181,kafka2:2181,kafka3:2181/kafka",
    properties: {"client.id": "kafka-java-bridge"}// Optional
};

| Option name |Mandatory |Type |Default value|Description| |:--------------|:-------------:|:--------|:-------------:|:------------| | bootstrapServers| NO |String |undefined |Kafka broker connection string.| | zookeeperUrl | No |String |undefined |Zookeeper connection string. If provided, broker list will be retrieved from standard path.| | properties | No |Object |undefined |Properties names can be found in the following table: high level producer properties.|

producer.send(topic, msg, cb)

topic - target topic name String.

msg - message to be sent to kafka String or Buffer.

cb - callback is called when message is sent. with err in case of failure or msg metadata in case of success.

producer.sendWithKey(topic, msg, key, cb)

topic - target topic name String.

msg - message to be sent to kafka String or Buffer.

key - kafka message key String or Buffer.

cb - callback is called when message is sent. with err in case of failure or msg metadata in case of success.

producer.sendWithKeyAndPartition(topic, msg, key, partition, cb)

topic - target topic name String.

msg - message to be sent to kafka String or Buffer.

key - kafka message key String or Buffer.

partition - target partition Integer.

cb - callback is called when message is sent. with err in case of failure or msg metadata in case of success.

Adding Your Own Jars To Classpath

If you wish to add jars to the classpath, it can be done by placing them at:

{app root path}/kafka-java-bridge/java/lib/yourjar.jar

Java Tier Logging

By default, underlying java tier logging is disabled.

If you wish to enable java tier logging you can place your own log4j.properties file at:

{app root path}/kafka-java-bridge/log4j/log4j.properties

Troubleshooting

In case of installation failure, you may want to take a look at our dependency java npm installation and troubleshooting sections.

If you are working on a windows machine, you may want to look at windows-build-tools for native code compilation issues.

Sources

License

MIT