@tawk.to/nestjs-batch-kafka
v0.1.1
Published
NestJS Kafka Batch Processor
Downloads
240
Maintainers
Keywords
Readme
Description
Process and Publish Kafka message by batch in NestJS. Cross compatible with ServerKafka
and ClientKafka
from @nestjs/microservices
package.
Installation
$ npm i --save @tawkto/nestjs-batch-kafka
Overview
To use the batch kafka consumer, initialize BatchKafkaServer
in your main.ts
file by connecting the microservice to your app.
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
// The config is the same as the KafkaOptions from the @nestjs/microservices package
strategy: new KafkaBatchServer({
client: {
brokers: ['localhost:52800', 'localhost:52801'],
},
consumer: {
groupId: 'test',
heartbeatInterval: 5000,
sessionTimeout: 30000,
},
run: {
autoCommitInterval: 5000,
autoCommitThreshold: 100,
partitionsConsumedConcurrently: 4,
},
})
})
Then you can start consuming the events in batches as follow
@BatchProcessor('test')
async test(
@Payload() data: any[],
@Ctx() context: KafkaBatchContext,
) {
const heartbeat = context.getHeartbeat();
const resolveOffset = context.getResolveOffset();
const commitOffsetsIfNecessary = context.getCommitOffsetsIfNecessary();
await heartbeat();
for (const message of data) {
console.log(message);
}
resolveOffset(context.getMessages().at(-1).offset);
console.log("Batch resolved");
await heartbeat();
await commitOffsetsIfNecessary();
}
Context
The KafkaBatchContext
object provides the necessary components from kafkajs
's EachBatchPayload
:
Client
The KafkaBatchClient
is exactly the same as the KafkaClient
from the @nestjs/microservices
package, except that client.send
method is removed from the client as batch messages should not be used for request-response
communication. On top of that, KafkaBatchClient
also have the capability to publish batch messages or publish to multiple topics just like in kafkajs
.
@Module({
imports: [
ClientsModule.register([{
name: 'KAFKA_BATCH_CLIENT',
customClass: KafkaBatchClient,
options: {
client: {
brokers: ['localhost:52800', 'localhost:52801'],
},
consumer: {
groupId: 'test',
heartbeatInterval: 5000,
sessionTimeout: 30000,
},
},
}]),
],
})
export class AppModule {}
Then you can inject and use the KafkaBatchClient
in your service as follow
@Injectable()
export class AppService {
constructor(
@Inject('KAFKA_BATCH_CLIENT')
private kafkaClient: KafkaBatchClient,
) {}
async eventToBatch() {
this.kafkaClient.emit('test', { example: 'data'});
}
async publishBatch() {
// equivalent to kafkajs producer.send
this.kafkaClient.emitBatch('test', [{
example: 'data1'
}, {
example: 'data2'
}])
}
async publishBatchTopics() {
// will publish to two topics, topic1 and topic2
// equivalent to kafkajs producer.publishBatch
this.kafkaClient.emitBatchTopics([{
pattern: 'topic1',
data: [{ example: 'data11' }, { example: 'data12' }]
}, {
pattern: 'topic2',
data: [{ example: 'data21' }, { example: 'data22' }]
}])
}
}
Calling send
with the KafkaBatchClient
will result in an error.
this.kafkaClient.send('send', { data: 'data'}); // Error