ngthminhdev-nestjs-kafka
v0.0.2
Published
```bash $ npm install ngthminhdev-nestjs-kafka ```
Downloads
10
Maintainers
Readme
Installation
npm
$ npm install ngthminhdev-nestjs-kafka
yarn
$ yarn add ngthminhdev-nestjs-kafka
Kafka Config
import { KafkaOptions, Transport } from '@nestjs/microservices';
import { Partitioners } from 'kafkajs';
export const kafkaConfig: KafkaOptions = {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
clientId: 'clientId',
},
consumer: {
groupId: 'groupId',
allowAutoTopicCreation: true,
},
producer: {
createPartitioner: Partitioners.LegacyPartitioner,
},
},
};
App module
@Module({
imports: [KafkaModule.register(kafkaConfig)],
controllers: [AppController],
providers: [AppService],
})
Application main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { kafkaConfig } from './config';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const PORT = process.env.PORT || 3000;
app.connectMicroservice(kafkaConfig);
await app.startAllMicroservices();
await app.listen(PORT);
}
bootstrap();
App controller
import { Controller, Inject, Post } from '@nestjs/common';
import { ClientKafka, EventPattern } from '@nestjs/microservices';
import { KAFKA_MODULE } from 'ngthminhdev-nestjs-kafka';
@Controller()
export class AppController {
constructor(@Inject(KAFKA_MODULE) private readonly client: ClientKafka) {}
async onModuleInit() {
const requestPatterns = ['request-pattern'];
requestPatterns.forEach((pattern) => {
this.client.subscribeToResponseOf(pattern);
});
}
@Post()
async sendPattern() {
this.client.emit<string>('request-pattern', 'some entity ' + new Date());
}
@EventPattern('request-pattern')
async handleEntityCreated(payload: any) {
console.log(JSON.stringify(payload) + ' created');
}
}
Author: [email protected]
License
Nest is MIT licensed.