bhb-amqp-connection-manager
v0.0.8
Published
Auto-reconnect and round robin support for amqplib.
Downloads
5
Maintainers
Readme
bhb-amqp-connection-manager
安装
npm install --save amqplib [email protected]
注意:该组件在不断完善中,可能引入比较多不兼容的内容,请锁死版本,按需升级
简介
大头兄弟amqp链接管理 基于amqp-connection-manager 包的功能,做部分业务扩展
需要了解的使用教程 包含
发送信息示例
const rabbitMQ = require('bhb-amqp-connection-manager');
const connection = rabbitMQ.connect(process.env.APP_MQ_URL)
.on('connect', () => console.log('Connected!'))
.on('disconnect', params => console.log('Disconnected.', params.err.stack));
const channel = connection.createChannel()
.addSetup(async function(channel) {
// `channel` here is a regular amqplib `ConfirmChannel`.
const prefetch = parseInt(process.env.prefetch) || 1;
await channel.assertQueue('hello', {durable: false});
await channel.prefetch(prefetch);
});
(async function() {
await channel.sendToQueue('hello', {num: 1});
await channel.publish('', 'hello', {num: 10001});
await connection.close();
})();
消费消息示例
if (require('cluster').isMaster) {
return require('../util/master');
}
const bluebird = require('bluebird');
const commonUtil = require('../util/common');
const rabbitMQ = require('bhb-amqp-connection-manager');
const connection = rabbitMQ.connect(process.env.APP_MQ_URL);
const channel = connection.createChannel()
.addSetup(async function(channel) {
// `channel` here is a regular amqplib `ConfirmChannel`.
const prefetch = parseInt(process.env.prefetch) || 5;
await channel.assertQueue('hello', {durable: false});
await channel.prefetch(prefetch);
await channel.consumerQueue('hello', async function({num}) {
console.log(`start ${num}`);
await bluebird.delay(2000);
console.log(`end ${num}`);
});
});
commonUtil.bindGraceExit(async function() {
await channel.cancelConsumers();
await channel.waitMessageEmpty();
await connection.close();
});
版本变更
- v0.0.3 支持node 4.0版本(babel编译)
- v0.0.4 移除babel编译,只支持node 6>0
- v0.0.5 修复json解析错误未捕获的bug。默认忽略推入数据json格式有误的数据,并通过日志输出提醒.
- v0.0.6 修复当连接后未执行完setup的情况下,channel已经失去连接导致忽略处理,并返回null的bug。改为直接抛出一个错误,中断后续没必要的执行。 引入一个不兼容的默认值处理:当queue声明为durable的时候,如果没设置x-queue-mode的情况下,将默认设置x-queue-mode=lazy, 如果需要自定义,需要明确声明该配置,否则assertQueue时导致配置不一致会抛出错误。
- v0.0.7 添加 consumerQueueUseRetry 便捷方法 相比 consumerQueue增加一个option参数 option.count (可选) 设置重试次数 option.failureQueue (可选) 设置失败后推入的队列,如果没设置,则自动创建一个队列,名字为:failure.${queueName} ,持久缓存 option.delay (可选) 设置重试频率的函数 ,设置该值时 count 不生效。 当count和delay都不设置的时候,使用amqplib-retry 的默认行为
(attempts) => {
const delay = Math.pow(2, attempts)
if (delay > 60 * 60 * 24) {
// the delay for the message is longer than 24 hours. Fail the message and never retry again.
return -1
}
return delay * 1000
}
v0.0.8 改写mq默认的失败重试机制
新增setConfig方法,目前可配置项:
{ DINGDING_HOST:'',//失败钉钉通知 DEAD_LETTER_TTLs:[4,20,100]//失败重试间隔 单位 秒 }