@itentialopensource/adapter-kafkav2
v0.23.2
Published
Itential adapter to connect to kafka
Downloads
388
Readme
Kafkav2 Adapter
This adapter is used to integrate the Itential Automation Platform (IAP) with the Kafka System. The documenation for kafkajs is available at [https://kafka.js.org/docs/getting-started]. The adapter utilizes the kafkjs library to provide the integrations that are deemed pertinent to IAP. This Readme file is intended to provide information on this adapter.
Note: It is possible that some integrations will be supported through the Kafka adapter while other integrations will not.
Itential provides information on all of its product adapters in the Customer Knowledge Base. Information in the Customer Knowledge Base is consistently maintained and goes through documentation reviews. As a result, it should be the first place to go for information.
For custom built adapters, it is a starting point to understand what you have built, provide the information for you to be able to update the adapter, and assist you with deploying the adapter into IAP.
Versioning
Itential Product adapters utilize SemVer for versioning. The current version of the adapter can be found in the package.json
file or viewed in the IAP GUI on the System page. For Open Source Adapters, the versions available can be found in the Itential OpenSource Repository.
Release History
Any release prior to 1.0.0 is a pre-release. Initial builds of adapters are generally set up as pre-releases as there is often work that needs to be done to configure the adapter and make sure the authentication process to Kafka works appropriately.
Release notes can be viewed in CHANGELOG.md or in the Customer Knowledge Base for Itential adapters.
Getting Started
These instructions will help you get a copy of the project on your local machine for development and testing. Reading this section is also helpful for deployments as it provides you with pertinent information on prerequisites and properties.
Environment Prerequisites
The following is a list of required packages for an adapter.
Node.js
Git
Adapter Prerequisites
The following list of packages are required for Itential product adapters or custom adapters that have been built utilizing the Itential Adapter Builder.
| Package | Description | | ------- | ------- | | @itentialopensource/adapter-utils | Runtime library classes for all adapters; includes request handling, connection, throttling, and translation. | | ajv | Required for validation of adapter properties to integrate with Kafka. | | fs-extra | Utilized by the node scripts that are included with the adapter; helps to build and extend the functionality. | | readline-sync | Utilized by the testRunner script that comes with the adapter; helps to test unit and integration functionality. |
Additional Prerequisites for Development and Testing
If you are developing and testing a custom adapter, or have testing capabilities on an Itential product adapter, you will need to install these packages as well.
chai
eslint
eslint-config-airbnb-base
eslint-plugin-import
eslint-plugin-json
mocha
nyc
testdouble
winston
Specific Prerequisites
At the current time the Kafka adapter does not utilize the adapter utilities as it makes use of the following library instead.
| Package | Description | | ------- | ------- | | kafkajs | Library that provides kafka connectivity through nodejs. |
Creating a Workspace
The following provides a local copy of the repository along with adapter dependencies.
git clone [email protected]:\@itentialopensource/adapters/adapter-Kafkav2
npm install
Installing an Itential Product Adapter
- Set up the name space location in your IAP node_modules.
cd /opt/pronghorn/current/node_modules
if the @itentialopensource directory does not exist, create it:
mkdir @itentialopensource
- Clone the adapter into your IAP environment.
cd \@itentialopensource
git clone [email protected]:\@itentialopensource/adapters/adapter-Kafka
- Install the dependencies for the adapter.
cd adapter-Kafka
npm install
Add the adapter properties for Kafka (created from Adapter Builder) to the
properties.json
file for your Itential build. You will need to change the credentials and possibly the host information below. Kafka sample propertiesRestart IAP
systemctl restart pronghorn
Adapter Properties and Descriptions
This section defines all the properties that are available for the adapter, including detailed information on what each property is for. If you are not using certain capabilities with this adapter, you do not need to define all of the properties. An example of how the properties for this adapter can be used with tests or IAP are provided in the Installation section.
{
"host": "localhost",
"port": 9092,
"interval_time": 5000,
"stub": false,
"parseMessage": true,
"wrapMessage": "myKey",
"check_iap_apps": true,
"check_wfe_status": true,
"check_ops_manager_status": false,
"fromBeginning": false,
"iap_apps_check_interval": 15000,
"client": {
"brokers": [
"broker:9093"
],
"clientId": "my-app",
"logLevel": "INFO"
},
"producer": {
"requireAcks": 1,
"ackTimeoutMs": 100,
"partitionerType": 0
},
"consumer": {
"groupId": "kafka-node-group"
}
}
Sample SSL and SASL properties that go under client props. In the below example ssl is enabled and needs a CA file, SASL is set to scram-sha-512.
"ssl": {
"enableTrace": true,
"ca": "/path/to/crt.pem",
"rejectUnauthorized": true
},
"sasl": {
"username": "my-user",
"password": "my-password",
"mechanism": "scram-sha-512"
}
Topic Properties
Operator can configure message filtering per topic by providing filters in topics[*].subscriberInfo[*].filters
property.
Regular expressions are accepted, guide on how to build regular expressions: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions
Operator shall stick to simple regular expressions as complex ones could cause catastrophic backtracking problem
https://www.regular-expressions.info/catastrophic.html leading to adapter being unresponsive.
Example of adapter's configuration with filtering applied:
"topics": [
{
"name": "t1",
"always": true,
"subscriberInfo": [
{
"subname": "default",
"filters": [
"PIXI",
"DRED",
"[abc]",
"F: (\\w+), L: (\\w+)",
"\\d{3,4}"
],
"rabbit": "kafka",
"throttle": {}
}
]
}
],
When operator skips to provide filters
property or the property contains no actual filters set, then all messages are passed.
Example of adapter's configuration with multiple paritions and subscribers per topic:
"topics": [
{
"name": "topic1",
"always": true,
"partition": 0,
"subscriberInfo": [
{
"subname": "default",
"filters": [
"PIXI",
"DRED",
"F: (\\w+), L: (\\w+)",
"\\d{3,4}"
],
"rabbit": "topic1",
"throttle": {}
},
{
"subname": "sub2",
"filters": [
"[abc]",
"F: (\\w+), L: (\\w+)",
"\\d{3,4}"
],
"rabbit": "topic1-s2",
"throttle": {}
}
]
},
{
"name": "topic1",
"always": true,
"partition": 1,
"subscriberInfo": [
{
"subname": "default",
"filters": [
"PIXI",
"DRED",
"[abc]",
"F: (\\w+), L: (\\w+)",
"\\d{3,4}"
],
"rabbit": "topic1-p1",
"throttle": {}
}
]
},
{
"name": "test-6",
"always": true
}
],
Note that if no parition is supplied only messages on partition 0 will be consumed. Additionally, if no rabbit topic is supplied, events will be published to a topic with the same name as the Kafka topic. For example, topic test-6
above will be published to the test-6
rabbit topic.
Example of adapter configuration to pass partitions array for one subscriber:
Note that you can pass multiple partitions to be consumed under the same subscriber. See example below -
"topics": [
{
"name": "test_topic",
"always": true,
"partitions": [
1,
2
],
"subscriberInfo": [
{
"subname": "default",
"filters": [],
"rabbit": "test_topic",
"throttle": {}
}
]
}
]
Connection Properties
These base properties are used to connect to Kafka upon the adapter initially coming up. It is important to set these properties appropriately.
| Property | Description |
| ------- | ------- |
| host | Optional. A fully qualified domain name or IP address. This is not needed for latest version of the adapter as the broker is set under client properties.|
| port | Required if host
set. Used to connect to the server. This is not needed for latest version of the adapter as the port is set under client properties|
| interval_time | Optional. The Kafka adapter keeps information about topics and offsets in memory in order to be more efficient. In order to work across restarts the adapter must persist the data. So the data is written into the .topics.json file. This write time defines how often to write the file.|
| stub | Optional. Slightly different meaning than normal, this is just telling the adapter to not actually send any request to kafka and mock a healthy connection. Should be set to false unless you are wanting to force the adapter to be green without an actual connection.|
Important
When adapter works in consumer configuration: autoCommit:false
fromOffset:true
current offset setting on adapter startup is loaded from .topic.json file for each (topic:partition).
Setting of property interval_time
affects adapter's behaviour after adapter restart.
If adapter is restarted before .topics.json is updated with latest offset for given (topic:partition), then after adapter goes up, consumer offsets are set based on .topic.json content or outOfRange resolution. Depending on the offset being set all messages present on kafka server with offset=(consumer offset + 1) will be read and emitted to subscriber (e.g. OperationManager) after restart. This can trigger duplicated jobs run by OperationManager. To avoid that, operator shall wait interval_time
after last message read by consumer before restarting adapter to avoid duplicated jobs.
Client Properties
The following properties are used to define the Kafka Client. These properties all have default values in the adapter and in Kafka. Definitions are taken from kafka-node page as these properties are directly passed to Kafka.
| Property | Description | | ------- | ------- | | consumersasl | Object, SASL authentication configuration for consumer. This is optional if you provide sasl. ex. { mechanism: 'plain', username: 'foo', password: 'bar' }| | producersasl | Object, SASL authentication configuration for producer. This is optional if you provide sasl. ex. { mechanism: 'plain', username: 'foo', password: 'bar' }| | brokers | kafka broker:port list | | clientId | Client-id is a logical grouping of clients with a meaningful name chosen by the client application | | logLevel | There are 5 log levels available: NOTHING, ERROR, WARN, INFO, and DEBUG. INFO is configured by default | | ssl | Object, options to be passed to the tls broker sockets, ex. { rejectUnauthorized: false }.| | sasl | Object, SASL authentication configuration (Currently, supports PLAIN, SCRAM-SHA-256, SCRAM-SHA-512), ex. { mechanism: 'plain', username: 'foo', password: 'bar' }.|
For all client config options see Client Config
Producer Properties
The following properties are used to define the Kafka Producer. These properties all have default values in Kafka. Definitions are taken from kafka-node page as these properties are directly passed to Kafka.
| Property | Description | | ------- | ------- | | requireAcks | Configuration for when to consider a message as acknowledged, default 1. | | ackTimeoutMs | The amount of time in milliseconds to wait for all acks before considered, default 100ms.| | partitionerType | Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0.|
Consumer Properties
The following properties are used to define the Kafka Consumer. These properties all have default values in Kafka. Definitions are taken from kafka-node page as these properties are directly passed to Kafka.
| Property | Description |
| ------- | ------- |
| groupId | consumer group id, default kafka-node-group
.|
Refer to kafkajs docs for all consumer options Consumer
Parsing Properties
The parseMessage
property allows the user to define how they want the Kafka message to be published to IAP's event system. If parseMessage
is set to true or omitted, the value of the Kafka message will be parsed as either an object or string and wrapped in an outer object. The wrapper object's key can be defined with the property wrapMessage
, or the default value payload
can be used if omitted. If parseMessage
is set to false, the entire kafka payload, including metadata, would be returned and the message itself would need to be transformed at a later point.
Turning off stream if WorkflowEngine or OperationsManager is down
check_iap_apps
(boolean) and iap_apps_check_interval
(integer) are used when the user wants to turn off stream if WorkflowEngine or OperationsManager is down. If check_iap_apps
is set to true, by default, the adapter will check the status of both WorkflowEngine and OperationsManager at a defined interval. If any IAP apps are down, the consumer will be paused until the apps are active again. If the user does not want to healthcheck OperationsManager, set check_ops_manager_status
to false. Similarly, if the user does not want to check WorkflowEngine status, set check_wfe_status
to false.
iap_apps_check_interval
(default 30000ms - 30s) allows the user to set the frequency in which to run IAP app healtcheck.
Send message sample payload
Note that this is an array, so it can have multiple topics.
[{
"topic": "topic-name",
"messages": [
{ "value": "hello world" },
{ "value": "hey hey!" }
]
}]
For all options of messages array please follow kafkajs documentation
Triggering jobs in IAP from a kafka message
Once you have the adapter configured and online in an IAP instance, you can create triggers in Operations Manager to kick off jobs when a kafka message is consumed. The adapter listens for messages on the subscribed topics and publishes to an event. This event is picked up by the Operations Manager. If you have created triggers for this specific topic, a job will get triggered. For a step-by-step example follow the next sub-section.
Listen to a topic called test-topic and trigger a workflow called test
- Subscribe to partition 0 of test-topic. Add this to your adapter service config. To see different options of topic properties see Topic Properties.
"topics": [
{
"name": "test-topic",
"always": true,
"partition": 0,
"subscriberInfo": [
{
"subname": "default",
"filters": [],
"rabbit": "test-topic-rabbit",
"throttle": {}
}
]
}
],
Note the rabbit key in the config is set to test-topic-rabbit. The adapter will send any message on test-topic to a Rabbit queue named test-topic-rabbit. If no rabbit topic is supplied, events will be published to a topic with the same name as the Kafka topic. Also note that the filter array can be left empty to consume all messages on partion 0 of test-topic.
- Create an automation in Operations Manager
Create a new automation, and add a trigger.
When selecting the Event for your trigger, make sure to select test-topic-rabbit from the drop down list. This is critical as the adpater will be sending the message on this queue. You can add filtering in the trigger by providing a Payload Schema Filter.
After saving the trigger, select the workflow you want to trigger. Save the Automation and view all jobs to see a job getting triggered as soon as a message is published on test-topic partition 0.
Troubleshooting the Adapter
Connectivity Issues
- Verify the adapter properties are set up correctly.
Go into the Itential Platform GUI and verify/update the properties
- Verify there is connectivity between the Itential Platform Server and Kafka Server.
ping the ip address of Kafka server
try telnet to the ip address port of Kafka
- Verify the credentials provided for Kafka.
login to Kafka using the provided credentials
- Verify the API of the call utilized for Kafka Healthcheck.
Go into the Itential Platform GUI and verify/update the properties
Functional Issues
Adapter logs are located in /var/log/pronghorn
. In older releases of the Itential Platform, there is a pronghorn.log
file which contains logs for all of the Itential Platform. In newer versions, adapters are logging into their own files.
Contributing to Kafka
Please check out the Contributing Guidelines.
License & Maintainers
Maintained By
Itential Product Adapters are maintained by the Itential Adapter Team.
Itential OpenSource Adapters are maintained by the community at large.
Custom Adapters are maintained by other sources.