@jetit/publisher
v5.1.1
Published
`@jetit/publisher` is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced featu
Downloads
5,540
Readme
@jetit/publisher
@jetit/publisher
is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced features such as message deduplication, consumer group management, scheduled event publishing, and more.
Table of Contents
- @jetit/publisher
Installation
npm install @jetit/publisher
Key Features
- Real-time event publishing and subscribing
- Configurable Streams class for flexible usage
- Improved error handling and reliability
- Performance tracking with Redis time and operation time metrics
- Dead Letter Queue (DLQ) for handling subscription failures
- Event filtering for specialized subscriptions
- Support for multiple event subscriptions from the same service
- Batch publishing (regular and scheduled)
- Basic monitoring with Prometheus export support
- Content-based one-time guarantee (0-1 semantics support)
- Optimized cleanup processes for improved performance
- Circuit Breaker pattern for fault tolerance
Usage
Basic Example
import { Publisher, EventData } from '@jetit/publisher';
// Create an instance of the publisher
const publisher = new Publisher('MyService');
// Publish an event
const eventData: EventData<{ message: string }> = {
eventName: 'my-event',
data: { message: 'Hello, world!' }
};
await publisher.publish(eventData);
// Subscribe to an event
publisher.listen('my-event').subscribe(event => {
console.log(`Received event: ${event.eventName}`, event.data);
});
Configuration
The Publisher
class can be configured with various options, including Circuit Breaker and Backpressure handling:
import { Publisher, IStreamsConfig } from '@jetit/publisher';
const config: Partial<IStreamsConfig> = {
cleanUpInterval: 3600000, // 1 hour
maxRetries: 5,
initialRetryDelay: 1000,
immediatePublishThreshold: 500,
unprocessedMessageThreshold: 25,
acknowledgedMessageCleanupInterval: 3600000, // 1 hour
dlqEventThreshold: 2000,
filterKeepAlive: 86400000, // 24 hours
duplicationCheckWindow: 86400, // 24 hours
circuitBreaker: {
enabled: true,
errorThreshold: 50,
errorThresholdPercentage: 50,
openStateDuration: 30000, // 30s
halfOpenStateMaxAttempts: 10,
maxStoredEvents: 5000,
},
};
const publisher = new Publisher('MyService', config);
Publishing Events
const eventData = {
eventName: 'user-registered',
data: { userId: '123', email: '[email protected]' }
};
await publisher.publish(eventData);
Subscribing to Events
publisher.listen('user-registered').subscribe(event => {
console.log('New user registered:', event.data);
});
Scheduled Publishing
const futureDate = new Date(Date.now() + 60000); // 1 minute from now
await publisher.scheduledPublish(futureDate, eventData);
Batch Publishing
import { publishBatch } from '@jetit/publisher';
const events = [
{ eventName: 'event1', data: { /* ... */ } },
{ eventName: 'event2', data: { /* ... */ } },
// ...
];
const result = await publishBatch(publisher, events, { batchSize: 100, delayBetweenBatches: 1000 });
console.log('Batch publish result:', result);
Dead Letter Queue (DLQ)
// Retry an event from DLQ
const success = await publisher.retryFromDLQ('eventId');
// Get DLQ stats
const stats = await publisher.getDLQStats();
console.log('DLQ stats:', stats);
Event Filtering
const options = {
eventFilter: (event) => event.data.userId === '123',
filterKeepAlive: 3600000 // 1 hour
};
publisher.listen('user-action', options).subscribe(event => {
console.log('Filtered user action:', event);
});
Performance Monitoring
// Get metrics for a specific time range
const metrics = await publisher.getMetrics(startTime, endTime);
console.log('Performance metrics:', metrics);
// Get latest metrics
const latestMetrics = await publisher.getLatestMetrics();
console.log('Latest metrics:', latestMetrics);
Prometheus Integration
import { PrometheusAdapter } from '@jetit/publisher';
import promClient from 'prom-client';
import express from 'express';
const app = express();
const prometheusAdapter = new PrometheusAdapter(publisher, promClient);
prometheusAdapter.setupEndpoint(app, '/metrics');
app.listen(3000, () => {
console.log('Metrics server listening on port 3000');
});
Advanced Features
Content-Based Deduplication
The library supports content-based deduplication to ensure that each unique event is processed only once:
const options = {
publishOnceGuarantee: true
};
publisher.listen('important-event', options).subscribe(event => {
console.log('Guaranteed unique event:', event);
});
Multiple Event Subscriptions
You can subscribe to multiple events from the same service:
const subscription1 = publisher.listen('event1').subscribe(/* ... */);
const subscription2 = publisher.listen('event2').subscribe(/* ... */);
Circuit Breaker
The Circuit Breaker pattern is implemented to prevent cascading failures in a distributed system. It helps to gracefully handle failures and allows the system to recover without overwhelming failed services.
Configuration options:
enabled
: Enable or disable the Circuit Breaker.errorThreshold
: Number of errors before opening the circuit.errorThresholdPercentage
: Percentage of errors to total calls before opening the circuit.timeWindow
: Time window for error rate calculation (in milliseconds).openStateDuration
: Duration to keep the circuit open before moving to half-open state (in milliseconds).halfOpenStateMaxAttempts
: Maximum number of attempts allowed in half-open state.
The Circuit Breaker has three states:
- Closed: Normal operation, calls pass through.
- Open: Calls are immediately rejected without reaching the service.
- Half-Open: A limited number of calls are allowed to test if the service has recovered.
Performance Optimizations
- Batched
xdel
operations for improved cleanup performance - Configurable cleanup intervals and thresholds
- Efficient event filtering at the subscription level
- Retry logic with exponential backoff for failed operations
- Circuit Breaker to prevent overwhelming failed services
- Dead Letter Queue (DLQ) for handling subscription failures
Cleanup and Graceful Shutdown
To ensure proper cleanup of resources, implement a graceful shutdown:
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
async function shutdown() {
console.log('Graceful shutdown initiated.');
try {
await publisher.close();
console.log('Resources and connections successfully closed.');
} catch (error) {
console.error('Error during graceful shutdown:', error);
}
process.exit(0);
}
Troubleshooting
If you encounter issues:
- Check the Redis connection settings
- Verify that consumer groups are correctly created
- Monitor the DLQ for failed events
- Review the performance metrics for any anomalies
- Check the logs for detailed error messages