@kimtuan1102/nestjs-kafka
v7.2.35
Published
NestJS Kafka Client
Downloads
8
Readme
Description
Installation
$ npm i --save @kimtuan1102/nestjs-kafka @kafkajs/confluent-schema-registry @nestjs/microservices kafkajs rxjs
Synchronous Module Initialization
Register the KafkaModule
synchronous with the register()
method:
@Module({
imports: [
KafkaModule.register([
{
name: 'HERO_SERVICE',
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
},
]),
]
...
})
Asynchronous Module Initialization
Register the KafkaModule
asynchronous with the registerAsync()
method:
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
ConfigModule.forRoot(),
KafkaModule.registerAsync(['HERO_SERVICE'], {
useFactory: async (configService: ConfigService) => {
const broker = this.configService.get('broker');
return [
{
name: 'HERO_SERVICE',
options: {
clientId: 'hero',
brokers: [broker],
},
consumer: {
groupId: 'hero-consumer'
}
}
}
];
},
inject: [ConfigService]
})
]
...
})
Asynchronous Module Initialization using config service
import { KafkaModuleOption, KafkaOptionsFactory } from '../kafka';
import { KAFKA_INTEGRATION_SERVICE } from '../common/constants';
export class KafkaConfigService implements KafkaOptionsFactory {
creatKafkaModuleOptions(): KafkaModuleOption[] {
return [
{
name: 'HERO_SERVICE',
options: {
client: {
clientId: clientId,
brokers: brokers,
},
consumer: {
groupId: clientId,
},
},
},
];
}
}
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { KafkaModule } from '@kimtuan1102/nestjs-kafka';
@Module({
imports: [
KafkaModule.registerAsync(['HERO_SERVICE'], {
imports: [ConfigModule],
useClass: KafkaConfigService,
},
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
Full settings can be found:
| Config | Options | | ------ | ------- | | client | https://kafka.js.org/docs/configuration | | consumer | https://kafka.js.org/docs/consuming#options | | producer | https://kafka.js.org/docs/producing#options | | serializer | | | deserializer | | | consumeFromBeginning | true/false | | | |
Subscribing
Subscribing to a topic to accept messages.
export class Consumer {
constructor(
@Inject('HERO_SERVICE') private client: KafkaService
) {}
onModuleInit(): void {
this.client.subscribeToResponseOf('hero.kill.dragon', this)
}
@SubscribeTo('hero.kill.dragon')
async getWorld(@Payload() data: KafkaMessage): Promise<void> {
...
}
}
Producing
Send messages back to kafka.
const TOPIC_NAME = 'hero.kill.dragon';
export class Producer {
constructor(
@Inject('HERO_SERVICE') private client: KafkaService
) {}
async post(message: string = 'Hello world'): Promise<RecordMetadata[]> {
const result = await this.client.send({
topic: TOPIC_NAME,
messages: [
{
key: '1',
value: message
}
]
});
return result;
}
}
Support
Nest is an MIT-licensed open source project. It can grow thanks to the sponsors and support by the amazing backers. If you'd like to join them, please read more here.
Stay in touch
- Author - Kamil Myśliwiec
- Website - https://nestjs.com
- Twitter - @nestframework
License
Nest is MIT licensed.