adonis-rabbitmq
v0.1.7
Published
AdonisJS RabbitMQ provider
Downloads
3
Readme
adonis-rabbit
adonis-rabbit
is a RabbitMQ provider for Adonis.
Getting Started
Instal adonis-rabbit
:
yarn add adonis-rabbit
Then:
node ace invoke adonis-rabbit
This will create config/rabbit.ts
and add the following fields to your .env
:
RABBITMQ_HOSTNAME=
RABBITMQ_USER=
RABBOTMQ_PASSWORD=
RABBITMQ_PORT=
RABBITMQ_PROTOCOL= "amqp://" //or ampqs
Make sure to set the correct values to the enviroment variables so adonis-rabbit
can connect.
Basic Usage
Sending messages to an queue
import Rabbit from '@ioc:Adonis/Addons/Rabbit'
import Route from '@ioc:Adonis/Core/Route'
Route.get('/', async () => {
// Ensures the queue exists
await Rabbit.assertQueue('my_queue')
// Sends a message to the queue
await Rabbit.sendToQueue('my_queue', 'This message was sent by adonis-rabbit')
})
Subscribing
Notice doesn't really makes sense to subscribe to an queue inside a controller, usually this is done through a preload file.
Creating a preload file
- In the CLI, type:
node ace make:prldfile rabbit
- Select
( ) During HTTP server
This is will create start/rabbit.ts
.
Listening to an queue
Inside start/rabbit.ts
:
import Rabbit from '@ioc:Adonis/Addons/Rabbit'
async function listen() {
await Rabbit.assertQueue('my_queue')
await Rabbit.consumeFrom('my_queue', (message) => {
console.log(message.content)
})
}
listen()
This will log every message sent to my queue my_queue
.
Documentation
RabbitMQ Manager
Import
import Rabbit from '@ioc:Adonis/Addons/Rabbit'
assertQueue()
await Rabbit.assertQueue('myQueue')
Assert the queue is created.
Parameters:
queueName
: the name of the queueoptions?
: the queue options
assertExchange()
await Rabbit.assertExchange('myQueue', 'type')
Assert the exchange is created.
Parameters:
queueName
: the name of the queuetype
: the type of the exchangeoptions?
: the queue options
bindQueue()
await Rabbit.bindQueue('myQueue', 'myExchange', '')
Binds a queue and an exchange .
queueName
: the name of the queueexchangeName
: the name of the exchangepattern?
: the pattern (default to''
)
sendToQueue()
await Rabbit.sendToQueue('myQueue', 'content')
Parameters:
queueName
: the name of the queuecontent
: the content to be send to the queueoptions
: the options
Notice that the content
parameter don't need to be a Buffer, Adonis RabbitMQ will automatically convert it to a Buffer if it isn't already.
You also don't have to JSON.stringify
an object, Adonis RabbitMQ will also do that for you (it'll be transformed to JSON then to Buffer).
sendToExchange()
await Rabbit.sendToExchange('myExchange', 'myRoutingKey', 'content')
Parameters:
exchangeName
: the name of the exchangeroutingKey
: the routing keycontent
: the content to send to the exchangeoptions
: the options
Notice that the content
parameter doesn't need to be a Buffer, Adonis RabbitMQ will automatically convert it to a Buffer if it is'nt already.
You also don't have to JSON.stringify
an object, Adonis RabbitMQ will also do that for you (it'll be transformed to JSON then to Buffer).
consumeFrom()
await Rabbit.consumeFrom('myQueue', (message) => {
console.log(message.content)
message.ack()
})
Consumes a message from a queue.
queueName
: the name of the queueonMessage
the callback which will be executed on the message receive.
The onMessage
callback receives a Message
instance as parameter.
await ackAll()
await Rabbit.ackAll()
Acknowledges all the messages.
await nackAll()
await Rabbit.nackAll()
Rejects all the messages.
Parameters:
requeue?
adds the rejected messages to queue again.
getConnection()
Retrieves the amqplib's Connection instance. If there`s not a connection, it'll be created.
await Rabbit.getConnection()
getConnection()
Retrieves the amqplib's Connection instance. If there`s not a connection, it'll be created.
await Rabbit.getConnection()
getChannel()
Retrieves the amqplib's Channel instance. If there's not a connection, it'll be created. If there`s not a channel, it'll be created too.
await Rabbit.getChannel()
closeChannel()
Closes the channel.
closeConnection()
Closes the connection.
Message
When consuming messages through consumeFrom
, you'll receive in the callback a Message instance.
This slightly different from amqplib approach. For example:
Rabbit.consumeFrom('queue', (message) => {
// Acknowledges the message
message.ack()
// Rejects the message
message.reject()
// The message content
console.log(message.content)
// If you're expecting a JSON, this will return the parsed message
console.log(message.jsonContent)
})
content
message.content
Returns the message content.
jsonContent
message.jsonContent
If the message is expected to be in JSON format, then you can use message.jsonContent
to get the message parsed as an object.
fields
message.fields
The message fields.
properties
message.properties
The message properties.
ack()
message.ack()
Acknowledges the message.
allUpTo?
acknowledges all the messages up to this.
nack()
message.nack()
Rejects the message.
Parameters:
allUpTo?
rejects all the messages up to this.requeue?
adds the rejected messages to Queue again.
reject()
message.nack()
Rejects the message, equivalent to nack
, but works in older versions of RabbitMQ where nack
does not.
Parameters:
requeue?
adds the rejected messages to Queue again.
Roadmap
- [ ] Add SSL options in
config/rabbit.ts
- [ ] Tests