yodoya-activetick-adapter
v3.0.0
Published
ActiveTick adapter for Yodoya - C&C and CEP
Downloads
13
Readme
yodoya-activetick-adapter
ActiveTick adapter for Yodoya - C&C and CEP
Usage:
bindIncoming
bindIncoming
binds the wiring socket (whether it be nanomsg, or kafka) to
a local observable stream (for an example Kefir). The function also takes an
option object whose members are the following:
messageName
is a string of data name to be bound to on
event handler.
filter
is a function to specify whether to emit a decoded message to the obs
stream.
getPayload
is a function to obtain payload from a received data.
For an example, for usage with kafka-node
and Kefir
:
const messageFilter = function (message) {
return message.topic === 'mytopic'
}
const bindReqOptions = {
messageName: 'message',
filter: messageFilter,
getPayload: function (buf) {
return buf.value
}
}
let kstream = Kefir.stream(function (_emitter) {
kemitter = _emitter
adapter.bindIncoming(consumer, kemitter, bindReqOptions, (err, res) => {
if (err) return done(err)
})
})
sendEvent
sendEvent
sends out Activetick data through some wire after encoding
it with a AT specific protocol buffer schema. The function also takes in
an option object:
formatPayload
is a function to format a message intended to be sent out
given that outbound message may encapsulate the encoded data. For an example,
in Kafka, outbound message needs to specify topic
and further encode the
protobuf encoded data into binary buffer.
sendData
is a function that defines whether wiring library's send
method
is synchronous or asynchronous.
For an example, for usage with kafka-node
:
const makeOutboundMsg = function ( encodedData ) {
let message = {
topic: 'mytopic',
messages: Buffer.from(encodedData)
}
return [message]
}
const respOptions = {
formatPayload: makeOutboundMsg,
sendData: function (socket, payload, cb) {
return socket.send(payload, cb)
}
}
adapter.sendEvent(producer, response, respOptions, (err, res) => {
if (err) return done(err)
})