npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

nestjs-rivulex

v1.0.0-beta.25

Published

Rivulex is a high-performance messaging library leveraging Redis Streams for reliable event-driven communication within distributed systems.

Downloads

128

Readme

License: MIT Last Tag Contributors Size GitHub last commit Maintenance npm Coverage

Rivulex NestJS provides custom transport, publisher, and trimmer abstractions for integrating Rivulex with NestJS applications. It leverages Rivulex’s Redis Streams-based messaging system and offers an easy-to-use abstraction for event-driven communication in NestJS.

Key Features

  • At-Least-Once Delivery: Rivulex ensures that every message is delivered at least once, making it suitable for scenarios where message loss is unacceptable.
  • FIFO Messaging: Leveraging Redis Streams, Rivulex provides a FIFO (First-In-First-Out) order for message processing, ensuring predictable and reliable message handling.
  • Distributed and Scalable: Built to handle horizontal scaling, Rivulex supports the creation of consumer groups, allowing you to efficiently scale out your messaging system across multiple instances.
  • Flexible Configuration: Easily configure timeouts, blocking behavior, retries, and more to tailor the system to your specific needs.
  • Error Handling and Logging: Integrates customizable error handling and logging, providing insights into message processing and failures.

Use Cases:

  • Event-Driven Architectures: Perfect for building systems that rely on events and need reliable message delivery.
  • Microservices: Facilitates communication between microservices in distributed systems.
  • Real-Time Data Processing: Suitable for applications that require real-time processing and streaming of data.

With Rivulex, you can build scalable, reliable, and efficient messaging systems that are well-suited for modern distributed environments.

Table of Contents

Rivulex

For more details about Rivulex, including its features and API documentation, visit the Rivulex GitHub repository.

Installation

To install nestjs-rivulex, use npm or yarn:

npm install nestjs-rivulex

Rivulex Transport

The RivulexTransport class is a custom transport strategy for NestJS that integrates with the Rivulex messaging system. This transport layer allows NestJS applications to use Rivulex for message handling and processing, leveraging the high-performance capabilities of Redis Streams.

Configuration

RivulexSubscriberConfig

  • redis: Configuration for the Redis connection, including options such as host, port, and authentication details.
  • Additional Options: Customize settings like clientId, group, processTimeout, processConcurrency, fetchBatchSize, blockTime, and retries according to your application's needs.

For a complete list of additional settings and configuration details, visit the Rivulex documentation.

Configure in main.ts

Configure the custom transport strategy in your NestJS application's main.ts:

import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions } from '@nestjs/microservices';
import { RivulexTransport, RivulexSubscriberConfig } from 'nestjs-rivulex';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  const rivulexConfig: RivulexSubscriberConfig = {
    redis: {
      host: 'localhost',
      port: 6379,
    },
    // Additional configuration options if needed
    group: 'group',
  };

  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      strategy: new RivulexTransport(rivulexConfig),
    },
  );

  await app.listen();
}

bootstrap();

Custom Logger

You can pass a custom logger to the RivulexTransport constructor. The logger should implement NestJS’s Logger interface or any custom logger service that adheres to the same API.

Examples

In this section, we will explore different ways of defining handlers in nestjs-rivulex using a single abstraction for a specific stream and a single abstraction with StreamAction decorators handling different actions for different streams.

Single Abstraction for a Specific Stream

In this example, we use the @Stream decorator to define a single abstraction that handles multiple actions within a specific stream with @Action decorators. This approach is ideal when you want to organize event handlers for all actions associated with a particular stream in one place.

import { Done, Event, Action, Stream } from 'nestjs-rivulex';

interface CustomHeaders {
    requestId: string;
    userId: string;
}

interface UserCreatedPayload {
    id: string;
    email: string;
}

interface UserDeletedPayload {
    id: string;
    email: string;
}

@Stream('users')
export class UsersHandlers {

    @Action('user_created')
    async handleUserCreated(event: Event<UserCreatedPayload, CustomHeaders>) {
        const { action, headers, payload, attempt, ack, channel } = event;
        // Handle 'user_created' event
        await ack();
    }

    @Action('user_deleted')
    async handleUserDeleted(event: Event<UserDeletedPayload, CustomHeaders>) {
        const { action, headers, payload, attempt, ack, channel } = event;
        // Handle 'user_deleted' event
         await ack();
    }
}
Single Abstraction Handling Actions for Different Streams

