npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

redis-streams-nodejs

v1.1.5

Published

Simple NodeJs consumer and producer for Redis Streams

Downloads

590

Readme

redis-streams-nodejs

Simple node package for easy use of Redis Streams functionality. This package allows for creation of a Redis consumer and producer.

Installation

Make sure you have NodeJs installed, then:

npm install redis-streams-nodejs

Usage

Basic example

import { RedisClient } from 'redis-streams-nodejs';

(async () => {
  // Client name must be unique per client
  const client = new RedisClient({
    groupName: 'mygroup',
    clientName: 'myclient1',
  });

  client.on('error', err => console.log('Redis Client Error', err));

  await client.connect();

  const consumer = client.createConsumer();

  // Redis stream to listen to and processable function
  const stream = {
    name: 'mystream',
    executable: (data, stream) => console.log('Redis message for stream ' + stream, data),
  };

  // Listen for new messages and process them according the
  // defined executable function
  consumer.listen(stream);
})();

When creating the Redis client, make sure to define a group and client name. Note, the client name must be unique in order for Redis to distinguish each individual client within the consumer group.

Class RedisClient

Constructor : new RedisClient(options)

The RedisClient is an extension of the original client from the node-redis package. All constructor options within the node-redis package are available to this class as well.

Example

// Connect client to Redis server with TLS enabled
const client = new RedisClient({
  socket: {
    port: 6380,
    host: 'myredis.server.com',
    tls: true,
  },
  password: 'mysupersecurepassword',
  groupName: 'mygroup',
  clientName: 'client1',
});

RedisClientOptions

| Parameters | Description | Required | | ---------- | --------------------------------------------- | -------- | | groupName | Name of the consumer group | Yes | | clientName | Name of the client, must be unique per client | Yes |

Other options can be found in the official node-redis github repository over here.

Methods

For all available methods, please look in the official node-redis repository over here.

createConsumer(options)

createProducer()

  • Returns a RedisProducer

streamExists(key)

  • key key name of the stream
  • Returns a boolean

groupExists(key)

  • key name of the stream
  • Returns a boolean

createGroup(key)

  • key name of the stream
  • Returns a string

Class RedisConsumer

Constructor : client.createConsumer(options)

The RedisConsumer is able to listen for incomming message in a stream. You can define an object or an array of objects in which you can define the name of the stream to listen for and which function should be executed for processing of the message. The consumer has a build-in retry mechanism which triggers an event retry-failed if all retries were unsuccessfull.

When a message is successfully processed (also in retry state), the consumer will send an acknowledgement signal to the Redis server. When the acknowlegdement is performed, the message will be removed from the pending list for that consumer group.

When the consumer starts, it will process all remaining pending messages at first before listening for new incomming messsage. However, you can overrule this behaviour by defining your own starting id.

Example

  const consumer = client.createConsumer({
    COUNT: 3,
    retries: 1,
    retryTime: ['5s'],
  });

  const streams = [
    {
      name: 'mystream',
      id: '>',
      executable: (data) => console.log('Only listen to new messages', data.message)
    },
    {
      name: 'myssecondstresm',
      executable: (data, stream) => console.log('Message for stream ' + stream, data.message)
    }
  ];

  consumer.client.on('retry-failed', (err, data) => {
    console.error('Failed processing message in stream ' + data.stream + '. Amount of retries: ' + data.retries, data.message);
  });

  consumer.client.on('process-error', (err, data) => {
    console.error('An unexpected error occured for stream ' + data.stream, err.message);
  });

  consumer.listen(streams);
});

RedisConsumerOptions

| Parameters | Description | Required | Default | | ---------- | ------------------------------------------------- | -------- | -------------------- | | COUNT | Number of elements to read | No | 1 | | BLOCK | Time in miliseconds to block while reading stream | No | 0 | | retries | Amount of retries for processing messages | No | 3 | | retryTime | Time interval between each retry | No | ['15s', '1m', '15m'] |

More information about the BLOCK and COUNT parameters can be found at the official docs of Redis.

The retryTime is an array of time strings. Seconds, minutes and hours are supported ('s', 'm', 'h'). When there are less items in the retryTime array than the amount of retries, the last time string item is used.

If you want to disable the retry mechanism, select a value of 0 for retries.

Methods

listen(streams)

addAckMessage(stream, id)

  • stream key name of the stream
  • id id of the message

Adds the message to the acknowlegdement list.

StreamToListen Object

{
  name: 'mystream',                                        // Keyname of the Redis stream
  executable: (message, stream) => console.log(message),   // Message processing function to be executed
  id: '>'                                                  // Optional, start listining from the message id. Defaults to '0-0'
}

Class RedisProducer

Constructor : client.createProducer()

The RedisProducer is used to add new messages to the Redis stream.

Example

const message = {
  firstName: 'John',
  lastName: 'Doe',
};

const producer = client.createProducer();
producer.add('mystream', message);

Methods

add(stream, message)

  • stream key name of the stream
  • message object/message to add to the stream

Events

| Event name | Description | | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | process-error | Event is triggered on the RedisConsumer when an error occurs during execution of the streams processing function. First argument is the error object, second argument is an object containing the stream, message and retries. | | retry | Event is triggered on the RedisConsumer just before a retry is attempted. A data object with properties stream, message, retries and timestamp is forwarded to the event. | | retry-failed | Event is triggered on the RedisConsumer when retry of the message has failed/ended. The first argument that is forwarded to the event is the error. The second arguments is a data object with properties stream, message, retries and timestamps. |

Typescript

This package has full Typescript support. See the example below on how to define a processing function with typed message data.

import { RedisClient, StreamsToListen, StreamMessageReply } from 'redis-streams-nodejs';

const client = new RedisClient({
  groupName: 'mygroup',
  clientName: 'myclient1',
});
await client.connect();

const streams: StreamsToListen = [
  {
    name: 'mystream',
    executable: processing,
  },
];

const consumer = client.createConsumer();
consumer.listen(streams);

// Define interface of your message data
interface MyMessage {
  firstName: string;
  lastName: string;
}

function processing(data: StreamMessageReply<MyMessage>) {
  const message = data.message;
  const fullName = message.firstName + ' ' + message.lastName; // Full typing of message

  console.log('Hello, my name is ' + fullName);
}