cry-ebus
v0.56.5
Published
enterprise bus
Downloads
38
Readme
Enterprise bus architecture
INSTALLATION
npm install 'cry-ebus' --save
This library is a robust ZMQ-based implementation of reliable client-broker-worker arhitecture. Cilents request services from workers via a broker. Each worker implements exactly one service and many workers can implement the same service.
Minimal working configuration has one broker, one worker, and one client.
Clients can also send and receive notification via publish/subscribe mechanism. Notifications are organised into channels
Broker, clients, and workers are using ZMQ for communication. This allows for several transport options:
- tcp: unicast transport using TCP
- inproc: local in-process (inter-thread) communication transport
- ipc: local inter-process communication transpor
- pgm, epgm: reliable multicast transport using PGM
Consult zmq connect documentation for details.
BROKER
Broker is the central component (the bus of enterprise bus arhitecture). It reliably and effciently:
- Registers and monitors available Workers;
- Receives service requests from clients and forwards them to appropriate brokers;
- Returns service responses (results) to the requesting client;
const Broker = require('cry-ebus').Broker;
const broker = new Broker();
broker.on('error', (err) => console.log('broker error', err));
broker.start();
process.on('SIGTERM', function () {
broker.stop()
process.exit(0);
});
process.on('SIGINT', function () {
broker.stop()
process.exit(0);
});
When starting new Broker(), the following config object can be passed in (here with defaults):
let opts ={
frontend: 'tcp://*:5000', // ZMQ port for clients
backend: 'tcp://*:5001', // ZMQ port for workers
pub: 'tcp://127.0.0.1:5002', // ZMQ port for publications
sub: 'tcp://127.0.0.1:5003', // ZMQ port for subscriptions
heartbeat: 500, // check workers every 500ms
liveness: 3, // deleted a worker that missed 5 heartbeats
expire: 300000, // reject service request in not answered in 300000ms
resubmitEvery: 3, // resubmit an ununswered request to a workers every 3 heartheats
debug: 1 // log level 1 (error)
}
const broker = new Broker(opts);
The same configuration can be using envieronment variable EBUS_BROKER_NAME and EBUS_NAME, the first overriding the second
For example: export EBUS_BROKER_DEBUG=3 would override export EBUS_DEBUG=2, a global ebus debug setting.
The configuration passed-in in the code overrides env variables so do avoid it.
Available events (subscibe with broker.on('eventname')):
error(err)
start
,stop
expired(rid)
- requet has expired because no worker replied in time, request will be rejectedrequest(service,rid,data)
- new request arrived from a clientreply(service,rid,data)
- a worker responded to a requestnew worker(service,wid)
- a new worker registereddisconnect worker (wid)
- a worker is deleted from the set of available workes (it disconnected or was disconneted by broker)
Note: a broker will respond to a client request for service SERVICES
by returning a list of services provided by connected workers.
Note: a broker will respond to a client request for service WORKERS
by returning a list of attached workers as an array of objects
[{ id, service, load }] where load is the number of requests currently assigned to this worker.
WORKER
Workers are "servers" in the e-bus arhitecture. Each worker serves exactly one service which must be registered with the borker when a new worker is created.
Every worker must listen for request(rid,data)
events, process these requests,
and then call worker.respond(rid,result)
with the results. If for whatever reason worker cannot respond to a request,
it can call worker.reject(rid,result)
to notify the client that this request cannot be answered.
const Worker = require('cry-ebus').Worker
const worker = new Worker('hello')
worker.on('request', request)
worker.on('error',(err) => console.error(err))
function request(rid, data) {
if (!data || !data.name) worker.reject(rid, { answer: 'hello ' + data.name })
else worker.respond(rid, { answer: 'hello ' + data.name })
}
worker.start()
Checking the on-line status of the worker (if worker is connected to the broker):
- worker receives
online
andoffline
events when status changes - check
worker.online()
(return true is broker is online, false if not)
const Worker = require('cry-ebus').Worker
const worker = new Worker('hello')
worker.on('request', request)
worker.on('online',() => console.log('broker is on-line',worker.online()))
worker.on('offline',() => console.log('broker is off-line',worker.online()))
function request(rid, data) {
worker.respond(rid, {
answer: 'hello ' + data.name
})
}
worker.start()
Available events: a worker can subscibe to the following events:
start
,stop
online
,offline
- status of connection to the broker changedrequest(rid, data)
- new workload arriving with id 'rid' and dataerror(err)
- an error occured
Optionally, a worker can receive configuration, here with defaults:
let opts = {
broker: 'tcp://localhost:5001', // broker's ZMQ backend port
heartbeat: 500, // report to worker every 500ms
cacheRqMs: 3000, // cache each calculated result for 3000ms
identity: 'some-unique-id', // worker's identity
debug: 1 // default level error (1)
}
const worker = new Worker('hello', opts)
The same configuration can be set using envieronment variable EBUS_WORKER_NAME and EBUS_NAME, the first overriding the second. e.g. export EBUS_WORKER_DEBUG=3 overrides export EBUS_DEBUG=2.
The configuration passed-in in the code overrides env variables, do avoid it.
CLIENT
Clients consume services by
- (using events): calling client.request(service,data). When reply arrives, event reply(rid,data) is triggered.
- (using promises): requesting a promise with client.requestPromise(service,data) and awaiting it.
Client using promises:
// create client that expires ununswered requests in 2s
const Client = require('cry-ebus').Client
const client = new Client({ expires: 2000 })
client.start()
// request two services in parallel
let p = []
p.push(client.requestPromise('hello', { name: 'Ana' }))
p.push(client.requestPromise('hello', { name: 'Barbi' }))
// wait until both services are fulfilled
Promise
.all(p)
.then((data) => console.log('ALL', data))
.catch((...err) => console.log('ERROR', err))
Client using events:
// create client that expires ununswered requests in 2s
const Client = require('cry-ebus').Client
const client = new Client({ expires: 2000 })
// subscibe to events before starting the client
client.on('expired', (rid, service, data) => console.error('expired', rid, service, data))
client.on('reply', (rid, data) => console.log('REPLY', rid, data))
client.start()
// send out two requests
client.request('hello', { name: 'Ana ' })
client.request('hello', { name: 'Barbi' })
// 'repy' event will be triggered twice when answers arrive
A client may request a list of services attached to the broker:
// create client that expires ununswered requests in 2s
const Client = require('cry-ebus').Client
const client = new Client({ expires: 2000 })
// subscibe to events before starting the client
client.on('expired', (rid, service, data) => console.error('expired', rid, service, data))
client.on('reply', (rid, data) => console.log('REPLY', rid, data))
client.start()
// request a list services attached to the broker
client.brokerServices()
Checking the on-line status of the client (if client is connected to the broker):
- client receives
online
andoffline
events when status changes - check
client.online()
(return true is broker is online, false if not)
const Client = require('cry-ebus').Client
const client = new Client({ expires: 2000 })
// subscibe to events before starting the client
client.on('offline', () => console.log('broker is off-line',client.online()))
client.on('online', () => console.log('broker is on-line',client.online()))
client.start()
Available events: a client receives the following events:
start
,stop
error(message)
message(data,channel)
- subscription messasge received on channelservices(services)
- broker sent an array of services served by attacked workers requested by client.brokerServicesworkers(workers)
- broker sent an array of workers [{id,service,load}] requested by client.brokerWorkers
pub/sub notifications
Clients can subscibe to notifications via channels, and can publish notifications to channels. A subscription is matched on the starst of the channel name. A client subscining to channel db/changes/ (just a string) would receive all nofications on channels named db/changes/, db/changes/mytable, db/changes/mytable/12345.
A publishing client
var ebus = require('cry-ebus');
var client = new ebus.Client();
let workload = 30
let sent = 0
let received = 0
let expired = 0
let rejected = 0
client.on('error', (msg) => console.log('error ', msg));
client.start();
var me = (new Date()).getSeconds()
let timer1 = setInterval(() => {
if (sent < workload) {
let msg = {
msg: 'publication A' + me + ' ' + + ++sent
}
client.publish('CH A', msg);
console.log('published on A: ', msg)
}
}, 100)
let timer2 = setInterval(() => {
if (sent < workload) {
let msg = {
msg: 'publication A.1' + me + ' ' + + ++sent
}
client.publish('CH AAA', msg);
console.log('published on A.1: ', msg)
}
}, 100)
let timer3 = setInterval(() => {
if (sent < workload) {
let msg = 'publication B' + me + ' ' + ++sent
client.publish('CH B', msg);
console.log('published on B: ', msg)
}
}, 100)
let timer4 = setInterval(() => {
if (sent < workload) {
let msg = 'publication C' + me + ' ' + ++sent
client.publish('CH C', msg);
console.log('published on C: ', msg)
}
}, 100)
process.on('SIGTERM', function () {
console.log('sigterm')
client.stop()
process.exit(0);
});
process.on('SIGINT', function () {
console.log('SIGINT')
client.stop()
process.exit(0);
});
A subscribing client
var ebus = require('../../lib/ebus.js');
var client = new ebus.Client();
client.on('message', (msg,channel) => console.log('message on channel',channel, msg));
client.on('error', (msg, more) => console.log('error ', msg, more));
client.subscribe('CH A')
client.subscribe('CH B')
client.start();
process.on('SIGTERM', function () {
console.log('sigterm')
client.stop()
process.exit(0);
});
process.on('SIGINT', function () {
console.log('SIGINT')
client.stop()
process.exit(0);
});
A client accepts the following config on creation:
let opts = { broker: 'tcp://localhost:5000', // broker's frontend ZMQ port
pub: 'tcp://localhost:5002', // broker's publications ZMQ port
sub: 'tcp://localhost:5003', // broker's subscriptions ZMQ port
heartbeat: 500, // test is broker is present every 500ms
expires: 300000, // expire ununswered request in 300000ms, raising 'expired(rid,service,data)' event
resends: 1000, // resubmit unanswered request to broker every 1000ms
debug: '5', // default debug level (error)
noHeartbeat: false // set to true to avoid heartbeating broker (use if same code is both worker and client, to avoid double heartbeat)
}
var client = new ebus.Client(opts);
The same configuration can be set using envieronment variables EBUS_CLIENT_NAME and EBUS_NAME, the first overriding the second. For example export EBUS_CLIENT_DEBUG=3.
Logging
A global setting export EBUS_DEBUG sets log error (an integer)
- FATAL = 0
- ERROR = 1
- INFO = 2
- DEBUG = 3
let logs = require('cry-ebus').Utils.Log
let worker = new Worker('hello', { debug: logs.INFO })