npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

ease-ali-kafka

v0.1.5

Published

Aliyun MQ(Kafka) Node.js client helper based on node-rdkafka

Downloads

2

Readme

ease-ali-kafka

基于 node-rdkafka 的阿里云MQ(Kafka) Node.js 客户端. 主要是简化配置和使用方式.

npm install ease-ali-kafka

使用方法


引入模块

const Kafka = require('ease-ali-kafka');

初始化

Kafka.init({
  // 参考阿里云的接入点说明. https://help.aliyun.com/document_detail/52376.html
  bootstrap: 'kafka-ons-internet.aliyun.com:8080',  
  username: '你的阿里云帐号 access key',
  password: '你的阿里云帐号 secret 后 10 位',
  // 如果不需要 consumer 则不用配置该参数
  consumerID: '在阿里云上配置的 consumer ID',
  // 发送消息后, 等待回执的超时时间, 默认 3 秒
  reportTimeout: 3000,
  // 自动生成 key 的编码方式, 默认 base64, 支持 base64, hex 等
  keyEncode: 'base64',
  // 生产者轮询本地的时间, 默认 200ms
  producerPollInterval: 200,
  // 发送发生超时后的重试次数, 默认重试 2 次
  retries: 2,
});

发送消息

Kafka.sendKafkaMessage('TOPIC_NAME', {
  id: 123,
  message: 'hello'
}).then(report => console.log('send success, report is ', report))
  .catch(err => console.error('send failed. ', err));

.sendKafkaMessage(topicName, content)

return: Promise<report>

topicName: {String} 目标topic名称
content: {String|Object|Buffer} 要发送的内容, 可以直接写 Object.
report: {Object} 消息回执. 其中主要包含字段 key{Buffer}, topic{String}, partition{Number}, offset{Number}

如因网络问题, 发送后3秒内未收到回执, 则会报 timeout 错误.

订阅并消费消息

Kafka.subscribe('TOPIC_NAME', function (data, commit, next) {
  console.log('consuming data', data);
  commit();
  next();
})

.subscribe(topicName, handler)

return: undefined

topicName: {String} 目标topic名称
handler: {Function|AsyncFunction} 处理消息的函数. 有多种消费方式

Kafka.subscribe(topic, function (data) {
  // 可以获取到 data. 
  // Kafka 的 commit 操作已在内部执行过了.
  // consumer 不会等待该消息处理完毕就将继续处理下一条消息 
});

Kafka.subscribe(topic, function (data, commit) {
  // 可以获取到 data.
  // 可以自己通过调用 commit() 函数控制是否给 kafka 消费完成的通知
  // commit 的同时会结束当前消息的处理, 准备处理下一条消息 
});

Kafka.subscribe(topic, function (data, commit, next) {
  // 可以获取到 data.
  // 可以自己通过调用 commit() 函数控制是否给 kafka 消费完成的通知
  // 可以通过调用 next() 函数来启动下一条消息的处理, 可以自己手动控制消息的并发度 
  // 这种模式通常用于消息处理过程中包含异步IO的时候 
})

data: {Buffer} 收到的消息内容, Buffer 类型. 通过 ._metadata 属性可以获取相关元数据
commit: {Function} 执行该函数则给 Kafka 发送消息成功消费的回执
next: {Function} 执行该函数则开始处理下一个消息. 否则即使有新的消息待处理, 也会等待当前消息处理完成才会处理下一个消息

支持订阅多个不同的 topic.

License

MIT