npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

relief-valve

v0.0.7

Published

This is a simple library for Redis Streams data type, which is used to accumulate messages until a specified threshold is reached, post which the same is available to consumer stream.

Downloads

420

Readme

Relief Valve

This package is based on redis stream data type and provides you with following features

  1. It is agnostic to any redis client library can be used with redis-abstraction.
  2. This package acts like a pressure relief value used in plumbing; this pattern is used to batch multiple messages in the stream into a single group and deliver it to consumer, used case for data ingestions and other places where batching is needed.
  3. It can be used as simple que without batching, for sharding messages within multiple consumers for H-Scalling
  4. It provides guarantee for message delivery and or processing with acknowledge facility.
  5. Provides facility to reprocess lost messages(which are not acked over a period of time).
  6. Can used as in fan-out topology(each consumer receives the copy of the message).
  7. Batching of messages with respect to count of messages.
  8. Batching of messages with respect to time elapsed from the last write to the stream.
  9. Its Redis cluster compatible package.
  10. It provides accumalator sharding functionality

Getting Started

  1. Install using npm -i relief-valve
  2. Require in your project. const rvType = require('relief-valve').ReliefValve; or import { IBatchIdentity, IRedisClient, ReliefValve } from 'relief-valve'
  3. Run redis on local docker if required. docker run --name streamz -p 6379:6379 -itd --rm redis:latest
  4. Instantiate with a redis client and name for the stream, group name, client name and thresholds. const publisherInstance = new ReliefValve(client, name, 1, 1, "PubGroup", "Publisher1");
  5. All done, Start using it!!.

Examples/Code snippets

  1. Please find example code usage in component tests
//Count based batching test
async function main(redisPool)
{
    const batchsize = 10;
    const publisherInstance = new ReliefValve(redisPool, name, batchsize, 1, "PubGroup", "Publisher1"); //Check out redis-abstraction package for redis pool creation.
    const consumerInstance1 = new ReliefValve(redisPool, name, batchsize, 1, "ShardGroup1", "Consumer1");

    let payloads = new Map<string, object>();
    for (let counter = 0; counter < 100; counter++) {

        const payload = { "hello": "world1", "A": "1", "Z": "26", "B": "2", "counter": counter.toString() };
        const generatedId = await publisherInstance.publish(payload);
        payloads.set(generatedId, payload);

        //Test
        const consumer1Result = await consumerInstance1.consumeFreshOrStale(3600);

        //Verify
        assert.notStrictEqual(generatedId, undefined);
        assert.notStrictEqual(generatedId, null);
        assert.notStrictEqual(generatedId, "");
        if (payloads.size === batchsize) {
            if (consumer1Result == undefined) throw new Error("Read failed no batch found");
            assert.notStrictEqual(consumer1Result.id, undefined);
            assert.notStrictEqual(consumer1Result.id, null);
            assert.notStrictEqual(consumer1Result.id, "");
            assert.strictEqual(consumer1Result.readsInCurrentGroup, 1);
            assert.strictEqual(consumer1Result.payload.size, batchsize);
            assert.deepStrictEqual(consumer1Result.payload, payloads);
            const ackResult = await consumerInstance1.acknowledge(consumer1Result as IBatchIdentity);
            assert.deepStrictEqual(ackResult, true);
            payloads = new Map<string, object>();
        }
        else {
            assert.deepStrictEqual(consumer1Result, undefined);
        }
    }
    //Validate
    const token = redisPool.generateUniqueToken('Test');
    try {
        await redisPool.acquire(token);
        const keys =  await redisPool.run(token, ["KEYS", "*"]);
        const length = await redisPool.run(token, ["XLEN", name]);
        assert.deepStrictEqual(keys, [name]);
        assert.deepStrictEqual(length, 0);
    }
    finally {
        //Release of connection is important as it makes it available for others to acquire.
        await redisPool.release(token);
    }
}

Built with

  1. Authors :heart: for Open Source.

Contributions

  1. New ideas/techniques are welcomed.
  2. Raise a Pull Request.

License

This project is contrubution to public domain and completely free for use, view LICENSE.md file for details.

Quick Tips & Usage Patterns

  1. The package should be instantiated on publisher and subscriber side with identitcal parameters in constructor apart from groupName and clientName, else will lead to chaotic behaviour of pulling messages.
  2. Count threshold is always evlauted on writes/publish into stream.
  3. Time threshold needs external invocation as redis currently doesnot support cron jobs, either subscriber or publisher can invoke this validation.
  4. Highest accurary of time threshold is limited to one second, but depends heavily on external invocation frequency.

API

  1. Typing info included with the package.
  2. Type doc[W.I.P]

Example impementation of Redis Pool

A pooled implmentation using redis-abstraction

import { IORedisClientPool } from 'redis-abstraction';

//Define the redis connection string
const singleNodeRedisConnectionString = 'rediss://redis.my-service.com';
//Create a injector function for creating redis connection instance.
const connectionInjector = () => IORedisClientPool.IORedisClientClusterFactory([singleNodeRedisConnectionString]);
//Initialize the pool
const pool = new IORedisClientPool(connectionInjector);

//Pass it around in the application.
main(pool)
    .finally(async () => {
        //Remember to call shutdown which closes all connections in pool, else node.js process will not exit.
        await pool.shutdown()
    })