npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

asyncapi-pub-middleware

v0.0.13

Published

Add a validating publisher object, from an AsyncAPI file description, to your request

Downloads

59

Readme

asyncapi-pub-middleware

Add a validating publisher object, from an AsyncAPI file description, to your request

Summary

This module will automatically adds a validating publisher object (RabbitMQ, Kafka or HTTP) to the request of your express-like application, from an AsyncAPI definition file.

Usage

const server = require('express');
const getPublisherMiddleware = require('asyncapi-pub-middleware');

const app = server();

const doc = fs.readFileSync('./myAsyncAPIFile.yaml', 'utf8');

const options = {
  tag: 'my-app',
  connections: {},
};
const plugins = {
  mqtt: './plugins/mqtt',
}

const mountPublisher = await getPublisherMiddleware(doc, options, plugins);
app.use(mountPublisher);

// your middlewares

app.use(async (err, req, res, next) => {
  await req.api.publisher.publish('service.error', err);
  next(err);
});

app.listen(options);

This will validate and parse your AsyncAPI file (myAsyncAPIFile.yaml), then create a middleware that mount a publisher object. The validation and parsing of the file is done by @asyncapi/parser.
This publisher object allows you to publish all messages defined as a subscribe operation in the asyncAPI file. On publishing it validates the parameters, headers and payload sent. The validation itself is done by ajv.

Important Note: for this module to work you need to have 'servers' described in the spec and at least one of these servers attached to each of the 'channels'

Documentation

asyncapi-pub-middleware

The main function just create a Publisher object from an asyncAPI spec file with the options passed and attach it to the request object.

Publisher

class Publisher {
  constructor(plugins = {})
  async loadAPI(apiDocument, options = {})
  async publish(topic, msg, headers = {}, options = {})
  async stop(closeConnection = true)
}

The publisher class itself it available out of the middleware for your convenience

const Publisher = require('asyncapi-pub-middleware').Publisher;
const publisher = new Publisher();

constructor(plugins = {})

For the moment only those protocol are taken care of

You can add other protocols or overwrite existing ones by passing a path to a protocol plugin file into the Publisher constuctor:

const Publisher = require('asyncapi-pub-middleware').Publisher;
const publisher = new Publisher({
  amqp: './plugins/amqp',
  kafka: './plugins/kafka',
});

async loadAPI(apiDocument, options = {})

This function loads the spec file and create a Channel object for each 'channel' in the spec file that has a 'subscribe' operation. This Channel object will be used for publishing messages to the defined 'channel'.
If no connection is provided (see options) it will try to create the connection itself from the 'servers' definitions in the spec file.

apiDocument

The AsyncAPI file. It accepts 3 formats:

string (reading directly from the file)

const document = fs.readFileSync('./lib/myAsyncAPIFile.yaml', 'utf8');

JSON (the file converted to a JSON object)

const yaml = require('js-yaml');
const document = yaml.load(fs.readFileSync('./lib/myAsyncAPIFile.yam', 'utf8'));

AsyncAPIDocument (the file parsed through @asyncapi/parser)

const { parse } = require('@asyncapi/parser');
const document = await parse(fs.readFileSync('./lib/myAsyncAPIFile.yam', 'utf8'));

options

An optional object. All properties are optional too.

// these are the default values
const {
  tag = '',
  connections = {},
} = options;

If tag is set, only the tagged subscribe operations will be available for publishing.
connections is an object containing the connections to the servers. It is highly recommended that you create the connections yourself, mostly to take care of the security part. The automatic connection creation in the plugins is mostly there for convenience in development.
If your asyncAPI file defines servers like that:

servers:
  rabbit:
    url: amqp://myuser:mypassword@localhost:5672
    protocol: amqp
    protocolVersion: 0.9.1
  kafkaBroker:
    url: localhost:19092
    protocol: kafka
  RESTServer:
    url: localhost:8080
    protocol: http

Your connection object should look like that:

const { Kafka } = require('kafkajs');
const amqplib = require('amqplib');
const axios = require('axios');

// Actual connection from amqplib for amqp protocol
const rabbitConn = await amqplib.connect('amqp://myuser:mypassword@localhost:5672');

const kafka = new Kafka({
  brokers: ['localhost:19092'],
});

// Producer connection from KafkaJS for kafka protocol
const kafkaProducer = kafka.producer();
await kafkaProducer.connect();

// Axios instance for http protocol
const httpInstance = axios.create({
  baseURL: 'localhost:8080',
  headers: { 'Authorization': AUTH_TOKEN },
});

const options = {
  connections: {
    rabbit: rabbitConn, // name of the amqp protocol server in the spec
    kafkaBroker: kafkaProducer, // name of the kafka protocol server in the spec
    RESTServer: httpInstance, // name of the http protocol server in the spec
  },
};

async publish(topic, msg, headers = {}, options = {})

This function will pick the Channel defined by the topic, validate the parameters, headers and message payload against the schema defined in the spec file and then ask the plugin to publish the message with the options.
The publish function returns the result of the publish action if applicable (i.e. only for the http protocol), it will always be an array of results.

await publisher.publish('my.amqp.channel.name', { foo: 'bar' }, { 'x-session-id': 'myuuid' }, { priority: 25 });
await publisher.publish('my.kafka.channel.name', { foo: 'bar' }, { 'x-session-id': 'myuuid' }, { key: 'myKafkaKey', partition: 3 });
const [{ data, status, headers }] = await publisher.publish('/ping', { foo: 'bar' }, { 'x-session-id': 'myuuid' });

async stop(closeConnection = true)

This function will close all channels and the underlying connection if it's asked and applicable.

await publisher.stop(false);

Plugins

Plugins are used for the actual publishing of the message.
During the channels creation the result of ProtocolConnection.getConnection(serverInfoFromSpec) is fed to the ProtocolConnection constructor which is then bound, i.e. protocol.bind(channelBindingsFromSpec, operationBindingsFromSpec).
Plugins prototype:

class ProtocolConnection {
  static async getConnection(serverInfoFromSpec);
  constructor(conn);
  async bind(channelBindingsFromSpec, operationBindingsFromSpec);
  async publish(topic, headers, msg, messageBindingsFromSpec, options = {});
  async stop(closeConnection = true)
}