@goparrot/pubsub-event-bus
v5.0.0
Published
NestJS EventBus extension for RabbitMQ PubSub
Downloads
542
Readme
PubSub Event Bus
PubSub Event Bus is built on top of NestJS CQRS module.
It gives the ability to use NestJS Cqrs Module across microservice architecture, using RabbitMQ message broker.
Table of Contents
Installation
First install the required package:
npm install --save @goparrot/pubsub-event-bus
It is highly recommended installing peerDependencies
by yourself.
Import module
Import module & configure it by providing the connection string.
import { CqrsModule } from "@goparrot/pubsub-event-bus";
export const connections: string[] = ["amqp://username:[email protected]/virtualhost"];
@Module({
imports: [CqrsModule.forRoot({ connections })],
})
export class AppModule {}
Full list of the PubSub CQRS Module options:
| Options | Description |
|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| connections | Array of connection strings |
| config | AMQP connection options |
| isGlobal | Should the module be registered as global |
| logger | Logger service to be used |
| connectionName | Name of the connection to be displayed in the server logs and management UI. Final name will have a suffix :producer
or :consumer
depending on the connection purpose |
| retryOptions | Global options for the retry mechanism. Read more in the Retry Mechanism section |
Note: The CqrsModule
class should be imported from @goparrot/pubsub-event-bus
library.
Usage
Create event
Event is a simple class with message payload.
export class StoreCreated implements IEvent {
constructor(private readonly storeId: string) {
}
}
This is a fully compatible event class that can be used with NestJS EventBus.
In order to make it PubSub ready, it should extend the AbstractPubsubEvent
class and be decorated with PubsubEvent
(
both imported from @goparrot/pubsub-event-bus
).
import { AbstractPubsubEvent, PubsubEvent } from "@goparrot/pubsub-event-bus";
export interface IStoreCreatedPayload {
storeId: string;
}
@PubsubEvent({ exchange: "store" })
export class StoreCreated extends AbstractPubsubEvent<IStoreCreatedPayload> {}
Publish event
Inject EventBus
into the service in order to emit the event (imported from @goparrot/pubsub-event-bus
).
import { EventBus } from "@goparrot/pubsub-event-bus";
import { Injectable } from "@nestjs/common";
@Injectable()
class SomeService {
constructor(private readonly eventBus: EventBus) {
}
async doCoolStuff() {
// create item
await this.eventBus.publish(new StoreCreated({ storeId }));
// return item
}
}
Consuming events
Create event handler
Create a simple class which extends AbstractPubsubHandler
and is decorated with PubsubEventHandler
(both imported
from @goparrot/pubsub-event-bus
).
import { AbstractPubsubHandler, PubsubEventHandler } from "@goparrot/pubsub-event-bus";
@PubsubEventHandler(StoreCreated)
export class StoreCreatedHandler extends AbstractPubsubHandler<StoreCreated> {
handle(event: StoreCreated) {
console.log(`[${this.constructor.name}] ->`, event.payload);
}
}
Notice, Unlike regular Cqrs events handlers, PubSub EventHandler uses its own
decorator @PubsubEventHandler(StoreCreated)
@PubsubEventHandler
decorator accepts a list of Events it is listening for, like:
@PubsubEventHandler(StoreCreated, UserCreated)
Implement required methods:
handle
- central point where event payload will come
Register event handler
Register the event handler as provider:
@Module({
providers: [StoreCreatedHandler],
})
export class AppModule {}
Once registered, event handler will start listening for incoming events.
Configuration
Event Configuration
In order to emit an event with extra headers, just call the withOptions({})
method and provide required configuration:
await this.eventBus.publish(
new StoreCreated({ storeId: "storeId" }).withOptions({
persistent: false,
priority: 100,
headers: ["..."],
}),
);
Handler Configuration
PubsubEventHandler
decorator accepts handler options as the last argument. List of available options
| Options | Description |
|---------------------|--------------------------------------------------------------------------------------------------------------|
| autoAck | Event acknowledge mode. Default ALWAYS_ACK
. Read more in the Acknowledge Mode section |
| queue | Custom queue name |
| bindingQueueOptions | Queue binding options from the amqplib
|
| retryOptions | Handler specific retry options. Read more in the Retry Mechanism section |
Acknowledge Mode
By default, library creates queues without automatic delivery acknowledgement, therefore, messages should be acknowledged by the client. There are several acknowledge modes provided by the library:
ALWAYS_ACK
(default)
Positive acknowledge in case of success or failure
ACK_AND_NACK
Automatic positive ack in case of success and automatic negative acknowledge in case of error
NEVER
Acknowledge should be performed manually. Message can be manually positively or negatively acknowledged
using AbstractPubsubHandler.ack
and AbstractPubsubHandler.nack
methods respectively
AUTO_RETRY
Automatic positive ack in case of success and automatic retry attempt in case of error. Read more in the Retry Mechanism section
Retry Mechanism
PubSub Event Bus supports automatic event processing retries with static or dynamic backoff. It can be enabled by
setting acknowledge mode to AUTO_RETRY
. In case of any unhandled error library will publish the event to the delayed
exchange to return it back the queue with a delay.
Retry mechanism can be configured both on module and handler levels. Handler specific options are merged with the module ones.
Available options:
| Options | Description | Default value |
|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------|
| maxRetryAttempts | Maximum number of retry attempts | 3 |
| delay | Delay between retry attempts in milliseconds. Can be a fixed positive number or a function that receives current retry attempt count and returns delay | Math.floor(1000 * Math.exp(retryCount - 1))
|
| strategy | Retry strategy to be used. Read more in the Retry Strategies section | DEAD_LETTER_TTL
|
When number of retry attempts is exceeded handler method onRetryAttemptsExceeded
is called with the event and last
error as arguments. Then message is discarded.
Example:
// app.module.ts
import { CqrsModule, RetryStrategyEnum } from "@goparrot/pubsub-event-bus";
export const connections: string[] = ["amqp://username:[email protected]/virtualhost"];
@Module({
imports: [
CqrsModule.forRoot({
connections,
retryOptions: {
maxRetryAttempts: 5,
delay: (retryCount: number) => retryCount * 1000,
strategy: RetryStrategyEnum.DELAYED_MESSAGE_EXCHANGE,
},
}),
],
})
export class AppModule {}
// store-created.handler.ts
import { AbstractPubsubHandler, PubsubEventHandler, RetryStrategyEnum } from "@goparrot/pubsub-event-bus";
@PubsubEventHandler(StoreCreated, {
autoAck: AutoAckEnum.AUTO_RETRY,
retryOptions: {
maxRetryAttempts: 10,
delay: (retryCount: number) => retryCount ** 2 * 1000,
strategy: RetryStrategyEnum.DEAD_LETTER_TTL,
},
})
export class StoreCreatedHandler extends AbstractPubsubHandler<StoreCreated> {
async handle(event: StoreCreated) {
// process the event
}
async onRetryAttemptsExceeded(event: StoreCreated, error: Error) {
// log the event processing failure
}
}
Retry Strategies
This library provides two different strategies for retry mechanism implementation. The main differences are requirements and performance.
Dead Letter Message and Per-Message TTL Strategy
This strategy has no additional requirements and therefore is the default one.
Library creates several RabbitMQ components:
- Waiting queues, one for each waiting time
- Exchange to route messages to the corresponding waiting queue
- Exchange to route messages back to source queue
Example:
There are two PubSub handlers:
The first one with static delay 1000 ms and 5 maximum retry attempts. Only one queue is required with waiting time 1000
ms.
The second one with delay function 1000*2^x
ms and 3 maximum retry attempts. Several queues are required with waiting
time 1000, 2000 and 4000 ms.
Therefore, library will create 3 queues with 1000, 2000 and 4000 ms waiting time. Queue with waiting time 1000 ms will be used for both handlers.
Delayed Message Exchange Strategy
This strategy requires RabbitMQ Delayed Message Plugin to be installed and enabled on the RabbitMQ server.
Library creates a delayed message exchange to route messages back to the source queue with a set delay.
Known Issues
Several handlers listening to the same event
Problem: When several handlers listen to the same event, each handler receives the same event several times (equal to number of listeners)
Workaround: You can create one "proxy" pub-sub event listener that will listen to the required event. Then there are two options available:
- It will publish a local event with the same content. Then you can create as many event listeners to this local event as you need. Main disadvantage of this approach is that the pub-sub event is acknowledged in this "proxy" event listener. Therefore, if something goes wrong in the actual event listeners, the library won't handle the error.
- It will execute all the required commands. Main disadvantage of this approach is that the pub-sub event acknowledgement is shared to all commands. Therefore, you should handle the double event processing in each command handler.