npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@plgworks/queue

v1.0.1

Published

Queue helps publish critical events using EventEmitter and RabbitMQ.

Downloads

3

Readme

Queue

npm version

PLG Works Queue helps in publishing and subscribing tasks over RabbitMQ. Internally it uses topic-based exchange.

One use-case is to publish tasks for asynchronous processing. For example, API worker process can publish tasks which will be taken up by asynchronous worker processes which have subscribed for such tasks.

Prerequisites

Install

npm install @plgworks/queue --save

Initialize

configStrategy is passed to initialize PLG Works Queue. configStrategy is an object with rabbitmq as a key. The value of rabbitmq is an object with following keys:

  • username [string] (mandatory) RabbitMQ connection username
  • password [string] (mandatory) RabbitMQ connection password
  • host [string] (mandatory) RabbitMQ host
  • port [string] (mandatory) RabbitMQ port
  • heartbeats [string] (mandatory) heartbeats defines after what period of time the peer TCP connection should be considered unreachable.
  • clusterNodes [Array] (mandatory) - List of RMQ cluster hosts.
  • enableRabbitmq [integer] (optional) 0 if local usage.
  • switchHostAfterSec [integer] (optional) Wait time before switching RMQ host.
  • connectionTimeoutSec [integer] (optional) Wait time for connection to establish.

Following snippet initializes PLG Works Queue Manager:

const Queue = require('@plgworks/queue');

// Config Strategy for PLG Works Queue.
configStrategy = {
	'rabbitmq': {
        'username': 'guest',
        'password': 'guest',
        'host': '127.0.0.1',
        'port': '5672',
        'heartbeats': '30',
        'enableRabbitmq': 1
    }
};

// Create instance
const queueManager = await Queue.getInstance(configStrategy);

queueManager object methods

  • queueManager.subscribeEvent.rabbit(topics, options, readCallback, subscribeCallback) Description: Subscribe to multiple topics over a queue. Parameters:

    • topics [Array] (mandatory) - Array of topics to subscribe to.
    • options [object] (mandatory) Object with following keys:
      • queue [string] (optional) - Name of the queue on which messages with relevant topics will be published. If not passed, a queue with a unique name is created and is deleted when the subscriber gets disconnected.
      • ackRequired [integer] (optional) - The delivered message needs ack if passed 1 ( default 0 ). if 1 passed and ack not done, message will redeliver.
      • broadcastSubscription [integer] (optional) - Set to 1, when queue needs to be subscribed to broadcasting events. Default 0.
      • prefetch [integer] (optional) - The number of messages released from queue in parallel. In case of ackRequired=1, queue will pause unless delivered messages are acknowledged. Default 1.
    • readCallback [function] (mandatory) - Callback method will be invoked whenever there is a new notification.
    • subscribeCallback [function] (optional) - Callback method to get consumerTag.
  • queueManager.publishEvent.perform(params) Description: Publish event to topics. Parameters:

    • params [object] (mandatory) Object with following keys:
      • topics [Array] (mandatory) Array of topics to which message to be publish.
      • broadcast [integer] (optional) Set to 1 for broadcasting. Default 0.
      • publishAfter [integer] (optional) Delay in milli-seconds between publish and being available for consumption. Default 0.
      • publisher [string] (mandatory) Name of publisher
      • message [object] (mandatory) Object with following keys:
        • kind [string] (mandatory) Kind of the message.
        • payload [object] (mandatory) Payload to identify message and extra info.

Examples

Subscribe

Following snippet subscribes to specific topics over a queue.

const Queue = require('@plgworks/queue');

// Config Strategy for PLG Works Queue.
configStrategy = {
	'rabbitmq': {
        'username': 'guest',
        'password': 'guest',
        'host': '127.0.0.1',
        'port': '5672',
        'heartbeats': '30',
        'enableRabbitmq': 1
    }
};

let unAckCount = 0; // Number of unacknowledged messages.

const topics = ["topic.testTopic"];

const options = {
  queue: 'testQueue',
  ackRequired: 1, // When set to 1, all delivered messages MUST get acknowledge.
  broadcastSubscription: 1, // When set to 1, it will subscribe to broadcast channel and receive all broadcast messages. 
  prefetch:10
};

const processMessage = function(msgContent) {
  // Process message code here.
  // msgContent is the message string, which needs to be JSON parsed to get message object.
};

