maestro-rabbit
v1.0.9
Published
(changed name to http-rabbitmq) Library to seamlessly handle connections with RabbitMQ
Downloads
3
Maintainers
Readme
Hapi Rabbitmq Library
An awesome RabbitMQ Library for the Hapi ecosystem to work seamlessly with rabbit queues as if they were regular HTTP calls.
Lead Maintainer: webnator
This library has changed name and is no longer mantained please go to
Introduction
The idea behind this library is to provide a simple and seamless interface to integrate a HapiJS API project with RabbitMQ based queues.
You won't need to change much in your current API implementation to make it work as an event listener from the Queue, and you can also have dual HTTP/Queue endpoints.
This library also works very well with Maestro A very cool workflow manager that you should also check out now that you're here.
Getting started
Install with
npm i maestro-rabbit
Publish only initialization
For the simplest implementation you can call the init method with your RabbitMQ connection parameters and this will initialize the connection and return the instance.
const RabbitQueue = require('maestro-rabbit');
const queueConfig = {
host: 'localhost',
port: '5672',
user: 'rabb1tAdm1n',
pass: '123',
vhost: '/',
exchange: 'rabbit_exchange',
errorQueue: 'error',
errorTopic: 'error.test',
maxRetries: 3,
reconnectionTime: 2000
}
const myConnection = await RabbitQueue.init({ queueConfig });
Initialization registering handlers
In order to consume the queue in real time, we can set handlers that listen to an specific topic and perform actions with them.
To configure these listeners we need to create a routes file first:
//queueRoutes-1.js
'use strict';
const testController = require('./controllers/testController');
module.exports = function(server) {
server.route({
topic: 'mytopic.test',
handler: testController.handleTopic
});
};
We can have as many routes file as we want, splitting for module or functionality. After created, we just need to group all of them into a single file:
// queueRouter.js
'use strict';
module.exports = function(queueRouter) {
require('./queueRoutes-1')(queueRouter);
require('./queueRoutes-2')(queueRouter);
require('./queueRoutes-3')(queueRouter);
};
Then, instead of initializing as we showed before. We need to pass the routes file as a parameter to the init function.
const queueRouter = require('./queueRouter');
const myConnection = await RabbitQueue.init({ queueConfig, routes: queueRouter });
That's it!
After initialization, we can start working with the methods in the queue instance.
Consuming the queue
If we take a look at the example above, we can see that we set up a handler for the topic mytopic.test and we said that the handler would be testController.handleTopic
So, inside of our testController file, we need to have a function handleTopic that should have more or less the following structure
function handleTopic(request, reply) {
return reply(request.payload).code(200);
}
Queue responses work in the following way:
- 2XX responses: Ack the message in the queue
- 4XX responses: Ack the message in the queue, but also publishes the message to the error queue and topic we defined when we initialized the library
- 5XX responses: Ack the message in the queue, but re-queues the message again in order to be consumed. It only retries X times as defined in the maxRetries properties of the configuration
- Any other: Ack the message in the queue, but also publishes the message to the error queue and topic we defined when we initialized the library
Publishing to the queue
In order to publish into the queue we can use 2 functions:
- publish({key, msg})
- publishHTTP(key, { payload, headers, query, params, traceId })
- publishDelayedHTTP(key, { payload, headers, query, params, traceId }, delay)
- queueManager()
publishHTTP is an implementation of the publish method, that wraps and stringifies the received parameters, and afterwards calls the publish function.
.publishHTTP(String key, { Object payload, Object headers, Object query, Object params, String traceId })
Publishes an stringified HTTP request into the topic.
Parameters:
- key: The routing ke (topic) where the message will be published
- payload: The HTTP payload in JSON format
- headers: The HTTP headers in JSON format
- query: The HTTP query in JSON format
- params: The HTTP params in JSON format
- [traceId]: The trace id for the request. Optional: For logging purposes
Returns:
- A promise
.publishDelayedHTTP(String key, { Object payload, Object headers, Object query, Object params, String traceId }, Int delay)
Same as below, but RabbitMQ queues that implement the delayed plugin.
If your rabbit doesn't have this plugin installed, please don't use this function. It will fail!
Parameters:
- key: The routing ke (topic) where the message will be published
- payload: The HTTP payload in JSON format
- headers: The HTTP headers in JSON format
- query: The HTTP query in JSON format
- params: The HTTP params in JSON format
- [traceId]: The trace id for the request. Optional: For logging purposes
- delay: The delay period in milliseconds
Returns:
- A promise
.publish({String key, String message})
Publishes an string into the topic.
Parameters:
- key: The routing key (topic) where the message will be published
- message: The message that we want to publish into the queue
Returns:
- A promise
.queueManager()
Returns the instance of the queue manager to work with its inner functions
Configuration
.init({ Object queueConfig[, Object extraOptions] [, Function routes]})
The library initialization method, it accepts the following parameters:
- queueConfig: The queue configuration, accepts the following paramenters:
- host: The host to connect to required,
- port: The port to connect to required,
- user: The user to authenticate in the queue required,
- pass: The password to authenticate in the queue required,
- vhost: The rabbitMQ vhost (Usually '/') required,
- exchange: The rabbitMQ exchange required,
- errorQueue: The queue where to output the errors for unprocessed incoming calls (Usually 'test-error') required,
- errorTopic: The topic (routing key) where to output the error (Usually error.MICROSERVICE_NAME) required,
- prefetch: Defaults to 1,
- delaySeconds: Defaults to 3000,
- reconnectionTime: Defaults to 2000,
- maxRetries: Defaults to 5,
- timeBetweenRetries: _Defaults to 3000
- extraOptions: Object that Allows to set some extra options for the queue configuration, all parameters are optional
- exchangeOptions: The settings to assert the rabbitMQ exchange More Options
- durable: Defaults to true
- queueOptions: The settings to assert the rabbitMQ queue More Options
- noAck: Defaults to false
- publisherOptions: The settings to publish into the rabbitMQ queue More Options
- persistent: Defaults to true
- retryOptions:
- retries: The maximum times that the retry policy will try to send a message to the queue if there's an error Defaults to 10
- time: The interval that the retry policy will use when trying to re-send a message to the queue if there's an error. In milliseconds Defaults to 1000
- exchangeOptions: The settings to assert the rabbitMQ exchange More Options
- routes: The router file as described above.