npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

sofa-bolt-node

v2.0.1

Published

bolt protocol nodejs implementation

Downloads

468

Readme

sofa-bolt-node

Bolt 协议 Nodejs 实现版本

NPM version build status Test coverage David deps Known Vulnerabilities npm download

一、简介

SOFABoltNode 是 SOFABolt 的 Nodejs 实现,它包含了 Bolt 通讯层协议框架,以及 RPC 应用层协议定制。和 Java 版本略有不同的是,它并不包含基础通讯功能(连接管理、心跳、自动重连等等),这些功能会放到专门的 RPC 模块里实现。

二、Bolt 通信层协议设计

Bolt 协议是一个标准的通讯层协议,目前包含两个大版本,定义如下:

V1 版本

Request command protocol for v1
0     1     2           4           6           8          10           12          14         16
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|proto| type| cmdcode   |ver2 |   requestId           |codec|        timeout        |  classLen |
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|headerLen  | contentLen            |                             ... ...                       |
+-----------+-----------+-----------+                                                                                               +
|               className + header  + content  bytes                                            |
+                                                                                               +
|                               ... ...                                                         |
+-----------------------------------------------------------------------------------------------+

Response command protocol for v1
0     1     2     3     4           6           8          10           12          14         16
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|proto| type| cmdcode   |ver2 |   requestId           |codec|respstatus |  classLen |headerLen  |
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
| contentLen            |                  ... ...                                              |
+-----------------------+                                                                       +
|                          header  + content  bytes                                             |
+                                                                                               +
|                               ... ...                                                         |
+-----------------------------------------------------------------------------------------------+

V2 版本

Request command protocol for v2
0     1     2           4           6           8          10     11     12          14         16
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+
|proto| ver1|type | cmdcode   |ver2 |   requestId           |codec|switch|   timeout             |
+-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+
|classLen   |headerLen  |contentLen             |           ...                                  |
+-----------+-----------+-----------+-----------+                                                +
|               className + header  + content  bytes                                             |
+                                                                                                +
|                               ... ...                                  | CRC32(optional)       |
+------------------------------------------------------------------------------------------------+

Response command protocol for v2
0     1     2     3     4           6           8          10     11    12          14          16
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+
|proto| ver1| type| cmdcode   |ver2 |   requestId           |codec|switch|respstatus |  classLen |
+-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+
|headerLen  | contentLen            |                      ...                                   |
+-----------------------------------+                                                            +
|               className + header  + content  bytes                                             |
+                                                                                                +
|                               ... ...                                  | CRC32(optional)       |
+------------------------------------------------------------------------------------------------+

V2 相比 V1 版本,主要两点改进:

  1. 增加了协议版本号(ver1)
  2. 协议层面支持了数据包的 CRC32 校验(后面详细介绍)

主要字段介绍:

  • proto: 协议标识位,bolt v1 是 0x01,bolt v2 是 0x02
  • ver1: bolt 协议版本,从 v2 开始 proto 不会再变,升级只变这个版本号
  • type: request/response/request oneway
  • cmdcode: request/response/heartbeat,和 type 有交叉
  • ver2: 应用层协议的版本(暂时没用)
  • requestId: 数据包唯一标识 id
  • codec: body 序列化方式,目前支持 hessian/hessian2/protobuf
  • switch: 是否开启 crc32 校验
  • headerLen: 自定义头部长度
  • contentLen: 内容长度
  • CRC32: 整个数据包通过计算出的 crc32 值(ver1 > 1 时支持)

三、功能介绍

基本 RPC 调用功能

客户端示例

'use strict';

const net = require('net');
const pump = require('pump');
const protocol = require('sofa-bolt-node');

const options = {
  sentReqs: new Map(),
};
const socket = net.connect(12200, '127.0.0.1');
const encoder = protocol.encoder(options);
const decoder = protocol.decoder(options);

socket.once('connect', () => {
  console.log('connected');
});
socket.once('close', () => {
  console.log('close');
});
socket.once('error', err => {
  console.log(err);
});

// 流式 API
pump(encoder, socket, decoder, err => {
  console.log(err);
});

// 监听 response / heartbeat_acl
decoder.on('response', res => {
  console.log(res);
});
decoder.on('heartbeat_ack', res => {
  console.log(res);
});

