@akenzy/kafka
v0.0.4
Published
Kafka module for NestJS
Downloads
7
Readme
NestJS + KafkaJS
Setup KafkaJS Module
Import and add the KafkaModule to the imports array of the module for which you would like to use Kafka.
Synchronous Module Initialization
Register the KafkaModule synchronous with the forRoot() method:
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { KafkaModule } from '@kenzy/kafka';
import { ConfigModule } from '@nestjs/config';
import { TestConsumer } from './test.consumer';
@Module({
imports: [
KafkaModule.forRoot({
broker: ['localhost:9092', 'localhost:9093', 'localhost:9094'],
}),
ConfigModule.forRoot({ isGlobal: true }),
],
controllers: [AppController],
providers: [AppService, TestConsumer],
})
export class AppModule {}
Handle Produce KafkaJS
EX: using class ProducerService
import { ProducerService } from '@kenzy/kafka';
import { Injectable } from '@nestjs/common';
@Injectable()
export class AppService {
constructor(private readonly producerService: ProducerService) {}
async getHello(): Promise<string> {
await this.producerService.produce('test', {
value: 'Kenzy test message Kafka',
});
return 'Hello World!';
}
}
Handle Consume KafkaJS
EX: using class ConsumerService PS: Create file {name}.consumer.ts import init Service Module Example My App
import { ConsumerService } from '@kenzy/kafka/dist';
import { Injectable, OnModuleInit } from '@nestjs/common';
@Injectable()
export class TestConsumer implements OnModuleInit {
constructor(private readonly consumerService: ConsumerService) {}
async onModuleInit() {
await this.consumerService.consume({
topic: { topic: 'test' },
config: { groupId: 'test-consumer' },
onMessage: async (message) => {
console.log({
value: message.value.toString(),
});
// throw new Error('Test error!');
},
});
}
}
Create By AKenzy Email: [email protected] Date: 16/01/2023