npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@trusk/amqp-connector

v1.6.3

Published

[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/facebook/react/blob/master/LICENSE) [![CircleCI](https://circleci.com/gh/trusk-official/amqp-connector.svg?style=svg)](https://circleci.com/gh/trusk-official/amqp-co

Downloads

813

Readme

AMQP connector

GitHub license CircleCI npm version PRs Welcome

An middle level amqp.node wrapper for every day use. It requires node.js >= 6.

Features:

  • Promise based
  • Compatible with direct/topic/fanout/headers exchanges and send to queue
  • Built-in RPC
  • Based on node-amqp-connection-manager
    • Automatically reconnect when your amqplib broker dies in a fire
    • Round-robin connections between multiple brokers in a cluster
    • If messages are sent while the broker is unavailable, queues messages in memory until we reconnect
    • Queued message are persisted to disk in case of unexpected crash/reboot, and recovered in memory
  • Built-in distributed tracing
  • Joi message structure validation on message reception (listen, subscribeToMessage)
  • Provide your own transport to log every microservice message
  • Automatic retry with dead-letter
  • Stream over amqp (alpha)

Install

npm install @trusk/amqp-connector

Run tests

# Launch RabbitMQ with docker (guest:guest)
docker run -d -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management

# Launch tests
npm test

Usage

Connection

// see http://www.squaremobius.net/amqp.node/channel_api.html#connect
const connection = amqpconnector({
  urls: ["amqp://localhost:5672"],
  serviceName: "default",
  serviceVersion: "1.2.3",
  transport: console // or any logger instance ex https://github.com/winstonjs/winston
}).connect();

Channel

const connection = amqpconnector().connect();

// see https://github.com/trusk-official/node-amqp-connection-manager#amqpconnectionmanagercreatechanneloptions
const channel = connection.buildChannelIfNotExists({
  name: "default",
  json: true, // use false to work with Buffers
  swap_path: path.resolve("./swap/my_channel"),
  swap_size: 50000,
  prefetchCount: 5,
  prefetchGlobal: true,
  rejectTimeout: 0 // The timeout before a message is rejected (defaults to 0)
  realm: "my_realm." // scopes every exchange/queue/routing key by adding the realm as prefix
});

Direct Exchange (with message validation)

const connection = amqpconnector({
  serviceVersion: "1.2.3"
}).connect();

const channel = connection.buildChannelIfNotExists({ json: true });

/**
 * @callback subscribeCallback
 * @param {object} message - an amqp message, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
 * @param {function} invoke - a contextual invoke function
 * @param {function} publishMessage - a contextual publishMessage function

 * Subscribes an exchange
 * @param {string} qualifier - the subscription string, see Subscribe qualifier
 * @param {subscribeCallback} callback - The message handler
 * @param {object} options - The subscribe options
 * @param {object} options.exchange - The exchange parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange
 * @param {object} options.queue - The queue parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
 * @param {object} options.headers - The subscribe headers, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_bindQueue
 * @param {object} options.schema - a Joi validation schema, see https://github.com/hapijs/joi/blob/v16.0.0-rc2/API.md#object---inherits-from-any
 * @param {object} options.validator - a validation function. Must return an { error, value } object. Schema takes precedence over validator.
 * @param {object} options.nack - a message nack arguments object
 * @param {bool} options.nack.allUpTo - defaults to false, see allUpTo https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
 * @param {bool} options.nack.requeue - defaults to false, see requeue https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
 * @param {number} options.retry - the ttl for retrying the message on top of the queue. Will create a dead letter exchange and no consumer queue to enable it
 * @param {number} options.dlPrefix - the prefix for the dead letter exchange and no consumer queue
 * @param {number} options.maxTries - The number of tries when sent to deadletter (defaults to none (unlimited retries))
 * @param {string} options.dumpQueue - the queue where to send the message when the maxTries is reached (defaults to none (discards if maxTries is set))
 * @return {Promise<object>} A promise that resolves { consumerTag }, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
 */
channel.subscribeToMessages(
  "direct/my-direct-exchange/my.routing.key/my-queue",
  async ({ message }) => {
    // handle message
  },
  {
    schema: Joi.object({
      content: Joi.object(),
      properties: Joi.object({
        headers: Joi.object({
          "x-service-version": Joi.string().regex(/^1.2.\d$/) // validates every 1.2 patches
        }).unknown()
      }).unknown()
    }).unknown()
  }
);

// or use a text schema definition
channel.subscribeToMessages(
  "direct/my-direct-exchange/my.routing.key/my-queue",
  async ({ message }) => {
    // handle message
  },
  {
    schema: {
      type: "object",
      flags: {
        unknown: true
      },
      keys: {
        content: {
          type: "object"
        },
        properties: {
          type: "object",
          flags: {
            unknown: true
          },
          keys: {
            headers: {
              type: "object",
              flags: {
                unknown: true
              },
              keys: {
                "x-service-version": {
                  type: "string",
                  rules: [
                    {
                      name: "pattern",
                      args: {
                        regex: "/^1.2.\\d$/"
                      }
                    }
                  ]
                }
              }
            }
          }
        }
      }
    }
  }
);

// or use a validator function
// The validator function must return an { error, value } object
// schema takes precedence over validator
const schema = Joi.object({
  content: Joi.object(),
  properties: Joi.object({
    headers: Joi.object({
      "x-service-version": Joi.string().regex(/^1.2.\d$/) // validates every 1.2 patches
    }).unknown()
  }).unknown()
}).unknown();

channel.subscribeToMessages(
  "direct/my-direct-exchange/my.routing.key/my-queue",
  async ({ message }) => {
    // handle message
  },
  {
    validator: schema.validator.bind(schema)
  }
);

/**
 * Publish to an exchange
 * @param {string} qualifier - the publish string, see Publish qualifier
 * @param {object|Buffer} message - The message
 * @param {object} options - The subscribe options
 * @param {object} options.headers - Additional headers for the message
 * @return {Promise<Bool>} A promise that resolves Bool, see http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish
 */
channel.publishMessage("direct/my-direct-exchange/my.routing.key", {
  foo: "bar"
});

Topic Exchange

const connection = amqpconnector().connect();
const channel = connection.buildChannelIfNotExists({ json: true });

/**
 * @callback subscribeCallback
 * @param {object} message - an amqp message, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
 * @param {function} invoke - a contextual invoke function
 * @param {function} publishMessage - a contextual publishMessage function

 * Subscribes an exchange
 * @param {string} qualifier - the subscription string, see Subscribe qualifier
 * @param {subscribeCallback} callback - The message handler
 * @param {object} options - The subscribe options
 * @param {object} options.exchange - The exchange parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange
 * @param {object} options.queue - The queue parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
 * @param {object} options.headers - The subscribe headers, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_bindQueue
 * @param {object} options.schema - a Joi validation schema, see https://github.com/hapijs/joi/blob/v16.0.0-rc2/API.md#object---inherits-from-any
 * @param {object} options.validator - a validation function. Must return an { error, value } object. Schema takes precedence over validator.
 * @param {object} options.nack - a message nack arguments object
 * @param {bool} options.nack.allUpTo - defaults to false, see allUpTo https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
 * @param {bool} options.nack.requeue - defaults to false, see requeue https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
 * @param {number} options.retry - the ttl for retrying the message on top of the queue. Will create a dead letter exchange and no consumer queue to enable it
 * @param {number} options.dlPrefix - the prefix for the dead letter exchange and no consumer queue
 * @param {number} options.maxTries - The number of tries when sent to deadletter (defaults to none (unlimited retries))
 * @param {string} options.dumpQueue - the queue where to send the message when the maxTries is reached (defaults to none (discards if maxTries is set))
 * @return {Promise<object>} A promise that resolves { consumerTag }, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
 */
channel.subscribeToMessages(
  "topic/my-topic-exchange/my.routing.key/my-queue",
  async ({ message }) => {
    // handle message
  }
);

/**
 * Publish to an exchange
 * @param {string} qualifier - the publish string, see Publish qualifier
 * @param {object|Buffer} message - The message
 * @param {object} options - The subscribe options
 * @param {object} options.headers - Additional headers for the message
 * @return {Promise<Bool>} A promise that resolves Bool, see http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish
 */
channel.publishMessage("topic/my-topic-exchange/my.routing.*", {
  foo: "bar"
});

Fanout Exchange

const connection = amqpconnector().connect();
const channel = connection.buildChannelIfNotExists({ json: true });

/**
 * @callback subscribeCallback
 * @param {object} message - an amqp message, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
 * @param {function} invoke - a contextual invoke function
 * @param {function} publishMessage - a contextual publishMessage function

 * Subscribes an exchange
 * @param {string} qualifier - the subscription string, see Subscribe qualifier
 * @param {subscribeCallback} callback - The message handler
 * @param {object} options - The subscribe options
 * @param {object} options.exchange - The exchange parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange
 * @param {object} options.queue - The queue parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
 * @param {object} options.headers - The subscribe headers, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_bindQueue
 * @param {object} options.schema - a Joi validation schema, see https://github.com/hapijs/joi/blob/v16.0.0-rc2/API.md#object---inherits-from-any
 * @param {object} options.validator - a validation function. Must return an { error, value } object. Schema takes precedence over validator.
 * @param {object} options.nack - a message nack arguments object
 * @param {bool} options.nack.allUpTo - defaults to false, see allUpTo https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
 * @param {bool} options.nack.requeue - defaults to false, see requeue https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
 * @param {number} options.retry - the ttl for retrying the message on top of the queue. Will create a dead letter exchange and no consumer queue to enable it
 * @param {number} options.dlPrefix - the prefix for the dead letter exchange and no consumer queue
 * @param {number} options.maxTries - The number of tries when sent to deadletter (defaults to none (unlimited retries))
 * @param {string} options.dumpQueue - the queue where to send the message when the maxTries is reached (defaults to none (discards if maxTries is set))
 * @return {Promise<object>} A promise that resolves { consumerTag }, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
 */
channel.subscribeToMessages(
  "fanout/my-fanout-exchange/my-queue",
  async ({ message }) => {
    // handle message
  }
);

/**
 * Publish to an exchange
 * @param {string} qualifier - the publish string, see Publish qualifier
 * @param {object|Buffer} message - The message
 * @param {object} options - The subscribe options
 * @param {object} options.headers - Additional headers for the message
 * @return {Promise<Bool>} A promise that resolves Bool, see http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish
 */
channel.publishMessage("fanout/my-fanout-exchange", {
  foo: "bar"
});

Headers Exchange

const connection = amqpconnector().connect();
const channel = connection.buildChannelIfNotExists({ json: true });

/**
 * @callback subscribeCallback
 * @param {object} message - an amqp message, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
 * @param {function} invoke - a contextual invoke function
 * @param {function} publishMessage - a contextual publishMessage function

 * Subscribes an exchange
 * @param {string} qualifier - the subscription string, see Subscribe qualifier
 * @param {subscribeCallback} callback - The message handler
 * @param {object} options - The subscribe options
 * @param {object} options.exchange - The exchange parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange
 * @param {object} options.queue - The queue parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
 * @param {object} options.headers - The subscribe headers, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_bindQueue
 * @param {object} options.schema - a Joi validation schema, see https://github.com/hapijs/joi/blob/v16.0.0-rc2/API.md#object---inherits-from-any
 * @param {object} options.validator - a validation function. Must return an { error, value } object. Schema takes precedence over validator.
 * @param {object} options.nack - a message nack arguments object
 * @param {bool} options.nack.allUpTo - defaults to false, see allUpTo https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
 * @param {bool} options.nack.requeue - defaults to false, see requeue https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
 * @param {number} options.retry - the ttl for retrying the message on top of the queue. Will create a dead letter exchange and no consumer queue to enable it
 * @param {number} options.dlPrefix - the prefix for the dead letter exchange and no consumer queue
 * @param {number} options.maxTries - The number of tries when sent to deadletter (defaults to none (unlimited retries))
 * @param {string} options.dumpQueue - the queue where to send the message when the maxTries is reached (defaults to none (discards if maxTries is set))
 * @return {Promise<object>} A promise that resolves { consumerTag }, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
 */
channel.subscribeToMessages(
  "headers/my-headers-exchange/my-queue",
  async ({ message }) => {
    // handle message
  },
  {
    headers: {
      customheader: "my-header",
      "x-match": "any"
    }
  }
);

/**
 * Publish to an exchange
 * @param {string} qualifier - the publish string, see Publish qualifier
 * @param {object|Buffer} message - The message
 * @param {object} options - The subscribe options
 * @param {object} options.headers - Additional headers for the message
 * @return {Promise<Bool>} A promise that resolves Bool, see http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish
 */
channel.publishMessage(
  "headers/my-headers-exchange",
  {
    foo: "bar"
  },
  {
    headers: {
      customheader: "my-header"
    }
  }
);

Send To Queue

const connection = amqpconnector().connect();
const channel = connection.buildChannelIfNotExists({ json: true });

/**
 * @callback subscribeCallback
 * @param {object} message - an amqp message, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
 * @param {function} invoke - a contextual invoke function
 * @param {function} publishMessage - a contextual publishMessage function

 * Subscribes an exchange
 * @param {string} qualifier - the subscription string, see Subscribe qualifier
 * @param {subscribeCallback} callback - The message handler
 * @param {object} options - The subscribe options
 * @param {object} options.exchange - The exchange parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange
 * @param {object} options.queue - The queue parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
 * @param {object} options.headers - The subscribe headers, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_bindQueue
 * @param {object} options.schema - a Joi validation schema, see https://github.com/hapijs/joi/blob/v16.0.0-rc2/API.md#object---inherits-from-any
 * @param {object} options.validator - a validation function. Must return an { error, value } object. Schema takes precedence over validator.
 * @param {object} options.nack - a message nack arguments object
 * @param {bool} options.nack.allUpTo - defaults to false, see allUpTo https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
 * @param {bool} options.nack.requeue - defaults to false, see requeue https://www.squaremobius.net/amqp.node/channel_api.html#channel_nack
 * @param {number} options.retry - the ttl for retrying the message on top of the queue. Will create a dead letter exchange and no consumer queue to enable it
 * @param {number} options.dlPrefix - the prefix for the dead letter exchange and no consumer queue
 * @param {number} options.maxTries - The number of tries when sent to deadletter (defaults to none (unlimited retries))
 * @param {string} options.dumpQueue - the queue where to send the message when the maxTries is reached (defaults to none (discards if maxTries is set))
 * @return {Promise<object>} A promise that resolves { consumerTag }, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
 */
channel.subscribeToMessages(
  "direct/any-exchange//my-queue",
  async ({ message }) => {
    // handle message
  }
);

/**
 * Publish to a queue
 * @param {string} qualifier - the publish string, see Publish qualifier
 * @param {object|Buffer} message - The message
 * @param {object} options - The subscribe options
 * @param {object} options.headers - Additional headers for the message
 * @return {Promise<Bool>} A promise that resolves Bool, see http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish
 */
channel.publishMessage("q/my-queue", {
  foo: "bar"
});

RPC

const connection = amqpconnector().connect();
const channel = connection.buildChannelIfNotExists({ json: true });

/**
 * @callback listenCallback
 * @param {object} message - an amqp message, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
 * @param {function} invoke - a contextual invoke function
 * @param {function} publishMessage - a contextual publishMessage function

 * Listens to an RPC queue
 * @param {string} qualifier - the subscription string, see Subscribe qualifier
 * @param {listenCallback} callback - The message handler
 * @param {object} options - The subscribe options
 * @param {object} options.queue - The queue parameters, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
 * @param {object} options.schema - a Joi validation schema, see https://github.com/hapijs/joi/blob/v16.0.0-rc2/API.md#object---inherits-from-any
 * @param {object} options.validator - a validation function. Must return an { error, value } object. Schema takes precedence over validator.
 * @return {Promise<object>} A promise that resolves { consumerTag }, see https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
 */
channel.listen("my-rpc-function", async ({ message }) => {
  return { value: message.content.value * 42 };
});

/**
 * Invoke an RPC function
 * @param {string} qualifier - the queue string
 * @param {object|Buffer} message - The message
 * @param {object} options - The subscribe options
 * @param {integer} options.timeout - the timeout (ms) for the call
 * @param {object} options.headers - Additional headers for the message
 * @return {Promise} A promise that resolves the function response
 */
const result = await channel.invoke("my-rpc-function", { value: 1337 }); // { value: 57491 }

Distributed tracing

  • By using the contextual invoke and publishMessage functions you can easily trace the journey of a message
channel.listen("my-traced-rpc-function-4", async ({ message }) => {
  // message.properties.headers["x-transaction-stack"] === ["qSdeF", "hYud7", "GTynl", "zQwfG"]
  return { value: 5 * message.content.value };
});

channel.listen("my-traced-rpc-function-3", async ({ message, invoke }) => {
  // message.properties.headers["x-transaction-stack"] === ["qSdeF", "hYud7", "GTynl"]
  const mess = await invoke("my-traced-rpc-function-4", {
    value: 4 * message.content.value
  });
  return mess.content;
});

channel.listen("my-traced-rpc-function-2", async ({ message, invoke }) => {
  // message.properties.headers["x-transaction-stack"] === ["qSdeF", "hYud7"]
  const mess = await invoke("my-traced-rpc-function-3", {
    value: 3 * message.content.value
  });
  return mess.content;
});

channel.listen("my-traced-rpc-function-1", async ({ message, invoke }) => {
  // message.properties.headers["x-transaction-stack"] === ["qSdeF"]
  const mess = await invoke("my-traced-rpc-function-2", {
    value: 2 * message.content.value
  });
  return mess.content;
});

const result = await channel
  .invoke("my-traced-rpc-function-1", { value: 42 })
  .then(response => {
    // response.content.value === 42 * 2 * 3 * 4 * 5;
  });

Dead letter

  • this will create a dead letter exchange my_dl_5000 and a no consumer queue my_dl_5000 with adequate arguments to enable automatic retry on top of the queue every 5000ms.
  • providing a maxTries value will make the broker retest the message maxTries - 1 (otherwise it is endless retries), then it will either ack the message (which basically discards it) or send it to the dumpQueue.
channel.subscribeToMessages(
  "direct/my-direct-exchange/my.routing.key/my-queue",
  async ({ message }) => {
    // handle
  },
  {
    retry: 5000,
    dlPrefix: "my_dl_", // defaults to dl_
    maxTries: 10, // defaults to none (endless)
    dumpQueue: "my-dump-queue" // defaults to none (discard if maxTries is set)
  }
);

Stream over AMQP (alpha)

  • channels must be raw
channel.listen("my-rpc-function-stream-1", async () => {
  return fs.createReadStream("/path/to/file");
});

const stream = channel.invoke(
  "stream/my-rpc-function-stream-1",
  Buffer.from("")
);

stream.pipe(fs.createWriteStream("/path/to/other_file"));

Qualifier structure

Publish qualifiers

direct/exchange/routingkey
direct// => type: direct, exchange: amqp.direct, routing key: ""

topic/exchange/routingkey
topic// => type: topic, exchange: amqp.topic, routing key: ""

fanout/exchange
fanout/ => type: fanout, exchange: amqp.fanout

headers/exchange
headers/ => type: headers, exchange: amqp.headers

q/queue
q/ => type: queue, queue: anonymous

Subscribe qualifiers

direct/exchange/routingkey/queue
direct/// => type: direct, exchange: amqp.direct, routing key: "", queue: anonymous

topic/exchange/routingkey/queue
topic/// => type: topic, exchange: amqp.topic, routing key: "", queue: anonymous

fanout/exchange/queue
fanout// => type: fanout, exchange: amqp.fanout, queue: anonymous

headers/exchange/queue
headers// => type: headers, exchange: amqp.headers, queue: anonymous