npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

amqpjson

v1.1.1

Published

helpers for JSON messages over AMQP

Downloads

5

Readme

amqpjson

Extends amqplib's Channels (Promise-based API only) to assist in sending and receiving JSON messages via AMQP.

Usage

const amqpjson = require('amqpjson')
const amqplib = require('amqplib')

// Publisher
amqplib.connect().then(function (conn) {
  const template = '{{phylum}}.{{class}}.{{order}}.{{family}}.{{genus}}.{{species}}'
  return amqpjson.publisher(conn, 'Animalia', template, {
    deliveryMode: 2
  })
}).then(function (publisher) {
  let human = {
    phylum: 'Chordata',
    'class': 'Mammalia',
    order: 'Primates',
    family: 'Hominadae',
    genus: 'Homo',
    species: 'sapiens'
  }
  return publisher.publishObject(human, { expiration: 5000 })
}).then(function () {
  // the message was confirmed
})

// Consumer
amqplib.connect().then(function (conn) {
  return amqpjson.consumer(conn, queue, exchange, bindKey, {
    messageTtl: 1000,
    parseDates: true /* not passed to amqplib */
  })
}).then(function (consumer) {
  consumer.consumeStart(handler)
})
function handler (message) {
  console.log(message.json) // set when contentType was 'application/json'
  this.ack(message)         // handler is bound to the channel
}

Consumer

Create a Channel intended for consuming JSON messages.

amqpjson.consumer(connection, [queue, [exchange, bindKey, [options]]])

Returns a Promise for a Channel:

  • connection: AMQP connection
  • queue: queue name
    • empty: declared as durable=false, autoDelete=true, expires=60000
    • non-empty: declared as durable=true, autoDelete=false, no expiration
  • exchange: exchange name, asserted as 'topic' exchange
  • bindKey: used to bind exchange to queue, skipped if empty or no exchange
    • single string is OK
    • array of strings is OK (all will be bound)
  • options: additional options to extend (or override) the defaults which are based on queue name (e.g. set a named queue that expires)
    • parseDates: if this is set to true, the consumer will recognize serialized Date objects and recreate them

The Channel returned is a regular Channel from amqplib. However, it has an additional method:

channel.consumeStart(handler, [options])

This works slightly differently than channel.consume(queue, handler, [options]):

  • you don't include the queue name
  • when your handler is called:
    • the message.content Buffer is parsed into message.json, if published with contentType='application/json'
    • if JSON.parse fails, handler is still called (but no message.json for you)
    • this is set to the Channel, so you can call this.ack(message)
  • options are passed just the same

Also the Channel returned has a property amqpjson, an object with the following properties:

  • queue: queue name (given or generated)
  • exchange: exchange name (if given)
  • bindKeys: list of all binding keys (if given)
  • bindKey: first binding key (if given, for backwards compatibility)
  • consumerTag: consumer tag obtained after calling consumeStart()
  • options: options used in channel.assertQueue() call
  • parseDates: if this was set, the flag is present

Publisher

Create a ConfirmChannel intended for publishing JSON messages.

amqpjson.publisher(connection, exchange, routeTemplate, [options])

Returns a Promise for a Channel:

  • connection: AMQP connection
  • exchange: exchange name, asserted as 'topic' exchange
  • routeTemplate: string, Mustache-based template used to route messages
  • options: additional options to save for each publishObject()

The Channel returned is a ConfirmChannel from amqplib, with the additional following methods:

publishObject(data, [options])

publishObject() takes care of:

  • remembering the exchange given
  • creating a routing key from the Mustache template and the data object itself
    • Mustache.render(routeTemplate, data) is how this is acheived
  • the data object will be serialized via JSON.stringify()
  • message property contentType is set to 'application/json'
  • message property contentEncoding is set to 'UTF-8'
  • any options passed are added to the options from publisher creation
  • returns a Promise which, when resolved, indicates the message was confirmed

publishAsync(exchange, routingKey, buffer, options)

This is just a promisified version of Channel.publish() which returns a Promise rather than accepting a callback.