npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@hcon/rabbit-fire

v0.9.0

Published

Simple fire-and-forget amqplib wrapper with auto-reconnect and round robin support.

Downloads

86

Readme

rabbit-fire

Simplified connection management for amqplib, originally forked from amqp-connection-manager. Made to fire-and-forget messages for logging purposes or similar use cases.

Changes

  • Removed all logics for confirm channels and confirmation, while maintaining queue logic for retry.
    • Messages are sent at publish and only queued if conditions means it cannot be sent.
    • Handling of queued messages now happens at reconnect.
  • Remove callbacks (and promise-breaker) in favor of promises only.
  • Remove parsing and json options, all clients must pass a Buffer.
  • Removed sendToQueue logic, only used for publushing to an exchange.
  • Removed addSetup: setup on initiailization is prefered and should be enforced.
  • Removed fetchServer: server data is known at startup.
  • Build to commonJS (target es2017) uniformaly, no split between CJS and ESM.

Features

  • Automatically reconnect when your amqplib broker dies in a fire.
  • Round-robin connections between multiple brokers in a cluster.
  • If messages are sent while the broker is unavailable, queues messages in memory until we reconnect.
  • Supports only promisses
  • Very un-opinionated library - a thin wrapper around amqplib.

Installation

npm install --save amqplib @hcon/rabbit-fire

Basics

The basic idea here is that, usually, when you create a new channel, you do some setup work at the beginning (like asserting that various queues or exchanges exist, or binding to queues), and then you send and receive messages and you never touch that stuff again.

amqp-simple-connection-manager will reconnect to a new broker whenever the broker it is currently connected to dies. When you ask amqp-simple-connection-manager for a channel, you specify one or more setup functions to run; the setup functions will be run every time amqp-simple-connection-manager reconnects, to make sure your channel and broker are in a sane state.

Before we get into an example, note this example is written using Promises, however much like amqplib, any function which returns a Promise will also accept a callback as an optional parameter.

Here's the example:

import * as amqp from '@hcon/rabbit-fire';

// Create a new connection manager
const connection = amqp.connect(['amqp://localhost']);

// Ask the connection manager for a ChannelWrapper.  Specify a setup function to
// run every time we reconnect to the broker.
const channelWrapper = connection.createChannel({
  name: 'chann',
  async setup(channel: amqp.Channel) {
    await channel.assertExchange('chann.ex', 'direct', {
      durable: true,
    });
  },
});

// Send some messages to the queue.  If we're not currently connected, these will be queued up in memory
// until we connect.  Note that `publish()` return a Promise which is fulfilled or rejected
// without confirm from broker.
channelWrapper
  .publish('chann.ex', Buffer.from('hello world'))
  .then(function () {
    return console.log('Message was sent!  Hooray!');
  })
  .catch(function (err) {
    return console.log('Message was rejected...  Boo!');
  });

API

connect(urls, options)

Creates a new AmqpConnectionManager, which will connect to one of the URLs provided in urls. If a broker is unreachable or dies, then AmqpConnectionManager will try the next available broker, round-robin.

Options:

  • options.heartbeatIntervalInSeconds - Interval to send heartbeats to broker. Defaults to 5 seconds.
  • options.reconnectTimeInSeconds - The time to wait before trying to reconnect. If not specified, defaults to heartbeatIntervalInSeconds.
  • options.connectionOptions is passed as options to the amqplib connect method.

AmqpConnectionManager events

  • connect({connection, url}) - Emitted whenever we successfully connect to a broker.
  • connectFailed({err, url}) - Emitted whenever we attempt to connect to a broker, but fail.
  • disconnect({err}) - Emitted whenever we disconnect from a broker.
  • blocked({reason}) - Emitted whenever a connection is blocked by a broker
  • unblocked - Emitted whenever a connection is unblocked by a broker

AmqpConnectionManager#createChannel(options)

Create a new ChannelWrapper. This is a proxy for the actual channel (which may or may not exist at any moment, depending on whether or not we are currently connected.)

Options:

  • options.name - Name for this channel. Used for debugging.
  • options.setup(channel, [cb]) - A function to call whenever we reconnect to the broker (and therefore create a new underlying channel.) This function should either accept a callback, or return a Promise. See addSetup below. Note that this inside the setup function will the returned ChannelWrapper. The ChannelWrapper has a special context member you can use to store arbitrary data in.

AmqpConnectionManager#isConnected()

Returns true if the AmqpConnectionManager is connected to a broker, false otherwise.

AmqpConnectionManager#close()

Close this AmqpConnectionManager and free all associated resources.

ChannelWrapper events

  • connect - emitted every time this channel connects or reconnects.
  • error(err, {name}) - emitted if an error occurs setting up the channel.
  • close - emitted when this channel closes via a call to close()

ChannelWrapper#publish

Work exactly like the counterparts in amqplib's Channel, except that they return a Promise (or accept a callback) which resolves when the message is confirmed to have been delivered to the broker. The promise rejects if there is an unexpected error or if close() is called on the ChannelWrapper before the message can be delivered.

ChannelWrapper#ack and ChannelWrapper#nack

These are just aliases for calling ack() and nack() on the underlying channel. They do nothing if the underlying channel is not connected.

ChannelWrapper#queueLength()

Returns a count of messages currently waiting to be sent to the underlying channel.

ChannelWrapper#close()

Close a channel, clean up resources associated with it.