queue-schedule
v3.0.6
Published
Queue producer and consumer tools, with kafka support.
Downloads
23
Readme
Queue Shedule
Kafka is a high avaliable message queue, but it lacks of consuming message with a slow speed. Some of task with no need to finish it at none, and we want to complete it with a small cost. This is just the reason why we develop Queue Shedule
.
Install
npm install queue-schedule
How to use
Use rdkafka
const Kafka = require('node-rdkafka');
const {RdKafkaProducer,RdKafkaConsumer} = require('queue-schedule');
const producerRd = new Kafka.HighLevelProducer({
'metadata.broker.list': KAFKA_HOST,
'linger.ms':0.1,
'queue.buffering.max.ms': 500,
'queue.buffering.max.messages':1000,
// debug: 'all'
});
producerRd.on('event.error',function(err) {
slogger.error('producer error');
});
producerRd.on('event.log',function(log) {
slogger.debug('producer log',log);
});
const producer = new RdKafkaProducer({
name : SCHEDULE_NAME1,
topic: TOPIC_NAME1,
producer:producerRd,
delayInterval: 500
});
producer.addData(FIST_DATA, {},function(err) {
if (err) {
slogger.error('write to queue error',err);
return done('write to queue error');
}
slogger.info('write to kafka finished');
});
const consumer = new Kafka.KafkaConsumer({
'metadata.broker.list': KAFKA_HOST,
'group.id': 'test-rdkafka-0',
'auto.offset.reset':'earliest',
'socket.keepalive.enable': true,
'socket.nagle.disable': true,
'enable.auto.commit': true,
'fetch.wait.max.ms': 5,
'fetch.error.backoff.ms': 5,
'queued.max.messages.kbytes': 1024 * 10,
debug:'all'
});
let hasDone = false;
new RdKafkaConsumer({
name: 'kafka',
consumer,
topics: [ TOPIC_NAME1],
doTask:function(messages,callback) {
slogger.trace(messages);
},
readCount : 1,
pauseTime : 500,
idleCheckInter: 10 * 1000
}).on(RdKafkaConsumer.EVENT_CONSUMER_ERROR,function(err) {
slogger.error('consumer error',err);
hasDone = true;
done(err);
}).on(RdKafkaConsumer.EVENT_CLIENT_READY,function() {
slogger.trace('the consumer client is ready');
}).on(RdKafkaConsumer.EVENT_LOG,function(log) {
// slogger.trace(JSON.stringify(log));
});
Using kafkajs
const { Kafka } = require('kafkajs');
const {KafkaJsProducer,KafkaJsConsumer} = require('queue-schedule');
const FIST_DATA = {a:1,b:2};
const SCHEDULE_NAME1 = 'schedule1';
const TOPIC_NAME1 = 'topic.kafkajs';
const client = new Kafka({
brokers: ['xxxx', 'yyyy']
});
const producer = new KafkaJsProducer({
topic: TOPIC_NAME1,
client,
});
producer.addData(FIST_DATA, {},function(err) {
if (err) {
console.error('write to queue error',err);
return;
}
console.info('write to kafka finished');
});
producer.on(KafkaJsProducer.EVENT_PRODUCER_ERROR, function(err) {
console.error('error in consumer', err);
});
new KafkaJsConsumer({
name: 'kafka',
client,
topic: TOPIC_NAME1,
consumerOption: {
groupId: 'kafkajs',
fromBeginning: true
},
doTask:function(messages,callback) {
console.log(messages);
const value = messages[0].value;//read the first value
let data = null;
try {
data = JSON.parse(value);
console.log('recieve data',data);
} catch (e) {
console.error('parse message error',e);
}
callback();//the next loop
},
readCount : 1,
pauseTime : 500,
idleCheckInter: 10 * 1000
}).on(KafkaJsConsumer.EVENT_CONSUMER_ERROR,function(err) {
console.error('consumer error',err);
hasDone = true;
done(err);
}).on(KafkaJsConsumer.EVENT_CONSUMER_READY,function() {
console.log('the consumer is ready');
});
API
For detail usage, see the document online here.