@usefultools/rabbit-mq
v4.0.1
Published
A RabbitMq client that's just betterer
Downloads
4
Readme
RabbitMq
A RabbitMq client that's just betterer.
Prereqs & Install
- Node >=9.10.0
- npm >=6.1.0
Please note that the TypeScript target is ES6.
npm install @usefultools/rabbit-mq
npm install @types/amqplib --save-dev # for TypeScript projects
Usage
1) Create a Subscriber ⬇️
import { RabbitMq } from "@usefultools/rabbit-mq"
async function setupSubsriber(): Promise<RabbitMq> {
const subscriber = new RabbitMq({
url,
onConnectionError: (error) => log.error("Subscriber error", error),
onConnectionClose: () => log.error("Subscriber connection closed"),
appId,
name: "subscriber",
log: console
})
await subscriber.assertChannel()
return subscriber
}
export default setupSubsriber
2) Create a Publisher ⬆️
import { RabbitMq } from "@usefultools/rabbit-mq"
async function setupPublisher(): Promise<RabbitMq> {
const publisher = new RabbitMq({
url,
onConnectionError: (error) => log.error("Publisher error", error),
onConnectionClose: () => log.error("Publisher connection closed"),
appId,
name: "publisher",
log: console
})
const opts = {
exchange: "logs",
type: ExchangeType.Topic,
bindings: [
{ queue: "logs_critical", routingKey: "*.critical", dlq: "dead_letter_queue", isDurable: true },
{ queue: "logs_all", routingKey: "#", dlq: "dead_letter_queue", isDurable: true },
],
}
await publisher.assertExchange(opts.exchange, opts.type, { durable: true })
const bindings = opts.bindings.map(async ({ queue, routingKey, dlq, isDurable }) => {
await publisher.assertQueue(queue, {
durable: isDurable,
arguments: {
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": dlq,
},
})
return publisher.bindQueue(queue, opts.exchange, routingKey)
})
await Promise.all(bindings)
return publisher
}
export default setupPublisher
3) Set up both the publisher and the subscriber upon service start 🔌
async function init(): Promise<void | never> {
try {
log.info(`Starting ${appId}...`)
const [subscriber, publisher] = await Promise.all([
setupSubsriber(),
setupPublisher()
])
await receive({ subscriber, publisher })
log.info(`Started ${appId}...`)
} catch (error) {
log.error(`Failed to start ${appId}...`, error)
return process.exit(1)
}
}
4) Receive and process messages 😎
async function receive({ subscriber, publisher }): Promise<void> {
subscriber.subscribe("requests", (msg: Message, channel: ConfirmChannel) => {
const ctx = buildContext(msg, publisher)
switch (msg.properties.type) {
case "request_foo":
return onRequestFoo(msg, channel, ctx)
case "request_bar":
return onRequestBar(msg, channel, ctx)
default:
// do not requeue, this will go straight to dlq
return channel.reject(msg, false)
}
})
}
Contributing
If you have comments, complaints, or ideas for improvements, feel free to open an issue or a pull request! See Contributing guide for details about project setup, testing, etc.
Author and license
This library was created by @LITCHI.IO. Main author and maintainer is Slavo Vojacek.
Contributors: Slavo Vojacek
@usefultools/rabbit-mq
is available under the ISC license. See the LICENSE file for more info.