@ksfcore/rpc
v2.0.18
Published
ksf4nodejs的rpc通信框架
Downloads
12
Readme
@ ksf / rpc
@ ksf/rpc
is the Ksf RPC calling framework, which provides an infrastructure for making RPC calls between multiple server processes. It can achieve the following capabilities:
Use ksf2node to translate the Ksf file into client proxy class code for the client to call any Ksf service.
After using ksf2node to translate Ksf files into server code, you can implement standard Ksf service, which can be used by any client using Ksf / KUP protocol Call directly.
Remote log, dye log, attribute report, alarm report and service communication.
Create a client proxy class for a custom communication protocol (such as a protocol using JSON format).
Create a server for a custom communication protocol (such as a protocol using JSON format).
Module: @ ksf / registry, Function: According to the service Obj name, go to the master to query the IP list available for the service.
Ksf RPC is divided into two parts, client and server:
The client part provides functions such as RPC proxy generation, message routing and network communication.
The server provides remote service exposure, request dispatching, and network communication functions.
Installation
npm install @ tar/rpc
A note on protocols and Ksf files
Before in-depth knowledge of Ksf, let's clarify the relationship between the Ksf encoding protocol
, KUP packet protocol
, and KSF packet protocol
:
Ksf encoding protocol is a data encoding and decoding rule. It encodes data types such as shaping, enumeration values, strings, sequences, dictionaries, and custom structures into binary data streams according to certain rules. After receiving the binary data stream, the opposite end deserializes according to the corresponding rules to obtain the original value.
Ksf encoding protocol uses a type of unsigned char called TAG to identify variables. For example, the TAG value of a variable is 100 (the value is customized by the developer). When encoding the variable value, it also encodes the TAG value. When the opposite end needs to read the value of the variable, it looks for a data segment with a TAG value of 100 in the data stream, and after finding it, reads out the data part according to the rule, which is the value of the variable.
The positioning of the Ksf encoding protocol is a set of encoding rules. The Ksf protocol serialized data can not only be transmitted over the network, but also stored in a database.
The KUP packet protocol is an upper-layer encapsulation of the Ksf encoding protocol and is positioned as a communication protocol. It uses the variable name as the keyword of the variable. When encoding, the variable name is packed into the data stream; when decoding, the corresponding data area is found according to the variable name, and the data area is deserialized according to the data type to obtain the original value.
The KUP packet protocol has a built-in type of Ksf encoding protocol. The key of the is the variable name and the value is the binary data serialized by Ksf encoding.
The data packet encapsulated by the KUP packet protocol can be sent directly to the Ksf server, and the server can directly deserialize to get the original value.
The KSF packet protocol is a communication protocol encapsulated using the Ksf encoding protocol for RequestPacket (request structure) and ResponsePacket (result structure). The structure contains important information such as the request sequence number, protocol type, and binary data after RPC parameter serialization.
For the encoding and decoding rules of the Ksf encoding protocol and the writing method of Ksf files, please refer to @tar/steam document.
When using, you can define the communication description file according to your needs, and then use the ksf2node tool to convert the required code files. For specific instructions, please refer to the ksf2node
documentation .
Example
Clone this project.
Execute in the root directory of the project:
npm install
Start the RPC server program in the
/rpc/examples/rpc-ksf/demo.1/server.node.1
directory:node main.js
Start the RPC client program in the
/rpc/examples/rpc-ksf/demo.1/client.node.proxy
directory:node main.js
Development steps
- Write a ksf file that defines the constants, enumeration values, structures, functions, and other communication protocols used by the client to communicate with the server. We use the following Ksf file as an example:
Generally speaking, Ksf files are usually developed, maintained and provided by
server-side development
.
module TRom
{
struct User_t
{
0 optional int id = 0;
1 optional int score = 0;
2 optional string name = "";
};
struct Result_t
{
0 optional int id = 0;
1 optional int iLevel = 0;
};
interface NodeJsComm
{
int test();
int getall(User_t stUser, out Result_t stResult);
int getUsrName(string sUsrName, out string sValue1, out string sValue2);
int secRequest(vector<byte> binRequest, out vector<byte> binResponse);
};
};
Save the above as: NodeJsComm.ksf.
- Generate client call code from Ksf file:
ksf2node --client NodeJsComm.ksf
- Writing client programs
// STEP01 Introduce system module and tool generated code
var Ksf = require ("@ ksf / rpc"). client;
var TRom = require ("./ NodeJsCommProxy.js"). TRom;
// STEP02 initialize Ksf client
// This step is not necessary. The subsequent documents will introduce the conditions under which the [ksf] .client.initialize function needs to be called and what it does
// The initialize function only needs to be called once, and globally available after initialization
// We don't need to use too many features in the demo program, so comment it out first
//Ksf.initialize("./config.conf ");
// STEP03 Generates a server-side call proxy instance
var prx = Ksf.stringToProxy (TRom.NodeJsCommProxy, "TRom.NodeJsTestServer.NodeJsCommObj@tcp -h 127.0.0.1 -p 14002 -t 60000");
// STEP04 client calls use the Promise mechanism for callbacks, here write success and failure callback functions
var success = function (result) {
console.log("result.response.costtime:", result.response.costtime);
console.log("result.response.return:", result.response.return);
console.log("result.response.arguments.stResult:", result.response.arguments.stResult);
}
var error = function (result) {
console.log("result.response.costtime:", result.response.costtime);
console.log("result.response.error.code:", result.response.error.code);
console.log("result.response.error.message:", result.response.error.message);
}
// STEP05 Initialize interface parameters and start calling RPC interface
var stUser = new TRom.User_t ();
stUser.name = "tencent-mig";
prx.getall (stUser) .then (success, error) .done ();
Save the above code as client.js
and use the following command to call the server.
node client.js
result.response.costtime: 7 result.response.return: 200 result.response.arguments.stResult: { id: 10000, iLevel: 10001 } As long as the corresponding Ksf file is available, Ksf services provided by C ++, Java, PHP, Node.js can be called.
- Generate server code based on Ksf file:
ksf2node --server NodeJsComm.ksf
- Writing the server program
ksf2node The tool will generate NodeJsComm.js
and NodeJsCommImp.js
.
Developers Do not need and try not to change NodeJsComm.js
. This file mainly implements functions such as structure encoding and decoding, function parameter encoding and decoding, and function distribution.
Developers only need to fill in the RPC functions defined in NodeJsCommImp.js
and implement the business logic.
var TRom = require('./NodeJsComm.js').TRom;
module.exports.TRom = TRom;
TRom.NodeJsCommImp.prototype.initialize = function ( ) {
//TODO::
}
TRom.NodeJsCommImp.prototype.test = function (current) {
//TODO::
}
TRom.NodeJsCommImp.prototype.getall = function (current, stUser, stResult) {
//TODO::
// Initially, each RPC function is empty, and developers need to fill in the blanks to fill in the missing business logic here.
// After completing the business logic, the developer calls the current sendResponse function and returns the data to the caller.
// Note: The sendResponse of each function is different, and its parameters correspond to the return value and output parameters of the current function.
// If the current function has a return value, then the first parameter of current.sendResponse should be the return. The return value of the current function in the example is int, and we return 200 as an example.
// After solving the return value problem, we can write the current output parameters in order. The encoding and decoding of parameters and network transmission are addressed by the framework.
stResult.id = 10000;
stResult.iLevel = 10001;
current.sendResponse(200, stResult);
}
TRom.NodeJsCommImp.prototype.getUsrName = function (current, sUsrName, sValue1, sValue2) {
//TODO::
}
TRom.NodeJsCommImp.prototype.secRequest = function (current, binRequest, binResponse) {
//TODO::
}
Next, create a service entry file. It is mainly responsible for reading configuration files, configuring ports, setting protocol parsers, starting services, and so on.
var Ksf = require("@ksfcore/rpc").server;
var TRom = require("./NodeJsCommImp.js").TRom;
var svr = Ksf.createServer(TRom.NodeJsCommImp);
svr.start({
name : "TRom.NodeJsTestServer.NodeJsCommObjAdapetr",
servant : "TRom.NodeJsTestServer.NodeJsCommObj",
endpoint : "tcp -h 127.0.0.1 -p 14002 -t 10000",
protocol : "ksf",
maxconns : 200000
});
console.log("server started.");
Save the above code as server.js, and start it with the following command.
node server.js
Client initialization function
In the demo code we mentioned that initialize does not have to display the call, we can also set the parameters we need in other ways.
First we look at the format and necessary parameters of the configuration file:
<ksf>
<application>
<client>
locator = ksf.ksfregistry.QueryObj@tcp -h 127.0.0.1 -p 14002 ## define master address
async-invoke-timeout = 60000 ## Timeout of asynchronous call (ms)
</client>
</application>
</ksf>
This configuration file is automatically generated by ksfnode
. We mainly use ksf.application.client.locator
and ksf.application.client.async-invoke-timeout
.
When can I not call the initialize function? If we generate a server-side proxy, each server uses the direct connection mode, that is, the IP address specified in stringToProxy does not need to be initialized.
In addition to using the configuration file to set these two parameters, we can call [@ksfcore/rpc] .client to expose the method for setting:
var Ksf = require ("@ ksf / rpc"). client;
Ksf.setProperty ("locator", "ksf.ksfregistry.QueryObj@tcp -h 127.0.0.1 -p 14002");
Ksf.setProperty ("timeout", 60000);
The above calling method is equivalent to using the configuration file.
Ksf service creation method
Ksf has three methods to create a standard Ksf service:
- Configuration file generated using
ksfnode
:
Using this method is the same as using Ksf C ++. We need to configure Obj for the service on the Ksf management platform, and then automatically generate a configuration file containing the listening port by ksfnode
when starting the program. The service framework depends on this configuration to bind the port Start the service.
The configuration file generated by ksfnode is similar to the following:
<ksf>
<application>
enableset=n
setdivision=NULL
<server>
node=ksf.ksfnode.ServerObj@tcp -h 127.0.0.1 -p 14002 -t 60000
app=TRom
server=NodeJsTestServer
localip=127.0.0.1
netthread=2
local=tcp -h 127.0.0.1 -p 10002 -t 3000
basepath=/usr/local/app/ksf/ksfnode/data/MTT.NodeJSTest/bin/
datapath=/usr/local/app/ksf/ksfnode/data/MTT.NodeJSTest/data/
logpath=/usr/local/app/ksf/app_log//
logsize=15M
config=ksf.ksfconfig.ConfigObj
notify=ksf.ksfnotify.NotifyObj
log=ksf.ksflog.LogObj
deactivating-timeout=3000
openthreadcontext=0
threadcontextnum=10000
threadcontextstack=32768
closeout=0
<TRom.NodeJsTestServer.NodeJsCommObjAdapter>
allow
endpoint=tcp -h 127.0.0.1 -p 14002 -t 60000
handlegroup=TRom.NodeJsTestServer.NodeJsCommObjAdapter
maxconns=200000
protocol=ksf
queuecap=10000
queuetimeout=60000
servant=TRom.NodeJsTestServer.NodeJsCommObj
shmcap=0
shmkey=0
threads=5
</TRom.NodeJsTestServer.NodeJsCommObjAdapter>
</server>
<client>
locator=ksf.ksfregistry.QueryObj@tcp -h 127.0.0.1 -p 14002:tcp -h 127.0.0.1 -p 14003
refresh-endpoint-interval=60000
stat=ksf.ksfstat.StatObj
property=ksf.ksfproperty.PropertyObj
report-interval=60000
sample-rate=1000
max-sample-count=100
sendthread=1
recvthread=1
asyncthread=3
modulename=TRom.NodeJsTestServer
async-invoke-timeout=60000
sync-invoke-timeout=3000
</client>
</application>
</ksf>
We use this configuration file to create a service:
// STEP01 introduces key modules
var Ksf = require ("@ ksf / rpc");
var TRom = require ("./ NodeJsCommImp.js");
// STEP02 Create an instance of the service
// Note the configuration here. In a formal environment, use process.env.KSF_CONFIG to indicate the path of the configuration file
// That is: svr.initialize (process.env.KSF_CONFIG, function (server) {...});
var svr = new Ksf.server ();
svr.initialize ("./ TRom.NodeJsTestServer.config.conf", function (server) {
server.addServant (TRom.NodeJsCommImp, server.Application + "." + server.ServerName + ".NodeJsCommObj");
});
// STEP03 After initializing the service in the previous step, start the service
svr.start ();
- Display configuration server information
// STEP01 introduces key modules
var Ksf = require ("@ ksf / ksf"). server;
var TRom = require ("./ NodeJsCommImp.js"). TRom;
// STEP02 Create an instance of the service
// Note that "endpoint" and "protocol" are mandatory options, and the format must be the same as the following example
var svr = Ksf.createServer (TRom.NodeJsCommImp);
svr.start ({
name: "TRom.NodeJsTestServer.AdminObjAdapetr",
servant: "TRom.NodeJsTestServer.AdminObj",
endpoint: "tcp -h 127.0.0.1 -p 14002 -t 10000",
maxconns: 200000,
protocol: "ksf"
});
console.log ("server started.");
- From the configuration file generated by
ksfnode
, select some services to start
// STEP01 introduces key modules
var Ksf = require ("@ ksf / rpc");
var TRom = require ("./ NodeJsCommImp.js");
Ksf.server.getServant ("./ TRom.NodeJsTestServer.config.conf"). ForEach (function (config) {
var svr, map;
map = {
'TRom.NodeJsTestServer.NodeJsCommObj': TRom.NodeJsCommImp
};
svr = Ksf.server.createServer (map [config.servant]);
svr.start (config);
});
Ksf client implementation principle
Ksf server implementation
Ksf calls a third-party protocol service as a client
We assume the communication protocol is in JSON format:
// Client-> Server
{
"P_RequestId": 0,
"P_FuncName": "test",
"P_Arguments": ["aa", "bb"]
}
// client <-server
{
"P_RequestId": 0,
"P_FuncName": "test",
"P_Arguments": ["ee", "ff"]
}
Implement protocol analysis class:
// Save the file as Protocol.js
var EventEmitter = require ("events"). EventEmitter;
var util = require ("@ ksf / util");
var stream = function () {
EventEmitter.call (this);
this._data = undefined;
this._name = "json";
}
util.inherits (stream, EventEmitter);
stream.prototype .__ defineGetter __ ("name", function () {return this._name;});
module.exports = stream;
/**
* Packaging method based on incoming data
* @param request
* request.iRequestId: the request sequence number generated by the framework
* request.sFuncName: function name
* request.Arguments: function's argument list
*/
stream.prototype.compose = function (data) {
var str = JSON.stringify ({
P_RequestId: data.iRequestId,
P_FuncName: data.sFuncName,
P_Arguments: data.Arguments
});
var len = 4 + Buffer.byteLength (str);
var buf = new Buffer (len);
buf.writeUInt32BE (len, 0);
buf.write (str, 4);
return buf;
}
/**
*
* After receiving the packet from the network, fill in the data to determine whether the packet is complete
* @param data The incoming data may be a TCP fragment, not necessarily a complete data request. The protocol parsing class does the data caching work inside.
*
* When there is a complete request, the unpacking function throws an event, and the data member of the event needs to be supplemented in the following format:
*
* {
* iRequestId: 0, // Serial number of this request
* sFuncName: "", // The function name of this request
* Arguments: [] // List of parameters for this request
* }
*
*/
stream.prototype.feed = function (data) {
var BinBuffer = data;
if (this._data != undefined) {
var temp = new Buffer(this._data.length + data.length);
this._data.copy(temp, 0);
data.copy(temp, this._data.length);
this._data = undefined;
BinBuffer = temp;
}
for (var pos = 0; pos < BinBuffer.length; ) {
if (BinBuffer.length - pos < 4) {
break;
}
var Length = BinBuffer.readUInt32BE(pos);
if (pos + Length > BinBuffer.length) {
break;
}
var result = JSON.parse(BinBuffer.slice(pos + 4, pos + Length).toString());
var request =
{
iRequestId : result.P_RequestId,
sFuncName : result.P_FuncName,
Arguments : result.P_Arguments
};
this.emit("message", request);
pos += Length;
}
if (pos != BinBuffer.length) {
this._data = new Buffer(BinBuffer.length - pos);
BinBuffer.copy(this._data, 0, pos);
}
}
/**
* Reset the current protocol parser
*/
stream.prototype.reset = function () {
delete this._data;
this._data = undefined;
}
The client uses the protocol parser to call the server code:
var Ksf = require("@ksfcore/ksf").client;
var Protocol = require("./ProtocolClient.js");
var prx = Ksf.stringToProxy(Ksf.ServantProxy, "test@tcp -h 127.0.0.1 -p 14002 -t 60000");
prx.setProtocol(Protocol);
prx.rpc.createFunc("echo");
var success = function (result) {
console.log("success");
console.log("result.response.costtime:", result.response.costtime);
console.log("result.response.arguments:", result.response.arguments);
}
var error = function (result) {
console.log("error");
console.log("result.response.error.code:", result.response.error.code);
console.log("result.response.error.message:", result.response.error.message);
}
prx.rpc.echo("tencent", "mig", "abc").then(success, error);
In addition, if you want to request a fixed machine based on a certain characteristic, you can use the following method:
prx.getUsrName (param, {
hashCode: userId
}). then (success, error) .done ();
After obtaining the client proxy object, the server-side interface function is called. At this time, the hashCode parameter can be passed in. Ksf will allocate the request to a fixed machine in the connection list according to the hashCode. have to be aware of is:
- The userId here is a number, which can be binary, octal, or hexadecimal, but the number converted to decimal is preferably less than 16 digits. JavaScript has a problem of losing precision when dealing with high-precision numbers.
- When the server machine list is fixed, the same hashCode will be assigned to a fixed machine. When the server machine list changes, the machine corresponding to the hashCode will be reassigned.
Ksf as a Third Party Agreement
First implement the RPC function processing class, pay attention to the framework's distribution logic:
A. If the function name passed from the client is a function of the processing class, then the framework limitedly calls the corresponding function
B. If the function passed from the client is not a processing function, then the ** onDispatch
function of the processing class is called, and the function is responsible for processing the request
C. If there is no onDispatch function, an error is reported
// Save the file as: EchoHandle.js
var Handle = function () {
}
Handle.prototype.initialize = function () { }
Handle.prototype.echo = function (current, v1, v2, v3) {
console.log("EchoHandle.echo::", v1, v2, v3);
current.sendResponse("TX", "TX-MIG");
}
Handle.prototype.onDispatch = function (v1, v2, v3) {
console.log("EchoHandle.onDispatch::", v1, v2, v3);
}
module.exports = Handle;
Code example of server startup function:
var Ksf = require("@ksfcore/ksf").server;
var Protocol = require("./ProtocolClient.js");
var Handle = require("./EchoHandle.js");
var svr = Ksf.createServer(Handle);
svr.start({
endpoint : "tcp -h 127.0.0.1 -p 14002 -t 10000",
protocol : Protocol
});
Ksf client related parameters
When Ksf client proxy object calls the protocol interface function, the last parameter can be passed in a configuration object:
- dyeing: Dyeing objects, see @ksfcore/dyeing for details.
- context: context object.
- packetType: call type, 1 is one-way request, others are ordinary requests.
- hashCode: Request a hash, which must be a number within JavaScript's precision security range (Math.pow (2, 53)-1)
Examples
prx.getUsrName (param, {
dyeing: dyeingObj,
context: {xxx: xxx},
packetType: 1,
hashCode: userId
}). then (success, error);
Setting Exception Node Blocking Policy Parameters
The peer node is considered abnormal when the following conditions are met:
Within 60 seconds, the number of timeout calls is greater than or equal to 2, and the timeout ratio is greater than 0.5
Number of consecutive timeouts is greater than 5
The anomalous node will be blocked and retry every 30 seconds, and recover if successful.
If you need to modify the blocking strategy, you can call the setCheckTimeoutInfo method, as follows:
proxy._worker.setCheckTimeoutInfo ({
minTimeoutInvoke: 2, // Minimum number of timeouts for strategy 1
checkTimeoutInterval: 60000, // Minimum time interval for strategy 1, in ms
frequenceFailInvoke: 5, // Number of consecutive timeouts for strategy 2
minFrequenceFailTime: 5000, // Minimum interval time for strategy 2 (minimum interval time from 0th to 5th timeout), unit ms
radio: 0.5, // Timeout ratio for strategy 1
tryTimeInterval: 30000, // Exception node retry interval, unit ms
reconnectInterval: 60000 // Interval between closing and reconnecting when abnormal node connection succeeds but retry fails
})