@confluentinc/schemaregistry
v0.4.0
Published
Node.js client for Confluent Schema Registry
Downloads
923
Readme
Confluent's JavaScript Client for Schema RegistryTM
Confluent's JavaScript client for Schema Registry supports Avro, Protobuf and JSON Schema, and is designed to work with Confluent's JavaScript Client for Apache Kafka. This is an Early Availability library. The goal is to provide a highly performant, reliable and easy to use JavaScript client in line with other Schema Registry clients such as our Go, .NET, and Java clients.
Installation
npm install @confluentinc/schemaregistry
Getting Started
Below is a simple example of using Avro serialization with the Schema Registry client and the KafkaJS client.
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
const { SchemaRegistryClient, SerdeType, AvroSerializer, AvroDeserializer} = require('@confluentinc/schemaregistry');
const registry = new SchemaRegistryClient({ baseURLs: ['http://localhost:8081'] })
const kafka = new Kafka({
kafkaJS: {
brokers: ['localhost:9092']
}
});
let consumer = kafka.consumer({
kafkaJS: {
groupId: "test-group",
fromBeginning: true,
},
});
let producer = kafka.producer();
const schema = {
type: 'record',
namespace: 'examples',
name: 'RandomTest',
fields: [
{ name: 'fullName', type: 'string' }
],
};
const topicName = 'test-topic';
const subjectName = topicName + '-value';
const run = async () => {
// Register schema
const id = await registry.register(
subjectName,
{
schemaType: 'AVRO',
schema: JSON.stringify(schema)
}
)
// Create an Avro serializer
const ser = new AvroSerializer(registry, SerdeType.VALUE, { useLatestVersion: true });
// Produce a message with the schema
await producer.connect()
const outgoingMessage = {
key: 'key',
value: await ser.serialize(topicName, { fullName: 'John Doe' }),
}
await producer.send({
topic: topicName,
messages: [outgoingMessage]
});
console.log("Producer sent its message.")
await producer.disconnect();
producer = null;
// Create an Avro deserializer
const deser = new AvroDeserializer(registry, SerdeType.VALUE, {});
await consumer.connect()
await consumer.subscribe({ topic: topicName })
let messageRcvd = false;
await consumer.run({
eachMessage: async ({ message }) => {
const decodedMessage = {
...message,
value: await deser.deserialize(topicName, message.value)
};
console.log("Consumer received message.\nBefore decoding: " + JSON.stringify(message) + "\nAfter decoding: " + JSON.stringify(decodedMessage));
messageRcvd = true;
},
});
// Wait around until we get a message, and then disconnect.
while (!messageRcvd) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
await consumer.disconnect();
consumer = null;
}
run().catch (async e => {
console.error(e);
consumer && await consumer.disconnect();
producer && await producer.disconnect();
process.exit(1);
})
Features and Limitations
- Full Avro and JSON Schema support
- Protobuf support requires Schema Registry in (upcoming) release: CP 7.4.8, 7.5.7, 7.6.4, 7.7.2, 7.8.0
- Support for CSFLE (Client-Side Field Level Encryption)
- Support for schema migration rules for Avro and JSON Schema
- Data quality rules are not yet supported
- Support for OAuth
Contributing
Bug reports and feedback is appreciated in the form of Github Issues. For guidelines on contributing please see CONTRIBUTING.md