npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

amqp.channel

v1.0.2

Published

A simplified way to setup an AMQP connection/channel with amqplib

Downloads

4

Readme

amqp.channel

Build Status Test Coverage

A simplified way to setup an AMQP connection/channel with amqplib. It's a function that takes an AMQP url as the first parameter and an optional second parameter that defines which methods and arguments should be called on the channel. The function returns a Promise that will resolve with the a channel object once all the method invocations defined in the second parameter have been resolved. Please see amqplib's documentation for the channel API.

Simplified Configuration

amqplib syntax:

require('amqplib').connect(url).then(function(connection){
  return connection.createConfirmChannel();
}).then(function(channel){
  return require('bluebird').all([
    channel.assertExchange('exchange', 'fanout', { durable: true }),
    channel.checkExchange('exchange'),
    channel.bindExchange('alt.exchange', 'exchange', ''),
    channel.unbindExchange('alt.exchange', 'exchange', ''),
    channel.deleteExchange('alt.exchange', { ifEmpty: true }),
    channel.assertQueue('first', { durable: true }),
    channel.assertQueue('second'),
    channel.checkQueue('first'),
    channel.bindQueue('first', 'exchange', ''),
    channel.unbindQueue('first', 'exchange', ''),
    channel.purgeQueue('first'),
    channel.deleteQueue('first', { ifEmpty: true }),
    channel.deleteQueue('second')
  ]);
}).then(function(channel){
  // Do stuff with the channel
});

amqp.channel syntax:

require('amqp.channel')(url, {
  assertExchange : [['exchange', 'fanout', { durable: true }]],
  checkExchange  : [['exchange']],
  bindExchange   : [['alt.exchange', 'exchange', '']],
  unbindExchange : [['alt.exchange', 'exchange', '']],
  deleteExchange : [['alt.exchange', { ifEmpty: true }]],
  assertQueue    : [['first', { durable: true  }], ['second']],
  checkQueue     : [['first']],
  bindQueue      : [['first', 'exchange', '']],
  unbindQueue    : [['first', 'exchange', '']],
  purgeQueue     : [['first']],
  deleteQueue    : [['first', { ifEmpty: true }], ['second']]
}).then(function(channel){
  // Do stuff with the channel
});

Simplified Usage

The channel object resolved by the returned Promise will behave differently from a normal channel object returned by the amqplib library in a few (hopefully convenient) ways:

  1. The consume, publish, and sendToQueue channel methods have been changed to explicitly handle JSON.
  2. The publish and sendToQueue methods have been "promisified" in a way that will still provide information to know whether or not the write buffer is full (and therefore, whether or not you should continue writing to it) by adding an additional ok boolean property to the promise.
  3. A channel consumer callback will no longer receive null when that consumer had been cancelled by Rabbit MQ. Instead, the channel object will emit a 'cancelled' event with all the arguments passed to the channel.consume() call for the consumer that was cancelled.

Examples of Modified Usage:

Automatic translation of JS object to JSON string to Buffer for sending/publishing:

channel.sendToQueue('someQueue', { hello: 'world' });
channel.publish('someExchange', 'routingKey', { hello: 'world' });

Promisification of sendToQueue and publish methods:

return channel.sendToQueue('someQueue', { hello: 'world' }).then(function(){
  return channel.publish('someExchange', 'routingKey', { hello: 'world' });
});

Automatic translation of message Buffer to JSON string to JS object for consuming:

channel.sendToQueue('someQueue', { hello: 'world' });
channel.consume('someQueue', function(parsedMessage, originalMessage){
  console.log('hello', parsedMessage.hello); // => hello world
  channel.ack(originalMessage);
});

Handling a consumer getting cancelled by Rabbit MQ:

channel.on('cancelled', function(queue, callback, options){
  // When the consumer below gets cancelled by Rabbit MQ
  console.log(queue, callback.name, options); // 'someQueue', 'onMessage', { noAck: true }
});
channel.consume('someQueue', function onMessage(parsedMessage, originalMessage){
  console.log(parsedMessage);
}, { noAck: true });

The ok property on the promises returned by the sendToQueue and publish methods:

var sent = channel.sendToQueue('someQueue', { hello: 'world' });
if (sent.ok) {
  // continue sending
} else {
  // maybe pause sending until unblocked?
  channel.once('drain', function(){
    // continue sending
  });
}

Real World Example

Say you wanted to listen to the 'foo' exchange and send a different message to the 'bar' queue every time the message's baz property contained the word 'qux'.

In your config.js:

var env = process.env;
var cfg = {
  exchange: env.EXCHANGE_TO_BIND_TO || 'foo',
  queue: {
    toSendTo: env.QUEUE_TO_SEND_TO || 'bar',
    toConsumeFrom: env.QUEUE_TO_CONSUME_FROM  || 'baz',
  }
  amqpUrl: env.RABBIT_MQ_URL || 'amqp://test:[email protected]:5672'
};

cfg.channelMethodsToCall = {
  assertQueue: // Channel method to invoke
  [ // Array of channel method invocations
    [ // channel.assertQueue( cfg.queue.toConsumeFrom )
      cfg.queue.toConsumeFrom
    ],
    [ // channel.assertQueue( cfg.queue.toSendTo, { durable: true } );
      cfg.queue.toSendTo, { durable: true }
    ]
  ],
  assertExchange: [
    [ cfg.exchange, 'fanout' ] // channel.assetExchange(cfg.exchange, 'fanout')
  ],
  bindQueue: [
    [ cfg.queue.toConsumeFrom, cfg.exchange, '' ]
  ]
}

module.exports = cfg;

In your app.js:

var cfg = require('./config');
var amqp = require('amqp.channel');

module.exports = amqp(cfg.amqpUrl, cfg.channelMethodsToCall)
  .then(consumeAtMost(1))
  .then(consumeFrom(cfg.queue.toConsumeFrom));
  
function consumeAtMost(maxMessages){
  return function(channel){
    // Only process `maxMessages` at a time and don't consume another
    // message until we've either `ack` or `nack` the current one.
    return channel.prefetch(maxMessage).then(function(){
      return channel;
    });
  }
}

function consumeFrom(queue){
  return function(channel){
    channel.consume(queue, function onMessage(parsed, msg){
      if (/baz/.test(parsed.baz)) {
        var msgToSend = { hello: 'world' };
        var options = { persistent: true };
        var sendMsg = channel.sendToQueue(cfg.queue.toSendTo, msgToSend, options);
        
        sendMsg.catch(function(e){
          console.error(e);
          // Try to process message again?
          // onMessage(parsed, msg);
        });
        
        if (sendMsg.ok) {
          channel.ack(msg);
        } else {
          sendMsg.then(function(){
            channel.ack(msg);
          });
        }
      } else {
        channel.ack(msg);
      }
    });
  
    channel.on('cancelled', function onConsumerCancelled(queue, cb, options){
      console.warn('RabbitMQ cancelled your consumer for %s', queue);
      // Try to setup the consumer again?
      // channel.consume(queue, cb, options);
    });
  
    return channel;
  }
}