@imec/digital-twin-kafka-utils
v0.5.0
Published
[![Build Status](https://dev.azure.com/apt-x/Digital%20Twin/_apis/build/status/digital-twin-kafka-utils?branchName=master)](https://dev.azure.com/apt-x/Digital%20Twin/_build/latest?definitionId=128&branchName=master)
Downloads
2
Keywords
Readme
Kafka Connection Manager
Description
Kafka manager is a wrapper around Kafkajs for the Digital Twin project. It's meant to be available publicly from NPM.
How to update this package
First of all you need to be a collaborator of the npm package. Ask [email protected] or [email protected] to get access.
- Create a branch
- Update code
- Run
yarn build
- Run
run npm version <patch,minor,major>
- Commit changes and the version update
- Open PR
- Merge PR
- Checkout to master
- Run
npm publish
on master
How to use it
Example to create and initialize an instance
import KafkaManager, { ProducerRecord, KafkaMessage } from ".";
interface RoadData{
name: string,
age: number
}
interface HobbitData{
name: string,
address: string
}
export default class Example {
kafka!: KafkaManager
async init() {
this.kafka = new KafkaManager({ brokers: ['kafka-server:9092']});
const producerConfig = {} // producer config from kafkajs, optional can be left out
const kafkaProducer = await this.kafka.createProducer(producerConfig);
const consumerConfig = {groupId: "hello"};
const kafkaConsumer = await this.kafka.createConsumer(consumerConfig);
const topic = 'traffic-loop';
const record: ProducerRecord = {
topic,
messages: [
{
headers: {'sequenceId': '2'},
key: 'remove road',
value: {name: 'my awesome name', age: 124}
},
{
headers: {'sequenceId': '2'},
key: 'add hobbit',
value: {name: 'hobbit', address: 'Esgaroth'}
}
]
}
setInterval(() => {
console.log('publishing');
this.kafka.publish(kafkaProducer, record);
}, 5000)
function eachBatchFn(messages: KafkaMessage<RoadData|HobbitData>[]) {
messages.forEach(element => {
switch (element.key) {
case "add hobbit":
console.log("Handle the hobbit message");
break;
case "remove road":
console.log("Handle the road message");
break;
default:
console.log("Handle unexpected default");
}
});
console.log("batch messages", messages)
}
function eachMessageFn(message: KafkaMessage<RoadData|HobbitData>) {
console.log("messages", message)
switch (message.key) {
case "add hobbit":
console.log("Handle the hobbit message");
break;
case "remove road":
console.log("Handle the road message");
break;
default:
console.log("Handle unexpected default");
}
}
// Subscribing to a topic can be done using the two methods below. Note that you cannot have `eachMessage` and `eachBatch` in the same subscription! This will cause kafkajs to only run one of the two.
// Use for each message subscription
this.kafka.subscribe<RoadData|HobbitData>(kafkaConsumer, {
subscribeTopic: {topic},
eachMessage: eachMessageFn
});
// Use batch subscription
this.kafka.subscribe<RoadData|HobbitData>(kafkaConsumer, {
subscribeTopic: {topic},
eachBatch: eachBatchFn,
});
}
}