@microfleet/plugin-kafka
v6.0.1
Published
Apache Kafka adapter for microfleet
Downloads
122
Readme
Microfleet Kafka Plugin
Adds Kafka support to microfleet. Provides Stream like API for sending messages to Kafka broker.
For more information please read about node-rdkafka.
Install
yarn add @microfleet/plugin-kafka
Configuration
To make use of the plugin, adjust microfleet configuration in the following way:
exports.plugins = [
...,
'kafka',
...
]
exports.kafka = {
// librdkafka configuration
// https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
debug: 'consumer,cgrp,topic,fetch',
'metadata.broker.list': 'kafka:9092',
'group.id': 'test-group',
}
Interface
Microfleet Kafka Plugin extends service interface with the following methods:
async service.kafka.createReadStream({ streamOpts, conf, topic }): Readable
Initializes Kafka Consumer stream using provided params and creates a Readable stream.
This is the reimplementation of the node-rdkafka.ConsumerStream
stream with some addons.
Extra parameters:
const streamOpts = {
checkTopicExists: boolean, // Check whether consumed topics exist.
stopOnPartitionsEOF: boolean, // Stop stream when all assigned partitions read.
offsetQueryTimeout: number, // Milliseconds Timeout for Broker requests.
offsetCommitTimeout: number, // Milliseconds to wait for offset commits received on stream close.
}
async service.kafka.createWriteStream({ streamOpts, conf, topic }): Writable
Initializes Kafka producer using provided params and creates a Writable stream. Detailed docs here - https://blizzard.github.io/node-rdkafka/current/ProducerStream.html
Parameter description
For information about parameters passed to the interface methods:
streamOpts
- See this forConsumerStream
or this forProducerStream
conf
- See this pagetopic
- See this page
Example
producerStream = await service.kafka.createProducerStream({
streamOptions: { objectMode: true, pollInterval: 10 },
conf: {'group.id': 'other-group'},
})
consumerStream = await service.kafka.createConsumerStream({
streamOptions: { topics: topic, streamAsBatch: true, fetchSize: 10 },
conf: {
debug: 'consumer',
'enable.auto.commit': false,
'client.id': 'someid',
'group.id': 'other-group',
},
topic: {
'auto.offset.reset': 'earliest', // 'earliest | latest' - earliest will start from las committed offset, latest - will start from last received message.
}
)
// and then
producerStream.write({
topic,
value: Buffer.from(`message at ${Date.now()}`),
}, cb)
// or
producerStream.write(Buffer.from(`message at ${Date.now()}`), cb)
for await (const message of consumer) {
// process message
}