npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@jetit/publisher

v6.0.0

Published

`@jetit/publisher` is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced featu

Readme

@jetit/publisher

@jetit/publisher is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced features such as message deduplication, consumer group management, scheduled event publishing, and more.

Table of Contents

Installation

npm install @jetit/publisher

Key Features

  • Real-time event publishing and subscribing
  • Configurable Streams class for flexible usage
  • Improved error handling and reliability
  • Performance tracking with Redis time and operation time metrics
  • Dead Letter Queue (DLQ) for handling subscription failures
  • Event filtering for specialized subscriptions
  • Support for multiple event subscriptions from the same service
  • Batch publishing (regular and scheduled)
  • Basic monitoring with Prometheus export support
  • Content-based one-time guarantee (0-1 semantics support)
  • Optimized cleanup processes for improved performance
  • Circuit Breaker pattern for fault tolerance

Usage

Basic Example

import { Publisher, EventData } from '@jetit/publisher';

// Create an instance of the publisher
const publisher = new Publisher('MyService');

// Publish an event
const eventData: EventData<{ message: string }> = {
    eventName: 'my-event',
    data: { message: 'Hello, world!' }
};

await publisher.publish(eventData);

// Subscribe to an event
publisher.listen('my-event').subscribe(event => {
    console.log(`Received event: ${event.eventName}`, event.data);
});

PublisherLite Usage

For scenarios where a single stream per event type is preferred (allowing multiple consumer groups on that single stream), use PublisherLite:

import { PublisherLite, EventData } from '@jetit/publisher'; // Import PublisherLite

// Create an instance of PublisherLite
const publisherLite = new PublisherLite('MyServiceLite');

// Publish an event (same as Publisher)
const eventData: EventData<{ message: string }> = {
    eventName: 'my-lite-event',
    data: { message: 'Hello, Lite world!' }
};
await publisherLite.publish(eventData);

// Subscribe to an event (same as Publisher)
publisherLite.listen('my-lite-event').subscribe(event => {
    console.log(`Received lite event: ${event.eventName}`, event.data);
});

// Configuration is also similar, just pass it to PublisherLite constructor
const liteConfig: Partial<IStreamsConfig> = { /* ... your config ... */ };
const configuredPublisherLite = new PublisherLite('MyConfiguredServiceLite', liteConfig, 'my-redis-connection');

Configuration

The Publisher class can be configured with various options, including Circuit Breaker and Backpressure handling:

import { Publisher, IStreamsConfig } from '@jetit/publisher';

const config: Partial<IStreamsConfig> = {
    cleanUpInterval: 3600000, // 1 hour
    maxRetries: 5,
    initialRetryDelay: 1000,
    immediatePublishThreshold: 500,
    unprocessedMessageThreshold: 25,
    acknowledgedMessageCleanupInterval: 3600000, // 1 hour
    dlqEventThreshold: 2000,
    filterKeepAlive: 86400000, // 24 hours
    duplicationCheckWindow: 86400, // 24 hours
    circuitBreaker: {
        enabled: true,
        errorThreshold: 50,
        errorThresholdPercentage: 50,
        openStateDuration: 30000, // 30s
        halfOpenStateMaxAttempts: 10,
        maxStoredEvents: 5000,
    },
};

const publisher = new Publisher('MyService', config);

Additionally, the Publisher constructor accepts a redisConnectionId parameter, which is used to identify the connection used by the publisher. This is useful when using multiple connections in a environment.

setRedisConfig(options1, 'redis-connection-id');
const publisher1 = new Publisher('MyService', config, 'redis-connection-id');  // <-- use this connection (options1) for publishing

setRedisConfig(options2, 'another-redis-connection-id');
const publisher2 = new Publisher('MyService', config, 'another-redis-connection-id');  // <-- use this connection (options2) for publishing

Publishing Events

const eventData = {
    eventName: 'user-registered',
    data: { userId: '123', email: '[email protected]' }
};

await publisher.publish(eventData);

Subscribing to Events

