cloudevents-kafka
v0.2.1
Published
Kafka transport plugin for CloudEvents JS SDK
Downloads
407
Readme
CloudEvents JS SDK Kafka
Kafka transport extension for CloudEvents JS SDK
Description
Allow serialise and deserialise CloudEvents for kafka protocol.
- Based on Kafka Protocol Binding for CloudEvents - Version 1.0.1
- Support Partitioning
- Tested with KafkaJS, but probably will work with any other client.
- Currently support only structured mode
- Supports CloudEvent versions 0.3, 1.0
- Correctly works with official CloudEvents JavaScript SDK starting from v5.
- Have zero dependencies.
- Strict typescript usage
Installation
npm install cloudevents cloudevents-kafka kafkajs
# or
yarn add cloudevents cloudevents-kafka kafkajs
Note: For examples will be used kafkajs
, but you can use any other client library.
Usage
Strict CloudEvent
Default CloudEvent
constructor do not strictly check input object,
for enable strict mode this library have two classes.
import { Version, CloudEvent } from 'cloudevents'
import {CloudEventStrict, CloudEventStrictV1} from "cloudevents-kafka"
// Will throw runtime exceptions about missing `id` field
const ce = new CloudEvent({
specversion: Version.V1,
source: 'some-source',
// id: 'some-id',
type: 'message.send'
})
// Will show typescript error during compilation about missing `id` field
const ces = new CloudEventStrict({
specversion: Version.V1,
source: 'some-source',
// id: 'some-id',
type: 'message.send'
})
// Will show typescript error during compilation about missing `id` field
const cev1 = new CloudEventStrictV1({
source: 'some-source',
// id: 'some-id',
type: 'message.send'
})
Receiving Events
If received valid KafkaMessage
it will be dedeserialized as CloudEvent
kafka = new Kafka({
clientId: 'test-app',
brokers: ['kafka:9092']
})
consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({message}: EachMessagePayload) => {
const receivedEvent = CeKafka.deserialize(message)
console.log(receivedEvent); // will be valid CloudEvent
}
})
Emitting Events
CloudEvent
will be serialised as KafkaMessage
object, which will contain key
, value
, header
and timestamp
fields, which yhou can send using any client.
import { Version } from 'cloudevents'
import {Consumer, EachMessagePayload, Kafka, Producer} from 'kafkajs'
import * as CeKafka from "cloudevents-kafka"
const {CloudEventStrict} = CeKafka
kafka = new Kafka({
clientId: 'test-app',
brokers: ['kafka:9092']
})
producer = kafka.producer()
await producer.connect()
const ce = new CloudEventStrict({
specversion: Version.V1,
source: 'some-source',
id: 'some-id',
type: 'message.send'
})
const messsage = CeKafka.structured(ce)
await producer.send({
topic: 'test-topic',
messages: [
messsage,
],
})
Patitions Key
For define key
property of kafka message you can add partitionkey
field to your event
const ce = new CloudEvent({
specversion: Version.V1,
source: 'some-source',
id: 'some-id',
type: 'message.send',
partitionkey: 'some-partitionkey'
})
const messsage = CeKafka.structured(ce)
console.log(message.key) // some-partitionkey
Development
Firstly save alias in /etc/hosts
127.0.0.1 localhost kafka
Start kafka
docker-compose up kafka
Install dependencies
yarn
Run tests
yarn test