In this example, we use the @StreamAction decorator to define a single abstraction that handles actions for different streams. This approach is useful when you need to manage event handlers for various streams in a single class, avoiding the need to create separate layers for each stream.

import { Done, Event, StreamAction, FullEvent, EventPayload, EventId, EventHeaders, EventAttempt, EventAck } from 'nestjs-rivulex';

interface CustomHeaders {
    requestId: string;
    userId: string;
}

interface OrderCreatedPayload {
    orderId: string;
    userId: string;
}

interface PaymentProcessedPayload {
    paymentId: string;
    orderId: string;
}

export class EventHandlers {

    @StreamAction('orders', 'order_created')
    async handleOrderCreated(event: Event<OrderCreatedPayload, CustomHeaders>) {
        // Handle 'order_created' event
        await event.ack();
    }

    @StreamAction('payments', 'payment_processed')
    async handlePaymentProcessed(event: Event<PaymentProcessedPayload, CustomHeaders>) {
        // Handle 'payment_processed' event
        await event.ack();
    }
}

Integration with Trimmer

You can configure the Trimmer to be initiated with the RivulexTransport custom transport strategy for the RivulexTransport. This ensures that old messages are automatically trimmed while subscribing to events.

Usage

import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions } from '@nestjs/microservices';
import { RivulexTransport, RivulexSubscriberConfig } from 'nestjs-rivulex';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  const rivulexConfig: RivulexSubscriberConfig = {
    redis: {
      host: 'localhost',
      port: 6379,
    },
    group: 'group',
    trimmer: { 👈🏻
      streams: ['users'],
      group: 'group',
      intervalTime: 43200000, // 12 hours
      retentionPeriod: 604800000, // 7 days
    },
  };

  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      strategy: new RivulexTransport(rivulexConfig),
    },
  );

  await app.listen();
}

bootstrap();

Decorators

nestjs-rivulex provides a comprehensive set of decorators to simplify and abstract the management of stream subscriptions, associating events with specific actions, and working with event parameters. These decorators help you organize your code in a clean and intuitive way, making it easier to define and handle events in your NestJS applications.

Class Decorators

@Stream(streamName: string) Decorate a class to specify the Redis stream name. This decorator indicates that the class contains methods to handle events from the specified Redis stream.

Example:

@Stream('users')
export class UsersHandlers {
  // Method handlers
}

Recommendation: Use the @Stream with @Action decorators when you want to define a single abstraction to handle events from a specific stream. This approach helps you manage and organize event handlers for all actions within the same stream in a cohesive manner.

Method Decorators

@Action(actionName: string) Decorate a method to handle a specific action within the stream.

Example:

@Action('user_created')
async handleUserCreated(event: Event<UserCreatedPayload, CustomHeaders>) {
    // Handle 'user_created' event
    await event.ack();
}

@StreamAction(stream: string, action: string) Decorate a class to specify the Redis stream name.

Example:

@StreamAction('users', 'user_created')
async handleUserCreated(event: Event<UserCreatedPayload, CustomHeaders>) {
    // Handle 'user_created' event
    await event.ack();
}

Recommendation: Use the @StreamAction decorator when you want a single abstraction to handle actions from different streams. This is particularly useful when you need to handle a few events from various streams. By grouping them together under the same abstraction, you avoid the need to create a separate layer for each stream, leading to a more streamlined and efficient event handling architecture.

Parameter Decorators

Parameter decorators are used to extract specific parts of the event object and inject them as parameters into your method. If no parameter decorator is used, the entire event object is provided as the first argument.

@FullEvent() Decorate a method parameter to extract the entire event object. Note that if no parameter decorator is used, the method will receive the full event object as the first argument by default.

Example:

async handleUserCreated(@FullEvent() event: Event<UserCreatedPayload, CustomHeaders>) {
    // Handle event
    await event.ack();
}

@EventPayload() Decorate a method parameter to extract the payload from the event.

Example:

async handleUserCreated(@EventPayload() payload: UserCreatedPayload) {
    // Handle payload
}

@EventId() Decorate a method parameter to extract the event ID.

Example:

async handleUserCreated(@EventId() eventId: string) {
    // Handle payload
}

@EventHeaders() Decorate a method parameter to extract the headers from the event.

Example:

async handleUserCreated(@EventHeaders() headers: Headers<CustomHeaders>) {
    // Handle headers
}

