npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

rocketmq-grpc

v0.3.1

Published

基于RocketMQ-gRPC协议的自封装Node.js客户端

Downloads

62

Readme

Apache RocketMQ Node.js 客户端

这是一个自封装的 Apache RocketMQ Node.js 客户端,基于 RocketMQ 5.x版本的gRPC协议实现。

  • 不依赖egg.js、rocketmq-nodejs-sdk等第三方库,它是纯粹的Node.js客户端。
  • 实现了分布式下顺序保障机制。
  • 自定义实现了类似的PushConsumer,通过长轮询的方式来获取消息。
  • 统一的日志记录器注入,方便调试。

注意

对于Macos or Linux,build操作失败,可以尝试使用sudo npm run build

对于Windows,build操作失败,可以尝试使用“以管理员身份运行”命令行。

概述

在开始客户端的部分之前,所需的一些前期工作(或者参照这里):

  1. 准备 Node.js 环境。Node.js 16.19.0 是确保客户端运行的最小版本,Node.js >= 18.17.0 是推荐版本;
  2. 部署 namesrv,broker 以及 proxy 组件。

快速开始

我们使用 npm 作为依赖管理和发布的工具。 你可以在 npm 的官方网站了解到关于它的更多信息。 这里是一些在开发阶段你会使用到的 npm 命令:

# 自动安装工程相关的依赖
npm install
# 初始化 grpc 代码
npm run build
# 安装rocketmq nodejs 客户端
npm i rocketmq-grpc

开启 grpc-js 的调试日志:

GRPC_TRACE=compression GRPC_VERBOSITY=debug GRPC_TRACE=all npm run xxx or node xxx

示例

普通消息

发送消息

import {Producer} from "rocketmq-grpc";

const simpleProducer = new Producer({
  endpoints: 'localhost:8081'
});
console.log('checkout:simpleProducer init success!');

(async () => {
  // 启动生产者
  simpleProducer.startup().then(() => {
    console.log('checkout:simpleProducer startup success!');

    // 发送消息
    simpleProducer
      .send({
        topic: 'checkout-topic',
        tag: 'checkout',
        keys: ['checkout-key'],
        body: Buffer.from('Hello, Checkout OK!')
      })
      .then(() => {
        console.log('checkout:send message success!');
        process.exit(0);
      });
  });
})();

消费消息

import {SimpleConsumer} from "rocketmq-grpc";

const consumer = new SimpleConsumer({
  consumerGroup: 'checkout-group',
  endpoints: '192.168.1.162:8081',
  subscriptions: new Map().set('checkout-topic', '*'),
  requestTimeout: 3000,
  awaitDuration: 30000 // long polling
});

console.log('checkout:consumer init success!');

const isShutdown = false;

async function startAndConsumeMessages() {
  try {
    // 启动消费者
    await consumer.startup();
    console.log('checkout:consumer startup success!');

    async function consumeMessages() {
      try {
        const messages = await consumer.receive(20);

        if (messages.length > 0) {
          console.log('got %d messages', messages.length);

          for (const message of messages) {
            console.log('body=%o', message.body.toString());
            await consumer.ack(message);
            console.log('checkout:ack message success!');
          }
        } else {
          console.log('No messages received, waiting...');
        }
      } catch (error) {
        console.error('An error occurred:', error);
      } finally {
        // // 等待一段时间后递归调用consumeMessages
        // await new Promise(resolve => setTimeout(resolve, 5000));

        console.log('checkout:waiting for messages...');

        if (!isShutdown) {
          await consumeMessages();
        }
      }
    }

    // 开始消费消息
    await consumeMessages();
  } catch (error) {
    console.error('An error occurred:', error);
  } finally {
    // 如果发生错误或者接收消息出现问题,可以选择重新启动消费者
    // 在这里你可以添加相应的逻辑
  }
}

startAndConsumeMessages().catch(console.error);

更多的示例可以参考这里

消息类型

  • [x] NORMAL
  • [x] FIFO
  • [x] DELAY
  • [x] TRANSACTION

消费类型

  • [x] PRODUCER
  • [x] SIMPLE_CONSUMER
  • [ ] PULL_CONSUMER
  • [x] PUSH_CONSUMER

PushConsumer并非RocketMQ官方SDK的实现方式,官方采用本地维持一个内存队列组来不断同步 拉取消息并分发给消费者,这样的实现是复杂的,只能等待官方SDK的实现。

此处的实现是基于5.x版本独有的SimpleConsumer,通过长轮询的方式来获取消息,SimpleConsumer 是RocketMQ在此版本提出用于供消费端开发者做更加自定义的消费者实现的一种方式。