npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

febs-message

v3.0.0

Published

febs message queue

Downloads

40

Readme

message 库封装了消息队列, 将复杂的调用封装成

  • rpc: 有返回值的双向通信, 只有一个接收者能接收到消息
  • subscribe: 无返回值的订阅发布消息, 所有订阅者都能接收到消息.

目前底层使用 rabbitmq, 系统初始化完成,如果中途发生断线将会自动重连

Example

//
// producer
//
var mq = require('febs-message');

/**
* @desc: 初始化消息队列.
*/
mq.init(function(msg, filename, line){
  console.error(msg);
});

/**
* @desc: 初始化rpc通信.
*/
mq.rpc.init({
  url:        'amqp://xxx',
  heartbeat:  10,      // in seconds.
  reconnect:  10000,   // 连接失败后多长时间重连.
  rpcTimout:  5000,    // rpc等待返回消息的超时.
  persistent: false,        // 是否持久消息.
  registerPublisher: true,
  registerSubscriber: false,
  // devSingle: ['live'],
},
null  // 不接收消息.
).then(()=>{
  console.log("Connect Success");

  // 发送消息.
  mq.rpc.request({handler:"handle1", recvSys:'live'}, {message:'hello', data:1})
    .then(ret=>{
      console.log('return message: ');
      console.log(ret);
    });
});
//
// customer
//
var mq = require('febs-message');


/**
* @desc: 消息处理方法
*/

async function handle1(msg) {
  console.log('handle1:');
  console.log(msg);

  return {err:msg.data};  // return to sender.
}

async function handle2(msg) {
  console.log('handle2:');
  console.log(msg);

  return {err:msg.data};
}


/**
* @desc: 初始化消息队列.
*/
mq.init(function(msg, filename, line){
  console.error(msg);
});

/**
* @desc: 初始化rpc通信.
*/
mq.rpc.init({
  url:        'amqp://xxx',
  heartbeat:  10,      // in seconds.
  reconnect:  10000,   // 连接失败后多长时间重连.
  rpcTimout:  5000,    // rpc等待返回消息的超时.
  persistent: false,        // 是否持久消息.
  registerSubscriber: true,
  // devSingle: ['live'],
},
'live',  // 接收发送给live系统的消息.
'live2'  // 接收发送给live系统的消息.
).then(()=>{

  console.log('Connect success');

  // 绑定消息处理方法.
  mq.rpc.bind('handle1', handle1);
  mq.rpc.bind('handle2', handle2);
});

Error

系统错误使用 mq.Error 实例抛出异常.

全局配置

/**
* @desc: 初始化系统.
* @param errorLogCB: 错误log回调. function(msg, filename, line)
*/
function init(errorLogCB)
  • 全局初始化后, 对需要用到的rpc, subscribe类型的消息队列再进行各自的初始化.
  • 在开发模式下, 可以进行本机调试而不影响服务器上的服务; 例如: 存在一个名为 sysMain 的系统, 开发者使用 sysMain_xxx 系统名称 进行注册, 在api层进行url的处理, 如, /?sys=xxx, 将消息发送给 sysMain_xxx 系统从而实现本机调试;

rpc

RPC 消息是远程调用消息, 只有一个消费者能接收到消息, 并且有返回值返回给生产者.

/**
* @desc:  连接消息队列.
*         - 使用direct模式,只有一个阅者都能接收到消息.
* @param opt: { // 全局唯一配置, 只有第一次调用有效.
              url:        'amqp://xxxxx',
              heartbeat:  10,      // in seconds.
                reconnect:  10000,   // 连接失败后多长时间重连.
                persistent: false,        // 是否持久消息.
                registerPublisher: false,   // 是否注册发布者.
                registerSubscriber: false,  // 是否注册订阅者.
                errHandleCB: function(e, handleName, recvData:string):data; // 消息处理的错误处理函数. 返回对象将反馈给发送方.
                beforeHandleCB: function(requestData:any):any; // 消息处理前的回调. 返回null则正常处理, 否则将返回数据返回给rpc.
                beforeRequestCB: function(requestData:any):boolean;  // 消息发送前的统一处理, 返回false则不进行发送.
                beforeResponseCB: function(requestData:any, responseData:any);  // 消息返回前的统一处理.          
                beforeReturnCB: function(recvData:any); // 通过request接口发送消息后, 消息已经通过网络接收到, 方法返回前的处理.
                devSingle:   false,         // 在单包模式下开发, 此模式下不使用真实的消息队列, 而在接口一致的情况下使用本地缓存进行开发.
              }
* @param recvFromSys: 接收哪些系统的消息,使用系统的名称 sys.main, ...
*                     如果不指定, 则不接收消息.
* @return: throw in err.
*/
function init(opt:object, ...recvFromSys:string[])