@EventAttempt() Decorate a method parameter to extract the attempt number from the event.

Example:

async handleUserCreated(@EventAttempt() attempt: number) {
    // Handle attempt number
}

@EventAck() Decorate a method parameter to extract the ack function from the event. Example:

async handleUserCreated(@EventAck() ack: Ack) {
    // Acknowledge event
    await ack();
}

Advance Decorators Example

In this section, we provide advanced examples demonstrating different combinations of class, method, and parameter decorators.

import { Done, Event, Action, Stream, StreamAction, Ack } from 'nestjs-rivulex';
import { FullEvent, EventPayload, EventId, EventHeaders, EventAttempt, EventAck } from 'nestjs-rivulex';

interface CustomHeaders {
    requestId: string;
    userId: string;
}

interface UserCreatedPayload {
    id: string;
    email: string;
}

interface UserUpdatedPayload {
    id: string;
    email: string;
    changes: Record<string, any>;
}

interface UserDeletedPayload {
    id: string;
    email: string;
}

@Stream('users')
export class UsersHandlers {

    // Using @FullEvent to handle the entire event
    @Action('user_created')
    async handleUserCreated(
        @FullEvent() event: Event<UserCreatedPayload, CustomHeaders>
    ) {
        const { action, headers, payload, attempt, ack, channel } = event;
        await ack();
    }

    // Extracting specific parts of the event using parameter decorators
    @Action('user_updated')
    async handleUserUpdated(
        @EventPayload() payload: UserUpdatedPayload,
        @EventId() eventId: string,
        @EventHeaders() headers: CustomHeaders,
        @EventAttempt() attempt: number,
        @EventAck() ack: Ack
    ) {
        // Process the update
        await ack();
    }

    // Handling actions for specific stream using @StreamAction
    @StreamAction('users', 'user_deleted')
    async handleUserDeleted(
        @FullEvent() event: Event<UserDeletedPayload, CustomHeaders>,
        @EventPayload() payload: UserDeletedPayload,
        @EventId() eventId: string,
        @EventHeaders() headers: CustomHeaders,
        @EventAttempt() attempt: number,
        @EventAck() ack: () => void
    ) {
        // Process the deletion
        await ack();
    }

    // Handling a complex event with different payload and headers types
    @StreamAction('users', 'user_updated')
    async handleComplexUserUpdate(
        @FullEvent() event: Event<UserUpdatedPayload, CustomHeaders>,
        @EventPayload() payload: UserUpdatedPayload,
        @EventId() eventId: string,
        @EventHeaders() headers: CustomHeaders,
        @EventAttempt() attempt: number,
        @EventAck() ack: () => void
    ) {
        // Handle the update with complex logic
        await ack();
    }
}

Rivulex Publisher

The RivulexPublisherModule provides a NestJS abstraction over the Rivulex Publisher, making it easy to integrate the Rivulex messaging system into your NestJS applications. It supports both synchronous and asynchronous configuration and exposes the RivulexPublisherService for publishing events.

Overview

The RivulexPublisherModule allows you to configure the Rivulex Publisher within your NestJS application using either the forRoot or forRootAsync methods. It provides seamless integration with NestJS's dependency injection system and supports various configuration options to customize the behavior of the Rivulex Publisher.

Configuration

Synchronous Configuration

Use the forRoot method to configure the Rivulex Publisher with static configuration options.

import { Module } from '@nestjs/common';
import { RivulexModule } from 'nestjs-rivulex';

@Module({
  imports: [
    RivulexModule.forRoot({
      redis: { host: 'localhost', port: 6379 },
      group: 'my-group',
      defaultStream: 'my-default-stream',
    }),
  ],
  controllers: [],
  providers: [],
})
export class AppModule {}

Asynchronous Configuration

Use the forRootAsync method to configure the Rivulex Publisher with dynamic configuration options, such as those provided by a configuration service.

import { Module } from '@nestjs/common';
import { RivulexModule } from 'nestjs-rivulex';

@Module({
  imports: [
    RivulexModule.forRootAsync({
      useFactory: async () => ({
        redis: { host: 'localhost', port: 6379 },
        group: 'my-group',
        defaultStream: 'my-default-stream',
      }),
      inject: [],
    }),
  ],
  controllers: [],
  providers: [],
})
export class AppModule {}

Usage

Once the RivulexPublisherModule is configured, you can inject the RivulexPublisherService into your services or controllers to publish events.

