exframe-mq
v5.1.2
Published
Messaging framework module
Downloads
5,037
Readme
exframe Messaging Framework Module
A simplified abstraction for RabbitMQ.
Features and Assumptions
- Persistant and shared connections
- Assumes JSON payload
- Default connection settings
Available Topologies
- Publish/Subscribe
Usage
const mq = require("exframe-mq");
const rabbitmq = mq.create({
logger,
url: 'amqp://localhost',
heartbeat: 30,
baseTimeout: 500,
maxAttempts: 5,
responseTimeout: 60 * 1000
});
Configuration Settings
logger
- See winston logger for interface of the logger. A default implementation will write to the stdouturl
- default: 'amqp://localhost' The url for the rabbitmq serverheartbeat
- default: 30 Time in seconds for a heartbeat between the client code and the rabbitmq server. Basically, keeps the connection alive with inactivity particularly if the connection will be closed by external mechanisms like Amazon's ELB.baseTimeout
- default: 500 Time in milliseconds for waiting between connection failuresmaxAttempts
- default: 190 Max number of attempts to try connecting to the rabbitmq server.responseTimeout
- default: 60000 Time in milliseconds for re-establishing a connection with the rabbitmq server after inactivity.
Environment Variable Configuration Settings
MQ_MAX_ATTEMPTS
- default: 190 Max number of attempts to try and connect to the rabbitmq server.MQ_EXIT_ON_FAILURE
- default: false Whether or not the application should fail on FIRST attempt at message queue connection.
Methods
getConnection
Gets the connection used to communicate to the rabbitmq server with.
const connection = await mq.getConnection();
client
Creates a client to the configured rabbitmq server. For subscribing to and publishing messages.
const client = mq.client(options);
Arguments:
options
- optional objecteventing
- optional object The configuration rpc publishing events to the eventing microservice.exchangeName
- string The exchange to publish events to the eventing microserviceroutingKey
- string The routing key to route events to the eventing microservice
Returns:
Client
Client
subscribe
Subscribes to a given exchange and routing key pattern. Will ensure the existence of the exchange and the queue matching on the routing key pattern with the given options.
The subscribe method will take a series of middleware for error handling and message processing. Middleware are expected to return promises that resolve with or without values.
client.subscribe(exchangeName, routingKey, async (context, message) => {
// do something with the message
});
for recieving some extra params in callback use default params default param can be null or empty object or we can set any value
client.subscribe(exchangeName, routingKey, async (context, message, extraParams = {}) => {
// do something with the message
});
Arguments:
exchangeName
- string The exchange to publish the message toroutingKey
- string The routing key for the exchange, governs which subscription to send the message tomessage
- object The message passed from the server. If a context field was part of the message, the contents of the object will have been merged into the context object and the field will be removed from the message....middleware
- One or more message or error handling functions. See middleware functionsoptions
- optional, objectexchangeType
- string, default: 'topic' The type of exchange the message will be published to see rabbitmq documentation. Valid values: 'topic', 'fanout', 'direct'exchangeOptions
- object options for the creation of the exchange see amqplib documentation- Default Options
durable
- boolean, default: true
- Default Options
publishOptions
- object options for the publishing of the message see amqplib documentation- Default Options
mandatory
- boolean, default: true
- Default Options
exclusiveQueue
- boolean, default: false If true, the created queue will be exclusive to that connection / channel. This is generally used for short lived temporary queues that need to handle an ephemeral set of messages. Closing the subscription will close this queue.
publish
Publishes a message to a given exchange and routing key. If the exchange does not exist, will create the exchange with the given options.
await client.publish(context, exchangeName, routingKey, message, options)
Arguments:
context
- See context object- expects
log
- expects
exchangeName
- string The exchange to publish the message toroutingKey
- string The routing key for the exchange, governs which subscription to send the message tomessage
- object The message passed from the server. If a context field was part of the message, the contents of the object will have been merged into the context object and the field will be removed from the message.options
- optional, objectexchangeType
- string, default: 'topic' The type of exchange the message will be published to see rabbitmq documentation. Valid values: 'topic', 'fanout', 'direct'exchangeOptions
- object options for the creation of the exchange see amqplib documentation- Default Options
durable
- boolean, default: true
- Default Options
publishOptions
- object options for the publishing of the message see amqplib documentation- Default Options
mandatory
- boolean, default: true
- Default Options
rpc
RPCs to a given exchange and routing key. If the exchange does not exist, will create the exchange with the given options. Will resolve or reject with the response from the subscribed client. If the subscribing service takes too long, rpc will reject with a timeout error.
const response = await client.rpc(context, exchangeName, routingKey, message, options)
Arguments:
context
- See context object- expects
log
- expects
exchangeName
- string The exchange to publish the message toroutingKey
- string The routing key for the exchange, governs which subscription to send the message tomessage
- object The message passed from the server. If a context field was part of the message, the contents of the object will have been merged into the context object and the field will be removed from the message.options
- optional, objectexchangeType
- string, default: 'topic' The type of exchange the message will be published to see rabbitmq documentation. Valid values: 'topic', 'fanout', 'direct'exchangeOptions
- object options for the creation of the exchange see amqplib documentation- Default Options
durable
- boolean, default: true
- Default Options
publishOptions
- object options for the publishing of the message see amqplib documentation- Default Options
mandatory
- boolean, default: true
- Default Options
timeout
- integer, default: 60000 Time to wait for a response from the server.
Returns:
Promise<Response>
- object The response from server
event
RPC an event to the eventing microservice, which adds the event to an event log and then publishes the event for consumption.
const response = await client.event(context, documentId, routingKey, message, options)
Arguments:
context
- See context object- expects
log
- expects
documentId
- string The document id associated with the eventroutingKey
- string The routing key subscribers for the event to be routed tomessage
- object The event data to pass to the consumer of the event.options
- optional, objectexchangeType
- string, default: 'topic' The type of exchange the message will be published to see rabbitmq documentation. Valid values: 'topic', 'fanout', 'direct'exchangeOptions
- object options for the creation of the exchange see amqplib documentation- Default Options
durable
- boolean, default: true
- Default Options
publishOptions
- object options for the publishing of the message see amqplib documentation- Default Options
mandatory
- boolean, default: true
- Default Options
timeout
- integer, default: 60000 Time to wait for a response from the server.
Returns:
Promise<Response>
- object The response from server
use
Sets up global middleware for subscriptions. Middleware are expected to return promises that resolve with or without values.
client.use(async (context, message) => {
context.user = await authorizeUser(message.token);
});
client.use(async (error, context, message) => {
context.log.error('error occurred', { errorData: error });
});
Arguments:
...middleware
- One or more message or error handling functions. See middleware functions
Context Objects
Context objects are used to pass information from middleware to middleware. Just add additional fields to the context object as necessary. The context object is pre-populated upon the handling of a message or passed in by the calling code.
Subscription Generated Fields
requestId
- string This is the request id associated with message. If a request id is not passed with the message context a request id will be generated, e.g., A1B2C3E4log
- object A logger that will take the interface of the winston logger. If a logger was not passed a default logger will be used that outputs to the stdout.
Middleware Functions
There are two types of middleware used by subscriptions: an error handling function that takes 3 arguments and a message processing function that takes 2.
To finish processing and the current middleware function is not the last in the chain of middleware. Set a done flag to the context object context.done = true
and resolve the returning promise.
If the function resolves with an object, the object is returned to the calling client code if available.
Message Handling
context
- See context objectmessage
- object The message passed to the subscribing middleware- Returns
Promise<Any>
Error Handling
error
- The error that has occurred either on the subscription or during message processingcontext
- See context objectmessage
- object The message passed to the subscribing middleware (If available, may be null/undefined)- Returns
Promise<Any>