npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

redisrqs

v2.0.0

Published

Reliable queuing system using Redis as the message store.

Downloads

2

Readme

Redis RQS (Reliable Queueing System)

Introduction

A Node.js module designed to build a very fast queuing system that guarantees at least once and at most once delivery. To achieve this objective, RedisRQS uses Redis as the data store and supports a 1:1 Publisher to Subscriber relationship. When a message arrives, RedisRQS guarantees that only one Subscriber will ever receive a message for processing regardless of the number of Subscribers listening for the same topic in the queue. This should allow a system to scale out the number of processors based on work load.

The way this system handles queuing and dequeuing messages is via Lua scripts. This provides an additional guarantee that each method call to RedisRQS will be a single network call to Redis. Furthermore, RedisRQS also guarantees that messages will never be lost due to a Subscriber unable to finish processing or crashing while processing a message. The queue sweeper will periodically put messages back in to the queue for processing when it runs. You can change the frequency of the sweep in the options.

It is important to note that at this time, there is no support for poison messages. If a message is incomplete or will always crash a Subscriber, the message (poison) will be put back in the pending queue for processing by another Subscriber. This is on the roadmap and will be made available at a future date. As all things open source, your contributions are welcome.

When using this node_module, three lists will be created:

  • Pending: named redisrqs:pending
  • Working: named redisrqs:working
  • Values: named redisrqs:values

NOTE: All the publicly exposed methods return a Promise object.

Install

npm install redisrqs --save

Quick start

Presuming that you have an Express application, let's setup a route that will send a message and a listener that will process the message.

var options = {
    redis: { // ioredis options
        port: 6379,
        host: localhost,
        family: 4,
        db: 15
    },
    redisrqs: {
        sweepInterval: 60000    // sweep every 1 minute
    }
};
var RedisRQS = require("redisrqs")
var queue = new RedisRQS(options)

// Add a subscriber for the topic named 'emailWelcomeLetter'
queue.on('emailWelcomeLetter', function(result) {
  var payload, emailBody;
  payload = JSON.parse(result.message);
  emailBody = createWelcomeEmail(payload.email, payload.fullName);
  sendEmail(payload.email, payload.fullName, emailBody);

  // Release the message so we don't process it again.
  queue.release(result.uuid);
});

// Publish the event 'emailWelcomeLetter' to the queue and return
app.post('/register', function(req, res) {}) {
  var email, fullName, message;
  email = req.body.email;
  fullName = req.body.fullName;
  message = {'email': email, 'full_name': fullName};
  queue.enqueue('emailWelcomeLetter', JSON.stringify(message));
  res.redirect('/thank-you');
});

Dependencies

RedisRQS uses ioredis as the Redis client library. Any valid options that ioredis recognizes can be passed in with the redis options key.

Internal Redis Lists

Pending

When a message is queued it is put in this list. This list will only contain the UUID of the message. The actual message value lives in the Values list.

Working

This is where the currently processed message(s) will be stored.

Values

This is where the actual message will live. Each message is identified by a UUID. When a message is de-queued, only the UUID is moved from the Pending list to the Working list.

Events Emitted

In addition to emitting the topic as an event, it also emits the following events:

  • redisrqs:enqueue when a message is queued
  • redisrqs:dequeue when a message is dequeued
  • redisrqs:release when a message is released from the queue
  • redisrqs:requeue when a message is requeued for processing
  • redisrqs:sweep when the sweeper is run

redisrqs:sweep

If a worker processing a message dies while processing, reset the message by placing it back in the pending queue. This is automatically configured when a RedisRQS object is created. The frequency of the sweeps can be controlled using the sweepInterval setting of the redisrqs key in options.

Methods

enqueue

Queue the message up.

enqueue(topic, message);

Example:

var obj = {
    "data": "My data"
};
queue.enqueue('MyTopic', JSON.stringify(obj));

release

Remove the message from the queue.

release(uuid);

Example:

queue.release('3e232cd5-0843-4429-93f0-722b7171263d');

requeue

Requeue the message for processing once more.

requeue(uuid);

Example:

queue.requeue('ef50c241-14e6-4349-ba29-be470b94d41d');

drainQueues

Clear the queue entirely. This is a destructive operation.

drainQueues();

Example:

queue.drainQueues();

getSweepInterval

Get the interval configured for the sweep.

queue.getSweepInterval().then(function(result) {
  console.log('The sweep interval is set to: ' + result);
});

getWorkingQueueSize

Get the number of items in the work queue.

queue.getWorkingQueueSize().then(function(result) {
  console.log('The work queue size is: ' + result);
});

getPendingQueueSize

Get the number of items in the pending queue.

queue.getPendingQueueSize().then(function(result) {
  console.log('The pending queue size is: ' + result);
});

getValuesQueueSize

Get the number of items in the values queue.

queue.getValuesQueueSize().then(function(result) {
  console.log('The values queue size is: ' + result);
});

History

  • 1.0.0: Initial public release.