@jetit/publisher
v6.0.0
Published
`@jetit/publisher` is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced featu
Readme
@jetit/publisher
@jetit/publisher is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced features such as message deduplication, consumer group management, scheduled event publishing, and more.
Table of Contents
- @jetit/publisher
Installation
npm install @jetit/publisherKey Features
- Real-time event publishing and subscribing
- Configurable Streams class for flexible usage
- Improved error handling and reliability
- Performance tracking with Redis time and operation time metrics
- Dead Letter Queue (DLQ) for handling subscription failures
- Event filtering for specialized subscriptions
- Support for multiple event subscriptions from the same service
- Batch publishing (regular and scheduled)
- Basic monitoring with Prometheus export support
- Content-based one-time guarantee (0-1 semantics support)
- Optimized cleanup processes for improved performance
- Circuit Breaker pattern for fault tolerance
Usage
Basic Example
import { Publisher, EventData } from '@jetit/publisher';
// Create an instance of the publisher
const publisher = new Publisher('MyService');
// Publish an event
const eventData: EventData<{ message: string }> = {
eventName: 'my-event',
data: { message: 'Hello, world!' }
};
await publisher.publish(eventData);
// Subscribe to an event
publisher.listen('my-event').subscribe(event => {
console.log(`Received event: ${event.eventName}`, event.data);
});PublisherLite Usage
For scenarios where a single stream per event type is preferred (allowing multiple consumer groups on that single stream), use PublisherLite:
import { PublisherLite, EventData } from '@jetit/publisher'; // Import PublisherLite
// Create an instance of PublisherLite
const publisherLite = new PublisherLite('MyServiceLite');
// Publish an event (same as Publisher)
const eventData: EventData<{ message: string }> = {
eventName: 'my-lite-event',
data: { message: 'Hello, Lite world!' }
};
await publisherLite.publish(eventData);
// Subscribe to an event (same as Publisher)
publisherLite.listen('my-lite-event').subscribe(event => {
console.log(`Received lite event: ${event.eventName}`, event.data);
});
// Configuration is also similar, just pass it to PublisherLite constructor
const liteConfig: Partial<IStreamsConfig> = { /* ... your config ... */ };
const configuredPublisherLite = new PublisherLite('MyConfiguredServiceLite', liteConfig, 'my-redis-connection');Configuration
The Publisher class can be configured with various options, including Circuit Breaker and Backpressure handling:
import { Publisher, IStreamsConfig } from '@jetit/publisher';
const config: Partial<IStreamsConfig> = {
cleanUpInterval: 3600000, // 1 hour
maxRetries: 5,
initialRetryDelay: 1000,
immediatePublishThreshold: 500,
unprocessedMessageThreshold: 25,
acknowledgedMessageCleanupInterval: 3600000, // 1 hour
dlqEventThreshold: 2000,
filterKeepAlive: 86400000, // 24 hours
duplicationCheckWindow: 86400, // 24 hours
circuitBreaker: {
enabled: true,
errorThreshold: 50,
errorThresholdPercentage: 50,
openStateDuration: 30000, // 30s
halfOpenStateMaxAttempts: 10,
maxStoredEvents: 5000,
},
};
const publisher = new Publisher('MyService', config);Additionally, the Publisher constructor accepts a redisConnectionId parameter, which is used to identify the connection used by the publisher. This is useful when using multiple connections in a environment.
setRedisConfig(options1, 'redis-connection-id');
const publisher1 = new Publisher('MyService', config, 'redis-connection-id'); // <-- use this connection (options1) for publishing
setRedisConfig(options2, 'another-redis-connection-id');
const publisher2 = new Publisher('MyService', config, 'another-redis-connection-id'); // <-- use this connection (options2) for publishingPublishing Events
const eventData = {
eventName: 'user-registered',
data: { userId: '123', email: '[email protected]' }
};
await publisher.publish(eventData);Subscribing to Events
// Basic subscription with automatic acknowledgment
publisher.listen('user-registered').subscribe(event => {
console.log('New user registered:', event.data);
});
// Subscription with external acknowledgment
const options = {
externalAcknowledgement: true
};
publisher.listen('user-registered', options).subscribe(async event => {
try {
console.log('New user registered:', event.data);
// Process the event
await processUserRegistration(event.data);
// Manually acknowledge the message after successful processing
await publisher.acknowledgeMessage(event.ackKey);
} catch (error) {
// Handle error - message will not be acknowledged and will be reprocessed
console.error('Failed to process user registration:', error);
}
});The externalAcknowledgement option allows you to manually control when messages are acknowledged. This is useful when:
- You need to ensure message processing is complete before acknowledgment
- You want to implement custom retry logic
- You need to coordinate acknowledgment with other operations
- You want to implement transaction-like behavior
When externalAcknowledgement is set to true:
- Messages won't be automatically acknowledged after delivery
- Each message contains an
ackKeythat must be used to acknowledge it - Unacknowledged messages will be redelivered to other consumers
- You must explicitly call
acknowledgeMessage(event.ackKey)after successful processing
Note: Be careful with external acknowledgment as failing to acknowledge messages can lead to message redelivery and potential duplicate processing.
Scheduled Publishing
const futureDate = new Date(Date.now() + 60000); // 1 minute from now
await publisher.scheduledPublish(futureDate, eventData);Batch Publishing
import { publishBatch } from '@jetit/publisher';
const events = [
{ eventName: 'event1', data: { /* ... */ } },
{ eventName: 'event2', data: { /* ... */ } },
// ...
];
const result = await publishBatch(publisher, events, { batchSize: 100, delayBetweenBatches: 1000 });
console.log('Batch publish result:', result);Dead Letter Queue (DLQ)
// Retry an event from DLQ
const success = await publisher.retryFromDLQ('eventId');
// Get DLQ stats
const stats = await publisher.getDLQStats();
console.log('DLQ stats:', stats);Event Filtering
const options = {
eventFilter: (event) => event.data.userId === '123',
filterKeepAlive: 3600000 // 1 hour
};
publisher.listen('user-action', options).subscribe(event => {
console.log('Filtered user action:', event);
});Performance Monitoring
// Get metrics for a specific time range
const metrics = await publisher.getMetrics(startTime, endTime);
console.log('Performance metrics:', metrics);
// Get latest metrics
const latestMetrics = await publisher.getLatestMetrics();
console.log('Latest metrics:', latestMetrics);Prometheus Integration
import { PrometheusAdapter } from '@jetit/publisher';
import promClient from 'prom-client';
import express from 'express';
const app = express();
const prometheusAdapter = new PrometheusAdapter(publisher, promClient);
prometheusAdapter.setupEndpoint(app, '/metrics');
app.listen(3000, () => {
console.log('Metrics server listening on port 3000');
});Publisher vs PublisherLite
This library offers two main publisher implementations: Publisher and PublisherLite. Choose the one that best fits your architecture and scaling needs.
Publisher (Default - Multi-Stream)
- Mechanism: Creates a separate Redis Stream for each consumer group subscribing to an event type (e.g.,
my-event:cg-serviceA,my-event:cg-serviceB). - Pros:
- Provides strong isolation between consumer groups. Issues or heavy load in one group's stream don't directly impact others.
- Potentially simpler cleanup logic per stream.
- May offer better performance distribution if consumer groups have vastly different processing speeds or volumes.
- Cons:
- Can lead to a large number of streams in Redis, especially with many event types and consumer groups, increasing memory usage and management overhead.
- Requires more complex ID generation (like the adaptive Lua script) to handle potential ID collisions across streams when publishing.
- Use When:
- You have a manageable number of consumer groups per event type.
- Strong isolation between consumer groups is critical.
- You anticipate significant differences in load or processing characteristics between consumer groups for the same event.
PublisherLite (Single-Stream)
- Mechanism: Uses a single Redis Stream per event type (prefixed with
sl:, e.g.,sl:my-event). All consumer groups for that event read from this single stream. - Pros:
- Significantly reduces the number of streams in Redis, lowering memory usage and simplifying management.
- Simplifies the publishing logic (no need for complex multi-stream ID generation).
- Aligns more closely with the standard Redis Streams consumer group model.
- Cons:
- Less isolation between consumer groups; a very slow or problematic consumer group could potentially impact the processing lag for others on the same stream (though Redis handles much of this internally).
- Cleanup logic (
XTRIM) is slightly more complex as it needs to consider the progress of all consumer groups on the stream before trimming messages.
- Use When:
- You have a large number of event types or expect many consumer groups per event.
- Reducing Redis resource consumption (memory, keyspace) is a priority.
- You prefer a simpler publishing mechanism and are comfortable with the standard Redis consumer group behavior on a shared stream.
- Important:
PublisherLitecurrently focuses on the core publish/listen flow and does not support scheduled publishing (scheduledPublishmethod). Use the standardPublisherif you require scheduled events.
Note: Both Publisher and PublisherLite share the same configuration options (IStreamsConfig) and core features like DLQ, Circuit Breaker, Metrics, etc. The primary difference lies in the underlying Redis Stream structure they use and the support for scheduled publishing.
Advanced Features
Content-Based Deduplication
The library supports content-based deduplication to ensure that each unique event is processed only once:
const options = {
publishOnceGuarantee: true
};
publisher.listen('important-event', options).subscribe(event => {
console.log('Guaranteed unique event:', event);
});Multiple Event Subscriptions
You can subscribe to multiple events from the same service:
const subscription1 = publisher.listen('event1').subscribe(/* ... */);
const subscription2 = publisher.listen('event2').subscribe(/* ... */);Circuit Breaker
The Circuit Breaker pattern is implemented to prevent cascading failures in a distributed system. It helps to gracefully handle failures and allows the system to recover without overwhelming failed services.
Configuration options:
enabled: Enable or disable the Circuit Breaker.errorThreshold: Number of errors before opening the circuit.errorThresholdPercentage: Percentage of errors to total calls before opening the circuit.timeWindow: Time window for error rate calculation (in milliseconds).openStateDuration: Duration to keep the circuit open before moving to half-open state (in milliseconds).halfOpenStateMaxAttempts: Maximum number of attempts allowed in half-open state.
The Circuit Breaker has three states:
- Closed: Normal operation, calls pass through.
- Open: Calls are immediately rejected without reaching the service.
- Half-Open: A limited number of calls are allowed to test if the service has recovered.
Performance Optimizations
- Batched
xdeloperations for improved cleanup performance - Configurable cleanup intervals and thresholds
- Efficient event filtering at the subscription level
- Retry logic with exponential backoff for failed operations
- Circuit Breaker to prevent overwhelming failed services
- Dead Letter Queue (DLQ) for handling subscription failures
- Adaptive Redis Stream ID Generation (Publisher only): The default
Publisherautomatically switches to an optimized ID generation strategy using a Lua script when publishing to many consumer groups (>10 by default) or when ID conflicts are detected. This preventsXADDerrors related to non-monotonic IDs in high-throughput scenarios. Configurable viaoptimizationThresholdandoptimizationDurationMs. (PublisherLitedoes not include this complex logic).
Cleanup and Graceful Shutdown
To ensure proper cleanup of resources, implement a graceful shutdown:
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
async function shutdown() {
console.log('Graceful shutdown initiated.');
try {
await publisher.close();
console.log('Resources and connections successfully closed.');
} catch (error) {
console.error('Error during graceful shutdown:', error);
}
process.exit(0);
}Troubleshooting
If you encounter issues:
- Check the Redis connection settings
- Verify that consumer groups are correctly created
- Monitor the DLQ for failed events
- Review the performance metrics for any anomalies
- Check the logs for detailed error messages
