sudo-queue-package
v1.2.2
Published
Queuing system for Sudo Applications
Downloads
12
Readme
Sudo Queue Publishing and Subscribing Guide
Description
Sudo Queue package allows you to connect with sudo queue microservice and also enables Publishing of messages via Sudo Queue Microservice.
Services
Node version 16+ RabbitMq running on Amazon MQ MongoDB
Installation
$ npm install sudo-queue @golevelup/nestjs-rabbitmq
Installing Sudo Queue package on you application will allow you communicate with the Sudo Queue microservice. It will allow ease of publishing, subscribing and acknowledging messages as required
Usage
Publishing Message
import * as SudoQueue from 'sudo-queue-package';
this.sudoQueue = new SudoQueue(
exchange,
key,
rabbitMQUrl,
mongoDbUrl,
);
const resp = this.sudoQueue.createMessage(
message,
'platform_name',
'application_name',
'transactionId'
messageDelay,
delayDateTime,
);
Exchange
- This is the Exchange that you want to publish message to, make sure exchange name is uniquekey
- The key is used to perform a simple matching between the routing key and the binding key defined for each queuerabbitMQUrl
- URL provided to connect to the RabbitMQ ServermongoDBUrl
- Mongo DB url that allows connection to mongodb for the storage of correlationIdmessage
- This is the message that is to be sent to the subscriber. It should be a json string.platform_name
- This will contain the name of platform of your message, it should be decided by youapplication_name
- use to identify the application which the message is coming frommessageDelay
- This is boolean, when true, message will not be queue immediately but will wait till the time set in delayDateTimedelayDateTime
- this is the time we want delayed message to be sent to the queue for subscriber
Subscribing to Message
import * as SudoQueue from 'sudo-queue-package';
import { Nack, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
export class SubscribeService {
@RabbitSubscribe({
exchange: 'exchange_name',
routingKey: 'key',
queue: 'term-loan-queue',
})
public async debitTermLoan(msg: any) {
console.log(`Received message for Term Loan: ${JSON.stringify(msg)}`);
const transaction = JSON.parse(msg.message);
const correlationId = msg.correlationId;
try {
// Check for running process on the correlation
const correlationExist = await this.sudoQueue.checkCorrelation(msg);
if (correlationExist.status === 'Processing') {
//This is new Correlation, Write your logic
return Nack()
}
else if (correlationExist.status === 'Closed') {
//This is new Correlation, Write your logic
await this.sudoQueue.closeMessage(correlationId)
return Nack()
}
else if (correlationExist.status === 'Renew') {
//This is a Correlation that allows retry, Write your logic
}
else if (correlationExist.status === 'New') {
//This is new Correlation, Write your logic
}
// Implement your own action on transaction
await this.sudoQueue.closeCorrelation(correlationId)
await this.sudoQueue.closeMessage(correlationId)
return new Nack()
}
catch (error){
console.log(error)
await this.sudoQueue.closeCorrelation(correlationId)
return new Nack();
}
}
}
Explanation
@RabbitSubscribe({
exchange: 'exchange_name',
routingKey: 'key',
queue: 'term-loan-queue',
})
This is Rabbit Subscription decorator that allows message to be received from the queue bind by the routingKey set when creating the message
const correlationExist = await this.sudoQueue.checkCorrelation(msg);
Message received should be pass to checkCorrelation method to confirm if message has been received earlier and is till been processed. It will return; Processing - message has been received earlier and still processing Renew - message has been received earlier but initial process completed New - this is the first time message is being received. Closed - Message has earlier been treated successfully
await this.sudoQueue.closeCorrelation(correlationId)
This method is used to close correlation which means you have completed working on it and would not want to retry
await this.sudoQueue.allowRetryCorrelation(correlationId)
This method is use to end processing correlation with allowing it to retry. This can be used when the function being perform failed and would allow retry. Note do not call close message on this.
await this.sudoQueue.closeMessage(correlationId)
This method allows you to close the entire message on the queue server, the sudo queue server will no longer queue the message.
return new Nack();
This is use to Acknowledge receiving message on the rabbitMQ so that it removes it. It should be the return message of all listener.
Stay in touch
- Author - Michael Ojo
License
Sudo Queue is Sudo Africa.