@gtdudu/kafkajs-batcher
v1.0.2
Published
Control batch size for kafkajs consumers
Downloads
7
Maintainers
Readme
@gtdudu/kafkajs-batcher
Control batch size for kafkajs consumers.
Takes care of commiting offsets properly.
Written in typescript.
Table of contents
Usage
Install
npm install @gtdudu/kafkajs-batcher --save
Initialize
import { Batcher } from '@gtdudu/kafkajs-batcher'
// ...
// init everything then
const batcher = new Batcher({
// return of kafka.consumer(config)
consumer,
// function executed when a batch is flushed
handler: ({ messages }) => {
// your code here
},
// how many messages before flushing batch
batchSize: 100,
// after this, an incomplete batch will be flushed anyway
// counter is reset every time a message is pushed
maxIdleMs: 2000,
})
await consumer.run({
eachBatchAutoResolve: false, // this must be false
autoCommit: false, // this must be false
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
const { topic, partition, messages } = batch;
// most likely needed if handler takes a while
const interval = setInterval(async () => {
try {
await heartbeat();
} catch (error) {
console.log('heatbeat error', error);
}
}, 1_000);
try {
await batcher.ingestBatch({
messages,
topic,
partition,
resolveOffset,
});
} catch (error) {
// handle error
}
clearInterval(interval);
},
});
// down the road
// best to stop feeding new messages to batcher if we're stopping
consumer.pause([{ topic: TOPIC }]);
// any handler already in progress will be awaited
// other messages will be discarded.. until module is restarted
await batcher.stop();
// more clean up here
Default behavior
Messages sharing the same topic/partition will be batched together.
ThestoreKey
used is${topic}-${partition}
.One message will count as one element in the batch.
Both those behaviours can be changed by providing your own getMessageInfo
function to Batcher constructor
getMessageInfo
Receives message, topic and partition and must return an object with the following properties
storeKey: string
: grouping is based on this.count: number
: any integer > 0
When the sum of count
for a given storeKey
reaches batchSize
or maxIdleMs
has elapsed, batch is flushed and handler
is called
WARNING
Providing your own getMessageInfo
is an adavanced use case.
Depending on your storeKey
for a given partition, offsets may not always be committed after batch is flushed if there are still lower offsets pending. To prevent re consumption in case of module restart, consumed offsets must be tracked somehow.
In order to do this, you can pass an offsetDeduper
to Batcher
constructor.
Two kind of dedupers are exported by this module:
const { FsOffsetDeduper } = require('@gtdudu/kafkajs-batcher')
Saves consumed offsets to a file in tmp os directory.const { RedisOffsetDeduper } = require('@gtdudu/kafkajs-batcher')
Stores consumed offsets in redis sorted sets.RedisOffsetDeduper
constructor expects aredisClient
which can come from eitherioredis
or noderedis
Use this when deploying on kubernetes cluster with multiple replicas.
Both dedupers store as little as possible.
Logs
To get extensive logs run your project with: DEBUG=gtdudu:*
Examples
- consumer:
node examples/consumer.mjs
- producer
node examples/producer.mjs
Develop
Prerequisites
Install
nvm use
npm install
Tests
Integration tests
docker-compose up
npm test