npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

bqueue

v0.0.22

Published

Reliable, distributed batch message processing using Redis

Downloads

178

Readme

BQueue

A simple library for reliable, distributed batch message processing using Redis. Distributes the queue across multiple Redis instances in a cluster for virtually limitless scaling.

Requirements

An ioredis (or ioredis API compatible) instance must be passed to BQueue on initialization.

Install

npm install bqueue

Initialize

For distributing the queue across multiple Redis instances, use a queueCount greater than 1.

const ioredis = require('ioredis');
const bqueue = require('bqueue');
const queueName = 'queue';
const queueCount = 10;
let redisClient = new ioredis();
let queue = new bqueue(redisClient, queueName, queueCount);

Add a message to the queue

A message can be anything: a string, number, function, object, etc. A message is randomly assigned to a single queue.

const message = 'Test Message';
try {
  const response = await queue.pushMessage(message);
  console.log('Message ID: ' + response.id);
} catch (err) {
  console.error(err);
}

Grab and process a batch of messages from the queue

Randomly selects a queue and grabs up to maxBatchSize messages from it. Redis blocks during the reading of all of the messages in the batch so limit maxBatchSize to the smallest amount needed for your batch operation. Don't forget to remove the messages from the queue after processing has completed. processingTimeout is the number of milliseconds expected for processing. If the messages have not been removed after this time, they will eventually be reinserted to the queue for future processing. Due to the distributed nature of the queue, this function can only return items on one particular queue. It should be called frequently to ensure quick processing of messages across all of the queues.

const maxBatchSize = 1000;
const processingTimeout = 300000;
try {
  const batch = await queue.getBatch(maxBatchSize, processingTimeout);
  batch.messages.forEach(message => {
    console.log('Message ID: ' + message.id);
    console.log('Message Contents: ' + message.message);
  });
  await batch.remove();
} catch (err) {
  console.error(err);
}

Reinsert unprocessed messages into the queue

Randomly selects a queue and reinserts unprocessed messages back into it for future processing. Due to the distributed nature of the queue, this function should be called frequently to ensure failed messages are retried.

const maxMessages = 1000;
try {
  const response = await queue.reinsertUnprocessed(maxMessages);
  console.log('Unprocessed Message IDs: ' + response.ids.join(','));
} catch (err) {
  console.error(err);
}