npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@jetit/publisher

v5.1.1

Published

`@jetit/publisher` is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced featu

Downloads

5,540

Readme

@jetit/publisher

@jetit/publisher is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced features such as message deduplication, consumer group management, scheduled event publishing, and more.

Table of Contents

Installation

npm install @jetit/publisher

Key Features

  • Real-time event publishing and subscribing
  • Configurable Streams class for flexible usage
  • Improved error handling and reliability
  • Performance tracking with Redis time and operation time metrics
  • Dead Letter Queue (DLQ) for handling subscription failures
  • Event filtering for specialized subscriptions
  • Support for multiple event subscriptions from the same service
  • Batch publishing (regular and scheduled)
  • Basic monitoring with Prometheus export support
  • Content-based one-time guarantee (0-1 semantics support)
  • Optimized cleanup processes for improved performance
  • Circuit Breaker pattern for fault tolerance

Usage

Basic Example

import { Publisher, EventData } from '@jetit/publisher';

// Create an instance of the publisher
const publisher = new Publisher('MyService');

// Publish an event
const eventData: EventData<{ message: string }> = {
    eventName: 'my-event',
    data: { message: 'Hello, world!' }
};

await publisher.publish(eventData);

// Subscribe to an event
publisher.listen('my-event').subscribe(event => {
    console.log(`Received event: ${event.eventName}`, event.data);
});

Configuration

The Publisher class can be configured with various options, including Circuit Breaker and Backpressure handling:

import { Publisher, IStreamsConfig } from '@jetit/publisher';

const config: Partial<IStreamsConfig> = {
    cleanUpInterval: 3600000, // 1 hour
    maxRetries: 5,
    initialRetryDelay: 1000,
    immediatePublishThreshold: 500,
    unprocessedMessageThreshold: 25,
    acknowledgedMessageCleanupInterval: 3600000, // 1 hour
    dlqEventThreshold: 2000,
    filterKeepAlive: 86400000, // 24 hours
    duplicationCheckWindow: 86400, // 24 hours
    circuitBreaker: {
        enabled: true,
        errorThreshold: 50,
        errorThresholdPercentage: 50,
        openStateDuration: 30000, // 30s
        halfOpenStateMaxAttempts: 10,
        maxStoredEvents: 5000,
    },
};

const publisher = new Publisher('MyService', config);

Publishing Events

const eventData = {
    eventName: 'user-registered',
    data: { userId: '123', email: '[email protected]' }
};

await publisher.publish(eventData);

Subscribing to Events

publisher.listen('user-registered').subscribe(event => {
    console.log('New user registered:', event.data);
});

Scheduled Publishing

const futureDate = new Date(Date.now() + 60000); // 1 minute from now
await publisher.scheduledPublish(futureDate, eventData);

Batch Publishing

import { publishBatch } from '@jetit/publisher';

const events = [
    { eventName: 'event1', data: { /* ... */ } },
    { eventName: 'event2', data: { /* ... */ } },
    // ...
];

const result = await publishBatch(publisher, events, { batchSize: 100, delayBetweenBatches: 1000 });
console.log('Batch publish result:', result);

Dead Letter Queue (DLQ)

// Retry an event from DLQ
const success = await publisher.retryFromDLQ('eventId');

// Get DLQ stats
const stats = await publisher.getDLQStats();
console.log('DLQ stats:', stats);

Event Filtering

const options = {
    eventFilter: (event) => event.data.userId === '123',
    filterKeepAlive: 3600000 // 1 hour
};

publisher.listen('user-action', options).subscribe(event => {
    console.log('Filtered user action:', event);
});

Performance Monitoring

// Get metrics for a specific time range
const metrics = await publisher.getMetrics(startTime, endTime);
console.log('Performance metrics:', metrics);

// Get latest metrics
const latestMetrics = await publisher.getLatestMetrics();
console.log('Latest metrics:', latestMetrics);

Prometheus Integration

import { PrometheusAdapter } from '@jetit/publisher';
import promClient from 'prom-client';
import express from 'express';

const app = express();
const prometheusAdapter = new PrometheusAdapter(publisher, promClient);

prometheusAdapter.setupEndpoint(app, '/metrics');

app.listen(3000, () => {
    console.log('Metrics server listening on port 3000');
});

Advanced Features

Content-Based Deduplication

The library supports content-based deduplication to ensure that each unique event is processed only once:

const options = {
    publishOnceGuarantee: true
};

publisher.listen('important-event', options).subscribe(event => {
    console.log('Guaranteed unique event:', event);
});

Multiple Event Subscriptions

You can subscribe to multiple events from the same service:

const subscription1 = publisher.listen('event1').subscribe(/* ... */);
const subscription2 = publisher.listen('event2').subscribe(/* ... */);

Circuit Breaker

The Circuit Breaker pattern is implemented to prevent cascading failures in a distributed system. It helps to gracefully handle failures and allows the system to recover without overwhelming failed services.

Configuration options:

  • enabled: Enable or disable the Circuit Breaker.
  • errorThreshold: Number of errors before opening the circuit.
  • errorThresholdPercentage: Percentage of errors to total calls before opening the circuit.
  • timeWindow: Time window for error rate calculation (in milliseconds).
  • openStateDuration: Duration to keep the circuit open before moving to half-open state (in milliseconds).
  • halfOpenStateMaxAttempts: Maximum number of attempts allowed in half-open state.

The Circuit Breaker has three states:

  1. Closed: Normal operation, calls pass through.
  2. Open: Calls are immediately rejected without reaching the service.
  3. Half-Open: A limited number of calls are allowed to test if the service has recovered.

Performance Optimizations

  • Batched xdel operations for improved cleanup performance
  • Configurable cleanup intervals and thresholds
  • Efficient event filtering at the subscription level
  • Retry logic with exponential backoff for failed operations
  • Circuit Breaker to prevent overwhelming failed services
  • Dead Letter Queue (DLQ) for handling subscription failures

Cleanup and Graceful Shutdown

To ensure proper cleanup of resources, implement a graceful shutdown:

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

async function shutdown() {
    console.log('Graceful shutdown initiated.');
    try {
        await publisher.close();
        console.log('Resources and connections successfully closed.');
    } catch (error) {
        console.error('Error during graceful shutdown:', error);
    }
    process.exit(0);
}

Troubleshooting

If you encounter issues:

  1. Check the Redis connection settings
  2. Verify that consumer groups are correctly created
  3. Monitor the DLQ for failed events
  4. Review the performance metrics for any anomalies
  5. Check the logs for detailed error messages