// Basic subscription with automatic acknowledgment
publisher.listen('user-registered').subscribe(event => {
    console.log('New user registered:', event.data);
});

// Subscription with external acknowledgment
const options = {
    externalAcknowledgement: true
};

publisher.listen('user-registered', options).subscribe(async event => {
    try {
        console.log('New user registered:', event.data);
        // Process the event
        await processUserRegistration(event.data);
        
        // Manually acknowledge the message after successful processing
        await publisher.acknowledgeMessage(event.ackKey);
    } catch (error) {
        // Handle error - message will not be acknowledged and will be reprocessed
        console.error('Failed to process user registration:', error);
    }
});

The externalAcknowledgement option allows you to manually control when messages are acknowledged. This is useful when:

  • You need to ensure message processing is complete before acknowledgment
  • You want to implement custom retry logic
  • You need to coordinate acknowledgment with other operations
  • You want to implement transaction-like behavior

When externalAcknowledgement is set to true:

  1. Messages won't be automatically acknowledged after delivery
  2. Each message contains an ackKey that must be used to acknowledge it
  3. Unacknowledged messages will be redelivered to other consumers
  4. You must explicitly call acknowledgeMessage(event.ackKey) after successful processing

Note: Be careful with external acknowledgment as failing to acknowledge messages can lead to message redelivery and potential duplicate processing.

Scheduled Publishing

const futureDate = new Date(Date.now() + 60000); // 1 minute from now
await publisher.scheduledPublish(futureDate, eventData);

Batch Publishing

import { publishBatch } from '@jetit/publisher';

const events = [
    { eventName: 'event1', data: { /* ... */ } },
    { eventName: 'event2', data: { /* ... */ } },
    // ...
];

const result = await publishBatch(publisher, events, { batchSize: 100, delayBetweenBatches: 1000 });
console.log('Batch publish result:', result);

Dead Letter Queue (DLQ)

// Retry an event from DLQ
const success = await publisher.retryFromDLQ('eventId');

// Get DLQ stats
const stats = await publisher.getDLQStats();
console.log('DLQ stats:', stats);

Event Filtering

const options = {
    eventFilter: (event) => event.data.userId === '123',
    filterKeepAlive: 3600000 // 1 hour
};

publisher.listen('user-action', options).subscribe(event => {
    console.log('Filtered user action:', event);
});

Performance Monitoring

// Get metrics for a specific time range
const metrics = await publisher.getMetrics(startTime, endTime);
console.log('Performance metrics:', metrics);

// Get latest metrics
const latestMetrics = await publisher.getLatestMetrics();
console.log('Latest metrics:', latestMetrics);

Prometheus Integration

import { PrometheusAdapter } from '@jetit/publisher';
import promClient from 'prom-client';
import express from 'express';

const app = express();
const prometheusAdapter = new PrometheusAdapter(publisher, promClient);

prometheusAdapter.setupEndpoint(app, '/metrics');

app.listen(3000, () => {
    console.log('Metrics server listening on port 3000');
});

Publisher vs PublisherLite

This library offers two main publisher implementations: Publisher and PublisherLite. Choose the one that best fits your architecture and scaling needs.

Publisher (Default - Multi-Stream)

  • Mechanism: Creates a separate Redis Stream for each consumer group subscribing to an event type (e.g., my-event:cg-serviceA, my-event:cg-serviceB).
  • Pros:
    • Provides strong isolation between consumer groups. Issues or heavy load in one group's stream don't directly impact others.
    • Potentially simpler cleanup logic per stream.
    • May offer better performance distribution if consumer groups have vastly different processing speeds or volumes.
  • Cons:
    • Can lead to a large number of streams in Redis, especially with many event types and consumer groups, increasing memory usage and management overhead.
    • Requires more complex ID generation (like the adaptive Lua script) to handle potential ID collisions across streams when publishing.
  • Use When:
    • You have a manageable number of consumer groups per event type.
    • Strong isolation between consumer groups is critical.
    • You anticipate significant differences in load or processing characteristics between consumer groups for the same event.

