@flexdemo/connection-service
v0.0.19
Published
Ths Packet contains an API that can be used to connect multiple Services. Currently it uses the Middleware RabbitMQ over an AMQP Connection.
Downloads
1
Readme
#FlexDeMo-Module-ConnectionService Ths Packet contains an API that can be used to connect multiple Services. Currently it uses the Middleware RabbitMQ over an AMQP Connection.
##Getting Started
The First step is, to create a Connection to the RabbitMQ Server. To do thatn create a new ConnectionService instance
let connection = new ConnectionService(url?:string)
. The url
parameter is Optional if a Connection url has
been specified via a Configuration File with the Configurator of the FlexDeMo-Module-Utils
Packet.
The next step should be to initialize the Connection via the connection.init(options)
Method. This Method creates a AMQP
Connection and Channel. init()
returns a Promise<void>
. The Reject will usually be a ConnectionException
.
options
is Optional. options.reconnect
defaults to true
. If options.reconnect
is true, even if no
connection is currently possible, it will retry until a connection can be made. This also applies to Losing the
connection to the Queue Service while running. Note: All listeners will be reallied automatically. But no attempt will
be made to resend or rereceive send data.
Sending Data to a Queue
To send Data to a Queue one can use connection.sendToQueue(queue:string, data:object, options?:Object)
. queue
represents the Queue the data is send to. sendToQueue
will check if the Queue exists, but will not create the Queue.
The data
parameter contains the data that is to be send to to the Queue. The data
object will be serialised as
an JSON String.
The options
parameter can be used to set some additional Options. If options.channel
is set, this channel will
used instead of the internal one of the ConnectionService
object.
If options.requestId
is set, it will set the correlationId to options.requestId
of the Message before sending
it.
If options.replyTo
is set, it will use the Queue specified in options.replyTo
as an Answer channel
(If it is supported be the Receiver).
The options.sendOptions
is an Object that will be passed to the underling sending Function. It can be used to
further specify Options to send the Message. See
here
Returns a Promise
that resolves if the data was send.
Sending Data to an Echange
To send Data to an Exchange, one can use
connection.sendToExchange(exchange:string, routingKey:string, data:object, options?:object
. The Exchange is
specified by the exchange
String. sendToExchange
checks if the Exchange exists, but will not create a missing
Exchange. The String routingKey
can be used by the Middleware to determine the right Queue to send the data to. The
Object data
will be serialised as JSON-String an sent to the Exchange as Data. The options
object contains
further options that can be used.
If options.channel
is specified, that channel will be used instead of the internal one.
If options.requestId
is specified, the correlationId of that message will be set to requestedId
If options.replyTo
is specified, that queue will be used to send the data back. (If supported by the receiver)
options.sendOptions
is an Object that will be passed to the underling sending Function. See
here.
Returns a Promise
that resolves if the data was send.
Sending a Message and receiving a Response
In many cases one wants to get a response after a Message has been send. The
connection.rpcToQueue(queue:string, data:object, options?:object)
and
connection.rpcToExchange(exchange:string, routingKey:string, data:object, options?:object)
Methods can be used for
this. The queue
and exchange
parameters respectively specify the Queue or Exchange the data is send to. The
data
Object will be serialised as JSON String and send to the Target. The routingKey
is used by
rpcToExchange
to select the right target Queue after the Message has been transmitted to the Middleware.
The options
object contains further parameters for the Methods.
If the options.receivingQueue
parameter is set, this queue will be used to send back data instead of the Data Queue.
If the options.channel
parameter is set, this channel is used instead of the internal one.
If options.replyTo
parameter is specified, this queue is used as the reply Queue instead of the Data Queue.
The options.sending
Object will be provided to the sending function.
The options.receiving
Object will be provided to the reply listener.
Returns a Promise<DataObject>
Object that resolves to a Message. See
here. Additionally the contentBody
Property is added, witch contains the parsed content
parameter.
Listen to a Queue
To listen to a Queue one can use connection.listen(queue:string, callback:function(message:object), options?:object)
.
The queue
parameter specifies the Queue that will be listened to. listen
checks if the Queue exists, but will not
create a new Queue. callback
is a function that will be called for every received Message. By default, all messages
need to be acknowledged or rejected. One can use connection.Ack(msg)
and connection.Reject(msg)
for that.
To retrieve the send data, one can use message.contentBody
witch will be a JS Object.
The options
parameter contains more Parameters. If options.channel
is set, this channel will be used instead
of the internal one.
If options.noAck
is set to a truthy value, all Messages are automatically acknowledged.
Returns Promise<object>
that contains the consumerTag
that can be used with connection.unlisten(consumerTag)
to stop listening.
Acknowledge a Message
connection.ack(msg:object, options:object)
can be used to Acknowledge a Message. Returns a Promise<void>
.
Reject a Message
connection.reject(msg:object, options:object)
can be used to Reject and requeue a Message. Returns a Promise<void>
.
Unlisten
To cancel a Listen, one can use connection.unlisten(consumerTag)
. This will cancel the Consumer.
Get a Reply
To receive a Reply one can use the connection.getResponseData(requestId:string, options?:Object)
. The Parameter
requestId
contains a string to identify the message by (correlationId). The Object options
can contain
further parameters. If options.channel
is specified, this channel will be used instead of the internal one.
If options.queue
is supplied, this queue will be used instead of the default data queue.
If options.useTimeout
is supplied and false, the timeout function will be ignored. Default true.
If options.requestTimeout
Numer: If supplied the timeout will use this number (ms). Default: 500.
Returns a Promise<msg>
object, that will resolve to the response message. Does not need to be acknowledged.
Echo Listener
A listener that will echo all Messages to the Queue specified in replyTo
. To create a echo Listener call
connecton.echoListener(queue:string)
. Where queue
specifies the Queue on witch the listener will listen.
Manly used for tests.
Returns a Promise<object>
witch results to an Object with a consumerTag
property.
Clear Queue
While testing it can be useful to clear all messages from a Queue. This can be done by calling
connection.clearQueue(queue:string, options?:object)
.
Where queue
specifies the queue to be cleared. The Object options
can contain additional parameters.
If options.channel
is specified, this channel will be used instead of the internal one.
This Method will clear most messages from a Queue. (Send but unacked Messages remain, and could be requeued). Should be used very cautiously!
Returns a Promise
Creating Queues
To create a Queue, the connection.AssertQueue(queue:string, durable:bool, options?:object)
Method can be called.
The will try to create the Queue queue
witch can be durable if ```durable````is true
.
The Parameter options
can contain more parameters. If options.channel
is set, this channel will be used instead
of the internal one.
If the Queue already exists, the options of the existing queue might be changed to he options specified as the parameters of this function.
Returns a Promise
.
Creating a Exchange
To create a Exchange, the connection.AssertExchange(exchange:string, durable:bool, type?:string, options?:object)
Method can be called.
If the Exchange named exchange
does not exist it will be created.
If durable
is true
the exchange will survive reconnects an can be reused.
The Type of the Exchange is specified in the type
parameter. Allowed Values:
"fanout"
: Broadcast to all bound queues"direct"
: Routs messages to a Queue if the RoutingKey of the Message exactly matches that of the Queue"topic"
: Routs messages if the Queue matches all or a Part of the RoutingKey:Topic1.Topic2.Topic3
"Headers"
: Routs a message if the Header matches (see Documentation)
The parameter options
contains more options that are used to assert a Exchange.
If options.channel
is specified, this channel is used instead of the internal one.
options.assert
is a Object that will be handed over to the assert Method.
Returns a Promise
.
Binding a Queue to an Exchange
To bind a Queue to an Exchange, one can use the connection.bind(queue:string, exchang:string, routingKey:string)
Method.
The Queue queue
will be bound to the Exchange exchange
. The Routing Key used for Routing can be supplied via
routingKey
. The Routing Key will be ignored for fanout
Exchanges.
Returns a Promise
.
Configuration
The Queue Service has some global Configurations that can be used to overwrite Default Values. These Configurations can
set under in the Configuration file (usually appconfig.json
) under the queueService.globalSettings
object.
Timeout
To Change the default Timeout, the timeoutAfter
value can be set. It accepts null
or undefined
to disable this
setting or a number representing the time in milli seconds until the timeout is produced.
{
"default":{
"queueService":{
"globalSettings":{
"timeoutAfter": 500
}
}
}
}