eventor-kafka-elastic-bridge
v5.3.2-beta.0
Published
The Eventor Kafka-Elastic-Bridge is an interface from the HIT Kafka infrastructure (Eventor) to DTC's ElasticSearch cluster - this allows the data that is pumped into the cluster to be searchable and certain Kafka messages can be traced / documented.
Downloads
4
Readme
eventor-kafka-elastic-bridge
The Eventor Kafka-Elastic-Bridge is an interface from the HIT Kafka infrastructure (Eventor) to DTC's ElasticSearch cluster - this allows the data that is pumped into the cluster to be searchable and certain Kafka messages can be traced / documented.
Configuration
As with all Eventor bridges, the config (specified in config.json
) has three parts:
{
"sourceConfig": "...",
"sinkConfig": "...",
"transformConfig": "..."
}
Examples of configurations can be found in the eventor-bridge-shared repo.
sourceConfig
The sourceConfig
section configures the Kafka connection string and related properties. See the
node-rdkafka page on configuration.
sinkConfig
The sinkConfig
configures the ElasticSearch connection string and its options.
The common module makes use of the ElasticSearch
Node.js client.
There is a defaultIndex
field that should be specified, and this will be the index that all messages will
be sent to, barring any additional indices specified in the CustomEsIndexer mentioned below. On startup, the
sink will detect and create any non-existent indices, with settings configured in the indexSettings
and
indexMappings
field of the sinkConfig
. Each index will have its own entry under the relevant sections,
and the body will conform to the ElasticSearch configuration standards. Additional indices can be set under
the transformConfig
.
transformConfig
The current transform class used is the KafkaToElasticTransform
class. Users can change this to
whichever transform they need. Check the documentation here.
The transformConfig
contains a map of additional index names to the settings, if desired:
{
...
"transformConfig": {
"myExtraIndex": {
"indexSettings": { ... },
"indexMappings": { ... }
},
"myOtherIndex": {
"indexSettings": { ... },
"indexMappings": { ... }
},
...
}
}
Configuring a Custom Transformation
in the KafkaToElasticTransform class, the user is provided with the ability to
inject their own, custom, transformer code. Exposed via the base class
CustomEsIndexer
, this interface has one abstract transform()
function that can
perform any other transformations the user may need between Kafka and ElasticSearch.
To use this feature, extend the base class and implement the transform()
function
as desired, and pass it an instance of it to the constructor of the
KafkaToElasticTransform
. This class also is able to manage any additional indices
that messages will be posted to in Elasticsearch.
N.B.: the KafkaToElasticTransform
transform function by default does not touch the message
coming out of Kafka, except for parsing the message body buffer into JSON. The
KafkaToElasticTransform.prepareForIndex()
function takes the resulting body and builds a proper
ES client payload.
Error Handling
The custom transformer should handle its own errors, but the calling code is nested
inside a try...catch
which will lob any error during the transform step up the
call-stack.
Exposed Methods
When constructing the KafkaToElasticTransform
, note that the constructor requires a
defaultIndexName
parameter. This is exposed on the ElasticSink
object via a getter, since that
class already contains the configuration for the default index.
The ElasticSink
class also has exposed a static function which constructs the master list of
indices which will be used. By convention, the first element of the list is the defaultIndex, and
additional indices are appended.