npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

egg-node-rdkafka

v1.0.5

Published

rdkafka plugin for egg.js

Downloads

28

Readme

egg-rdkafka

中国电信股份有限公司福州分公司 软件开发团队 fzyejy fzliub

该插件是rdkafka 的封装, 方便在egg.js 环境下使用的一个egg风格的插件, 并提供了方便的api发送给kafka消息的方法 部分详细配置请参考 https://github.com/Blizzard/node-rdkafka

目前该版本为项目组内测版本,可用于测试项目与部分生产项目,希望各开发人员在使用过程中,提供宝贵的建议及意见,积极加入该插件的维护工作中

依赖说明

依赖的 egg 版本

--- | --- 2.x | 😁 1.x | 😁

依赖的 Node 版本

node >= 8.0.0 😁

使用方法

npm install egg-node-rdkafka --save

开启插件

// config/plugin.js
exports.rdkafka = {
  enable: true,
  package: 'egg-node-rdkafka',
};

配置

// {app_root}/config/config.default.js
exports.kafka = {
  commonOption: {  //通用配置
    'metadata.broker.list': '',//kafka 集群地址
  },
  consumerOption: { //消费者配置
    'group.id': 'rdkafka',  //设置消费者组id
    'enable.auto.commit': false  //是否自动提交,目前该配置无效,强制在消费时候自动提交
  },
  producerOption: { //生产者配置
    'client.id': 'default-client', // 标记生产者
  },
  topicOption: {
    dir: path.join(process.cwd(), '/app/kafka'), // 消费者订阅目录
    env: '', // 环境变量,会在topic后面自动加上后缀-${env},例:例如env = test,foo文件夹转化成topic时候会变成foo-test
    excludeTopics: [], // 不订阅目录下的指定文件夹(topic)
    rawTopics: [] // 忽略env参数订阅目录下的指定文件夹(topic),如果遇到带env参数的topic和不带env参数的topic冲突,则以不带env参数的topic为主
  },
  messageOption: {
    // 保留
  }
};

详细配置

请到 https://github.com/edenhill/librdkafka/blob/v1.4.2/CONFIGURATION.md 查看详细配置项说明(仅对commonOption,consumerOption,producerOption有效)。

目录结构

egg-project
├── package.json
├── app.js (optional)
├── app
|   ├── router.js
│   ├── controller
│   ├── service
│   └── kafka (optional)  --------> like `controller, service...`
│       ├── someTopic (optional)  -------> 文件夹名称代表kafka主题,如someTopic
│            ├── someKey1.js(optional)  ------> 订阅key为someKey1的消息
|            └── someKey2.js(optional)  ------> 订阅key为someKey2的消息
|            └── __default.js(optional)  ------> 订阅的key都未命中时,默认订阅,包括key为空的情况。
// |    
├── config
|   ├── plugin.js
|   ├── config.default.js
│   ├── config.prod.js
|   ├── config.test.js (optional)
|   ├── config.local.js (optional)
|   └── config.unittest.js (optional)

使用注意

Note: 你必须设置 app.config.topicOption.dir, egg-rdkafka-node 需要基于 这个去加载所订阅的topic

使用案例

// {app_root}/controller/index.js
class IndexController extends Controller {
  async index() {
    await this.ctx.kafka.sendMessage({
      topic: 'someTopic', // 指定 kafka 目录下 的topic 
      key: 'someKey', // 指定 kafka 下的 topic 目录 对应key的consumer
      value: JSON.stringify({
        username: 'JohnApache',
        userId: 10001,
        gender: 0
      })
    });
  }

  async some() {
    this.ctx.kafka.sendMessageSync({
      topic: 'someTopic', // 指定 kafka 目录下的 topic 
      key: 'someKey', // 指定 kafka 下的 topic 目录 对应key 的consumer
      value: JSON.stringify({
        username: 'JohnApache',
        userId: 10001,
        gender: 0
      })
    }, () => {
      // success callback 
    }, () => {
      // error callback 
    })
  }
}

// {app_root}/kafka/someTopic/someKeyConsumer.js
class SomeKeySubscription extends Subscription {
  async subscribe(message) {
    const { topic, key, value, partition, timestamp, offset } = message;
    this.ctx.logger.info(`consume message ${value} by topic ${topic} key ${key} consumer`);
    await asyncTask();
  }
}

提问交流

License

MIT