lazy-kit-mq
v0.0.3
Published
> A quick toolkit based on Nodejs & AMQP with RabbitMQ & MQTT with Emqx
Downloads
4
Readme
lazy-kit-mq
A quick toolkit based on Nodejs & AMQP with RabbitMQ & MQTT with Emqx
Functions
interface AmqpOpts {
protocol: 'amqp'|'amqps';
hostname: string;
port: number;
username: string;
password: string;
vhost: string;
}
interface MqttOpts {
protocol: 'wss' | 'ws' | 'mqtt' | 'mqtts' | 'tcp' | 'ssl' | 'wx' | 'wxs' | 'ali' | 'alis';
hostname: string;
port: number;
username: string;
password: string;
/**
* 此参数是一个布尔值,用于指定 MQTT 代理是否应保留有关客户端的会话信息。其作用如下:
* 如果 clean 设置为 true,表示清除会话。在这种情况下,代理不会保留任何关于客户端以前会话的信息。通常用于短暂或临时连接。
* 如果 clean 设置为 false,表示非清除会话。在这种情况下,代理会保留会话信息,例如订阅和未确认的 QoS 1 和 QoS 2 消息,
* 即使客户端断开连接后也会保留。当客户端使用相同的 clientId 重新连接时,它会恢复先前的会话状态。
* 这对需要保持状态的场景非常有用,例如持久订阅和可靠消息传递。
*/
clean: boolean;
/**
* clientId 是 MQTT 客户端的唯一标识符。它由代理用于跟踪各个客户端。clientId 必须在连接到同一代理的所有客户端中是唯一的。其作用如下:
* 当客户端使用特定的 clientId 连接到代理时,代理会将该客户端的会话和状态与该 clientId 关联起来。
* 如果使用相同的 clientId 重新连接客户端,并且 clean 会话设置为 false,则代理会恢复该客户端的先前会话和状态。
* 如果使用相同的 clientId 重新连接客户端,并且 clean 会话设置为 true,则代理会为该客户端启动一个新会话,
* 并且会丢弃与相同 clientId 关联的任何先前的会话信息。
*/
clientId: string;
}
type MqType = 'amqp'|'mqtt';
/**
- *(星号)通配符:通配符用于匹配一个单词(单词之间由点号分隔)。例如,如果您使用 *.example.com 作为绑定键,它将匹配任何以 .example.com 结尾的单词。例如,绑定键 *.example.com 可以匹配 www.example.com、mail.example.com 等。
- #(井号)通配符:通配符用于匹配零个或多个单词(单词之间由点号分隔)。例如,如果您使用 #.example.com 作为绑定键,它将匹配任何以 .example.com 结尾的单词,无论有多少单词。例如,绑定键 #.example.com 可以匹配 www.example.com、mail.example.com、sub.www.example.com 等。
*/
enum W_AMQP {
Separator = ".",
SingleWord= "*",
MultiWords= "#",
}
/**
- 单级通配符(+):单级通配符用于匹配单个主题级别(目录)。当你在订阅时使用 +,它将匹配一个主题级别,并且可以与任何文本匹配。例如,如果你订阅了主题 home/+/light,它将匹配 home/livingroom/light 和 home/bedroom/light 等主题。
- 多级通配符(#):多级通配符用于匹配多个主题级别(目录)。当你在订阅时使用 #,它将匹配一个或多个主题级别,并且可以与任何文本匹配。例如,如果你订阅了主题 home/#,它将匹配 home/livingroom/light、home/bedroom/light 以及 home 主题本身。
*/
enum W_MQTT {
Separator = "/",
SingleWord= "+",
MultiWords= "#",
}
/**
* AMQP订阅主题后接收消息的回台哦函数
* msg: Message {
content: Buffer;
fields: MessageFields;
properties: MessageProperties;
},
* ack: 可选函数, 当AUTO_ACK=false时, 客户端主动确认收到消息; AUTO_ACK默认=true;
*/
type AmqpMessage = {
content: Buffer;
fields: {
messageCount?: number | undefined;
consumerTag?: string | undefined;
};
properties: {
contentType: any | undefined;
contentEncoding: any | undefined;
headers: {
"x-first-death-exchange"?: string | undefined;
"x-first-death-queue"?: string | undefined;
"x-first-death-reason"?: string | undefined;
"x-death"?: {
count: number;
reason: "rejected" | "expired" | "maxlen";
queue: string;
time: {
"!": "timestamp";
value: number;
};
exchange: string;
"original-expiration"?: any;
"routing-keys": string[];
}[] | undefined;
[key: string]: any;
};
deliveryMode: any | undefined;
priority: any | undefined;
correlationId: any | undefined;
replyTo: any | undefined;
expiration: any | undefined;
messageId: any | undefined;
timestamp: any | undefined;
type: any | undefined;
userId: any | undefined;
appId: any | undefined;
clusterId: any | undefined;
};
}
type AmqpSubCallback = (msg:AmqpMessage, ack?:()=>void)=>void;
/**
* 接收消息的回调 - 工作模式
*/
export type AmqpWorkGainCallback = (msg:AmqpMessage, ack?:()=>void, nack?:()=>void)=>void;
/**
* MQTT订阅主题后接收消息的回台哦函数
* topic: string 主题
* message: Buffer 消息
*/
type MqttSubCallback = (topic:string, message:Buffer)=>void;
/**
* 根据给定的参数自动连接AMQP或者MQTT服务器
* @param arg 连接URL或者配置参数
* @returns T extends AMQP|MQTT
*/
function toLink<T extends AMQP|MQTT>(arg:string|AmqpOpts|MqttOpts): Promise<T>;
class AMQP {
/**
* 创建连接
* @param arg 可以是url或者是JSON配置
* url: amqp(s)://username:password@hostname:port/vhost
* options: {
hostname: 'rabbitmq-server',
port: 5672,
username: 'myuser',
password: 'mypassword',
vhost: '/myvhost'
}
* @returns this
*/
connect(arg: string | AmqpOpts): Promise<this>;
/**
* 关闭连接
* @returns this
*/
close(): Promise<this>;
/**
* 发布主题
* @param exchangeName 交换机
* @param routingKey 路由
* @param message 消息
* @param options { durable=true } 队列持久化
* @returns this
*/
publish(exchangeName:string, routingKey:string, data:any, options?:{
durable:boolean;
}): Promise<this>;
/**
* 订阅主题
* @param exchangeName 交换机
* @param bindingKey 路由
* @param callback 监听到消息后的回调函数
* @param options {
durable:true; // 持久队列
exclusive:true; // 排他队列
AUTO_ACK:true; // 不需要客户端手动确认消息, AMQP服务器将视为该消息已被处理,然后将其标记为已传递。
}
* @returns
*/
subscribe(exchangeName:string, bindingKey:string, callback:AmqpSubCallback, options?:{
durable:boolean; // 持久队列
exclusive:boolean; // 排他队列
AUTO_ACK:boolean; // 自动回复
}): Promise<this>;
/**
* 取消订阅
* @param queue 要解绑的队列的名称
* @param exchangeName 要解绑的交换机的名称
* @param key 路由键模式,用于匹配绑定时指定的路由键
* @returns
*/
unsubscribe(queue:string, exchangeName:string, key:string): Promise<this>;
/**
* 向队列发送消息 - 工作模式
* @param queue 队列
* @param message 消息
* @param options
* @returns {
durable:true; // 队列持久化
persistent:true; // 消息持久化
headers?: any;
properties?:any;
}
*/
wSend(queue:string, data:any, options?:{
durable:boolean; // 队列持久化
persistent:boolean; // 消息持久化
headers?: any;
properties?:any;
}): Promise<this>;
/**
* 从队列接收消息 - 工作模式
* @param queue 队列
* @param callback 收到消息后的回调
* @param options {
durable:true; 队列持久化
AUTO_ACK:true; 自动回复
}
* @returns
*/
wGain(queue:string, callback: AmqpWorkGainCallback, options?:{
durable:boolean;
AUTO_ACK:boolean;
}): Promise<this>;
/**
* 停止消费者: 通过 channel.cancel 停止消费者,从而终止消息的消费
* @param tag 消费者
* @returns
*/
cancelC(tag:string): Promise<this>;
/**
* 删除队列: 通过 channel.deleteQueue 删除队列名称
* 工作模式(Work Queue):
* 如果删除了一个工作队列,那么与该队列相关的所有消费者将无法继续处理消息,
* 队列中的消息也将被删除。这可以用于清空工作队列或停止队列的消息处理。
* 主题模式(Topic Exchange):
* 删除队列将会导致该队列不再接收任何新的消息,但不会影响其他队列的状态。
* 已经进入队列的消息将继续被消费者处理,直到队列中的消息全部被消费完或者队列被删除。
* @param queue 队列
* @returns
*/
deleteQ(queue:string): Promise<this>;
}
class MQTT {
/**
* 创建连接, 可以是url或者JSON配置
* URL: [协议://][用户名:密码@]主机[:端口号][/路径][?查询字符串][#片段标识符]
- 协议(Protocol):MQTT URL 以协议开头,指定了要使用的协议。在 MQTT 中,通常使用的协议是 mqtt:// 或 mqtts://,分别表示 MQTT 和 MQTT over TLS/SSL。
- 用户名和密码(Optional):可选项,如果 MQTT 服务器要求身份验证,可以在 URL 中包含用户名和密码。格式为 username:password@。例如,user:pass@mqtt://。
- 主机(Host):主机名或 IP 地址,指定 MQTT 服务器的位置。例如,mqtt://mqtt.eclipse.org。
- 端口号(Optional):可选项,指定 MQTT 服务器的端口号。如果不指定,默认端口号是 1883(MQTT)或 8883(MQTT over TLS/SSL)。格式为 :port。例如,mqtt://mqtt.eclipse.org:1883。
- 路径(Optional):可选项,路径部分用于指定 MQTT 主题或其他资源。在 MQTT URL 中,路径部分通常为空或包含主题信息。例如,mqtt://mqtt.eclipse.org/topic.
- 查询字符串(Optional):可选项,用于传递查询参数。在 MQTT URL 中,查询参数通常用于指定连接选项。例如,mqtt://mqtt.eclipse.org?clientId=myClient.
- 片段标识符(Optional):可选项,不常用于 MQTT 连接。它允许你在 URL 中标识资源的特定部分。
* options: {
- clientId:指定客户端的唯一标识符。如果未指定,服务器将生成一个随机的 clientId。通常,每个客户端都应该有一个唯一的 clientId。
- username 和 password:如果 MQTT 服务器要求身份验证,你可以提供用户名和密码。这些选项用于连接到需要身份验证的 MQTT 服务器。
- protocolVersion:指定要使用的 MQTT 协议版本。通常,可以使用 'mqtt'(MQTT 3.1)或 'mqttv3.1'(MQTT 3.1.1)来指定协议版本。
- clean:指定是否应该保持会话状态。如果设置为 true,客户端连接时会创建一个新会话,服务器将不会保留以前的会话信息。如果设置为 false,客户端会恢复以前的会话状态。
- keepalive:指定客户端与服务器之间的保持活动间隔(以秒为单位)。在指定的时间间隔内,客户端必须发送心跳以保持连接。默认值为 60 秒。
- reconnectPeriod:指定重新连接的时间间隔(以毫秒为单位)。如果连接断开,客户端将尝试重新连接服务器。默认值为 1000 毫秒。
- will:设置遗嘱消息。遗嘱消息会在客户端意外断开连接时发送给服务器。你可以指定遗嘱主题、遗嘱消息内容、遗嘱消息的 QoS 和保留标志。
- rejectUnauthorized:指定是否要拒绝未经授权的 TLS/SSL 连接。通常用于安全连接,如果设置为 false,则允许自签名证书。
}
*/
connect(arg: string | MqttOpts): Promise<this>;
/**
* 关闭连接
* @param force
* @returns
*/
close(force?:boolean): this;
/**
* 监听事件
connect:当 MQTT 客户端成功连接到 MQTT 服务器时触发。通常在此事件中订阅主题或执行其他初始化操作。
message:当 MQTT 客户端接收到来自订阅主题的消息时触发。你可以在此事件中处理接收到的消息。
error:当 MQTT 客户端发生错误时触发。你可以在此事件中处理连接或通信错误。
close:当 MQTT 客户端关闭连接时触发。你可以在此事件中进行资源清理或记录连接关闭事件。
offline:当 MQTT 客户端处于离线状态时触发。通常在临时失去连接后触发,用于处理连接断开的情况。
reconnect:当 MQTT 客户端尝试重新连接到 MQTT 服务器时触发。可以用于处理重新连接过程中的事件。
end:当 MQTT 客户端调用 end 方法关闭连接时触发。你可以在此事件中执行额外的清理操作。
*/
on(ev:any, callback: (...args:any[])=>void): this;
onConnect(callback: ()=>void): this;
onMessage(callback: (topic:string, message:Buffer)=>void): this;
onError(callback: (err:any)=>void): this;
onClose(callback: ()=>void): this ;
onEnd(callback: ()=>void): this;
onOffline(callback: ()=>void): this;
onRecon(callback: ()=>void): this;
onDiscon(callback: ()=>void): this;
/**
* 发布主题
* @param topic 主题
* @param msg 消息
* @param options {
qos?: QoS; -- Quality of Service,服务质量): 用于指定消息的服务质量级别。可能的取值包括:
0:最多一次传递(At most once)。
1:至少一次传递(At least once)。
2:仅一次传递(Exactly once)。
默认情况下,qos 的值为 0,表示最多一次传递,消息会被尽力发送,但不保证可靠性。你可以根据需要选择不同的 qos 级别来确保消息传递的可靠性。
retain?: boolean; -- 一个布尔值,用于指定是否在 MQTT 服务器上保留该消息。如果将 retain 设置为 true,则消息将被保留,新订阅该主题的客户端会立即接收到最新的保留消息。如果设置为 false,则不会保留该消息。
dup?: boolean; -- 一个布尔值,用于指定消息是否为重复消息。如果将 dup 设置为 true,表示该消息是之前发布的一条重复消息。这在某些情况下用于处理消息重复。
properties?: IPublishPacket['properties']; -- 一个对象,用于指定其他消息属性。这可以包括消息的生命周期、内容类型、用户属性等。不同 MQTT 客户端库支持的消息属性可能有所不同。
cbStorePut?: StorePutCallback; -- 一个回调函数,用于处理消息存储的相关操作。这通常在自定义存储消息时使用,以实现自定义的消息存储行为。
}
* @returns
*/
publish(topic: string, data: any, options?: any): Promise<this>;
/**
* 订阅主题
* @param topic 主题
* @param options {
qos: QoS;
nl?: boolean; -- 一个布尔值,用于指定是否在本地发布的消息也被传递给订阅者。如果将 nl 设置为 true,则不会将订阅者自己发布的消息传递给他们。
rap?: boolean; -- 一个布尔值,用于指定是否订阅者会接收到已发布的保留消息。如果将 rap 设置为 true,则订阅者会接收到所有已发布的保留消息。
rh?: number; -- 一个数字,用于指定订阅者在收到保留消息时如何处理它们。可能的取值包括:
0:如果订阅者之前没有收到过该主题的消息,它会接收到保留消息。
1:如果订阅者之前没有收到过该主题的消息,它会接收到保留消息,并且如果订阅者之前订阅过该主题,则它会接收到当前发布的消息。
rh 属性用于确定订阅者在连接到主题时是否应该接收保留消息以及如何处理它们。
}
* @returns
*/
subscribe(topic: string|string[], callback:MqttSubCallback, options?: any): Promise<this>;
/**
* 取消订阅主题
* @param topic 主题
* @returns
*/
unsubscribe(topic:string|string[]): Promise<this>;
/**
* 手动清除最后一条消息
* @returns
*/
clear(): this;
}
function toBuffer(data:any): Buffer;