easy-rmq
v1.1.0
Published
RabbitMQ (amqp) for no-brainer.
Downloads
3
Readme
Easy RabbitMQ
Wanna let your software to series of jobs? You don't want to use extensive and complicated complex queueing system. Easy RabbitMQ provide simple and easy use of RabbitMQ for you.
Project site: [https://github.com/huferry/easy-rmq]
Quick Start
First, install the package:
npm i -s easy-rmq
Connect to the queue as simple as:
const conn = await required('easy-rmq').connect({
user: 'guest',
password: 'secret',
host: '172.134.1.25'
})
const queue = await conn.queue('my-queue')
and publish your message just like this:
queue.publish({
from: 'Somebody',
msg: 'Hello, World!'
})
or if you're from the consuming side:
queue.subscribe(payload => {
console.log(payload)
// this will print:
// { "from": "Somebody", "msg": "Hello, World!" }
})
But, if you're outside a non-async function:
required('easy-rmq').connect({
user: 'guest',
password: 'secret',
host: '172.134.1.25'
})
.then(conn => conn.queue('my-queue'))
.then(queue => {
// publish message
queue.publish({msg: 'Hello, World!'})
// consume message
queue.subscribe(payload => {
console.log(payload)
})
})
Documentation
- Connecting to Server
- Access to The Queue
- Publish A Message
- Subscribing for Messages 4.1. Handler Function 4.2. Error Handling
1. Connecting to Server
The module is exporting a single async function connect
that will take an object containing properties to connect to the amqp server.
require('easy-rmq').connect({
user: 'guest', // mandatory
password: 'guest', // mandatory
host: '127.0.0.1', // mandatory
port: '5672' // optional, 5672 by default
})
Note that this function is asynchronous and will return a Promise object that returns one function queue
to access the queue.
2. Access to The Queue
Accessing to the queue can be made by the the queue(queueName)
function.
require('easy-rmq')
.connect({ /* fill in with your connection properties */ })
.then(conn => conn.queue('my-queue'))
Note that you don't have to create the queue on the server. We are basically doing a code-first queueing process. In the example above, the queue 'my-queue' will be created automatically on the server.
This queue
function returns (a Promise to-) an object containing functions to publish
a message and to subscribe
for messages from the queue.
3. Publish A Message
3.1. Simple Message
The publish
function takes the payload
of the queue message and this can be any JSON serializable object. The payload will be carried onto the queue message and delivered by the subscribing function.
require('easy-rmq')
.connect({ /* fill in with your connection properties */ })
.then(conn => conn.queue('my-queue'))
.then(queue => {
// payload can be any JSON serializable object
const payload = {
time: new Date(),
text: 'please send the goods in 2 weeks',
importance: 5
}
queue.publish(payload)
})
3.2. Propagate to Multiple Messages
Basically the payload
argument corresponds with a single object. In case the payload
is an array then the it will be split into multiple messages and every single object in the array will be a payload
on its own, resulting in multiple queue entries.
Example:
require('easy-rmq')
.connect({ /* fill in with your connection properties */ })
.then(conn => conn.queue('my-queue'))
.then(queue => {
// payload as array will be propagated
// as individuals messages
queue.publish([
{ id: 'xxx' },
{ id: 'yyy' }
])
// first message payload (aded by program):
// [ { "id": "xxx" }, { "id": "yyy" }]
// second message payload (automatically added):
// { "id": "xxx" }
// third message payload (automatically added):
/// { "id": "yyy" }
})
4. Subscribe for Messages
The last part of this module is to subscribe for the queue messages (and then do some processing). From the queue
object, use the subscribe
function. This will take in as arguments the handler
function and the onError
function.
require('easy-rmq')
.connect({ /* fill in with your connection properties */ })
.then(conn => conn.queue('my-queue'))
.then(queue => {
queue.subscribe(handler, onError)
})
4.1. The handler
Function
The handler
function is given the payload
object as argument (see example).
Example:
require('easy-rmq')
.connect({ /* fill in with your connection properties */ })
.then(conn => conn.queue('my-queue'))
.then(queue => {
queue.subscribe(
payload => handler(payload), // the handler function
(error, requeue) => handleError(error, requeue)) // the onError function
})
function handler(payload) {
// Do any processing needed here!
// Any error thrown within this processing,
// will be forwarded to the error handling function.
console.log(payload)
}
function handleError(error, requeue) {
// error is of the Javascript class Error.
// Depending on the type/kind of error you can
// decide to requeue the message.
// for example:
if (error.message === 'server is still starting') {
const timeoutInMs = 5000
// timeout is optional, it is 1000 milliseconds by default.
requeue(timeoutInMs)
}
}
4.2. Error Handling
Any error thrown in the processing will triggers the error handler (see example in previous section). The error handling function, which is provided by the user, will be given 2 arguments: the error
and a requeue
function. In case that the user decided that he wants to retry the processing then he can invoke the requeue
function. The user can set a delay in milliseconds to this function. If no delay is provided it will default to 1 seconds.