avrokado
v0.5.3
Published
A Kafka client and Avro (de)serializer library based on node-rdkafka
Downloads
17
Maintainers
Readme
Avrokado
:avocado: A Kafka client and Avro (de)serializer library
Table of Contents
Installation
To install, use your favourite dependency manager.
The package name is avrokado
.
npm i avrokado --save
yarn add avrokado --exact
Usage
For examples, please refer to the examples folder.
SchemaRegistry
This will fetch the key
and value
schemas for a topicName
.
Class Signature
new SchemaRegistry (
endpoint: string,
topics: ReadonlyArray<string> | string,
version: number | 'latest' | 'all'
) => SchemaRegistry;
Where:
- endpoint: Endpoint for your Schema Registry;
- topics: Name of the topics (
Array
) or topic (String
) you want to retrieve the schemas for; - version: It can be either:
- A
number
, which will then force the function to only fetch that version; all
, which means it will fetchall
versions of the schemas;latest
, which will fetch only thelatest
schema versions.
- A
Fields
- schemas: Object containing the loaded schemas.
Methods
load
async load() => Promise<void>;
The load
method will load all the schemas selected to memory, and can be accessed through the schemas
field from the instanced class.
Best Practices
It is recommended to load the schemas BEFORE creating your Consumer or Producer.
AvroConsumer
This will create a consumer stream using node-rdkafka.
Please check their DOCUMENTATION since most of the options are from this library.
Class Signature
new AvroConsumer(
conf: Object,
topicConf: Object,
schemas: TopicsSchemas
) => AvroConsumer;
Where:
- consumerConfiguration:
librdkafka
's consumer-specific configuration; - defaultTopicConfiguration:
librdkafka
's default topic configuration; - streamOpts:
librdkafka
's read stream options; - schemas: An object with all
key
andvalue
schemas (return fromloadSchemas
).
Returns a AvroConsumer
, which extends from Readable
stream.
Fields
- stream: This is a
ConsumerStream
object fromnode-rdkafka
, which has another fieldconsumer
for theKafkaConsumer
itself (yes it's ugly).
Events Emitted
| Event name | Trigger/Description |
|---------------|-------------------------------------------------------|
| avro
| Whenever a message is parsed with Avro |
| ready
| When the Consumer Stream is created |
| event.error
| Wraps ConsumerStream.consumer
's event.error
event |
And any other event emitted by a ConsumerStream
from node-rdkafka
.
API
Specifically for avro
event emitted, it should be expected a AvroMessage
type, which contains:
| Variable | Description |
|-----------------|-----------------------------------------|
| value
| The raw value buffer |
| key
| The raw key buffer |
| size
| Size in bytes of the raw message |
| topic
| Name of the topic |
| offset
| Offset in which the message is |
| partition
| Partition from the topic |
| timestamp
| When the message was retrieved |
| valueSchemaId
| Schema ID for the value |
| keySchemaId
| Schema ID for the key |
| parsedValue
| Avro-deserialized value (from value) |
| parsedKey
| Avro-deserialized key (from key) |
Notes
- To use the
KafkaConsumer
methods, for now you will need to doAvroConsumer.stream.consumer
.
AvroProducer
This will create a producer using node-rdkafka.
Please check their DOCUMENTATION since most of the options are from this library.
Class Signature
new AvroProducer(
conf: Object,
topicConf: Object,
schemas: TopicsSchemas
) => AvroProducer;
Where:
- conf:
librdkafka
's producer-specific configuration; - topicConf?:
librdkafka
's default topic configuration; - schemas: An object with all
key
andvalue
schemas (return fromloadSchemas
).
Returns a AvroProducer
, which extends from Producer
.
Methods
connect
connect(
metadataOption: Object = {}
) => Promise<true | Error>;
The connect
method will connect to the Kafka broker and await
until a connection is successfully made or an error is thrown.
produce
produce(
topic: string,
partition?: number,
message?: unknown,
key?: unknown,
sendRaw?: boolean,
timestamp?: number,
opaque?: unknown
) => void;
The produce
method will produce a message to Kafka. If sendRaw
is set to true
, the message WILL NOT be avro encoded.
disconnect
disconnect(
timeout: number = 5000
) => Promise<true | Error>;
The disconnect
method will disconnect from the Kafka broker and await
until it is gracefully interrupted.
Tests
- Install
Docker
; - Install
docker-compose
; - Start up the images with
docker-compose up -d
and make sure zookeeper, kafka and schema-registry are all running; - Run
npm run test
oryarn test
.
TODO
- Improve in-code documentation.