npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

carrotstiqs

v1.0.0

Published

A declarative API for asynchronous messaging with RabbitMQ

Downloads

17

Readme

CircleCI dependencies Status NPM version Coverage Status Known Vulnerabilities FOSSA Status

CarrotStiqs

A declarative API for asynchronous messaging with RabbitMQ.

$ npm i carrotstiqs

Highlights

  • Two message types:
    • Commands: Each message is processed successfully (at least) once and is load balanced across consumers.
    • Events: Each message is processed successfully (at least) once per consumer group and is load balanced across consumers.
  • Consumer Groups: Declare which events and commands they will consume. Any group can send any event or command. If multiple groups subscribe to an event A, each consumer group will receive all future "A" events.
  • Shared topology for all clients. This protects against message loss in the case that one client is publishing before other clients have asserted their queues.
  • Discover common issues quickly during development:
    • Misspellings of consumer group name, events, and commands
    • Forgetting to consume messages for a registered command/event
  • Reliably delivers message regardless of client crashes, AMQP broker crashes, or network outages

Assumptions

  • Fault-tolerance/resilience over throughput
  • Prefer "at least once delivery"
  • Prefer publish confirmation
  • Prefer ack mode on consumers
  • Using RabbitMQ (Other implementations of AMQP may work, but are not supported)
  • Official support is limited to Node 10+

API Walkthrough

For more detailed information about types or behaviors please review the types in the source code or the tests, respectively.

Topology The topology object has a key with the name of each consumer group, and each group declares the events and commands it will be consuming.

const topology = {
  email: {
    events: ['userInfoChanged'],
    commands: [],
  },
  user: {
    events: [],
    commands: ['changeContactInfo'],
  },
};

Client The library's main export is a function that receives the topology and connection urls. The client it returns has methods for sending and consuming messages.

Creating a client may synchronously throw validation errors.

const createCarrotStiqsClient = require('./carrotstiqs');
const client = createCarrotStiqsClient({
  topology,
  connectionUrls: ["amqp://guest:guest@localhost:5672"],
});

Consume messages Prefetch count and handlers are set when initializing consumers.

Handlers receive a string representation of the message, the original amqplib msg object, and functions to ack, nack w/ requeue, or nack w/o requeue. The first invocation of any of these functions will take effect and all future invocations are noops. This avoids bringing down a channel by acking twice, or both acking and nacking.

A considerable amount of validation happens when declaring consumers in order to detect problems such as missing handlers or typos.

client.initializeConsumerGroup('email', {
  events: {
    userInfoChanged: {
      prefetch: 1,
      handler: async ({ message, messageObject, acknowledgeMessage, discardMessage, retryMessage }) => {
        try {
          const data = JSON.parse(message);

          // Send email
          await sendUserInfoChangedEmail(data);

          acknowledgeMessage();
        } catch (e) {
          // Discard message if JSON parsing failed or it has already been delivered once
          if (e instanceof SyntaxError || messageObject.fields.redelivered) {
            discardMessage();
          }

          // Try once more
          retryMessage();
        }
      }
    }
  }
});

Send messages Both sendCommand and sendEvent return promises that resolve when the message was confirmed by RabbitMQ, or reject if the message could not be delivered successfully.

If a message was sent while there was no connection to RabbitMQ or before topology was fully asserted, they will be queued in memory.

client.sendCommand('changeContactInfo', JSON.stringify({ fakeData: true }));
client.sendEvent('userInfoChanged', JSON.stringify({ fakeData: true }));

Close client The close method closes all channels and connections. It will resolve when everything has been shut down.

client.close();

Best practices

Each process can have multiple clients, though this probably isn't optimal. If you desire more concurrency it probably would be better to tweak the prefetch number instead.

If prefetch is greater than one or there is more than one consumer for a command or grouped event, message processing guarantees go out the window. Because of the ordering characteristic and the library's at-least-once delivery preference, it is recommended to make your handlers safe for duplicate and out-of-order deliveries.

Performance

Some benchmarks have been written for this library. They can be run with npm run benchmark, or you can view past results from a local test here

Acknowledgements

This library was inspired in part by rabbot and moleculer. Originally authored by Nathan Schwartz and Taylor King as part of their work on the Merlin Platform.

Enhancement ideas

  • Check for special RabbitMQ characters in event and command names
  • Allow full connection objects (socketOptions) to be passed to amqp-connection-manager
  • Identify and manage slow connsumers
  • Streamline testing approach and write more integration tests
  • Delete queues and exchanges between test suites instead of running a temp client
  • Version exchanges and queues
  • Introduce "strict mode", which defaults to true. Opting out would bypass most validations
  • Ack batching like rabbot