import { Injectable } from '@nestjs/common';
import { RivulexPublisherService } from 'nestjs-rivulex';

@Injectable()
export class MyService {
  constructor(private readonly publisher: RivulexPublisherService) {}

  async publishEvent() {
    await this.publisher.publish('my-event', { id: '123', data: 'example' });
  }
}

Integration with Trimmer

You can configure the Trimmer to be initiated with the RivulexPublisherModule. This ensures that old messages are automatically trimmed while publishing events.

Usage

To configure the Trimmer within the RivulexPublisherModule, include the trimmer configuration in the forRoot or forRootAsync methods:

import { Module } from '@nestjs/common';
import { RivulexModule } from 'nestjs-rivulex';

@Module({
  imports: [
    RivulexModule.forRoot({
      redis: { host: 'localhost', port: 6379 },
      group: 'my-group',
      defaultStream: 'my-default-stream',
      trimmer: { 👈🏻
        streams: ['my-default-stream'],
        group: 'my-group',
        intervalTime: 86400000, // 24 hours
        retentionPeriod: 604800000, // 7 days
      },
    }),
  ],
  controllers: [],
  providers: [],
})
export class AppModule {}

Or with forRootAsync:

import { Module } from '@nestjs/common';
import { RivulexModule } from 'nestjs-rivulex';

@Module({
  imports: [
    RivulexModule.forRootAsync({
      useFactory: async () => ({
        redis: { host: 'localhost', port: 6379 },
        group: 'my-group',
        trimmer: { 👈🏻
          channels: ['users'],
          intervalTime: 43200000, // 12 hours
          retentionPeriod: 604800000, // 7 days
        },
      }),
      inject: [],
    }),
  ],
  controllers: [],
  providers: [],
})
export class AppModule {}

In this example, the Trimmer is configured as part of the Subscriber configuration. When the Subscriber starts, it also starts the trimming process for the specified channels.

For More Details

For more detailed information about the Rivulex Publisher, please refer to the original Rivulex repository.

Understood. Here's the updated documentation that reflects that the RivulexTrimmerService starts automatically and does not need to be accessed directly by the developer:

Rivulex Trimmer

The RivulexTrimmerModule provides a NestJS abstraction over the Rivulex Trimmer, allowing you to manage the trimming of old messages from Redis streams as a standalone service. This ensures that messages older than a specified retention period are removed at regular intervals. The trimming process is distributed and coordinated using Redis to avoid conflicts between multiple instances.

Overview

The RivulexTrimmerModule can be configured and started independently of the Publisher and Subscriber modules. It provides seamless integration with NestJS's dependency injection system and supports both synchronous and asynchronous configuration. The trimming process starts automatically and does not require any direct interaction from the developer.

Configuration

Synchronous Configuration

Use the forRoot method to configure the RivulexTrimmerModule with static configuration options.

import { Module } from '@nestjs/common';
import { RivulexTrimmerModule } from 'nestjs-rivulex';

@Module({
  imports: [
    RivulexTrimmerModule.forRoot({
      redis: { host: 'localhost', port: 6379 },
      group: 'my-group',
      streams: ['my-default-stream'],
      intervalTime: 86400000, // 24 hours
      retentionPeriod: 604800000, // 7 days
    }),
  ],
  controllers: [],
  providers: [],
})
export class AppModule {}

Asynchronous Configuration

Use the forRootAsync method to configure the RivulexTrimmerModule with dynamic configuration options, such as those provided by a configuration service.

import { Module } from '@nestjs/common';
import { RivulexTrimmerModule } from 'nestjs-rivulex';

@Module({
  imports: [
    RivulexTrimmerModule.forRootAsync({
      useFactory: async () => ({
        redis: { host: 'localhost', port: 6379 },
        group: 'my-group',
        streams: ['my-default-stream'],
        intervalTime: 86400000, // 24 hours
        retentionPeriod: 604800000, // 7 days
      }),
      inject: [],
    }),
  ],
  controllers: [],
  providers: [],
})
export class AppModule {}

Usage

Once the RivulexTrimmerModule is configured, the RivulexTrimmerService will manage the trimming process automatically. The service starts the trimming process when the application starts and stops it when the application shuts down. There is no need to interact with the RivulexTrimmerService directly.

Contributing

We welcome contributions! Please see our CONTRIBUTING.md for guidelines on how to contribute.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

For any issues or support, please open an issue on the GitHub repository.