@rob3000/nestjs-kafka
v1.6.0
Published
KafkaJS integration with NestJS
Downloads
1,254
Readme
NestJS + KafkaJS
Integration of KafkaJS with NestJS to build event driven microservices.
Setup
Import and add the KafkaModule
to the imports array of the module for which you would like to use Kafka.
Synchronous Module Initialization
Register the KafkaModule
synchronous with the register()
method:
@Module({
imports: [
KafkaModule.register([
{
name: 'HERO_SERVICE',
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
},
]),
]
...
})
Asynchronous Module Initialization
Register the KafkaModule
asynchronous with the registerAsync()
method:
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
ConfigModule.forRoot(),
KafkaModule.registerAsync(['HERO_SERVICE'], {
useFactory: async (configService: ConfigService) => {
const broker = this.configService.get('broker');
return [
{
name: 'HERO_SERVICE',
options: {
clientId: 'hero',
brokers: [broker],
},
consumer: {
groupId: 'hero-consumer'
}
}
}
];
},
inject: [ConfigService]
})
]
...
})
Full settings can be found:
| Config | Options | | ------ | ------- | | client | https://kafka.js.org/docs/configuration | | consumer | https://kafka.js.org/docs/consuming#options | | producer | https://kafka.js.org/docs/producing#options | | serializer | | | deserializer | | | consumeFromBeginning | true/false | | | |
Subscribing
Subscribing to a topic to accept messages.
export class Consumer {
constructor(
@Inject('HERO_SERVICE') private client: KafkaService
) {}
onModuleInit(): void {
this.client.subscribeToResponseOf('hero.kill.dragon', this)
}
@SubscribeTo('hero.kill.dragon')
async getWorld(data: any, key: any, offset: number, timestamp: number, partition: number, headers: IHeaders): Promise<void> {
...
}
}
Producing
Send messages back to kafka.
const TOPIC_NAME = 'hero.kill.dragon';
export class Producer {
constructor(
@Inject('HERO_SERVICE') private client: KafkaService
) {}
async post(message: string = 'Hello world'): Promise<RecordMetadata[]> {
const result = await this.client.send({
topic: TOPIC_NAME,
messages: [
{
key: '1',
value: message
}
]
});
return result;
}
}
Schema Registry support.
By default messages are converted to JSON objects were possible. If you're using
AVRO you can add the SchemaRegistry
deserializer to convert the messages. This uses the KafkaJS Schema-registry module
In your module.ts
:
@Module({
imports: [
KafkaModule.register([
{
name: 'HERO_SERVICE',
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
},
deserializer: new KafkaAvroResponseDeserializer({
host: 'http://localhost:8081'
}),
serializer: new KafkaAvroRequestSerializer({
config: {
host: 'http://localhost:8081/'
},
schemas: [
{
topic: 'test.topic',
key: join(__dirname, 'key-schema.avsc'),
value: join(__dirname, 'value-schema.avsc')
}
],
}),
},
]),
]
...
})
See the e2e test for example.
TODO
- Tests
PRs Welcome :heart: