kafka-executor
v1.0.13
Published
Job executor for Kafka
Downloads
11
Readme
kafka-executor
Listens to topics and executes asynchronous functions able to process each kafka message, ensuring that any processing will succeed, before the corresponding message offset is committed.
Features
- Simple API
- Ensures that all the jobs will be executed successfully before a message will be committed
- Retry strategy for jobs that fail
- Graceful shutdown
Installation
yarn add kafka-executor
#or
npm install kafka-executor --save
Usage
Basic
import KafkaExecutor, { Job } from 'kafka-executor';
const executor = new KafkaExecutor({
brokerList:'0.0.0.0:9092,0.0.0.0:9091',
topics:['topic1','topic2'],
groupId:'groupId',
});
executor.init();
executor.addJob('myJobId',new Job((kafkaMessage)=>{
console.log(kafkaMessage);
return Promise.resolve();
}));
Documentation
Job
import { Job } from 'kafka-executor';
new Job(() => Promise.resolve(), {
maxRetries?: number,
retryDelay?: number | (retryNumber: number) => number,
shouldRetry?: boolean | (err: Error) => boolean,
})
| Name | Required | Default | Description | | ------------- | ------------- | ------------- | ------------- | | maxRetries: number | no | 3 | How many times must retry until fail | | retryDelay: number | (retryIndex)=>number | no | 60000 ms | The delay between the retries in ms | | shouldRetry: boolean | (error)=>boolean | no | true | Determines if a job have to retry in case of failure |
KafkaExecutor
Options
import KafkaExecutor from 'kafka-executor';
new KafkaExecutor({
brokerList: string;
groupId: string;
topics: string[];
connectionTimeout: string[];
checkInterval?: number;
batchSize?: number;
errorHandler?: (err: Error[], message:KafkaMessage,commit:Function) => void;
logger?: (message: string, type: LogType, code?: string) => void;
maxRetries?: number;
retryDelay?: number;
consumer?: object;
})
| Name | Required | Default | Description | | ------------- | ------------- | ------------- | ------------- | | brokerList: string | yes | - | Initial list of brokers as a CSV list of broker host or host:port | | topics: [string] | yes | - | The topics that the consumer will listen to| | groupId: string | yes | - | Client group id string. All clients sharing the same group.id belong to the same group | | checkInterval: number | no | 2000 | How match time to wait until check for new messages in case of dead period | | batchSize: number | no | 1 | How many messages to process concurrently, Change this according to your error tolerance | | errorHandler: (error,kafkaMessage,commit:Function)=>void | no | yes | A function responsible for handling job errors. By Default the process will exit with code 1 | | logger: (message:string, type:'info'|'warn'|'error', code)=>void | no | console | A function responsible for logging | | consumer: object | no | - | Options for the consumer see rdkafka configuration options | | maxRetries: number | no | 3 | Global configuration for all jobs | | retryDelay: number | no | 60000 ms | Global configuration for all jobs |
Functions
import KafkaExecutor from 'kafka-executor';
const executor = new KafkaExecutor({
brokerList: '0.0.0.0:9092';
groupId: 'group';
topics: ['topic'];
});
executor.addJob('myJobId',new Job(...))
executor.init()
executor.removeJob('myJobId')
executor.on('event',Function)
executor.shutdown()
| Name | Description | | ------------- | ------------- | | init: (jobId:string)=>Promise) | Initialize the kafka-executor and connect consumer with the kafka. | | addJob: (jobId:string, new Job(...))=>void) | Adds a job in the processing flow. | | removeJob: (jobId:string)=>void) | removes a job. | | on: (jobId:string)=>void) | Listens in a variant of events handled by kafka-executor and rdkafka | | shutdown: (jobId:string)=>Promise) | shutdown the process gracefully ensuring that the pending jobs will finish before exit |
Events
| Event | Arguments | Description | | ------------- | ------------- | ------------- | | message.received | kafkaMessage[] | Fires when the consumer gets a message | | message.committed | kafkaMessage | Fires when the consumer commits a message | | processing.error | kafkaMessage, error | Fires when one or more jobs fail | | shutdown | - | Fires when the kafka-executor shutdown |
node-rdkafka events
|Event|Description|
|-------|----------|
|data
| When using the Standard API consumed messages are emitted in this event. |
|disconnected
| The disconnected
event is emitted when the broker disconnects. This event is only emitted when .disconnect
is called. The wrapper will always try to reconnect otherwise. |
|ready
| The ready
event is emitted when the Consumer
is ready to read messages. |
|event
| The event
event is emitted when librdkafka
reports an event (if you opted in via the event_cb
option).|
|event.log
| The event.log
event is emitted when logging events occur (if you opted in for logging via the event_cb
option). You will need to set a value for debug
if you want information to send. |
|event.stats
| The event.stats
event is emitted when librdkafka
reports stats (if you opted in by setting the statistics.interval.ms
to a non-zero value). |
|event.throttle
| The event.throttle
event is emitted when librdkafka
reports throttling.|
kafkaMessage
{
value: Buffer,
size: number,
topic: string,
offset: number,
partition: number,
key: string,
timestamp: number
}
| Name | Type | Description | | ------------- | ------------- | ------------- | | value | Buffer | message contents as a Buffer | | size | number | size of the message, in bytes | | topic | string | topic the message comes from | | offset | number | offset the message was read from | | partition | string | partition the message was on | | key | number | key of the message if present | | timestamp | number | timestamp of message creation |
error
{
...Error,
jobId: string,
status?: string,
}
| Name | type | Description | | ------------- | ------------- | ------------- | | jobId | the failed job | | status | the http status if exists |
codes
| Name | Description | | ------------- | ------------- | | kafkaError | Log produced by kafka | | connectionError | Log produced when trying to connect to kafka | | jobFailed | Log produced by a job | | jobRetry | Log produced by a job when retries|