@hongfangze/mq
v0.1.0
Published
comm.mq
Downloads
9
Readme
@hongfangze/mq 消息队列
介绍
消息队列的使用,目前支持:RabbitMQ,后期增加其他MQ的支持。
开始使用:
npm install @hongfangze/mq
import { AMQP } from '@hongfangze/mq';
import { RabbitMQOptions } from '@hongfangze/mq/OptionsDefine';
RabbirMQ API
/**
* 发布一个数据到指定的队列
* @param {string} queueName 队列名称
* @param {string} msg 需要添加到队列的数据
* @return {*} {Promise<void>}
* @memberof IRabbitMQ
*/
publish(queueName: string, msg: string): Promise<void>;
/**
* 订阅一个指定的队列
* @param {Function} handle 一个async函数,参数为当前的消息
* @param {string} queueName 队列名称
* @return {*} {Promise<string>} 一个GUID标识符,用于取消订阅
* @memberof IRabbitMQ
*/
subscription(handle: Function, queueName: string): Promise<string>;
/**
* 取消订阅一个指定的队列
* @param {string} uuid 一个GUID标识符
* @return {*} {Promise<void>}
* @memberof IRabbitMQ
*/
unsubscription(uuid: string): Promise<void>;
/**
* 发送通知数据(一般用于基础数据维护项目的基础数据变化后,通知需要接收变化的项目)
* @param {string} msg 通知的消息
* @param {string} exchangeName 交换机名称
* @param {string} routingKey 路由键,#代表任意
* @return {*} {Promise<{ result: boolean, msg?: string }>} result:成功或失败,msg:失败原因
* @memberof IRabbitMQ
*/
sendNotify(exchangeName: string, routingKey: string, msg: string): Promise<{ result: boolean, msg?: string }>;
/**
* 接收通知数据
* @param {Function} handle 一个async函数,参数为当前的消息
* @param {string} exchangeName 交换机名称
* @param {string} routingKey 路由键,#代表任意
* @param {string} queueName 队列名称,可以不传使用非持久队列
* @return {*} {Promise<string>} 一个GUID标识符,用于停止接收
* @memberof IRabbitMQ
*/
receiveNotify(handle: Function, exchangeName: string, routingKey: string, queueName?: string): Promise<string>;
/**
* 停止接收通知数据
* @param {*} uuid 一个GUID标识符
* @return {*} {Promise<void>}
* @memberof IRabbitMQ
*/
unreceiveNotify(uuid: string): Promise<void>;
RabbitMQ 发布/订阅Demo
const rabbitmqConf: RabbitMQOptions = {
hostname: '10.2.103.36',
username: "guest",
password: 'guest',
};
const pub = async () => {
const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
for (let i = 0; i < 10; i++) {
console.log(`pub:${i}`);
await rabbitmq.publish("q", i.toString());
await sleep(100);
}
}
const sub1 = async () => {
const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
let i = 1;
const sub = await rabbitmq.subscription((async (msg) => {
console.log(`sub1收到第${i}次消息:`, msg);
await sleep(500);
i++;
}), "q");
// setTimeout(async () => {
// await rabbitmq.unsubscription(sub);
// }, 1000);
}
const sub2 = async () => {
const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
let i = 1;
const sub = await rabbitmq.subscription((async (msg) => {
console.log(`sub2收到第${i}次消息:`, msg);
await sleep(1000);
i++;
}), "q");
// setTimeout(async () => {
// await rabbitmq.unsubscription(sub);
// }, 1000);
}
export default async () => {
sub1();
sub2();
pub();
}
RabbitMQ 通知Demo
const rabbitmqConf: RabbitMQOptions = {
hostname: '10.2.103.36',
username: "guest",
password: 'guest',
};
// 模拟基础数据的生产者,当基础数据发生改变时,进行广播
// 且每个客户端的处理时长不一致,防止并发处理而采取的单个订阅模式
// 假设base为基础数据生产者
// product1需要监听user的变化
// product2需要监听role的变化
// product3需要监听user和role的变化
const company = "hongfangze";
const modules = ["user", "role"];
const oper = ["insert", "update", "delete"];
const sendNotify = async () => {
const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
for (let i = 0; i < 10; i++) {
const _module = modules[getRandomNum(0, modules.length - 1)];
const data = {
id: i + 1,
text: `${_module}-${getRandomStr()}`,
oper: oper[getRandomNum(0, oper.length - 1)]
};
const routingKey = `${company}.${_module}`;
console.log(`broadcast:${routingKey}:`, data);
await rabbitmq.sendNotify(company, "userProduct", _module, JSON.stringify(data));
await sleep(100);
}
}
const product1 = async () => {
const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
const notify = await rabbitmq.receiveNotify((async (msg) => {
console.log(`product1收到消息:`, msg);
await sleep(500);
}), "product1", company, "userProduct", "user");
// setTimeout(async () => {
// await rabbitmq.unreceiveNotify(notify);
// }, 1000);
}
const product2 = async () => {
const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
const notify = await rabbitmq.receiveNotify((async (msg) => {
console.log(`product2收到消息:`, msg);
await sleep(1000);
}), "product2", company, "userProduct", "user");
// setTimeout(async () => {
// await rabbitmq.unreceiveNotify(notify);
// }, 1000);
}
const product3 = async () => {
const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
const notify = await rabbitmq.receiveNotify((async (msg) => {
console.log(`product3收到消息:`, msg);
await sleep(1500);
}), "product3", company, "userProduct", null);
// setTimeout(async () => {
// await rabbitmq.unreceiveNotify(notify);
// }, 1000);
}
export default async () => {
await product1();
await product2();
await product3();
sendNotify();
}
版本迭代记录
2024-05-30 v0.0.3
- 基础的发布订阅逻辑实现。
- 消息通知逻辑实现。
2024-07-10 v0.1.0
- sendNotify、receiveNotify这2个函数重构。