@syrf/transport-library
v1.0.12
Published
NodeJs Message Queue client library, currently supports STOMP protocol
Downloads
8
Readme
transport-library
NodeJs Message Queue client library, currently supports STOMP protocol
Stomp
STOMP is the Simple (or Streaming) Text Orientated Messaging Protocol.
STOMP provides an interoperable wire format so that STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among many languages, platforms and brokers
More about STOMP protocol: https://stomp.github.io/
STOMP supported servers: https://stomp.github.io/implementations.html#STOMP_Servers
Basic Usage
const transport = require('../transport-library')
const stompClient = transport.Stomp.create(61613,'localhost','guest','guest')
stompClient.on(events.CONNECT, () => {
// do something when connection established
})
stompClient.connect()
stompClient.subscribe('/topic/sample.topic',(message,header)=>{
console.log(message.text)
})
stompClient.publish('/topic/sample.topic',{ text:"Hello STOMP!" })
//will output :
//Hello STOMP!
connect can have an object parameter that will be passed to the header of the CONNECT
message frame
const transport = require('../transport-library')
const stompClient = transport.Stomp.create(61613,'localhost','guest','guest')
stompClient.on(events.CONNECT, () => {
// do something when connection established
})
stompClient.connect({ "client-id": "123", otherHeader : "other header value" })
stompClient.subscribe('/topic/sample.topic',(message,header)=>{
console.log(message.text)
})
stompClient.publish('/topic/sample.topic',{ text:"Hello STOMP!" })
//will output :
//Hello STOMP!
TLS
const tls = {
ca: [fs.readFileSync('server/tls/localhost.key')]
}
const stompClient = transport.Stomp.create(61613,'localhost','guest','guest').useTLS(tls)
To use TLS connection, simply pass tls configuration when creating new instance of transport lib. the TLS options wil be passed to NodeJs TLS connect
Publish and Subscribe
Parsing
Messages passed in publish and received in subscribe will be parsed in JSON by default. In case you want to parse your own message, you can pass a custom function to parse message
const convertToXML = obj => {
let result = ''
//convert object to XML string
return result
}
const convertFromXML = xml => {
let result = null
//convert XML to object
return result
}
const stompClient = transport.Stomp.create(61613,'localhost','guest','guest')
stompClient.useCustomMessageFormat(convertToXML,convertFromXML)
messageFormatter
will format the object you passed in publish
second argument into string using the function you provided
messageParser
will parse incoming message from message broker from string into object using function you provided
Batch Publish
Batch Publish can be used to reduce queue number by polling the outgoing messages from publish()
method, into a batch that will be actually sent to MQ every time span period, or when the batch size is exceeding a configured limit.
publisher = Stomp.create(61613, 'localhost', 'guest', 'guest')
publisher.useBatchPublish({
count: 10,
size: 150000
span: 1000
})
The sample code above shows how to use a batch publishing. the batch will be published every 1000ms
or when there are 10 messages
in batch or when the messages size is over 150KB
const processMessage = message => console.log(message)
stomp.subscribe('/topic/sample.topic', (data, headers) => {
if(headers.isbatch === "true"){
data.messages.map(processMessage)
} else {
processMessage(data)
}
})
The example above shows how to handle a batch message on subscribe
and detect if a message is a batch message or not.
Currently batch publish only supports JSON message format
Ack and Nack
To acknowledge a message the library provide ack
and nack
method
stompClient.subscribe('/topic/sample.topic',(message,header,subscriptionId)=>{
//process the message
stompClient.ack(header["message-id"],subscriptionId)
})
More about Ack and Nack
Connection Retry
When connection is broken from network issue or MQ server is down or other causes. library will try to reconnect with a configurable interval.
while waiting for the connection to re establish. outgoing message sent via publish
method will be stored in backoff buffer, and will be sent immediately after connection established
any subscription
will be re subscribed after connection is established.
Fixed Interval
const stompClient = transport.create(61613,'localhost','guest','guest')
stompClient.retryInterval(3000)
the configuration above sets the connection retry interval in 3s. So when connection is closed, the transport will retry to connect after 3s.
Incremental Interval
const stompClient = transport.create(61613,'localhost','guest','guest')
stompClient
.retryInterval(3000)
.incrementalRetryInterval(3000)
Exponential Interval
const stompClient = transport.create(61613,'localhost','guest','guest')
stompClient
.retryInterval(3000)
.exponentialRetryInterval()
Failover
let stompClient = Stomp.createWithFailover([
{
host: 'localhost',
port: 61613,
user: 'guest1',
pass: 'guest1'
},
{
host: 'localhost',
port: 61614,
user: 'guest2',
pass: 'guest2'
},
{
host: 'localhost',
port: 61615,
user: 'guest3',
pass: 'guest3',
tls : {
ca: [fs.readFileSync('server/tls/localhost.key')]
},
},
{
host: 'otherhost.com',
port: 61613,
user: 'guest4',
pass: 'guest4'
}
])
When failover
configuration is provided transport lib will use the first index first to connect to the server. when connection failure happens transport lib will immediately switch to the next failover server.
If all server are tried and no connection is successful. then the retry interval will be used.
TLS configuration will also work with failover servers
Configurations
Available configuration options :
Config | Type | Default | Description
---|---|---|---
port|number|N/A|define port of server
host|string|N/A|define hostname of server
user|string|N/A|username to login to server
pass|string|N/A|password to login to server
tls|object|null|to configure tls connection
messageFormatter|function|body => JSON.stringify(body)
|function to format message to be sent
messageParser|function|body => JSON.parse(body)
|function to parse message to received
timeout|number|60000|configure socket timeout in ms
batchPublish|object|null|configure batch publishing
batchSubscribe|object|null|configure batch subscribing
backoffBufferSize|number|1|count of messages to be stored when not connected
failover|array|null|configure failover servers
interval|number|3000|retry interval when connection lost
increment|number|0|retry interval increment
exponential|boolean|false|exponent the interval retry
enableRetry|boolean|true|enable retry when connection lost
useSeparateConnection|boolean|false|use separate socket connection between subscribe and publish
statsInterval|5000|number|Interval between stats event
bucketCount|60|number|Bucket count of stats data
bucketSpan|1000|number|Bucket time span to store stats data
name|string|default|name of the instance for stats monitoring
group|string|default|group of the instance for stats monitoring
Builder Methods
- static create : create new client object accepting port,host,user and password as parameters. returns stomp client object
- static createWithFailover : create new client object with failover connection accepting failover arrays as parameters. returns stomp client object
- useBatchPublish : combine multiple published messages with same destination into single STOMP message to reduce queue count. accept batch publish options object with these properties
size
batch size limit in bytes,count
batch size limit in messages count andspan
batch timeout limit to send batches - useBatchSubscribe : same with
useBatchPublish
but for subscribe. will call message callback when exceed batch size or time - useTLS : use TLS connection instead of TCP. accepting NodeJs TLS options object
- setConnectionTimeout : set connection idleness timeout in ms
- retryInterval : set retry interval in ms
- incrementalRetryInterval : set retry interval increment in ms
- exponentialRetryInterval : set exponential retry interval
- useBackoffBuffer : size of how many messages the back off buffer can store
- useSeparateConnection : use separated connection between publish and subscribe
- useCustomMessageFormat : use custom message format. default is
JSON
- useStatsMonitor : configure stats monitoring options (statsInterval, bucketCount and bucketSpan)
- setName : set instance name to identify instance at stats monitoring
Methods
Method | Argument(s) | Returns | Description
---|---|---|---
connect|N/A|void|establish connection
destroy|N/A|void|destroy socket and stomp client
subscribe|string
destination,[object
headers],[function
callback]|subscription Id|subscribe to queue, topic or exchange
unsubscribe|string
destination|void|unsubscribe from a queue, topic or exchange
publish|string
destination, any
body, object
headers|void|publish message to a destination
ack|object
|void|acknowledge a message
nack|object
|void|send NACK command
nack|object
|void|send NACK command
disconnect|function
callback|void|send disconnect message to server and close socket
Events
List of events emitted :
- connect : Event when socket connected and connected to stomp server
- disconnect : Event when socket disconnected and disconnected to stomp server
- error : Event on connection error happens
- close : Event on connection closed
- message : Event when message arrived
- timeout : Event when idleness timeout passed
- receipt : Event when a receipt is received
- sendSocketclose : Event on send socket is closed when using separate connection
- sendSocketConnect : Event on send socket is connected when using separate connection
- subsSocketclose : Event on subscribe socket is closed when using separate connection
- subsSocketConnect : Event on subscribe socket is connected when using separate connection
- sendSocketTimeout : Event on subscribe socket is idle when using separate connection
- stats : Emit stats data of the lib
Stats
Stats monitoring implemented circular buffer pattern to store latest sets of stats data. by default stats data is a summary of an array containing 60 buckets (called "bucket count"). the last bucket on the array will accumulate stats data for 1s (called "bucket span"). after bucket span elapsed, new bucket will be pushed to the array, and bucket on the first index will be removed. every 5s (stats interval), summary of the 60 buckets will be created and emitted by "stats" event along with instance data like name, group, host, port etc. here is the sample of stats data :
{
name: 'default',
group: 'default',
port: 61613,
host: 'localhost',
user: 'guest',
stats: {
publish: { count: 10, avgSize: 360 },
batchPublish: {
count: 1,
avgMessagesContained: 10,
avgSize: 360,
triggeredBy: {
span: 1,
size: 0,
count: 0
}
},
subscribe: { count: 10, avgTransportLatency: 0, avgSize: 365 }
}
}
Development
Details about development and testing can be found in testing.md