/**
* @desc: 注册消息处理器. 如果在处理消息的过程中发生了异常, 则会调用errHandleCB.
* @param handleName: 处理器名称.
* @param handle: async function(jsonData):data; 返回data给sender.
* @return: 
*/
function bind(handleName:string, handle:func)

/**
* @desc: 发送消息.
* @param receiver: 
    {
      handler: '', // 消息处理器.
      recvSys: '', // 接收的系统.
    }
* @param data: (json). 需要发送的数据.
* @return: Promise.
            - resolve(msg)
            - catch('timeout')
*/
async function request(receiver, data)

subscribe

sbscribe 消息是订阅消息, 所有订阅者都能接收到消息, 无返回值返回给生产者.

/**
* @desc:  连接消息队列.
*         - 使用subscribe模式,所有的订阅者都能接收到消息.
* @param opt: { // 全局唯一配置, 只有第一次调用有效.
              url:        'amqp://xxxxx',
                heartbeat:  10,      // in seconds.
                reconnect:  10000,   // 连接失败后多长时间重连.
                persistent: false,        // 是否持久消息.
                registerPublisher: false,   // 是否注册发布者.
                registerSubscriber: false,  // 是否注册订阅者.
                errHandleCB: function(e, handleName, recvData:string):void; // 消息处理的错误处理函数. 返回true则会从队列中移除消息.
                beforeHandleCB: function(requestData:any); // 消息处理前的回调.
                beforeRequestCB: function(requestData:any):boolean;  // 消息发送前的统一处理, 返回false则不进行发送.
                devSingle:   false,         // 在单包模式下开发, 此模式下不使用真实的消息队列, 而在接口一致的情况下使用本地缓存进行开发.
              }
* @param recvFromSys: 接收哪些系统的消息,使用系统的名称 sys.main, ...
*                     如果不指定, 则不接收消息.
* @return: Promise
*/
function init(opt:object, ...recvFromSys:string[])

/**
* @desc: 注册消息处理器.
* @param handleName: 处理器名称.
* @param handle: function(jsonData)
* @return: 
*/
function bind(handleName:string, handle:func)

/**
* @desc: 发布消息.
* @param receiver: 
    {
      handler: '', // 消息处理器.
      recvSys: '', // 接收的系统.
    }
* @param data: (json). 需要发送的数据.
* @return: boolean.
*/
async function publish(receiver, data)

devSingle example

单包开发模式下, 可以将多个系统统一个入口启动 (单实例), 方便调试.

rpc client.

File: client.js


var mq = require('febs-message');

/**
* @desc: 初始化消息队列.
*/
mq.init(function(msg, filename, line){ // log处理方法
  console.log(msg);
});

/**
* @desc: 初始化rpc通信.
*/
mq.rpc.init({
  url:        'amqp://xxxx',
  heartbeat:  10,      // in seconds.
  reconnect:  10000,   // 连接失败后多长时间重连.
  rpcTimout:  5000,    // rpc等待返回消息的超时.
  persistent: false,        // 是否持久消息.
  registerPublisher: true,
  registerSubscriber: false,
  devSingle: ['sysname'],
},
null  // 不接收消息.
).then(()=>{
  console.log("OK");

  // 发送消息.
  mq.rpc.request({handler:"handle1", recvSys:'sysname'}, {message:'hello', data:1})
    .then(ret=>{
      console.log('return message: ');
      console.log(ret);
    });
});

rpc server.

File: server.js


var mq = require('febs-message');


/**
* @desc: 消息处理方法
*/

async function handle1(msg) {
  console.log('handle1:');
  console.log(msg);

  return {err:msg.data};  // return to sender.
}

async function handle2(msg) {
  console.log('handle2:');
  console.log(msg);

  return {err:msg.data};
}


/**
* @desc: 初始化消息队列.
*/
mq.init(function(msg, filename, line){
  console.error(msg);
});

/**
* @desc: 初始化rpc通信.
*/
mq.rpc.init({
      url:        'amqp://xxxx',
      heartbeat:  10,      // in seconds.
      reconnect:  10000,   // 连接失败后多长时间重连.
      rpcTimout:  5000,    // rpc等待返回消息的超时.
      persistent: false,        // 是否持久消息.
      registerSubscriber: true,
      devSingle: ['sysname'],
    }, 'sysname'  // 接收发送给live系统的消息.
  )
  .then(()=>{
    console.log('ok');

    // 绑定消息处理方法.
    mq.rpc.bind('handle1', handle1);
    mq.rpc.bind('handle2', handle2);

    require('./client');
  });

仅启动 server.js 进程.