kafkajs-wrapper
v1.0.7
Published
A Kafka wrapper using kafkajs for producing, consuming, and managing Kafka topics dynamically
Downloads
516
Readme
Kafka Wrapper
A TypeScript wrapper for Kafka using kafkajs
, designed to simplify interaction with Kafka by providing a reusable single connection instance across different modules (Producer, Consumer, Admin). This wrapper allows you to dynamically configure topics, produce and consume messages, and manage offsets.
Features
- Create and manage Kafka topics
- Produce messages to dedicated partitions or using round-robin distribution
- Consume messages with automatic offset management
- Single connection instance for producers, consumers, and admin tasks
- Configurable for various use cases
Installation
To install the Kafka wrapper, you can use npm:
npm install kafka-wrapper
Usage
How to use the Kafka wrapper:
// src/app.ts
import { Producer } from 'kafkajs-wrapper';
import { Consumer } from 'kafkajs-wrapper';
import { Admin } from 'kafkajs-wrapper';
import { KafkaConfig } from 'kafkajs-wrapper/types';
// Kafka configuration
const kafkaConfig: KafkaConfig = {
clientId: 'my-app',
brokers: ['localhost:9092'],
};
// Producer function
async function runProducer() {
const producer = new Producer(kafkaConfig);
await producer.connect();
const topic = 'my-topic';
const message = 'Hello Kafka!';
await producer.produceMessage({ topic, message });
console.log(`Produced message: ${message} to topic: ${topic}`);
await producer.disconnect();
}
// Consumer function
async function runConsumer() {
const consumer = new Consumer(kafkaConfig, 'my-group');
await consumer.connect();
const topic = 'my-topic';
await consumer.subscribe(topic, false); // Start consuming from the last committed offset
await consumer.consumeMessages(async (message) => {
console.log(`Consumed message: ${message.message.value} from topic: ${message.topic}`);
// Here you can process the message
},async(retrylogicfail)=>{
console.log(`Consumed message: ${message.message.value} from topic: ${message.topic}`);
});
// Uncomment the next line if you want to manually disconnect the consumer after some time
// await consumer.disconnect();
}
// Admin function to create topic and partitions
async function runAdmin() {
const admin = new Admin(kafkaConfig);
await admin.connect();
const topic = 'my-new-topic';
const numPartitions = 3;
const replicationFactor = 1;
// Create a new topic
await admin.createTopic(topic, numPartitions, replicationFactor);
console.log(`Created topic: ${topic} with ${numPartitions} partitions.`);
// Create additional partitions
await admin.createPartitions(topic, 2); // Adding 2 more partitions to the existing topic
console.log(`Added partitions to topic: ${topic}`);
await admin.disconnect();
}
// Run the producer, consumer, and admin functions
(async () => {
await runAdmin(); // Create topic and partitions first
await runProducer(); // Then produce a message
await runConsumer(); // Finally consume messages
})();