kafka-node-topic-consumer
v0.3.5
Published
error handling and concurrency handling for kafka-node HighLevelConsumer (0.8)
Downloads
12
Maintainers
Readme
kafka-node-topic-consumer
wrapper around kafka-node's HighLevelConsumer
that provides error handling and message processing concurrency control via fastq (a lightweight fast queue modeled after async
's queue).
Installing
npm install --save kafka-node kafka-node-topic-consumer
Purpose
There are two main motivations for this module:
- There are known issues with the high level consumer api in kafka 0.8. Often when starting consumers too quickly after a failure or too near in time to another member of the same group, rebalancing issues are experienced. To help alleviate these issues, the
TopicConsumer
will self heal when an error is encountered by the underlying HighLevelConsumer by first attempting to close the existing consumer before removing it and scheduling a rebuild at a random time in the near future (30-90 seconds). The rebuild process is infinite, in that if it fails, it will restart the healing process. - Although kafka guarantees ordering within a partition,
kafka-node
's HighLevelConsumer' resembles a sort of firehose, emitting messages as soon as they arrive, regardless of how fast the application is able to process them. To control this issue, the TopicConsumer implements an in memory queue which processes a single batch of messages at a time. As soon as the underlying consumer emits the first message of a newly received batch, it pauses the consumer and pushes all messages into the queue. Once the last message has been processed, it resumes consuming messages.
Getting Started
import TopicConsumer from 'kafka-node-topic-consumer';
// create a new TopicConsumer
const consumer = new TopicConsumer({
host: process.env.ZOOKEEPER_HOST,
consumer: { groupId: 'my-consumer-group' },
topic: 'my-topic',
});
consumer.registerWorker((msg) => {
console.log(msg);
return Promise.resolve();
});
consumer.on('message:error', (err, msg) => {
console.error(err, msg);
});
consumer.connect()
API
constructor(options) => TopicConsumer
instantiate a new topic consumer
Params
| name | type | description |
| --- | --- | --- |
| options | Object | constructor options |
| [options.concurrency] | Number | number of tasks to be processed at any given time, default is 1
|
| options.consumer | Object | consumer options |
| options.consumer.groupId | String | consumer group id |
| options.host | String | zookeeper connection string |
| [options.parse] | Function | a function (raw) => Promise
for parsing raw kafka messages before they are pushed into the queue. the default parse
function will attempt to parse the raw message's value
attribute as utf-8
stringified json and add it as the parsedValue
attribute on the message |
| [options.rebuild] | Object | rebuild configuration |
| [options.rebuild.closing] | Object | valid retry options for closing failed consumers |
| [options.rebuild.maxDelay] | Number, String | the maximum time to wait before rebuilding, default is 2m
|
| [options.rebuild.minDelay] | Number, String | the minimum time to wait before rebuilding, default is 35s
|
| options.topic | String, Object | topic name or payload |
| [options.validate] | Function | a function (parsed) => Promise
for validating queue messages. Messages that fail validation will not be processed by workers |
Example
import Bluebird from 'bluebird';
import joi from 'joi';
import TopicConsumer from 'kafka-node-topic-consumer';
const consumer = new TopicConsumer({
host: process.env.ZOOKEEPER_HOST,
consumer: {
groupId: 'my-group-id'
},
topic: 'my-topic',
parse(raw) {
return Bluebird.try(() => {
return JSON.parse(raw.value.toString('utf8'));
});
},
validate(parsed) {
const schema = joi.object({
id: joi.string().guid().required(),
action: joi.string().valid('create', 'destroy', 'update').required(),
data: joi.object().required(),
});
const result = joi.validate(parsed, schema);
if (result.error) {
return Promise.reject(result.error);
}
return Promise.resolve(result.value);
},
});
connect([done]) => Promise
Wait for a new consumer to register
Params
| name | type | description | | --- | --- | --- | | done | Function | optional callback |
Example
consumer.connect(err => {});
consumer.connect()
.then(() => {})
.catch(err => {});
consumer
the underlying HighLevelConsumer instance
queue
the underlying queue instance
getStatus() => Object
get current status
Returns
{
"consumer": {
"groupId": "my-consumer-group",
"initialized": false,
"ready": true,
"closing": false,
"paused": false,
"rebalancing": false,
"topicPayloads": [
{
"topic": "my-topic",
"partition": "6",
"offset": 39,
"maxBytes": 1048576,
"metadata": "m"
},
{
"topic": "my-topic",
"partition": "7",
"offset": 19,
"maxBytes": 1048576,
"metadata": "m"
},
{
"topic": "my-topic",
"partition": "8",
"offset": 16,
"maxBytes": 1048576,
"metadata": "m"
},
{
"topic": "my-topic",
"partition": "9",
"offset": 28,
"maxBytes": 1048576,
"metadata": "m"
},
{
"topic": "my-topic",
"partition": "10",
"offset": 14,
"maxBytes": 1048576,
"metadata": "m"
},
{
"topic": "my-topic",
"partition": "11",
"offset": 33,
"maxBytes": 1048576,
"metadata": "m"
}
]
},
"queue": {
"idle": true,
"length": 0
},
"status": "up"
}
registerWorker(worker)
register a new worker function
Params
| name | type | description |
| --- | --- | --- |
| worker | Function | a function worker(parsed) => Promise
that is passed every (valid) message for processing |
Example
consumer.registerWorker(parsed => {
return Promise.resolve();
});
Events
the TopicConsumer extends from the EventEmitter class and emits the following lifecycle events:
| event | description |
| --- | --- |
| consumer:closing-error
| fired (err)
when all attempts to close a failed consumer have failed |
| consumer:commit-error
| fired (err)
when an error is encountered commiting offsets |
| consumer:connecting
| fired when a new consumer instance is waiting to connect/register |
| consumer:error
| fired (err)
anytime the underlying consumer emits an error |
| consumer:offset-out-of-range
| fired when underlying consumer encounters an OffsetOutOfRangeError
|
| consumer:pausing
| fired when first message is pushed into queue and underlying consumer is paused |
| consumer:rebuild-initiated
| fired when the rebuild process has been initiated |
| consumer:rebuild-scheduled
| fired (delayInSeconds)
when the rebuild has been scheduled |
| consumer:rebuild-started
| fired when the rebuild has started |
| consumer:resuming
| fired when last task in queue has been processed and underlying consumer is resuming |
| consumer:starting
| fired after a new consumer has registered and is beginning to fetch messages |
| message:processing
| fired (parsed)
when the queue has started processing a message |
| message:skipped
| fired (parsed, reason)
when a message fails validation |
| message:success
| fired (parsed, results)
when a message has been successfully processed |
| message:error
| fired (err, parsed)
when a worker rejects |
Testing
Requires docker 1.8+ and docker-compose 1.12+
docker-compose up
Contributing
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request
License
Copyright (c) 2016 Gaia
Licensed under the MIT license