npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

kafka-node-topic-consumer

v0.3.5

Published

error handling and concurrency handling for kafka-node HighLevelConsumer (0.8)

Downloads

12

Readme

kafka-node-topic-consumer

wrapper around kafka-node's HighLevelConsumer that provides error handling and message processing concurrency control via fastq (a lightweight fast queue modeled after async's queue).

Installing

npm install --save kafka-node kafka-node-topic-consumer

Purpose

There are two main motivations for this module:

  1. There are known issues with the high level consumer api in kafka 0.8. Often when starting consumers too quickly after a failure or too near in time to another member of the same group, rebalancing issues are experienced. To help alleviate these issues, the TopicConsumer will self heal when an error is encountered by the underlying HighLevelConsumer by first attempting to close the existing consumer before removing it and scheduling a rebuild at a random time in the near future (30-90 seconds). The rebuild process is infinite, in that if it fails, it will restart the healing process.
  2. Although kafka guarantees ordering within a partition, kafka-node's HighLevelConsumer' resembles a sort of firehose, emitting messages as soon as they arrive, regardless of how fast the application is able to process them. To control this issue, the TopicConsumer implements an in memory queue which processes a single batch of messages at a time. As soon as the underlying consumer emits the first message of a newly received batch, it pauses the consumer and pushes all messages into the queue. Once the last message has been processed, it resumes consuming messages.

Getting Started

import TopicConsumer from 'kafka-node-topic-consumer';

// create a new TopicConsumer
const consumer = new TopicConsumer({
  host: process.env.ZOOKEEPER_HOST,
  consumer: { groupId: 'my-consumer-group' },
  topic: 'my-topic',
});

consumer.registerWorker((msg) => {
  console.log(msg);
  return Promise.resolve();
});

consumer.on('message:error', (err, msg) => {
  console.error(err, msg);
});

consumer.connect()

API

constructor(options) => TopicConsumer

instantiate a new topic consumer

Params

| name | type | description | | --- | --- | --- | | options | Object | constructor options | | [options.concurrency] | Number | number of tasks to be processed at any given time, default is 1 | | options.consumer | Object | consumer options | | options.consumer.groupId | String | consumer group id | | options.host | String | zookeeper connection string | | [options.parse] | Function | a function (raw) => Promise for parsing raw kafka messages before they are pushed into the queue. the default parse function will attempt to parse the raw message's value attribute as utf-8 stringified json and add it as the parsedValue attribute on the message | | [options.rebuild] | Object | rebuild configuration | | [options.rebuild.closing] | Object | valid retry options for closing failed consumers | | [options.rebuild.maxDelay] | Number, String | the maximum time to wait before rebuilding, default is 2m | | [options.rebuild.minDelay] | Number, String | the minimum time to wait before rebuilding, default is 35s | | options.topic | String, Object | topic name or payload | | [options.validate] | Function | a function (parsed) => Promise for validating queue messages. Messages that fail validation will not be processed by workers |

Example
import Bluebird from 'bluebird';
import joi from 'joi';
import TopicConsumer from 'kafka-node-topic-consumer';

const consumer = new TopicConsumer({
  host: process.env.ZOOKEEPER_HOST,
  consumer: {
    groupId: 'my-group-id'
  },
  topic: 'my-topic',
  parse(raw) {
    return Bluebird.try(() => {
      return JSON.parse(raw.value.toString('utf8'));
    });
  },
  validate(parsed) {
    const schema = joi.object({
      id: joi.string().guid().required(),
      action: joi.string().valid('create', 'destroy', 'update').required(),
      data: joi.object().required(),
    });
    const result = joi.validate(parsed, schema);
    if (result.error) {
      return Promise.reject(result.error);
    }
    return Promise.resolve(result.value);
  },
});

connect([done]) => Promise

Wait for a new consumer to register

Params

| name | type | description | | --- | --- | --- | | done | Function | optional callback |

Example
consumer.connect(err => {});

consumer.connect()
.then(() => {})
.catch(err => {});

consumer

the underlying HighLevelConsumer instance


queue

the underlying queue instance


getStatus() => Object

get current status

Returns
{
  "consumer": {
    "groupId": "my-consumer-group",
    "initialized": false,
    "ready": true,
    "closing": false,
    "paused": false,
    "rebalancing": false,
    "topicPayloads": [
      {
        "topic": "my-topic",
        "partition": "6",
        "offset": 39,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "7",
        "offset": 19,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "8",
        "offset": 16,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "9",
        "offset": 28,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "10",
        "offset": 14,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "11",
        "offset": 33,
        "maxBytes": 1048576,
        "metadata": "m"
      }
    ]
  },
  "queue": {
    "idle": true,
    "length": 0
  },
  "status": "up"
}

registerWorker(worker)

register a new worker function

Params

| name | type | description | | --- | --- | --- | | worker | Function | a function worker(parsed) => Promise that is passed every (valid) message for processing |

Example
consumer.registerWorker(parsed => {
  return Promise.resolve();
});

Events

the TopicConsumer extends from the EventEmitter class and emits the following lifecycle events:

| event | description | | --- | --- | | consumer:closing-error | fired (err) when all attempts to close a failed consumer have failed | | consumer:commit-error | fired (err) when an error is encountered commiting offsets | | consumer:connecting | fired when a new consumer instance is waiting to connect/register | | consumer:error | fired (err) anytime the underlying consumer emits an error | | consumer:offset-out-of-range | fired when underlying consumer encounters an OffsetOutOfRangeError | | consumer:pausing | fired when first message is pushed into queue and underlying consumer is paused | | consumer:rebuild-initiated | fired when the rebuild process has been initiated | | consumer:rebuild-scheduled | fired (delayInSeconds) when the rebuild has been scheduled | | consumer:rebuild-started | fired when the rebuild has started | | consumer:resuming | fired when last task in queue has been processed and underlying consumer is resuming | | consumer:starting | fired after a new consumer has registered and is beginning to fetch messages | | message:processing | fired (parsed) when the queue has started processing a message | | message:skipped | fired (parsed, reason) when a message fails validation | | message:success | fired (parsed, results) when a message has been successfully processed | | message:error | fired (err, parsed) when a worker rejects |

Testing

Requires docker 1.8+ and docker-compose 1.12+

docker-compose up

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

License

Copyright (c) 2016 Gaia

Licensed under the MIT license