PublisherLite (Single-Stream)

  • Mechanism: Uses a single Redis Stream per event type (prefixed with sl:, e.g., sl:my-event). All consumer groups for that event read from this single stream.
  • Pros:
    • Significantly reduces the number of streams in Redis, lowering memory usage and simplifying management.
    • Simplifies the publishing logic (no need for complex multi-stream ID generation).
    • Aligns more closely with the standard Redis Streams consumer group model.
  • Cons:
    • Less isolation between consumer groups; a very slow or problematic consumer group could potentially impact the processing lag for others on the same stream (though Redis handles much of this internally).
    • Cleanup logic (XTRIM) is slightly more complex as it needs to consider the progress of all consumer groups on the stream before trimming messages.
  • Use When:
    • You have a large number of event types or expect many consumer groups per event.
    • Reducing Redis resource consumption (memory, keyspace) is a priority.
    • You prefer a simpler publishing mechanism and are comfortable with the standard Redis consumer group behavior on a shared stream.
    • Important: PublisherLite currently focuses on the core publish/listen flow and does not support scheduled publishing (scheduledPublish method). Use the standard Publisher if you require scheduled events.

Note: Both Publisher and PublisherLite share the same configuration options (IStreamsConfig) and core features like DLQ, Circuit Breaker, Metrics, etc. The primary difference lies in the underlying Redis Stream structure they use and the support for scheduled publishing.

Advanced Features

Content-Based Deduplication

The library supports content-based deduplication to ensure that each unique event is processed only once:

const options = {
    publishOnceGuarantee: true
};

publisher.listen('important-event', options).subscribe(event => {
    console.log('Guaranteed unique event:', event);
});

Multiple Event Subscriptions

You can subscribe to multiple events from the same service:

const subscription1 = publisher.listen('event1').subscribe(/* ... */);
const subscription2 = publisher.listen('event2').subscribe(/* ... */);

Circuit Breaker

The Circuit Breaker pattern is implemented to prevent cascading failures in a distributed system. It helps to gracefully handle failures and allows the system to recover without overwhelming failed services.

Configuration options:

  • enabled: Enable or disable the Circuit Breaker.
  • errorThreshold: Number of errors before opening the circuit.
  • errorThresholdPercentage: Percentage of errors to total calls before opening the circuit.
  • timeWindow: Time window for error rate calculation (in milliseconds).
  • openStateDuration: Duration to keep the circuit open before moving to half-open state (in milliseconds).
  • halfOpenStateMaxAttempts: Maximum number of attempts allowed in half-open state.

The Circuit Breaker has three states:

  1. Closed: Normal operation, calls pass through.
  2. Open: Calls are immediately rejected without reaching the service.
  3. Half-Open: A limited number of calls are allowed to test if the service has recovered.

Performance Optimizations

  • Batched xdel operations for improved cleanup performance
  • Configurable cleanup intervals and thresholds
  • Efficient event filtering at the subscription level
  • Retry logic with exponential backoff for failed operations
  • Circuit Breaker to prevent overwhelming failed services
  • Dead Letter Queue (DLQ) for handling subscription failures
  • Adaptive Redis Stream ID Generation (Publisher only): The default Publisher automatically switches to an optimized ID generation strategy using a Lua script when publishing to many consumer groups (>10 by default) or when ID conflicts are detected. This prevents XADD errors related to non-monotonic IDs in high-throughput scenarios. Configurable via optimizationThreshold and optimizationDurationMs. (PublisherLite does not include this complex logic).

Cleanup and Graceful Shutdown

To ensure proper cleanup of resources, implement a graceful shutdown:

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

async function shutdown() {
    console.log('Graceful shutdown initiated.');
    try {
        await publisher.close();
        console.log('Resources and connections successfully closed.');
    } catch (error) {
        console.error('Error during graceful shutdown:', error);
    }
    process.exit(0);
}

Troubleshooting

If you encounter issues:

  1. Check the Redis connection settings
  2. Verify that consumer groups are correctly created
  3. Monitor the DLQ for failed events
  4. Review the performance metrics for any anomalies
  5. Check the logs for detailed error messages