amqplib-envelop
v1.1.2
Published
A wrapper for working with RabbitMQ using the amqplib npm package
Downloads
250
Maintainers
Readme
A wrapper for working with RabbitMQ using the amqplib npm package
Content
Connection configs
Setup configs
Create connection
Using (server) Using (worker)
Connection configs
// config/default.js
const pkg = require('../package.json');
module.exports = {
rabbitmq: {
protocol: 'amqp:',
hostname: '127.0.0.1',
port: 5672,
user: 'guest',
password: 'guest',
connectionName: `${pkg.name}-${pkg.version}`,
vhost: '',
}
};
Setup configs
// constants/rabbitmq.js
const EXCHANGE_NAME = 'films';
const QUEUE_NAME = 'marvel';
module.exports = {
exchanges: {
films: {
name: EXCHANGE_NAME,
type: 'topic',
options: {
durable: true,
},
},
},
queues: {
marvel: {
name: QUEUE_NAME,
durable: true,
arguments: {
'x-dead-letter-exchange': EXCHANGE_NAME,
'x-dead-letter-routing-key': `${QUEUE_NAME}.dlx`,
'x-queue-type': 'quorum',
'x-delivery-limit': 2,
},
},
marvelDlx: {
name: `${QUEUE_NAME}.dlx`,
durable: true,
arguments: {
'x-dead-letter-exchange': EXCHANGE_NAME,
'x-dead-letter-routing-key': QUEUE_NAME,
'x-message-ttl': 5000,
},
},
},
bindings: {
marvel: {
queue: QUEUE_NAME,
source: EXCHANGE_NAME,
pattern: QUEUE_NAME,
},
smarvelDlx: {
queue: `${QUEUE_NAME}.dlx`,
source: EXCHANGE_NAME,
pattern: `${QUEUE_NAME}.dlx`,
},
},
};
Create connection
// helpers/rabbitmq,js
const RabbitMQ = require('amqplib-envelop');
const config = require('config');
module.exports = async () => RabbitMQ.initAndGetInstance(config.rabbitmq);
Using (server)
// server/index.js
const rabbitMqInstance = require('./helpers/rabbitmq');
const { exchanges, queues, bindings } = require('./constants/rabbitmq');
// chain of promises
async function sendToMarvelQueue(msg) {
const rabbitmq = await rabbitMqInstance();
return rabbitmq.assertExchange(exchanges.films)
.then(() => rabbitmq.assertQueue(queues.marvel))
.then(() => rabbitmq.bindQueue(bindings.marvel))
.then(() => rabbitmq.sendToQueue(queues.marvel.name, msg, { persistent: true }))
.catch((err) => {
console.error('sendToMarvelQueue error: ', err);
throw new Exception(500, err?.message);
});
}
// async/await
async function sendToMarvelQueue(msg) {
try {
const rabbitmq = await rabbitMqInstance();
await rabbitmq.assertExchange(exchanges.films);
await rabbitmq.assertQueue(queues.marvel);
await rabbitmq.bindQueue(bindings.marvel);
await rabbitmq.sendToQueue(queues.marvel.name, msg, { persistent: true });
} catch (err) {
console.error('sendToMarvelQueue error: ', err);
throw new Exception(500, err?.message);
}
}
Using (worker)
// worker/index.js
const RabbitMQ = require('amqplib-envelop');
const rabbitmq = require('./helpers/rabbitmq');
const marvelWorker = require('./marvelWorker');
module.exports = {
start: async () => {
const rabbitInstance = await rabbitmq();
await marvelWorker(rabbitInstance);
},
stop: async () => {
const rabbitInstance = await RabbitMQ.getActiveInstance();
await rabbitInstance.closeConnection();
process.exit(0);
},
};
// worker/marvelWorker.js
const { exchanges, queues, bindings } = require('./constants/rabbitmq');
const consumer = (rabbitmq) => async (msg) => {
const msgObj = rabbitmq.getMsgObj(msg);
// some logic with message
console.log('message from queue: ', msgObj);
// some logic for nack
rabbitmq.nack(msg);
// some logic for ack
rabbitmq.ack(msg);
};
module.exports = async (rabbitmq) => {
await rabbitmq.assertExchange(exchanges.films);
await Promise.all([
rabbitmq.assertQueue(queues.marvel),
rabbitmq.assertQueue(queues.marvelDlx),
]);
await Promise.all([
rabbitmq.bindQueue(bindings.marvel),
rabbitmq.bindQueue(bindings.marvelDlx),
]);
await rabbitmq.consume(queues.marvel, consumer(rabbitmq), {
consumerTag: 'marvelApp:1.1.2'
});
};