@claudeseo/nest-kafka
v0.0.3
Published
simple integrate kafka.js with nest.js
Downloads
181
Readme
nest-kafka
Description
Simple Integrate Kafka.js With Nest.js
Module Initialization
Register Kafka Module with the registerAsync
method
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
}),
KafkaModule.registerAsync({
inject: [ConfigService],
useFactory: async (configService: ConfigService) => {
const brokers = configService.get<string[]>('kafka.broker', [
'localhost:9092',
]);
return {
consume_method: 'each',
options: {
client: {
brokers,
clientId: 'kafka-sample',
},
consumer: {
groupId: 'kafka-sample',
},
producer: {
allowAutoTopicCreation: false,
},
subscribe: {
fromBeginning: false,
},
run: {
autoCommit: false,
},
},
};
},
}),
],
})
export class SomeModule {}
Configure
| Config | Options | | ------------------------------- | ------------------------------------------------- | | consume_method | 'each' or 'batch' | | options | | | options.client | https://kafka.js.org/docs/configuration | | options.consumer | https://kafka.js.org/docs/consuming#options | | options.producer | https://kafka.js.org/docs/producing#options | | options.run | | | options.run.autoCommit | true or false | | options.subscribe | https://kafka.js.org/docs/consuming#frombeginning | | options.subscribe.fromBeginning | true or false | | options.connect | | | options.connect.consumer | true or false | | options.connect.producer | true or false |
Consumer
const TOPIC_NAME = 'test';
@Controller()
export class AppController implements OnModuleInit {
constructor(@InjectKafka() private readonly kafkaService: KafkaService) {}
onModuleInit() {
this.kafkaService.subscribeOf(TOPIC_NAME, this);
}
@KafkaSubscribeTo(TOPIC_NAME)
async handler(payload: KafkaEachPayload): Promise<void> {
console.log(payload);
await this.kafkaService.commitOffset(
payload.topic,
payload.partition,
payload.message.offset
);
}
}
Producer
@Injectable()
export class AppService {
constructor(@InjectKafka() private readonly kafkaService: KafkaService) {}
async sendMessage(): Promise<void> {
await this.kafkaService.sendMessage({
topic: 'test2',
messages: [
{
key: null,
value: 'Hello World',
},
],
});
}
}