npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

worqr

v1.2.3

Published

A distributed, reliable, atomic, work queueing system that only requires redis as the backend.

Downloads

9

Readme

NPM

node

Worqr

A distributed, reliable, atomic, work queueing system that only requires redis as the backend.

Attributes:

  • Ensure a task is performed exactly once.
  • Only Redis is required as a back-end server. No server components to install. Operates purely redis-based, peer-supervised in your application.
  • Every transaction is fully atomic (queue never left in inconsistent state).
  • When workers fail, all tasks are put back into the queue at the front.
  • Queue doesn't keep any item history.
  • As a work queue, each item is only delivered to exactly one worker.
  • Work items are not enveloped, so there is no encoding/decoding overhead or chance for the format to change in future releases.
  • Uses pub/sub for instantaneous new work notifications(no spin locks or blocking calls).
  • When a worker fails, all in-process work is automatically rescheduled to other workers.
  • Worqr is written in TypeScript, so you get automatic typings included.

Is Worqr for me?

  • You need to distribute a task workload
  • You are already using Redis in your application
  • You don't want to run a queue service or pay per-transaction fees

If the answer to all three of those questions is yes, then worqr is for you.

Theory of Operation

The primary unit of work to be done is a Task. Tasks are organized into Queues. Queues are serviced by Workers. When a worker takes a task, the task is assigned to that worker's in-process list. If a worker fails to keep it's active flag from expiring, all its in-process tasks are returned to the work queue.

Worqr can handle one-time tasks as well as persistent tasks. A persistent task is a task that never completes. For example, if your worker needs to keep a set of open connections to other applications, each of those connections can be managed with a task so they are opened exactly once by exactly one worker.

Terminology

Task - something a worker consumes.

Process - a task that is being worked on.

Worker - a distributed agent that consumes tasks.

Queue - a first in, first out list of Tasks.

Getting started

Initializing Worqr

let worqr = new Worqr({ host: <domain/ip>, port: <port #>, password: <pwd>}, {redisKeyPrefix: <unique namespace> });

NOTE: Worqr will open two connections to Redis. One for data and one for subscriptions.

You can also supply an existing data and/or subscription connection to prevent Worqr from opening it's own.

let worqr = new Worqr({ data: <RedisClient instance>, subscribe: <RedisClient instance> });

NOTE: These must be separate Redis connections if supplying both.

Queing work

worqr.enqueue('<queue name>', '<task>');

NOTE: Tasks are strings. Your code is responsible for serializing any complex objects.

Stopping work

worqr.cancelTasks('<queue name>', '<task>');

NOTE: Worqr will cancel all instances of the specified task if more than one exists. Task cancellation must be supported by the worker process as well if the task is already in-process.

Getting setup to do work

If this is a worker process (a process that will perform work), you must start the worker before you can accept work from a queue. Do not do this in a process that only queues tasks.

worqr.startWorker();

startWorker method returns a promise, so it also supports await:

await worqr.startWorker();

NOTE: Once a worker has started, it will periodically refresh a redis key. Make sure your event loop doesn't get CPU starved or your worker will be failed and its work will be recycled back into the queue. If your event loop runs too long, you can use setImmediate() to break your long running process into smaller sections.

Taking work from a queue

// Add event handler for queue name first, so we don't miss any work events
worqr.on('<queue name>', (type: string, message: string) => {
    switch (type) {
        case 'work':
            // do work
            // message: the number of new tasks
            doWork();
            ...
            break;
        case 'cancel':
            // stop work
            // message: task to cancel 
            ...
            break;
        case 'delete':
            // work queue was deleted, no action necessarily needed
            // message: empty string
            ...
            break;
        }       
});

// Ask for work from the specified queue
await worqr.startWork('<queue name>');

Work processing function

Once a task is started, it's called a "process". The process has a processId that must be used to uniquely finish or fail that task instance. This is because a task payload is not necessarily unique.

async function doWork() {
    const process = await worqr.dequeue('<queue name>');
    if (process) {
        // perform the task `process.task`
        ...
        // complete the task
        await worqr.finishProcess(process.id);
        // setImmediate gives the event loop a chance to perform other tasks first
        setImmediate(() => {
            doWork();
        });
    }
}

Stop working on a task/Fail a task

If something goes wrong while processing a task, you can fail the task to send it back to the queue.

await worqr.stopProcess('<processId>');

NOTE: Worqr does not track the historical state of a task. If it fails for some reason that will always fail like invalid input, the task should be marked as finished and that error should be handled in your code.

Fall all tasks

If your process is shutting down and you want to return all work to the queue, you can fail your worker

worqr.failWorker();

Additional command reference

Get all queue names

function getQueues(): Promise<string[]>

Get all current workers

public getWorkers(): Promise<string[]>

Get queues that are being worked on by a particular worker

Omitting the workerId will use the current workerId

private getWorkingQueues(workerId?: string): Promise<string[]>

What's actually happening?

Enqueue

Add a task to a queue.

In the diagram, the queue name is Q1 and the task payload is T1.

enqueue (T1, Q1)

        Q1 : List
        +----------------------+
        | +----+ +----+        |
(1)     | |   *| |    |        |
T1 +--> | |T1  | |T0  |        |
        | +----+ +----+        |
        +----------------------+
         head              tail
 1) LPUSH Q1 T1
 2) PUBLISH Q1 1