const readCallback = function(msgContent) {
  // Please make sure to return promise in callback function. 
  // On resolving the promise, the message will get acknowledged.
  // On rejecting the promise, the message will be re-queued (noAck)
  return new Promise(async function(onResolve, onReject) {
    // Incrementing unacknowledged message count.
    unAckCount++;

    // Process the message. Following is a 
    response = await processMessage(msgContent);
  
    // Complete the task and in the end of all tasks done
    if(response == success){
      // The message MUST be acknowledged here.
      // To acknowledge the message, call onResolve
      // Decrementing unacknowledged message count.
      unAckCount--;
      onResolve();   
    } else {
      //in case of failure to requeue same message.
      onReject();
    }
  })    
};

const subscription = {}; // object to store the consumer tag
const subscribeCallback = function(consumerTag) {
  subscription.consumerTag = consumerTag;
};

const subscribe = async function() {
  const queueManager = await Queue.getInstance(configStrategy);
  queueManager.subscribeEvent.rabbit(
    topics, // List of topics
    options,
    readCallback,
    subscribeCallback
   );
};

// Gracefully handle SIGINT, SIGTERM signals.
// Once SIGINT/SIGTERM signal is received, programme will stop consuming new messages. 
// But, the current process MUST handle unacknowledged queued messages.
process.on('SIGINT', function () {
  // Stop the consumption of messages
  process.emit('CANCEL_CONSUME', subscription.consumerTag);

  console.log('Received SIGINT, checking unAckCount.');
  const f = function(){
    if (unAckCount === 0) {
      process.exit(1);
    } else {
      console.log('waiting for open tasks to be done.');
      setTimeout(f, 1000);
    }
  };
  // Wait for open tasks to be done.
  setTimeout(f, 1000);
});

function rmqError(err) {
  console.log('rmqError occured.', err);
  process.emit('SIGINT');
}
// Event published from package in case of internal error.
process.on('rmq_error', rmqError);

subscribe();

Publish

Following snippet publishes a task for specific topics.

// Config Strategy for PLG Works Queue.
configStrategy = {
	'rabbitmq': {
        'username': 'guest',
        'password': 'guest',
        'host': '127.0.0.1',
        'port': '5672',
        'heartbeats': '30',
        'enableRabbitmq': 1
    }
};

const topics = ["topic.testTopic"];

const message = {
   kind: 'testMessageKind',
   payload: {
      // Custom payload for message
   }
};

// Import the Queue module.
const Queue = require('@plgworks/queue');
const publish = async function() {
  const queueManager = await Queue.getInstance(configStrategy);
  queueManager.publishEvent.perform(
    {
      topics: topics,
      publisher: 'MyPublisher',
      publishAfter: 30*1000, // delay in milli-seconds
      message: message
    });
};

publish();

Publish with delay

In some use cases, it is required to process certain task with a delay. For example, after one hour of user sign-up, we need to send an email. Such tasks can be published by using the publishAfter parameter. Internally, we use dead letter exchange for achieving this functionality.

Important Note: Do not use arbitrary values of delays. Internally, the message is stored in a delay specific queue for the waiting duration. As the number of allowed delays increases, so do the number of waiting queues. Having too many queues, can hamper RabbitMQ performance.

// Config Strategy for PLG Works Queue.
configStrategy = {
	'rabbitmq': {
        'username': 'guest',
        'password': 'guest',
        'host': '127.0.0.1',
        'port': '5672',
        'heartbeats': '30',
        'enableRabbitmq': 1
    }
};

const topics = ["topic.testTopic"];

const message = {
   kind: 'testMessageKind',
   payload: {
      // Custom payload for message
   }
};

// Import the Queue module.
const Queue = require('@plgworks/queue');
const publish = async function() {
  const queueManager = await Queue.getInstance(configStrategy);
  queueManager.publishEvent.perform(
    {
      topics: topics,
      publisher: 'MyPublisher',
      message: message
    });
};

publish();

Cancel and Resume message consumption

As seen in the subscribe snippet, cancelling consumption is the first step in SIGINT handling. For cancelling the consumption, consumerTag is needed, which is obtained in subscribeCallback. See subscribe snippet above for more details.

For cancelling the consumption, emit CANCEL_CONSUME event with consumerTag info.

process.emit('CANCEL_CONSUME', consumerTag);

For resuming the consumption, emit RESUME_CONSUME event with consumerTag info.

process.emit('RESUME_CONSUME', consumerTag);

Running test cases

Run following command to execute test cases.

./node_modules/.bin/mocha --recursive "./test/**/*.js"