npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

ddq

v2.0.2

Published

DeDuplicated Queue

Downloads

2

Readme

DeDuplicated Queue (DDQ)

Build Status Dependencies Dev Dependencies codecov.io

About

This module was created because of a need for a system which acts like a queue for tasks but doesn't allow for duplication. DDQ uses an async methodology to complete certain tasks, sending and receiving messages, then indicating the message has been fully processed. We used callbacks instead of Promises to make the process faster and easier to integrate with other libraries.

DDQ also extends the Event Emitter module so it's very easy to use events for when there is a message or an error has occured and inform the software of these events.

DDQ is split into this core library and a backend. The core exposes the public interface, initializes the backend, then encapsulates some of the logic (such as message heartbeats) so the backends have less duplicated code. The backend is responsible for implementation-level details, such as how to store and retrieve messages and the queue cleanup jobs. The backend is specific to a data source, such as MySQL. Also, when a message is emitted to be processed, the backend should lock the message so no other instance may pick it up to be processed simultaneously.

Setting Up

Setting up DDQ is pretty straight forward. It does require some configuration to get everything working together. Your project would need to include DDQ in it's package.json and also include a backend module or write one yourself. In the config value for backend this will set what module you're using. In the code this looks for ddq-backend-whatYouHaveInConfigValueForBackend. This should also be in your node_modules/ directory, so it's easily found.

Another config value is the heartbeatDelayMs. DDQ uses a method on the wrapped message, which we'll get to later. A heartbeat routine is called every so often to update the task/job in the queue currently being processed. The heartbeatDelayMs is configured in milliseconds, so a value of "1000" would have the heartbeat execute every second.

Also, servers might not be able to handle a certain number of processes, or you might only want to handle a few at a time, so setting the maxProcessingMessages to a number will make it so only up to that number of processes are created. DDQ will automatically tell the backend to pause the polling when this limit is reached. It will resume polling once the number of processing messages is lower than the max.

Other config values would be more specific for what your backend needs, like table name, how often to poll the storage mechanism and so on. These are passed to the backend using the backendConfig values.

var DeDuplicatedQueue, instance;

DeDuplicatedQueue = require("ddq");

/* Pass in the config, including which backend to use and DDQ will find it
 * and use it.
 */
instance = new DeDuplicatedQueue({
    backend: "mock",
    backendConfig: {
        host: "localhost",
        password: "someReallyNiceLongSecurePassword",
        pollingDelayMs: 5000,
        port: 3306,
        table: "query",
        user: "hopefullyNotRoot"
    },
    heartbeatDelayMs: 1000,
    maxProcessingMessages: 10
});

Sending a Message

To send a message into the queue, use instance.sendMessage(). You pass in the message and an optional callback which be executed once the message has been sent. The message can be anything you want as the backend will be the one to process it and put it into its storage mechanism.

instance.sendMessage("sample message", (err) => {
    if (err) {
        // Take action if there is an error.
    }
});

Listening

In order to receive messages from the queue, call instance.listen() and the instance will start to emit messages as they are found. DDQ will send two types of events: data and error. When data is emitted, you'll receive a message from the queue and a callback from DDQ. Once the process is complete you'll need to call the callback with an argument whether there was an error when processing.

When the data event is triggered from the backend, DDQ will use methods on the data received from the backend. These methods include: heartbeat, requeue, and remove. The only piece of information the software running DDQ will receive is the message and the callback from DDQ in order to say whether the processing of the message was successful or not. When the message is being processed, DDQ will call the heartbeat method on the wrappedMessage using heartbeatDelayMs from the config to tell the backend to update the heartbeat at that interval.

// Starts listening for events.
instance.listen();

/* DDQ uses event emitters and you'll need to listen for them so your
 * applications can take actions.
 */
instance.on("data", (message, callback) => {
    // Take action on the data.
    if (somethingBadHappened) {

        // Requeued for later processing.
        callback(new Error("Something bad happened"));
    } else {

        // Removed as message is no longer needed.
        callback();
    }
});

The other event DDQ will emit is error. This alerts the code listening there was an error either coming from DDQ or passed back from the backend.

instance.on("error", () => {
    // Take action on the error.
});

Pausing and Resuming Polling

At some point your consumer might want to pause the polling of messages so it can do a task without having messages coming at it. You can do this by simply calling the DDQ method instance.pausePolling(). This doesn't use a callback, for if there is an error it should be emitted and picked up by the listener.

// Tells the backend to stop polling.
instance.pausePolling();

Once you feel you should resume polling you can call the instance.resumePolling() method. We don't want to resume polling if the limit has been reached. This also doesn't use a callback, for if there is an error it should be emitted and picked up by the listener.

// Tells the backend to resume polling.
instance.resumePolling();

Closing

At some time you'll want to stop listening to DDQ which you will call close. This will call the backend to close its connections, stop polling for messages and stop emitting messages..

// Stop listening to events and close the connection to the database.
instance.close((err) => {
    if (err) {
        // Take action if there is an error.
    }
});