Start Task

Move a task from a "to do" list to a "doing" list.

In the diagram, the worker is W1 and the process is P1.

startTask (Q1, W1, P1)

        Q1 : List                         P1 : List           W1_Q1 : Set
        +----------------------+          +--------+          +---------------------+
        | +----+ +----+        |          | +----+ |          | +----+ +----+       |
        | |    | |   X|        | (1)      | |   *| |  (2)     | |   *| |    |       |
        | |T1  | |T0  |        | -->T0--> | |T0  | |  P1 +--> | |P1  | |P0  |       |
        | +----+ +----+        |          | +----+ |          | +----+ +----+       |
        +----------------------+          +--------+          +---------------------+
         head              tail
 1) RPOPLPUSH Q1 P1
 2) SADD W1_Q1 P1

Stop Task

Removes a task from a worker's personal "to do" list.

stopTask (P1)

        Q1 : List                         P1 : List           W1_Q1 : Set
        +----------------------+          +--------+          +---------------------+
        | +----+ +----+        |          | +----+ |          | +----+ +----+       |
        | |    | |   *|        | (1)      | |   X| |      (2) | |   X| |    |       |
        | |T1  | |T0  |        | <--T0<-- | |T0  | |          | |P1  | |P0  |       |
        | +----+ +----+        |          | +----+ |          | +----+ +----+       |
        +----------------------+          +--------+          +---------------------+
         head              tail
 1) RPOPLPUSH P1 Q1
 2) SREM W1_Q1 P1

Finish Task

Moves a task from a worker's list into a top level "to do" list.

finishTask (P1)

                                          P1 : List           W1_Q1 : Set
                                          +--------+          +---------------------+
                                          | +----+X|          | +----+ +----+       |
                                          | |    | |          | |   X| |    |       |
                                      (1) | |T0  | |      (2) | |P1  | |P0  |       |
                                          | +----+ |          | +----+ +----+       |
                                          +--------+          +---------------------+

 1) DEL P1
 2) SREM W1_Q1 P1

Cancel Tasks

Removes a task from a queue.

Workers listen on Q1 and "finish" all tasks that are cancelled.

cancelTasks (Q1, T1)

        Q1 : List
        +----------------------+
        | +----+ +----+        |
        | |   X| |    |        |
        | |T1  | |T0  |        |
        | +----+ +----+        |
        +----------------------+
         head              tail
 1) LREM Q1 T1 0
 2) PUBLISH Q1_cancel T1

Start Worker

Register a worker as existing and available for work.

startWorker (W1)

        Workers : Set
        +----------------------+          2) W1 = "RUN"
        | +----+ +----+        |             W1.TTL = 3 sec
        | |    | |   *|        |     (1)
        | |W0  | |W1  |        | <--+ W1
        | +----+ +----+        |
        +----------------------+

 1) SADD Workers W1
 2) SET W1 RUN EX 3

Keep Worker Alive

Digest cycle.

keepWorkerAlive (W1)

Every 1 second
        SET W1 RUN EX 3 XX  (XX = set if exists)

    If SET fails, worker has failed to stay alive and must restart

Start Work

Subscribe to task event queue and start work, keeping track of which queues each worker services.

startWork (W1, Q1)

        WorkMap_W1 : Set
        +----------------------+
        | +----+ +----+        |
        | |    | |   *|        | (1)
        | |Q0  | |Q1  |        |
        | +----+ +----+        |
        +----------------------+

 1) SADD WorkMap_W1 Q1

Stop Work

Unsubscribe from new task event queue. In-progress work is returned to the queue.

stopWork (W1, Q1)

        WorkMap_W1 : Set
        +----------------------+
        | +----+ +----+        |
        | |    | |   X|        | (1)
        | |Q0  | |Q1  |        |
        | +----+ +----+        |
        +----------------------+

 1) SREM WorkMap_W1 Q1
 2) perform steps 4-5 of failWorker

Fail Worker

Stop all work and delete the worker. In-progress work is returned to the queue.

failWorker (W1)

        Workers : Set
    (1) +----------------------+  2) W1 = NULL
        | +----+ +----+        |
        | |    | |   X|        |
        | |W0  | |W1  |        |
        | +----+ +----+        |
        +----------------------+

 1) SREM Workers W1
 2) DEL W1
 3) SMEMBERS WorkMap_W1
 4) for each queue serviced by worker W1 (items in WorkMap_W1)
    SMEMBERS W1_Q1  (get all in-process tasks)
    DEL WorkMap_W1
        Q1 : List                         P1 : List
        +----------------------+          +--------+
        | +----+ +----+        |          | +----+X| (4a2)
        | |    | |   *|        | (4a1)    | |   X| |
        | |T1  | |T0  |        | <--T0<-- | |T0  | |
        | +----+ +----+        |          | +----+ |
        +----------------------+          +--------+
         head              tail
  a) for each in-process task (items in W1_Q1):
   1) RPOPLPUSH P1 Q1
   2) DEL P1
        W1_Q1 : Set
        +---------------------+
        | +----+ +----+      X| (4b)
        | |    | |    |       |
        | |P1  | |P0  |       |
        | +----+ +----+       |
        +---------------------+

  b) DEL W1_Q1
  c) if tasks exist for queue
     PUBLISH Q1_work count
  • diagrams made with http://asciiflow.com/