zkmessage-queue
v1.1.0
Published
A message queue
Downloads
4
Readme
ZKMessage Queue
A distribution message queue based on zookeeper.
How to use.
npm install zkmessage-queue --save
const messageService = new MessageQueue({
servers: '10.138.30.208:2181',
path: '/zkconfig/video_dev',
username: 'zkconfig',
password: 'zkconfig',
handle: (messageBean, done) => {
// handle message
logger.info('Handle message: ', messageBean);
setTimeout(() => {
let random = Math.floor(Math.random() * 100);
if (random < 10) {
done(new Error('Handle fail.'));
} else {
done();
}
}, 1);
},
handleError: (messageBean, done) => {
// 处理错误消息
logger.info('Handle error message: ', messageBean);
done();
}
});
messageService.connect();
PS: call method
handle
to handle message, if an error is thrown,handleError
message will be called. Please make suredone
method must be called after handling operation, or the proess will be truggled in this message forever.
Some things you must be noticed. 1 The username must have the read and write previleges of the path. At this point only
digest
andanonymous
authentication are supported. 2 This path must exist:${path}/message
,the path to store mesage queue,${path}/pedding
,the path to store pedding message,${path}/queue
, the path to queue the register process.
API
- constructor(options)
The options config is as follows:- servers: the servers of zookeeper servers, separated by
,
, such as10.138.30.208:2181
; - path: the path to store message and connection queue;
- username: the username for zookeeper;
- passport: the passport for zookeeper;
- charset: the charset for message content, default
utf-8
; - handleRetryMaxTimes: the max retry times for one message, default is
10
; - registerRetryMaxTimes: the max retry times for register this process to zookeeper queue, after retry for
${registerRetryMaxTimes}
times of retry, this process is discast forever until an restart operation is taken. default is200
- borrowRetryTimes: the max retry times for borrowing a message from message queue to pedding queue. default is
2
; - returnRetryTimes: the max retry times for returning a message from pedding queue to message queue. default is
2
; - messageCacheTriggerCount: the trigger count for message queue to switch cache hint on, which in other words, if the length of message queue is bigger than this value, the cache will switch on automatically. default is
100
, - messageCacheMaxCount: the max size of cache. which means how many messages can cache store. default is
50
; - messageCacheRetryTimes: the max retry times for save cache, default is
2
; - handle: {Function|AsyncFunction} A function to handle the consumed message.
- messageObject, an instance of
MessageBean
; - done, optional, a callback function which must be called after the handing operation. You can pass an error parameter to the function to indicate the a failure. This parameter is ignore.
- messageObject, an instance of
- handleError: {Function|AsyncFunction} A method to handle the error message.
- messageObject, an instance of
MessageBean
; - done, optional, a callback function which must be called after the handling operation. You can pass an error parameter to indicate a failure operation. If the
handleError
function is an AsyncFunction, This parameter is ignore.
- messageObject, an instance of
- servers: the servers of zookeeper servers, separated by
- appendMessage(id, message, callback), append a message to message queue. the done method will be be called while appending success or fail.
- id: String, the id of the message, please make this an uniq.
- message: String, the message content.
- callback: callback function, (Error, messageBean), an error parameter will be taken if an error has been thrown.
push(content, done), append a message content to message queue.
done
method will be called whether success or fail.async
pushPromise(content), append a message content to message queue.returnPeddingMessages(done), return the pedding messages to the message queue.
- callback: callback function, (Error).
- connect(callback)
Connect to a zookeeper server, after connected, the callback method will be called.
- callback(), a method with no parameters.
- close() Close the connection.
eg:
const MessageQueue = require('zkmessage-queue');
const queue = new MessageQueue(options);
queue.push('A test message', (err, messageBean) => {
if (err) {
console.error(err);
} else {
console.info(messageBean);
}
});
// if in a async function
await queue.pushPromise('A test message');
Operations
There are three ways to change the message queue:
- Append message by calling the
push
orpushPromise
method. - Return pedding message to message queue by calling the
returnPeddingMessages
. - Consume message automatically, which the
handle
method will be called.
Events
Some event will be trigged, the event list is as follow:
EVENT_CONNECT_SUCCESS
, fired when connect successfully.EVENT_CONNECT_ERROR
, fired when connect fail.EVENT_ERROR
, fire an error occor.EVENT_INCOMING_MESSAGE
, fired when a message is appended successfully.EVENT_HANDLE_MESSAGE
, fired when a message is handled successfully.EVENT_HANDLE_MESSAGE_ERROR
, fired afterhandleError
has been called.EVENT_REMOVE_MESSAGE
, fired when a message has been removed successfully(delete from pedding queue).
You can bind an event handler like this:
const MessageQueue = require('zkmessage-queue');
let queue = new MessageQueue(...)
queue.on(MessageQueue.EVENT_CONNECT_SUCESS, () => {
// handle an connect success event.
})