schema-registry-avro
v1.0.1
Published
Use a schema registry to decode avro messages
Downloads
6
Maintainers
Readme
Schema Registry
What is a Schema Registry ?
According to the confluent doc (https://docs.confluent.io/current/schema-registry/index.html), a schema registry will store a versioned history of your avro schema.
How it's work ?
You should have a least a producer and a consumer using kafka. When you produce a message, you should encode it using avro.
When you produce a message using an avro schema for the first time it should be post to the subject in the schema registry. You'll get an uniq id if this is your first schema for this subject or if the new schema is compatible with the old one (see https://docs.confluent.io/current/schema-registry/avro.html#compatibility-types)
When encoding your message especially with a schema registry, you'll need to insert before it a byte called magic byte
and the id of the schema from the schema registry as an integer (4 bytes).
You'll have :
|------------|------------|------------|------------|------------| ...................................
| magic byte | ------------------ schema id ------------------- | your encoded message
When consuming the message coming from kafka in your consumer, you'll need to read the schema id
first. For example you get the id 12
.
Then you can get the schema which encode the message by it's id from the schema registry.
All routes to the schema registry are documented in the confluent doc : https://docs.confluent.io/current/schema-registry/develop/api.html
Environment config
Access to the schema registry using Basic HTTP auth
SCHEMA_REGISTRY_URL
: Url of the schema registry. Default :http://schema-registry:8081
SCHEMA_REGISTRY_USERNAME
: Username for Basic HTTP auth. Default :none
SCHEMA_REGISTRY_PASSWORD
: Password for Basic HTTP auth. Default :none
How to use it ?
Decode your message
From your consumer :
SchemaRegistry = require('schema-registry-avro');
const onMessage = async ({ topic, partition, message }) => {
try {
const mess = await SchemaRegistry.decode(message.value)
} catch(e) {
console.log(e)
}