npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@heavybit/hb-queue-consumer-package

v1.0.1

Published

Shared queue consumer package that allows microservices to consume messages in Rabbit MQ queues

Downloads

124

Readme

Introduction

The RabbitMQConsumer is a lightweight Node.js class that simplifies consuming messages from RabbitMQ queues, providing a reliable mechanism for handling message retries, dead-lettering, and graceful connection management. The class abstracts the complexity of RabbitMQ setup, connection management, and message consumption, allowing developers to focus on processing business logic.

Features

  • Persistent Connection: Automatically establishes and maintains a persistent RabbitMQ connection, reconnecting if the connection drops.
  • Retry Logic with Exponential Backoff: Automatically retries failed messages with exponential backoff up to a configurable number of retries.
  • Dead-Letter Queue: Supports dead-lettering for messages that have exceeded the retry limit.
  • Graceful Connection Handling: Manages RabbitMQ connections and channels, including graceful shutdown and error recovery.
  • Durable Queues: Ensures queues are durable, making the messages persistent across RabbitMQ restarts.

Technologies Used

  • Node.js: The core runtime environment for building the package.
  • RabbitMQ: A message broker for sending messages between microservices.
  • amqplib: A Node.js library for working with RabbitMQ.

Installation

You can install the package via npm by running the following command:

npm install @heavybit/hb-queue-consumer-package

Requirements

  • Node.js version 12 or higher.
  • RabbitMQ installed locally or accessible via a cloud instance.

Usage

Here’s how to integrate the RabbitMQConsumer package into your project:

  1. Import the Package

To start using RabbitMQConsumer, import it into your project:

const RabbitMQConsumer = require('@heavybit/hb-queue-consumer-package');

  1. Initialize the Consumer

Create an instance of RabbitMQConsumer with required options like queue name, dead-letter queue, maximum retries, and retry backoff time:

const consumer = new RabbitMQConsumer({
  queue: 'yourQueue',
  deadLetterQueue: 'yourDeadLetterQueue',
  maxRetries: 5, // optional, default is 3
  retryBackoff: 2000 // optional, default is 1000ms
});
  1. Start Consuming Messages Start consuming messages from the RabbitMQ queue by providing a callback function that processes each message.
(async () => {
  try {
    await consumer.consume(async (message) => {
      // Business logic to process the message
      console.log('Message received:', message);
    });
  } catch (error) {
    console.error('Failed to start consuming messages:', error);
  }
})();
  1. Configuring RabbitMQ Connection The consumer automatically connects to RabbitMQ using the environment variables:
  • RABBITMQ_HOST
  • RABBITMQ_PORT
  • RABBITMQ_USER
  • RABBITMQ_PASSWORD

Here's an example .env file configuration:

# Example environment variables
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest

Handling Message Retries and Dead-Lettering

If a message fails to be processed, it will be retried according to the retry configuration with exponential backoff. Once the maximum retry attempts are exhausted, the message will be sent to the dead-letter queue.

// Example of message retry logic with dead-lettering
consumer.handleRetry(msg);

Graceful Shutdown

Ensure a clean shutdown by closing the RabbitMQ connection and channel when your application stops:

process.on('SIGINT', async () => {
  console.log('Shutting down consumer...');
  await consumer.close();
  process.exit(0);
});

Example: Integrating into a Microservice

Here's an example of how you can integrate the RabbitMQConsumer into a microservice using Express:

const express = require('express');
const RabbitMQConsumer = require('@yourorg/rabbitmq-consumer');

const app = express();
const consumer = new RabbitMQConsumer({
  queue: 'taskQueue',
  deadLetterQueue: 'taskQueue.deadLetter',
  maxRetries: 5,
  retryBackoff: 2000
});

app.use(express.json());

app.post('/process-task', async (req, res) => {
  try {
    await consumer.consume(async (message) => {
      // Process the message
      console.log('Task processed:', message);
    });
    res.status(200).send('Task processing started');
  } catch (error) {
    console.error('Failed to start task processing:', error);
    res.status(500).send('Error occurred');
  }
});

app.listen(3000, () => {
  console.log('Microservice running on port 3000');
});

API Reference

RabbitMQConsumer

constructor(options)

Creates an instance of RabbitMQConsumer.

  • Options
    • queue: The name of the RabbitMQ queue from which to consume messages.
    • deadLetterQueue: The name of the dead-letter queue for failed messages.
    • maxRetries: (Optional) The maximum number of retry attempts for failed messages. Defaults to 3.
    • retryBackoff: (Optional) The base retry backoff time in milliseconds. Defaults to 1000ms.

async connect()

Establishes a connection to RabbitMQ and creates a channel for message consumption. Automatically called during message consumption.

  • Returns: Promise<void>
  • Throws: Error if the connection fails.

async consume(callback)

Begins consuming messages from the queue, calling the provided callback function to process each message.

  • Parameters:
    • callback: A function that processes received messages. Should return a Promise.
  • Returns: Promise
  • Throws: Error if consumption fails.

async handleRetry(msg)

Handles message retries using exponential backoff. If the retry count exceeds the maximum retries, the message is moved to the dead-letter queue.

  • Parameters:

    • msg: The message object to retry.
  • Returns: Promise<void>


async close() Closes the RabbitMQ connection and channel gracefully.

  • Returns: Promise<void>
  • Throws: Error if closing the connection or channel fails.

RabbitMQConsumer.publishToQueue(queueName, data, retries = 3, initialDelay = 1000)

Publishes a message to the specified RabbitMQ queue with retry logic and exponential backoff.

Parameters:

  • queueName (string): The name of the RabbitMQ queue.
  • data (Object): The message payload to be sent to the queue.
  • retries (number): The maximum number of retry attempts (default is 3).
  • initialDelay (number): The initial delay (in milliseconds) before retrying (default is 1000 ms).

Returns:

  • Promise<void>

Throws:

  • Error if queueName or data is missing.
  • Error after all retry attempts fail.

RabbitMQConsumer.close()

Closes the RabbitMQ channel and connection. This method is automatically called after message publishing is completed. You can also call it manually if needed.

Returns:

  • Promise<void>