// 发送 RPC 请求
encoder.writeRequest(1, {
  args: [{
    $class: 'java.lang.String',
    $: 'peter',
  }],
  serverSignature: 'com.alipay.sofa.rpc.quickstart.HelloService:1.0',
  methodName: 'sayHello',
  timeout: 3000,
});

// 发送心跳包
encoder.writeHeartbeat(2, { clientUrl: 'xxx' });

服务端示例

'use strict';

const net = require('net');
const pump = require('pump');
const protocol = require('sofa-bolt-node');

const server = net.createServer(socket => {
  const options = {
    sentReqs: new Map(),
  };
  const encoder = protocol.encoder(options);
  const decoder = protocol.decoder(options);
  pump(encoder, socket, decoder, err => {
    console.log(err);
  });

  decoder.on('request', req => {
    console.log(req);
    encoder.writeResponse(req, {
      isError: false,
      appResponse: {
        $class: 'java.lang.String',
        $: `hello ${req.data.args[0]} !`,
      },
    });
  });
  decoder.on('heartbeat', hb => {
    console.log(hb);
    encoder.writeHeartbeatAck(hb);
  });
});

server.listen(12200);

多种序列化方式支持

目前推荐的序列化方式是 protobuf,因为它跨语言性做得比较好。在蚂蚁内部其实我们主要使用的是 hessian 序列化,后面我们会陆续开源关于它的一系列最佳实践,尽请期待。下面我们演示一个 pb 的 demo

通过 *.proto 文件定义接口

syntax = "proto3";

package com.alipay.sofa.rpc.test;

// 可选
option java_multiple_files = false;

service ProtoService {
  rpc echoObj (EchoRequest) returns (EchoResponse) {}
}

message EchoRequest {
  string name = 1;
  Group group = 2;
}

message EchoResponse {
  int32 code = 1;
  string message = 2;
}

enum Group {
  A = 0;
  B = 1;
}

客户端使用 protobuf

'use strict';

const net = require('net');
const path = require('path');
const pump = require('pump');
const protocol = require('sofa-bolt-node');
const protobuf = require('antpb');

// 存放 *.proto 文件的目录,加载 proto
const protoPath = path.join(__dirname, 'proto');
const proto = protobuf.loadAll(protoPath);

// 将 proto 作为参数传入 encoder/decoder
const sentReqs = new Map();
const encoder = protocol.encoder({ sentReqs, proto });
const decoder = protocol.decoder({ sentReqs, proto });

const socket = net.connect(12200, '127.0.0.1');
socket.once('connect', () => {
  console.log('connected');
});
socket.once('close', () => {
  console.log('close');
});
socket.once('error', err => {
  console.log(err);
});
pump(encoder, socket, decoder, err => {
  console.log(err);
});

// 指定序列化方式为 protobuf
encoder.codecType = 'protobuf';

const req = {
  serverSignature: 'com.alipay.sofa.rpc.test.ProtoService:1.0',
  methodName: 'echoObj',
  args: [{
    name: 'peter',
    group: 'B',
  }],
  timeout: 3000,
};

decoder.on('response', res => {
  console.log(res.data.appResponse);
});

// 记录请求、发送请求
sentReqs.set(1, { req });
encoder.writeRequest(1, req);

服务端使用 protobuf

'use strict';

const net = require('net');
const path = require('path');
const pump = require('pump');
const protocol = require('sofa-bolt-node');
const protobuf = require('antpb');

const protoPath = path.join(__dirname, 'proto');
const proto = protobuf.loadAll(protoPath);

const server = net.createServer(socket => {
  const options = {
    sentReqs: new Map(),
    proto,
  };
  const encoder = protocol.encoder(options);
  const decoder = protocol.decoder(options);
  pump(encoder, socket, decoder, err => {
    console.log(err);
  });

  decoder.on('request', req => {
    const reqData = req.data.args[0].toObject({ enums: String });;
    encoder.writeResponse(req, {
      isError: false,
      appResponse: {
        code: 200,
        message: 'hello ' + reqData.name + ', you are in ' + reqData.group,
      },
    });
  });

  decoder.on('heartbeat', hb => {
    console.log(hb);
    encoder.writeHeartbeatAck(hb);
  });
});

server.listen(12200);

CRC32 校验

RPC 在网络传输过程中可能会遇到各种各样奇葩的问题,导致二进制被篡改,如果这个接口是和钱有关的,就可能导致资损,所以 Bolt 协议层面引入了一个校验功能,当开启时会在整个数据包后面额外传输 4 个 bytes 是数据包计算出来的 CRC32 值,接收端收到数据包以后先在本地重新计算 CRC32 值然后和附带的值比对,一致继续处理,不一致则直接报错

