npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@meinestadt.de/glue-schema-registry

v2.0.0

Published

This is a SerDe library to interact with the AWS Glue Schema Registry. It makes it easy to encode and decode messages with Avro schemas and the AWS' wire format.

Downloads

15,936

Readme

glue-schema-registry

npm Build status

This is a SerDe library to interact with the AWS Glue Schema Registry. It makes it easy to encode and decode messages with Avro schemas and the AWS' wire format. It's primary built to produce and consume messages with MSK, Kafka, Kinesis, SNS, and so on. This enables the creation of Javascript/Typescript applications that are compatible with messages created with the AWS Glue Java SerDe libraries.

Apache Avro-encoded messages can be created and consumed using this library. Protobuf and JSON Schema are not currently supported. The library supports gzip compression, schema registration, and schema evolution. Avsc (https://github.com/mtth/avsc) is used for avro encoding/decoding.

This library works well with kafkajs (https://kafka.js.org).

2023-03-24: Breaking changes

The library has been rewritten to use the AWS SDK v3. This is a breaking chance. The migration should be relatively easy, but there are some API changes that you need to be aware of.

  1. the props parameter of the constructor is now mandatory. The type changed from Glue.ClientConfiguration to GlueClientConfig.
  2. the props parameter of updateGlueClient is now mandatory. The type changed from Glue.ClientConfiguration to GlueClientConfig.
  3. the return type of analyzeMessage has changed. The type of the schema field is now GetSchemaVersionResponse from the SDK v3.

Getting started

Install with npm

npm i @meinestadt.de/glue-schema-registry

Usage

First, create an avro schema and register it.

import * as msglue from "@meinestadt.de/glue-schema-registry";
import * as avro from "avsc";


// avro schema
const schema = avro.Type.forSchema({
    type: "record",
    name: "property",
    namespace: "de.meinestadt.demo",
    fields: [
        { name: "demo", type: "string", default: "Hello World" },
    ]
});

const registry = new msglue.GlueSchemaRegistry<SCHEMATYPE>("<GLUE REGISTRY NAME>", {
    region: "<AWS_REGION>",
});

// create a new schema
// throws an error if the schema already exists
try {
    const schemaId = await registry.createSchema({
        type: SchemaType.AVRO,
        schemaName: "Testschema",
        compatibility: SchemaCompatibilityType.BACKWARD,
        schema: JSON.stringify(schema),
    });
} catch (error) {
    ...
}

// or register a version of an existing schema
// creates a new version or returns the id of an existing one, if a similar version already exists
const schemaId = await registry.register({
    schemaName: "Testschema",
    type: SchemaType.AVRO,
    schema: JSON.stringify(schema),
});

// now you can encode an object
const encodedmessage = await registry.encode(schemaId, object);

the encoded message can then be sent to Kafka or MSK, for example:

// import * as kafka from "kafkajs"
// ...
const kc = new kafka.Kafka(); 
const producer = kc.producer();
await producer.connect();
await producer.send({
    topic: "<TOPICNAME>",
    messages: [{ value: encodedmessage }],
});

To decode received messages:

    const schema = avro.Type.forSchema({
        type: "record",
        name: "property",
        namespace: "de.meinestadt.demo",
        fields: [
            { name: "demo", type: "string", default: "Hello World" },
        ]
    });
    const dataset = await registry.decode(message.value, schema);

Note: registry.decode expects the raw message as the first parameter, plus the target schema as the second parameter. If the message is encoded with a different version of the schema, the encoding schema gets loaded from Glue. The record is then converted to the target schema. If the schemas are not compatible an exception is thrown.

Advanced

Type parameter

You can set the type of the object that you want to encode, and that you receive when you decode a message, as type parameter when you create the instance.

For example:

interface Address {
  street: string;
  zip: string;
  city: string;
}

const registry = new msglue.GlueSchemaRegistry<Address>("<GLUE REGISTRY NAME>", {
    region: "<AWS_REGION>",
});

The constructor takes an GlueClientConfig object as parameter.

Create a new schema

Creates a new schema in the glue schema registry. Throws an error if the schema already exists.

registry.createSchema({
    type: SchemaType // currently only SchemaType.AVRO
    schemaName: string
    compatibility: SchemaCompatibilityType // NONE, BACKWARD, BACKWARD_ALL, DISABLED, FORWARD, FORWARD_ALL, FULL, FULL_ALL
    schema: string // the schema as JSON string
})

Register a schema

Registers a schema and makes it available for encoding/decoding. Creates a new version if the schema does not exist yet. Throws an exception if the Glue compatibility check fails. Schemas are cached so that subsequent calls of register do not lead to multiple AWS Glue calls.

const schemaId = await registry.register({
    schemaName: string,
    type: SchemaType,  // currently only SchemaType.AVRO
    schema: string, // schemas as JSON string
});

Encode an object

Encode the object with a given glueSchemaId and returns a Buffer containing the binary data.

async encode(glueSchemaId: string, object: T, props?: EncodeProps): Promise<Buffer>

Optional properties:

{
  compress: boolean // default: true
}

Decode an object

Decodes the binary message with the passed avro schema. Reads the ID of the producer schema from the message, and loads the schema from the Glue schema registry. Caches schemas for subsequent use. Converts the incoming message from the producers schema into the consumers schema. Throws an error if the producer schema does not exist, or cannot be loaded from glue, or if producer and consumer schema are not compatible.

Decodes both uncompressed and gzip compressed messages.

async decode(message: Buffer, consumerschema: avro.Type): Promise<T>

Get meta data for a message

If you need meta data, such as schema id and glue schema, you can use

async analyzeMessage(message: Buffer): Promise<AnalyzeMessageResult>

to get the meta data for a message, without fully decoding it. The result is defined as follows:

export type AnalyzeMessageResult = {
  /**
   * true if the message is valid
   */
  valid: boolean
  /**
   * the error code, if valid is false, otherwise undefined
   */
  error?: ERROR
  /**
   * the header version
   */
  headerversion?: number
  /**
   * the compression type, may be 0 (none) or 5 (gzip)
   */
  compression?: number
  /**
   * the uuid of the schema
   */
  schemaId?: string
  /**
   * the glue schema
   */
  schema?: GetSchemaVersionResponse
}

Update the Glue client

You can refresh the internally cached Glue client by calling updateGlueClient. This is particulary useful when credentials need to get updated.

updateGlueClient(props: GlueClientConfig)

Examples

Nodejs Lambda function consuming a MSK/Kafka stream

The following lambda function consumes messages from Kafka/MSK, and prints the deserialized object to the console. Incoming messages must be encoded with a compatible Avro schema.

import * as avro from "avsc";
import { GlueSchemaRegistry } from "@meinestadt.de/glue-schema-registry";

// define the interface for the deserialized message
interface IHelloWorld {
  message: string;
}

// the avro schema
const schema = avro.Type.forSchema({
  type: "record",
  namespace: "de.meinestadt.glueschema.demo",
  name: "HelloWorld",
  fields: [
    {
      name: "message",
      type: "string",
      default: "",
    },
  ],
});

export const handler = async (event: any) => {
  // create the registry object
  // Note: the lambda function must have the permission to read schemas and
  // schema versions from AWS Glue
  const registry = new GlueSchemaRegistry<IHelloWorld>("MySchemas", {
    region: "eu-central-1",
  });
  // Iterate through keys
  for (let key in event.records) {
    console.log("Key: ", key);
    // Iterate through records
    const p = event.records[key].map((record: any) => {
        console.log(`Message key: ${record.key}`);
      // base64 decode the incoming message
      const msg = Buffer.from(record.value, "base64");
      // decode the avro encoded message
      return registry.decode(msg, schema).then((dataset) => {
        // print the deserialized object to the console
        console.log(dataset);
      });
    });
    await Promise.all(p);
  }
};

Protocol details

@meinestadt.de/glue-schema-registry uses the same simple wire protocol as AWS' java libraries (https://github.com/awslabs/aws-glue-schema-registry). The protocol is defined as follows:

VERSION_HEADER (1 Byte) + COMPRESSION_BYTE (1 Byte) + SCHEMA_VERSION_ID (16 Byte) + CONTENT

The current supported version is 3. Compression can be 0 (no compression) or 5 (zlib compression).

Future plans

  • support for Protobuf
  • support for JSON Schema