ut-bus
v7.65.2
Published
UT bus module
Downloads
864
Readme
UT Bus
Transports
ut-bus supports the following transports:
hemera
configuration options
Check hemera config schema for all options.
In addition to all the options you can also pass a nats
property which will
be used for the nats connection.
Check the Nats connect options
Configuration example:
{
utBus: {
serviceBus: {
hemera: {
...hemeraOptions,
nats: {
...natsConnectOptions
}
}
}
}
}
jsonrpc
http transport over json-rpc 2.0 protocol.
jsonrpc configuration options
port
(number) [optional] - tcp port. If omitted then a random port will be used.openId
(array) [optional] - a list of openId providers.maxBytes
(number) [optional] - maximum size of the request body (the default is 1MB coming from hapi).api
(object) [optional] - swagger configuration.ui
(boolean | object) [optional]base
(string) [optional] - ui path prefix (default '/api')initOAuth
(object) [optional] - swagger ui OAuth credentialsclientId
(string) - pre-populated in swagger ui auth interfaceclientSecret
(string) - pre-populated in swagger ui auth interface
domain
(string | boolean) [optional] - Enables dns discovery and uses this property as a top-level domain to use for records. (both regular or multi-cast discovery mechanisms are supported). If set totrue
then machine's hostname will be used as a top-level domainconsul
(object) [optional] - used for configuring a consul client in caseConsul
service discovery is required. For reference check consul client optionsprefix
(string) [optional] - prefix to be used in conjunction with the namespace to construct ahost
when resolving service locations. (e.g. host will becomeprefix + namespace
)suffix
(string) [optional] - suffix to be used in conjunction with the namespace to construct ahost
when resolving service locations. (e.g. host will becomenamespace + suffix
)capture
(string) [optional] - enable capturing of requests and responses in individual<capture>/*.http
files for debugging purposes. Note that this is not suitable for use in production and the folder specified in this setting must exist. Also due to hapi API constraints, some validations are turned off when capture is activated.gateway
(object) [optional] - call remote methods from bus running within different security context. This is an easy way to integrate two separate implementations where the gateway calls the other side with dedicated credentials for server to server calls.<prefix>
(object) - configuration to apply for calls with this prefix, i.e. calls likeutMethod('prefix/x.x.x')(...params)
url
- specifies the base URL of the remote bus. This is alternative to specifying the individual configuration options below, as all of them can form an URLhttps://username:[email protected]
protocol
- remote bus protocolhost
- remote bus hostport
- remote bus portusername
- specifies a username to use for authentication against the remote buspassword
- specifies a password to use for authentication against the remote bus
tls
(object) [optional] - can be used to enable TLS. Accepts any of the options passed to node.js https.createServer Specifically, the following options, are interpreted as filesystem paths:cert
- Server certificatekey
- Server private keyca
- Trusted root certificate for mutual TSL with self signed (client) certificates. To enable mutual TLS, use this option together with settingtls.requestCert
totrue
.crl
- certificate revocation lists
For more information consult tls.createSecureContext.
cors
(object | boolean) [optional] - Enables cors for all routes. Additional place where cors can be enabled or/and overwritten is in validations. For detailed configuration, see docs for hapiroute.options.cors
security
(object | boolean) [optional] - Enables security headers on all routes. For detailed configuration, see docs for hapiroute.options.security
client
(object) [optional] - HTTP/HTTPS client configuration options:client.tls
(object) [optional] - use this option to enable TLS or mutual TLS for bus to bus communication. These options are passed to the request module, and the following ones are interpreted as file paths:cert
- Client certificatekey
- Client private keyca
- Trusted root certificate for mutual TSL with self signed (server) certificates.
pre
(object | string | array) [optional] - Enables pre hooksmethod
- the backend method to be calledauthOnly
[optional] - whether the hook to be called for authenticated requests only (defaultfalse
){ "utBus": { "serviceBus": { "jsonrpc": { "pre": { "method": "subject.object.predicate", "authOnly": true } } } } }
Instead of an object the configuration can be just a
string
{ "utBus": { "serviceBus": { "jsonrpc": { "pre": "subject.object.predicate" } } } }
{"pre": "subject.object.predicate"}
is equivalent to{"pre": { "method": "subject.object.predicate" }}
Or it can be an array of objects or strings. For example if you want 2 methods to be called consequently:
{ "utBus": { "serviceBus": { "jsonrpc": { "pre": ["subject.object.predicate1", "subject.object.predicate2"] } } } }
or a method to be called first and another 2 methods to be called in parallel after that:
{ "utBus": { "serviceBus": { "jsonrpc": { "pre": [ "subject.object.predicate", ["subject.object.predicate1", "subject.object.predicate2"] ] } } } }
For more information about how to set a mixed array of actions аnd what logic stands behind that, please check this link
Configuration examples:
{
utBus: {
serviceBus: {
jsonrpc: {
port: 9876,
ui: true
}
}
}
}
{
utBus: {
serviceBus: {
jsonrpc: {
port: 9876,
openId: [
'https://accounts.google.com/.well-known/openid-configuration'
],
ui: {
initOAuth: {
clientId: 'someClientId'
clientSecret: 'someClientSecret'
}
}
}
}
}
}
rabbot
rabbot configuration options
debug (Boolean) - if set to true then additional debug queue and binding will be created. Also the reply queue in debug mode will not be subscribed for batch acknowledgement and will get auto deleted upon disconnection.
connection (Object) - see connection options for more info.
exchanges (Array) - exchanges to be auto created upon establishing a connection.
if omitted then te following exchange will be created:
{ name: bus.id, type: 'fanout', autoDelete: true }
queues (Array) - queues to be auto created upon establishing a connection.
if omitted then te following queue will be created:
{ name: bus.id, subscribe: true, autoDelete: true }
if omitted and
debug
is set to true, then the following queue will also be created:{ name: bus.id + '(debug)', subscribe: false, autoDelete: false }
bindings (Array) - bindings to be auto created upon establishing a connection. if omitted then te following binding will be created:
{ exchange: bus.id, target: bus.id, keys: [] }
if omitted and
debug
is set to true, then the following binding will also be created:{ exchange: bus.id, target: bus.id + '(debug)', keys: [] }
Example:
{
utBus: {
serviceBus: {
rabbot: {
debug: true,
connection: {
...connectionOptions
},
exchanges: [
...exchanges
],
queues: [
...queues
],
bindings: [
...bindings
]
}
}
}
}
moleculer
moleculer repo
This transport uses moleculer's ServiceBroker
internally.
moleculer configuration options
See Moleculer Broker options for full list.
Example:
{
utBus: {
serviceBus: {
moleculer: {
...moleculerOptions
}
}
}
}
If the moleculer
property is set to a string
instead of an object
then it
will be used as a transporter. E.g.
{
utBus: {
serviceBus: {
moleculer: 'abc'
}
}
}
// is equivalent to
{
utBus: {
serviceBus: {
moleculer: {
transporter: 'abc'
}
}
}
}
utRpc
This is an transport which relies on ut-rpc for delivering messages over tcp streams.
utRpc configuration options
utRpc
configuration property can be either a string
or a number
.
If set to a string
then the messages will be sent over a
domain socket
(on Linux)
or a named pipe
(on windows).
If set to a number
that would mean the messages will be sent over the
respective tcp port.
Example:
{
utBus: {
serviceBus: {
utRpc: 9876 // tcp port
}
}
}
// or
{
utBus: {
serviceBus: {
utRpc: 'abc'
// (for windows) named pipe: \\.\pipe\ut5-abc
// (for linux) domain socket: /tmp/ut5-abc.sock
}
}
}
Caching
Ut-bus provides built-in caching mechanisms. There are 2 preconditions which are needed in order for caching to be achieved.
There should be an instance of ut-port-cache defined on implementation level. This is necessary because ut-bus doesn't do the caching itself but relies on having a running instance of ut-port-cache internally.
E.g:
module.exports = () => () => ({ adapter: [ function cache(...params) { return class cache extends require('ut-port-cache')(...params) {}; } ] });
Cache configuration must be explicitly provided as part of the options when importing a bus method.
E.g:
const options = { cache: { // cache options } }; return bus.importMethod('namespace.entity.action', options)(msg);
Where options.cache allows the following configuration:
key
- an object or a function describing the storage options. Can be either an object or a function returning an object.object - an object consisting of the following properties.
id
- a string to be used as a storage keyparams
- a string or an object used to define the segment by appending these params to the imported method name. (Not used if asegment
is passed). ut-port-cache will use the params to build the segment in the form of a query string.- string - E.g. if
segment
thennamespace.entity.action?segment
- object - E.g. if
{x: 1, y: 2}
thennamespace.entity.action?x=1&y=2
segment
- a string to bypass the params and define the segment directly
function - a function returning an object with the same properties as described above. The function accepts. Using a function instead of a predefined object provides the convenience of defining dynamic id, params and segment depending on the incoming message. E.g.
function(msg) { return { id: msg.id, params: msg.params } }
before
- string - cache operation before calling the method, can be one of 'get', 'set', 'drop', undefined This property is optional. The method names usually follow the following pattern:namespace.entity.action
. If the "action" part of the method name matches one of the predefined bindings then the respected cache operation will be applied automatically:{ "get": "get", "fetch": "get", "add": false, "create": false, "edit": "drop", "update": "drop", "delete": "drop", "remove": "drop" }
instead
- string - cache operation called instead of calling the method, can be one of 'get', 'set', 'drop', undefined This property is optional.after
- string - cache operation before calling the method, can be one of 'get', 'set', 'drop', undefined This property is optional. The method names usually follow the following pattern:namespace.entity.action
. If the "action" part of the method name matches one of the predefined bindings then the respected cache operation will be applied automatically:{ "get": "set", "fetch": "set", "add": "set", "create": "set", "edit": "set", "update": "set", "delete": false, "remove": false }
ttl
- number optional cache duration, default is set in cache portport
- string optional cache port namespace, default iscache
optional
boolean optional - indicating whether caching itself is optional. I.e. no error will be thrown if caching doesn't succeed. default isfalse
.
Full example
bus.importMethod('some.method', {
cache: {
key: msg => ({
id: msg.id,
params: 'op1',
segment: 'my-segment',
}),
before: 'get',
after: 'set',
ttl: 5000
}
})
Message level encryption
ut-bus can be configured to support message level encryption (mle) when receiving messages from outside and/or when communicating with another instance of ut-bus.
Message level encryption configuration
In order for that to be achieved the following configuration is needed:
{
utBus: {
serviceBus: {
jsonrpc: {
sign: serverMlsk,
encrypt: serverMlek,
client: {
sign: clientMlsk,
encrypt: clientMlek
}
}
}
}
}
where:
utBus.serviceBus.jsonrpc.sign
(serverMlsk) - private key used for signing the responseutBus.serviceBus.jsonrpc.encrypt
(serverMlek) - private key used for decrypting the requestutBus.serviceBus.jsonrpc.client.sign
(clientMlsk) - private key used for signing the requestutBus.serviceBus.jsonrpc.client.encrypt
(clientMlek) - private key used for decrypting the response
serverMlsk
and serverMlek
are used when the bus acts as a
server (when receiving requests from outside/another bus).
clientMlsk
and clientMlek
are used when the bus acts as a
gateway (i.e. as a client in bus-2-bus communication). For more info
check gateway
in jsonrpc configuration options
If these keys are provided then the bus will try to authenticate
using login.identity.exchange
method, otherwise the
authentication method will be login.identity.check
The keys provided in the configuration have to be in JSON web key format. They can be generated by a script, using some JSON Object Signing and Encryption library. For example jose:
const { generateKeyPair } = require('jose');
async function getKeys() {
const {privateKey: mlsk} = await generateKeyPair('ES384', { crv: 'P-384'});
const {privateKey: mlek} = await generateKeyPair('ECDH-ES+A256KW', { crv: 'P-384'});
return {
mlsk: await exportJWK(mlsk),
mlek: await exportJWK(mlek)
}
}
or by using some online JSON Web Key generator like mkjwk
Full configuration example:
{
utBus: {
serviceBus: {
jsonrpc: {
sign: {
kty: 'EC',
d: 'BYfl8to6zRfjjm7jFYtY5i_BwR2jXspsv1HDN0OLIaz-tUiACKZBeRruaLzBrHXJ',
use: 'sig',
crv: 'P-384',
x: 'pM8gcPvgdKrKaxQmIC7Q67AvV7KteWqU5I4X83ErVinZnAgeT1KwfhCYssD3YNvK',
y: 'SVsvfEm3CVu2WjOho2frL7LnaXeOQHC1JT856bOH-Vp3E-4_1j2Kp9KHJJf7Qn1v',
alg: 'ES384'
},
encrypt: {
d: '3UScww8iqdRaBeTraC61WCFoO3fisO9A0p49P_GI6BuZO26-WUyElUWoKyhkcbeI',
kty: 'EC',
use: 'enc',
crv: 'P-384',
x: 's8uFX_D-Ow5Q6UoRs6tFDBDkpdpcsueSl7-oyPpBFdgY6Co9L2AZknuqA4vDSKe4',
y: 'IffoB24bdS2nk699nXMB4cVe7LgLdinCKNGgrgcPHlPXnqfdJ7T5DLucLLJP0DQA',
alg: 'ECDH-ES+A256KW'
},
client: {
sign: {
kty: 'EC',
d: 'jo8G0i1lhqRFN9x0Og3XRE-EC0u_ZAUbKcU7SEit6c6tRupqviQ_CmODGyfdDoHY',
use: 'sig',
crv: 'P-384',
x: 'avLDJ42674HZhDoh8--e9S3GDOLZ3l6lc3hRQtcjHQ3zPfeqyyyN2qYhX4TQBhOg',
y: 'duOXLY-NgzsraPWepv42Z5itp2WQNdmklcQMq-arrFXkJaqjNTZ29GJYehPwMTP_',
alg: 'ES384'
},
encrypt: {
crv: 'P-384',
x: 'cm3RKHV8rfSJSoJvpRZTUTYIYDnAcbVf6vjGspJfifyn7NNcmNX4WN7vs-robaBR',
y: 'iuReGVbQ1aUpnzeXahIFJyTpOXuMZkTKkn-SXCW7xM6DcKascabaKCHlQRVBMmQS',
d: 'hGbwEuoIBi60isZqHxivhCTE-KgJ59QY4KByRmzjLPxh_Di1eBThyx8l_DT6kARk',
kty: 'EC',
kid: 'MQ3uhovLo_AYLF3RQMMwiORwOY5o53_KLAXV4vKdsXc',
alg: 'ECDH-ES+A256KW',
use: 'enc'
}
}
}
}
}
}
Message level encryption flow
The message level encryption flow consists of 4 main parts:
The client sends a request.
Here the payload gets signed with
clientPrivateMlsk
and encrypted withserverPublicMlek
. Pseudo code:await encrypt(await sign(msg, clientPrivateMlsk), serverPublicMlek);
In case of
login.identity.exchange
or in other words (when the server doesn't know the client public keys) then the client must send its public mlsk ("sign") and mlek ("encrypt") keys as part of a jwe protected header.// protectedHeaders: {mlsk: clientPublicMlsk, mlek: clientPublicMlek} await encrypt(await sign(msg, clientPrivateMlsk), serverPublicMlek, protectedHeaders);
The server receives the request.
Here the payload gets decrypted with
serverPrivateMlek
and verified withclientPublicMlsk
. Pseudo code:await verify(await decrypt(msg, serverPrivateMlek), clientPublicMlsk)
The server sends the response.
Here the payload gets signed with
serverPrivateMlsk
and encrypted withclientPublicMlek
. Pseudo code:await encrypt(await sign(msg, serverPrivateMlsk), clientPublicMlek);
The client receives the response.
Here the payload gets decrypted with
clientPrivateMlek
and verified withserverPublicMlek
. Pseudo code:await verify(await decrypt(msg, clientPrivateMlek), serverPublicMlek)
Custom error fields
By default (when not in debug mode), ut-bus will preserve the following error fields:
type
message
print
validation
params
in order to add/remove fields, the following configuration can be provided. e.g:
{
utBus: {
serviceBus: {
errorFields: {
cause: 'error',
customField: true,
params: false
}
}
}
}
The errorFields
object specifies how
certain fields to be treated. Where:
true
means that the key should be preservedfalse
means that the key should be omitted'error'
means that the key should be preserved and treated as an error object (i.e. formatted recursively)