npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@gtdudu/kafkajs-batcher

v1.0.2

Published

Control batch size for kafkajs consumers

Downloads

7

Readme

@gtdudu/kafkajs-batcher

Control batch size for kafkajs consumers.
Takes care of commiting offsets properly.
Written in typescript.

Table of contents

Usage

Install

npm install @gtdudu/kafkajs-batcher --save

Initialize

import { Batcher } from '@gtdudu/kafkajs-batcher'

// ...
// init everything then

const batcher = new Batcher({
  // return of kafka.consumer(config)
  consumer,
  // function executed when a batch is flushed
  handler: ({ messages }) => {
    // your code here
  }, 
  // how many messages before flushing batch
  batchSize: 100,
  // after this, an incomplete batch will be flushed anyway
  // counter is reset every time a message is pushed
  maxIdleMs: 2000,  
})

await consumer.run({
  eachBatchAutoResolve: false, // this must be false
  autoCommit: false, // this must be false
  eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
    const { topic, partition, messages } = batch;

    // most likely needed if handler takes a while
    const interval = setInterval(async () => {
      try {
        await heartbeat();
      } catch (error) {
        console.log('heatbeat error', error);
      }
    }, 1_000); 

    try {
      await batcher.ingestBatch({
        messages,
        topic,
        partition,
        resolveOffset,
      });
    } catch (error) {
      // handle error
    }

    clearInterval(interval);
  },
});


// down the road
// best to stop feeding new messages to batcher if we're stopping
  consumer.pause([{ topic: TOPIC }]); 
  // any handler already in progress will be awaited
  // other messages will be discarded.. until module is restarted
  await batcher.stop();
  // more clean up here 

Default behavior

  1. Messages sharing the same topic/partition will be batched together.
    The storeKey used is ${topic}-${partition}.

  2. One message will count as one element in the batch.

Both those behaviours can be changed by providing your own getMessageInfo function to Batcher constructor

getMessageInfo

Receives message, topic and partition and must return an object with the following properties

  • storeKey: string: grouping is based on this.
  • count: number: any integer > 0

When the sum of count for a given storeKey reaches batchSize or maxIdleMs has elapsed, batch is flushed and handler is called

WARNING

Providing your own getMessageInfo is an adavanced use case.

Depending on your storeKey for a given partition, offsets may not always be committed after batch is flushed if there are still lower offsets pending. To prevent re consumption in case of module restart, consumed offsets must be tracked somehow.

In order to do this, you can pass an offsetDeduper to Batcher constructor.

Two kind of dedupers are exported by this module:

  • const { FsOffsetDeduper } = require('@gtdudu/kafkajs-batcher')
    Saves consumed offsets to a file in tmp os directory.
  • const { RedisOffsetDeduper } = require('@gtdudu/kafkajs-batcher')
    Stores consumed offsets in redis sorted sets.
    RedisOffsetDeduper constructor expects a redisClient which can come from either ioredis or node redis
    Use this when deploying on kubernetes cluster with multiple replicas.

Both dedupers store as little as possible.

Logs

To get extensive logs run your project with: DEBUG=gtdudu:*

Examples

  • consumer:
node examples/consumer.mjs
  • producer
node examples/producer.mjs

Develop

Prerequisites

Install

nvm use
npm install

Tests

Integration tests

docker-compose up
npm test