@new-knowledge/nk-kafka-node
v1.0.3
Published
A wrapper to enforce work-arounds for working with kafka via node in a dockerized env
Downloads
11
Keywords
Readme
nk-kafka-node
Motivations
Working with Kafka in Node can be difficult. There's plenty of libraries to use that are actively maintained. However they fall short in a couple of places:
In containerized environments where containers are containers are coming up asynchronously, a few checks need to happen before consumers are connected to Kafka and before producers should start producing messages.
In instances where you have consumers subscribed to a topic and a producer publishing to the same topic, you need to be assured that the topic is created ahead of time in the case that the consumer
ready
s before the producer.kafka-node simplifies a lot of complexity, but has known issues around the zookeeper client.
Most clients leave you in callback chains as you proactively try to prevent faults.
To address these, this helper library has been created.
Usage
KafkaClient
Internally, nk-kafka-node
uses KafkaClient from kafak-node
. One client per producer/consumer is made at the time of producer/consumer creation. You can configure clients with the method configureClient(clientConfig)
and pass all the same options as you can to KafkaClient.
The default configuration is:
const clientConfig = {
kafkaHost: process.env.KAFKA_HOST || 'kafka:9092',
connectRetryOptions: {
retries: 20,
factor: 3,
minTimeout: 1000,
maxTimeout: 30000
}
};
If you only need to set the kafkaHost
you can use the environment variable KAFKA_HOST
.
Creating Consumers
After setting your client config, you can create consumers with createConsumer(topic, [groupId], [optionsOverride])
. GroupId is optional, but is recommended. Without groupId
provided, the consumer will be assigned a uuid. optionsOverride
merges with the default options set which are the following:
const consumerOptions = {
autoCommit: true,
fromOffset: false,
encoding: 'utf8'
};
createConsumer
extends kafka-node's consumer. It has the same events, methods, and options available to it. createConsumer
checks if topics that are being subscribed to exist, and tries to create them with kafka-node's createTopics.
Note: In order to take advantage of creating topics proactively, set the equivalent environment variable or setting in your kafka instance AUTO_CREATE_TOPICS_ENABLE
to true. For "ches/kafka" that is KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
Example:
const Kafka = require('nk-kafka-node');
Kafka.createConsumer([{topic: 'my-topic1'}, {topic: 'my-topic12'}], 'app-my-topics-consumer')
.then((consumer) => {
consumer.on('message', (msg) => {
// Handle message
});
consumer.on('error', (err) => {
// Handle error from connected consumer
consumer.close(); // Closing on error is a suggestion
});
})
.catch(err => {
// Handle error connecting consumer
});
Creating and Using Producers
After setting your client config, you can create producers with createProducer()
. createProducer
extends kafka-node's producer. It has the same events and methods available to it as well as an extra method.
The producer
that is resolved by createProducer
has a method called produce
on it. produce
ensures that a leader node has been elected before it attempts to publish a set of messages as well as setting some options.
Example:
const Kafka = require('nk-kafka-node');
Kafka.createProducer()
.then((producer) => {
producer.produce('my-topic1', [
JSON.stringify({
body: {
user: 'Jeff'
rug: null
}
}),
'plain string',
Buffer.from('boat string')
]);
producer.on('error', (err) => {
// Handle connected producer error
})
})
.catch((err) => {
// Handle creating/connecting producer
})