npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@adwerx/rabbitmq-worker

v1.0.1

Published

This library provides a high-level API for consuming messages from a rabbitmq exchange with sidekiq-like behavior.

Downloads

2

Readme

rabbitmq-worker

This library provides a high-level API for consuming messages from a rabbitmq exchange with sidekiq-like behavior.

Installation

npm install -S @adwerx/rabbitmq-worker amqplib

Quickstart

Create a worker instance and use the configuration methods to indicate the exchange to bind to, a perform function to process messages, an error handler, and provide a connection factory function.

Call start with an AbortSignal to start consuming messages and use the abort signal to stop the worker.

import Worker from "rabbitmq-worker";
import amqplib from "amqplib";

Worker.toConnect(() => amqplib.connect(process.env.AMQP_URL));

const controller = new AbortController();
process.once("SIGINT", () => controller.abort());

await new Worker()
  .from("events")
  .perform(async (data, { fields, properties }) => {
    // ... do work
    // data is a *parsed* message
  })
  .on("error", console.error(err))
  .start(controller.signal);

Configuration

.from(exchangeName: string)

A worker must be configured with an exchange to source messages from. Use worker.from(exchangeName) to set the exchange that the worker's queue will be bound to.

.bind(queueName: string, routingKey = "#", args: StringIndexed = {})

By default, starting a worker asserts a queue specifically for this worker. The default queue name is the same as the exchange name. You can indicate the queue name to be used if desired by calling worker.bind(queueName).

.perform(fn)

A worker must be given a perform function which will be called for each message retrieved. Use worker.perform(fn) to provide a perform function. The perform function can be sync or async.

.toConnect(connectionFactory: ConnectionFactoryFunction)

A worker must have an amqp connection factory function. You can provide this globally by setting a connection factory function to the Worker.connect static property. It is also possible to provide the connection factory to a single worker instance with worker.toConnect(factoryFunc). The factory function must return (or resolve with) an amqplib connection instance. This function will be called any time the worker needs to reconnect to the server.

.parse(contentType: ParserFunction | "json" | "none")

All messages are assumed to be JSON data and will be parsed prior to being passed as an argument to your perform function. You can override this behavior by using .parse("none") to disable parsing or by providing your own parsing function with .parse((buffer: Buffer) => any).

.with(options: PartialWorkerOptions = {})

You can configure other worker behavior at the class level or instance level. For global default options, use Worker.defaults = { ... } and the provided settings will be defaults for all instances. For options specific to one instance, use worker.with({ ... }) to set options for only that instance.

WorkerOptions is a type that looks like this:

interface WorkerOptions {
  tag?: string;                  # the consumerTag the worker will use when connecting
  concurrency: number;           # how many jobs may be in progress at any given moment
  retries: number;               # how many times a job may retry before dying
  parser: ParserFunction;        # a function that is used to parse a message before perform
  deadJobRetensionMs: number;    # how long should dead jobs remain in the dead queue
}

Design

This library prioritizes a simple convention over configuration. A worker will assert the exchange it intends to bind a queue to, a work queue to hold its messages, a binding between these two, a retry queue to hold messages awaiting retry, a dead queue to hold messages that have exhausted retries, a requeue exchange, and a binding between the requeue exchange and the work queue to automatically requeue messages that have waited their retry period.

Example 1: Default behavior

await new Worker()
  .from("events")
  .perform(() => {})
  .start(signal);

The worker defined above will create a work queue named events. Since no queue name was explicitly provided, a queue name of events is assumed. A queue to contain retryable messages is created named events.retry. A queue to hold dead messages is created named events.dead. An events.requeue exchange is created in order to re-queue messages that wait their retry period. The events exchange will be bound to the events work queue. The events.requeue exchange will be bound to the events work queue.

Example 2: Specified queue name

await new Worker()
  .from("orders")
  .bind("provisioning")
  .perform(() => {})
  .start(signal);

The worker defined above will create a work queue named provisioning. A queue to contain retryable messages is created named provisioning.retry. A queue to hold dead messages is created named provisioning.dead. An provisioning.requeue exchange is created in order to re-queue messages that wait their retry period. The orders exchange will be bound to the provisioning work queue. The provisioning.requeue exchange will be bound to the provisioning work queue.

How retries work

Jobs retry a default of 25 times with an exponential backoff. When all retries are exhausted, the job is sent to the dead queue.

When a message is delivered to the worker, and the perform function is successful, the message is acknowledged.

When a message is delivered to the worker, and an unexpected error occurs, and the message is unable to be sent to the retry or dead queue, the message is not acknowledged.

When a message is delivered to the worker, and an unexpected error occurs, and the message is sent to the retry or dead queue, the message is acknowledged.

When a message is sent to the retry queue, the message will have an expiration set.

When a message in the retry queue expires, the message is dead-lettered to the requeue exchange.

When a message's deaths meet or exceed the retry limit, the message is sent to the dead queue with an expiration.

When a message in the dead queue expires, the message is discarded.

Contributing

Run tests with: npm test

Run build with: npm run build