@rjavlonn/custom-rabbit
v1.0.1
Published
description
Readme
NestRabbit
Installation
npm install --save @rjavlonn/custom-rabbit
Usage
Initialisation
NestRabbit is based on AMQPConnectionManager.
To have a detailed understanding about how to configure the library feel free to check it out.
...
@Module({
imports: [
NestRabbitModule
],
})
export class ApplicationModule {}
}
The NestRabbitModule being Global, you need to declare it only once in your application module.
Configuration
To configure the exchanges/queues you need to register your configuration under the NestRabbitConfigurationNamespace.
export default registerAs(NestRabbitConfigurationNamespace, (): NRModuleConfiguration => ({
urls: [
...
],
auto: true,
logging: true,
exchanges: [
...
],
queues: [
...
]
});
Configurations options :
| Configuration | Type | Description | | ------------- | ------------- | ----- | | urls | Array | broker uri to connect to, if one ore more are down, the library will try to connect to at least one. | | auto | boolean | when a consumer is invoked, will ack after the call is done or nack if an exception is raised during the call | | logging | boolean | will automatically log both the notification and the error (if raised during the call) | | exchanges | Array | list of exchanges to assert before starting. if an assertion fails, stop the service. | queues | Array | list of queues to assert before starting. if an assertion fails, stop the service. | options | AmqpConnectionManagerOptions | options related to the root library (AmqpConnectionManager)
For the queues
configuration, the name property is really important. This is the key used to bind your consumers to the configuration.
Subscribing
NestRabbit use @Consume()
decorator to register callbacks for specific queues.
@Controller()
export class FooController {
constructor(
private readonly fooService: FooService,
) {}
@Consume('some.queue')
public async onEvent() {
console.log('A message has been posted on this queue!');
}
...
Important 🚨 :️ Due to the Nest lifecycle, NestRabbit check all methods of all registered controllers.
Do not forget to decorate your class with the @Controller() annotation and register as any HTTP controller.
@Controller()
export class FooConsumer {
private readonly logger: Logger;
constructor(
private readonly medicalDocumentService: MedicalDocumentService,
) {
this.logger = new Logger(this.constructor.name);
}
@Consume('edge-monitoring.document_approved.queue')
@UsePipes(new ValidationPipe({
transform: true,
whitelist: true,
}))
public async onDocumentApproved(
@Context() context: BrokerContext,
@Payload() payload: DocumentMessageDTO,
): Promise<void> {
...
}
Configuration
The configuration is a mandatory step to :
- connect to the broker
- configure the setup of your queues
- configure the setup of your exchanges
Behind the scene NestRabbit uses
@nestjs/config
meaning the configuration is also compliant with
.env configuration system.
To understand how to properly configure your queues and service, feel free to check the documentation of amqplib or rabbitMQ.
export default registerAs(NestRabbitConfigurationNamespace, (): INRModuleConfiguration => ({
urls: [
process.env.RABBIT_MQ_URI || `amqp://cied:cied@localhost:5672`,
],
exchanges: [
{
name: 'demo-service.exchange',
type: 'direct',
options: {
durable: true,
internal: false,
autoDelete: false,
arguments: [],
},
},
],
queues: [
{
name: 'some.queue',
prefetch: 100,
options: {
deadLetterRoutingKey: 'some.error.route',
deadLetterExchange: 'demo-service.exchange',
},
consumption: {
noAck: false,
},
binding: {
exchange: process.env.RABBIT_CORE_TOPIC_EXCHANGE || 'main.exchange',
routingKey: 'some.foo.route',
},
},
{
name: 'some.errors.queue',
binding: {
exchange: 'demo-service.exchange',
routingKey: 'some.error.route',
},
},
],
}));
Acknoledgement
In some context you might want to do manual acknowledgment, to do so you can use the @Context()
param decorator.
....
@Consume('some.other.queue')
public async onEvent(@Context() context: BrokerContext) {
console.log('A message has been posted on this queue!');
context.ack();
}
BrokerContext API
The BrokerContext exposes methods to Ack or Nack messages the same way the the AMQP Channel. Feel free to check the JSDoc of the classe directly from your favorite IDE.
Message Content
By default the message contains lots of information. You can access the raw message from the context
with the method context.getRawMessage()
;
If you want to map the content directly with a class the same way an HTTP Controller does with the @Body()
decorator
you can use the @Payload()
param decorator. This decorator will try to JSON.parse()
the content of your message.
Transformation/Validation
As for HTTP Controllers the NestRabbit analyses the Pipes decorators, meaning you directly validate and instantiate your payload with a ValidationPipe.
@Consume('test.queue')
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
public async onEvent(
@Context() context: BrokerContext,
@Payload() payload: Foo,
) {
...
}