npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

kafka-consumer-manager

v5.0.1

Published

Kafka consumer manager handles retries, offset issues and retries

Downloads

16

Readme

kafka-consumer-manager

Build Status

NPM NPM

This package is used to simplify the common use of kafka consumer:

  • Supports multiple kafka-consumer-manager by instance creation
  • Provides support for autoCommit: false and throttling by saving messages to queues and working messages by message per partition, (concurrency level equals to the partitions number)
  • Provides an api for kafka consumer offset out of sync check by checking that the offset of the partition is synced to zookeeper
  • Accepts a promise with the business logic each consumed message should go through
  • Accepts a promise function with the business logic of when to pause and when to resume the consuming
  • Provides an api for sending message back to the topic (usually for retries)
  • Prometheus integration for reporting statistics about handling message and offset diff between zookeeper and consumer group

Install

npm install --save kafka-consumer-manager

API

How to use

let KafkaConsumerManager = require('kafka-consumer-manager');
let configuration = {
        KafkaUrl: "localhost:9092",
        GroupId: "some-group-id",
        KafkaConnectionTimeout: 10000,
        KafkaRequestTimeout: 30000,
        KafkaOffsetDiffThreshold: 3,
        Topics: ["TOPIC-A", "TOPIC-B"],
        ResumePauseIntervalMs: 30000,
        ResumePauseCheckFunction: (consumer) => {
            return shouldPauseConsuming(consumer)
        },
        MessageFunction: (msg) => { return handleMessage(msg) },
        MaxMessagesInMemory: 100,
        ResumeMaxMessagesRatio: 0.25,
        CreateProducer: false,
        StartOffset: "earliest"
    };
(async () => {
let kafkaConsumerManager = new KafkaConsumerManager()
await kafkaConsumerManager.init(configuration)
    .then(() => {})
    })()
Configuration
  • KafkaUrl – URL of Kafka.

  • GroupId – Defines the Consumer Group this process is consuming on behalf of.

  • KafkaConnectionTimeout – Max wait time for kafka to connect. (default: 10000ms)

  • KafkaRequestTimeout – Max wait time for a kafka request. (default: 30000ms)

  • KafkaOffsetDiffThreshold – Tolerance for how far the partition offset of the consumer can be from the real offset, this value is used by the health check to reject in case the offset is out of sync.

  • Topics – Array of topics that should be consumed.

  • ResumePauseIntervalMs – Interval of when to run the ResumePauseCheckFunction (Optional).

  • ResumePauseCheckFunction – Promise that should always be resolve. In case of resolve with true value, the consumer will be resumed, if false it will be paused (Mandatory if ResumePauseIntervalMs provided). this function accepts one param (consumer).

  • MessageFunction – Promise that should always be resolve. this function applied to each consumed message, It accepts one param (message). Don't change the original message, it may cause it may cause unstable behaviour in getLastMessage function.

  • ErrorMessageFunction(message, err) – optional, a function being called once MessageFunction rejects.

  • FetchMaxBytes – The maximum bytes to include in the message set for this partition. This helps bound the size of the response. (Default 1024^2).

  • WriteBackDelay – Delay the produced messages by ms. (optional).

  • AutoCommit – Boolean, If AutoCommit is false, the consumer will queue messages from each partition to a specific queue and will handle messages by the order and commit the offset when it's done.

  • LoggerName – String, the value of consumer_name field of the internal logger, if empty this field will not exist.

  • CreateProducer – Boolean, If CreateProducer is true it will create Producer instance.(Default true)

  • ExposePrometheusMetrics – Boolean, If true prometheus metrics will be collected and registered in the prom-client *please note that prom-client is a peer-dependency meaning it have to exist in your application.

  • PrometheusHistogramBuckets – optional, array of doubles defining bucket size for kafka_request_duration_seconds_bucket in seconds in which the kafka message processing time of your service will be written. default values are: [0.001, 0.003, 0.005, 0.015, 0.03, 0.05, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5]

  • ConsumerGroupOffsetCheckerInterval optional, the size of the check consumer group diff offset in milliseconds. default size is 5000.

  • StartOffset optional, a string specifying the offset from which the consumer will start reading messages. Default is set to 'latest'

AutoCommit: true settings
  • MaxMessagesInMemory – If enabled, the consumer will pause after having this number of messages in memory, to lower the counter call the finishedHandlingMessage function (Optional).
  • ResumeMaxMessagesRatio – If enabled when the consumer is paused it will resume only when MaxMessagesInMemory * ResumeMaxMessagesRatio < CurrentMessagesInMemory, number should be below 1 (Optional).
AutoCommit: false settings
  • ThrottlingThreshold – If the consumer will have more messages than this value it will pause, it will resume consuming once the value is below that given threshold`.
  • ThrottlingCheckIntervalMs – The interval in ms of when to check if messages are above or below the threshold`.
  • CommitEachMessage – Boolean, If CommitEachMessage is false the commit will be each AutoCommitIntervalMs.(Default true)
  • AutoCommitIntervalMs – The interval in ms to make commit to the broker, relevant only if CommitEachMessage is false.(Default 5000)

await kafka-consumer-manager.init(configuration)

Init the consumer and the producer, make sure to pass full configuration object else you will get exceptions.

The function returns Promise.

kafka-consumer-manager.validateOffsetsAreSynced()

Runs a check the offset of the partitions are synced and moving as expected by checking progress and zookeeper offsets.

The function returns Promise.

await kafka-consumer-manager.closeConnection()

Closes the connection to kafka, return Promise.

kafka-consumer-manager.pause()

Pause the consuming of new messages.

kafka-consumer-manager.resume()

Resume the consuming of new messages.

kafka-consumer-manager.send(message, topic)

Send a message back to a topic. returns a promise.

kafka-consumer-manager.finishedHandlingMessage()

Decrease the counter of how many messages currently processed in the service, used with combine of the env params: ResumeMaxMessagesRatio and MaxMessagesInMemory Only relevant for autoCommit: true

kafka-consumer-manager.getLastMessage()

Get the last message that the consumer received. Don't change the original message, it may cause unstable behaviour in MessageFunction function.

kafka-consumer-manager.on(eventName, eventHandler)

Listens on the chosen consumer events

Running Tests

Using mocha and nyc

npm test