npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

redis-streams-broker

v0.0.15

Published

This package is a broker to redis stream data type, This package provides guaranteed message delivery feature with acknowledgement.

Downloads

977

Readme

redis-streams-broker

This package is based on redis stream data type which provides you with following features

  1. Broker to redis stream which can be used as centralized que between microservices. (Using Redis)
  2. Support for injectable redis client ioredisonly
  3. Guarantee of message delivery via consumer acknowledgements.
  4. Consumer Group functionality for scalability. (Just like Kafka)
  5. Option to drop a message when its acked, thus keeping memory footprint in check.

Getting Started

  1. Install using npm -i redis-streams-broker
  2. Require in your project. const brokerType = require('redis-streams-broker').StreamChannelBroker;
  3. Run redis on local docker if required. docker run --name streamz -p 6379:6379 -itd --rm redis:latest
  4. Instantiate with a redis client and name for the stream. const broker = new brokerType(redisClient, name);
  5. All done, Start using it!!.

Examples/Code snippets

  1. Please find example code for injectable ioredis client here
  2. Please find example code for injectable custom client here
  3. Please find multi threading examples here
  4. Please find async processing examples here
const Redis = require("ioredis");
const redisConnectionString = "redis://127.0.0.1:6379/";
const qName = "Queue";
const redisClient = new Redis(redisConnectionString);
const brokerType = require('redis-streams-broker').StreamChannelBroker;
const broker = new brokerType(redisClient, qName);

//Used to publish a paylod on stream.
const payloadId = await broker.publish({ a: "Hello", b: "World" }); 

//Creates a consumer group to receive payload
const consumerGroup = await broker.joinConsumerGroup("MyGroup"); 

//Registers a new consumer with Name and Callback for message handlling.
const subscriptionHandle = await consumerGroup.subscribe("Consumer1", newMessageHandler); 

// Handler for arriving Payload
async function newMessageHandler(payloads) {
    for (let index = 0; index < payloads.length; index++) {
        try {
            const element = payloads[index];
            console.log("Payload Id:", element.id); //Payload Id
            console.log("Payload Received from :", element.channel); //Stream name
            console.log("Actual Payload:", element.payload); //Actual Payload
            await element.markAsRead(); //Payload is marked as delivered or Acked also optionaly the message can be dropped.
        }
        catch (exception) {
            console.error(exception);
        }
    }
}

//Provides summary of payloads which have delivered but not acked yet.
const summary = await consumerGroup.pendingSummary();

//Unsubscribes the consumer from the group.
const sucess = consumerGroup.unsubscribe(subscriptionHandle); 

//Amount of memory consumed by this stream in bytes.
const consumedMem = await broker.memoryFootprint();

Built with

  1. Authors :heart for Open Source.
  2. nanoid for auto generating subscribtion handles.
  3. redis-scripto2 for handling lua scripts.
  4. relief-valve future refactoring to be open to any redis connection lib.

Contributions

  1. New ideas/techniques are welcomed.
  2. Raise a Pull Request.

Current Version:

0.0.15[Beta]

License

This project is contrubution to public domain and completely free for use, view LICENSE.md file for details.

API

Class StreamChannelBroker

  1. constructor(redisClient: any, channelName: string)

    Creates a broker instance.

    redisClient: Injectable redis client which will be used to send commands to redis server.

    channelName: Name of the stream key, if this doesnot exists it will be created on first push or group subscription.

  2. publish(payload: any, maximumApproximateMessages?: number, failOnMaxMessageCount:boolean): Promise<string>;

    Publishes provided message into the stream and returns id generated by server.

    payload: A JS object containing properties which are passed as key values pairs.

    maximumApproximateMessages: Appropiate length of the stream it is equal to ~ MAXLENGTH option in redis. Defaulted to 100, If negative number is passed then it behaves as non capped stream.

    failOnMaxMessageCount: if maximumApproximateMessages is positive number and failOnMaxMessageCount is set to true then it will only publish messages untill it reaches the maximum count post that it will start failling by returning null as message id, default value is false.

  3. joinConsumerGroup(groupName: string, readFrom: string): Promise<ConsumerGroup>

    Creates a consumer group on the given redis stream with information provided, if the group exists does nothing returning a ConsumerGroup object.

    groupName: Name of the group to be created ot joined.

    readFrom: Id of the mesage to start reading from. defaulted to $ to only read new messages recevied on redis, check redis docs for more info.

  4. memoryFootprint(): Promise<number>

    Returns number of bytes consumed by the current stream.

  5. destroy(): Promise<boolean>;

    Starts to unsubscribe all the handles that were subscribed to this instance.

Class ConsumerGroup

  1. subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<boolean>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string, readPending?: boolean): Promise<string>

    Subscribes to stream to start receiving events when new payload arrives, this internally creates a polling system to check for new messages in stream. returns subscription name.

    consumerName: Name of the consumer who is subscribing via the consumer group object.

    handler: A callback function which will be invoked when new message a.k.a payload(s) arrive. Should be of signature (payload: Payload[]) => Promise<number> should be async & return from this function is number of messages to fetch from redis(expected +ve number; -ve or 0 will unsubscribe from the group stopping all further reads from stream,if NAN then defaults to number provided when subscribing), look at Payload class below for more details.

    pollSpan: Number of millisecond to wait after completion of handler to check for next available message in stream. Defaulted to 1000 milliseconds.

    payloadsToFetch: Maximum number of messages to fetch in one poll to server this is simillar to COUNT command in redis, this is optional and defaulted to 2.

    subscriptionHandle: Name for subscription handler this is what will be returned from the function, this is defaulted to unique shortid.

    readPending: If set to true will read all messages from start of the stream ie: Id = 0 which are in pending list of this consumer and group, once all pending are read it will automatically switch to latest messages from the stream. If set to false it will always look for new message from the stream, this is defaulted to false.

  2. unsubscribe(subscriptionHandle: string): Promise<boolean>

    Unsubscribes from the stream for the given subscriptionhandle, returns true for sucess and false for failure.

    subscriptionHandle: Name of the subscription handle which was returned by subscribe api.

  3. pendingSummary(): Promise<GroupSummary>

    Returns details of the pending items for the given group by exposing GroupSummary object.

Class Payload

  1. channel: string: Name of the stream key in redis.

  2. id: string: Id of the message being received.

  3. payload: any: Actual payload to processs.

  4. markAsRead(deleteMessage?: boolean): Promise<boolean>

    This function helps to ack the payload as read or processed, returns status of the operation via boolean return type true indicating success.

    deleteMessage: if set to true it will ack & delete the message from the stream if set to false will only ack the message defaulted to false.

Class GroupSummary

  1. total: number: This is the total number of messages in pending list.
  2. firstId: string: Id of the first message which is pending.
  3. lastId: string: Id of the last message which is pending.
  4. consumerStats: any: Extra information provided by XPENDING command.