npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

zkmessage-queue

v1.1.0

Published

A message queue

Downloads

4

Readme

ZKMessage Queue

A distribution message queue based on zookeeper.

How to use.

npm install zkmessage-queue --save
const messageService = new MessageQueue({
  servers: '10.138.30.208:2181',
  path: '/zkconfig/video_dev',
  username: 'zkconfig',
  password: 'zkconfig',
  handle: (messageBean, done) => {
    // handle message
    logger.info('Handle message: ', messageBean);
    setTimeout(() => {
      let random = Math.floor(Math.random() * 100);
      if (random < 10) {
        done(new Error('Handle fail.'));
      } else {
        done();
      }
    }, 1);
  },
  handleError: (messageBean, done) => {
    // 处理错误消息
    logger.info('Handle error message: ', messageBean);
    done();
  }
});
messageService.connect();

PS: call method handle to handle message, if an error is thrown, handleError message will be called. Please make sure done method must be called after handling operation, or the proess will be truggled in this message forever.

Some things you must be noticed. 1 The username must have the read and write previleges of the path. At this point only digest and anonymous authentication are supported. 2 This path must exist: ${path}/message,the path to store mesage queue, ${path}/pedding,the path to store pedding message, ${path}/queue, the path to queue the register process.

API

  • constructor(options)
    The options config is as follows:
    • servers: the servers of zookeeper servers, separated by ,, such as 10.138.30.208:2181;
    • path: the path to store message and connection queue;
    • username: the username for zookeeper;
    • passport: the passport for zookeeper;
    • charset: the charset for message content, default utf-8;
    • handleRetryMaxTimes: the max retry times for one message, default is 10;
    • registerRetryMaxTimes: the max retry times for register this process to zookeeper queue, after retry for ${registerRetryMaxTimes} times of retry, this process is discast forever until an restart operation is taken. default is 200
    • borrowRetryTimes: the max retry times for borrowing a message from message queue to pedding queue. default is 2;
    • returnRetryTimes: the max retry times for returning a message from pedding queue to message queue. default is 2;
    • messageCacheTriggerCount: the trigger count for message queue to switch cache hint on, which in other words, if the length of message queue is bigger than this value, the cache will switch on automatically. default is 100,
    • messageCacheMaxCount: the max size of cache. which means how many messages can cache store. default is 50;
    • messageCacheRetryTimes: the max retry times for save cache, default is 2;
    • handle: {Function|AsyncFunction} A function to handle the consumed message.
      • messageObject, an instance of MessageBean;
      • done, optional, a callback function which must be called after the handing operation. You can pass an error parameter to the function to indicate the a failure. This parameter is ignore.
    • handleError: {Function|AsyncFunction} A method to handle the error message.
      • messageObject, an instance of MessageBean;
      • done, optional, a callback function which must be called after the handling operation. You can pass an error parameter to indicate a failure operation. If the handleError function is an AsyncFunction, This parameter is ignore.
  • appendMessage(id, message, callback), append a message to message queue. the done method will be be called while appending success or fail.
    • id: String, the id of the message, please make this an uniq.
    • message: String, the message content.
    • callback: callback function, (Error, messageBean), an error parameter will be taken if an error has been thrown.
  • push(content, done), append a message content to message queue. done method will be called whether success or fail.

  • async pushPromise(content), append a message content to message queue.

  • returnPeddingMessages(done), return the pedding messages to the message queue.

    • callback: callback function, (Error).
  • connect(callback) Connect to a zookeeper server, after connected, the callback method will be called.
    • callback(), a method with no parameters.
  • close() Close the connection.

eg:

const MessageQueue = require('zkmessage-queue');
const queue = new MessageQueue(options);

queue.push('A test message', (err, messageBean) => {
  if (err) {
    console.error(err);
  } else {
    console.info(messageBean);
  }
});

// if in a async function
await queue.pushPromise('A test message');

Operations

There are three ways to change the message queue:

  1. Append message by calling the push or pushPromise method.
  2. Return pedding message to message queue by calling the returnPeddingMessages.
  3. Consume message automatically, which the handle method will be called.

Events

Some event will be trigged, the event list is as follow:

  • EVENT_CONNECT_SUCCESS, fired when connect successfully.
  • EVENT_CONNECT_ERROR, fired when connect fail.
  • EVENT_ERROR, fire an error occor.
  • EVENT_INCOMING_MESSAGE, fired when a message is appended successfully.
  • EVENT_HANDLE_MESSAGE, fired when a message is handled successfully.
  • EVENT_HANDLE_MESSAGE_ERROR, fired after handleError has been called.
  • EVENT_REMOVE_MESSAGE, fired when a message has been removed successfully(delete from pedding queue).

You can bind an event handler like this:

const MessageQueue = require('zkmessage-queue');

let queue = new MessageQueue(...)
queue.on(MessageQueue.EVENT_CONNECT_SUCESS, () => {
  // handle an connect success event.
})