该功能由客户端开启,但是开启之前一般有一个协商的过程,服务端通过协商告诉客户端它支持 crc32 校验

'use strict';

const net = require('net');
const pump = require('pump');
const protocol = require('sofa-bolt-node');

const options = {
  sentReqs: new Map(),
};

const socket = net.connect(12200, '127.0.0.1');
const encoder = protocol.encoder(options);
const decoder = protocol.decoder(options);
pump(encoder, socket, decoder);

// 客户端开启 crc 校验
encoder.protocolType = 'bolt2'; // v2 版本以上才支持 crc 校验
encoder.boltVersion = 2;
encoder.crcEnable = true;

// 发送
encoder.writeRequest(1, {
  args: [{
    $class: 'java.lang.String',
    $: 'peter',
  }],
  serverSignature: 'com.alipay.sofa.rpc.quickstart.HelloService:1.0',
  methodName: 'sayHello',
  timeout: 3000,
});

四、用户接口

全局接口

  • encoder(options) 创建一个 ProtocolEncoder
    • @param {Map} sentReqs - 用于存储发送出去的请求
    • @param {Map} [classCache] - 类定义缓存
    • @param {Object} [classMap] - hessian 序列化的类型定义
    • @param {Object} [proto] - protobuf 序列化的接口定义
    • @param {Url} [address] - TCP socket 地址
    • @param {String} [codecType] - 序列化方式
  • decoder(options) 创建一个 ProtocolDecoder
    • @param {Map} sentReqs - 用于存储发送出去的请求
    • @param {Map} [classCache] - 类定义缓存
    • @param {Object} [classMap] - hessian 序列化的类型定义
    • @param {Object} [proto] - protobuf 序列化的接口定义
  • setOptions(options) 设置一些全局的参数

ProtocolEncoder 接口

  • protocolType 设置协议,bolt/bolt2
  • codecType 设置序列化方式,hessian/hessian2/protobuf
  • boltVersion 设置 bolt 的版本
  • crcEnable 是否开启 crc 校验
  • writeRequest(id, req, [callback]) 发送请求
    • @param {Number} id - 数据包唯一标识
    • @parma {Object} req - 请求对象
      • @param {String} serverSignature - 服务的唯一标识
      • @param {String} methodName - 方法名
      • @param {Array} args - 参数列表
      • @param {Number} timeout - 超时时间
      • @param {Object} requestProps - 额外传递的 kv 参数
  • writeResponse(req, res, [callback]) 发送响应
    • @param {Object} req - 请求对象,有请求才有响应
    • @parma {Object} res - 响应对象
      • @param {Boolean} isError - 是否成功
      • @param {String} errorMsg - 异常信息,isError=false 的话为 null
      • @param {Object} appResponse - 响应对象
      • @param {Object} responseProps - 额外传递的 kv 参数
  • writeHeartbeat(id, hb, [callback]) 发送心跳请求
    • @param {Number} id - 数据包唯一标识
    • @parma {Object} hb - 心跳对象
      • @param {String} clientUrl - 客户端 url
  • writeHeartbeatAck(hb, [callback]) 发送心跳响应
    • @parma {Object} hb - 心跳对象

五、接口设计思想

从上面的介绍和接口定义看,我们对协议的实现核心就是 Encoder 和 Decoder 两个类,并且采用了 Nodejs 里流(Stream)的风格

+---------+  pipe  +---------+  pipe  +---------+    response
| Encoder |  --->  | Socket  |  --->  | Decoder |    ...
+---------+        +---------+        +---------+
                      |  ^
                      |  |
                      |  |
                      v  |
+---------+  pipe  +---------+  pipe  +---------+    request
| Encoder |  --->  | Socket  |  --->  | Decoder |    ...
+---------+        +---------+        +---------+

所有的协议细节,数据的切分都封装在 Encoder/Decoder 两个类中,并且提供标准的 API,所以以后我们要替换其他的通讯层协议(比如:dubbo),那么只需要直接替换就好了

六、如何贡献

请告知我们可以为你做些什么,不过在此之前,请检查一下是否有已经存在的Bug或者意见

如果你是一个代码贡献者,请参考代码贡献规范

七、开源协议

MIT