@colu-legacy/osseus-mq
v0.3.17
Published
Osseus pub/sub, bus, queue and more
Downloads
3
Readme
Osseus MQ
AmazonMQ / ActiveMQ based osseus queue, topic and bus module
- WARNING current version 0.1.0 is in alpha stage.
Install
$ npm install osseus-mq
Usage
Configuration
See osseus-config for the configuration details.
OSSEUS_MQ_PROTOCOL
(string, required) - 'AMQP' for AMQP or 'STOMP' for STOMPOSSEUS_MQ_AMQP_BROKERS
(array, required) - array of brokersOSSEUS_MQ_AMQP_CONSUMERS_<broker alias>
(array, required) - array of consumers of a certain brokerOSSEUS_MQ_AMQP_PRODUCERS_<broker alias>
(array, required) - array of producers to a certain brokerExample:
OSSEUS_MQ_AMQP_BROKERS: [
{
'alias': 'cluster',
'connect_options': {
'username': 'admin',
'password': 'admin',
'transport': 'ssl', //ONLY for amqps, remove otherwise
'connections': [
{'host': 'localhost', 'port': 5672},
{'host': 'localhost', 'port': 5673}
]
}
}
],
OSSEUS_MQ_AMQP_CONSUMERS_CLUSTER: [
{
name: 'queue://SYSTEM.MESSAGES',
alias: 'smsg',
options: ['settle'] // settle option for queue:// will redlive untill message is accepted by a client.
}
],
OSSEUS_MQ_AMQP_PRODUCERS_CLUSTER: [
{
name: 'queue://SYSTEM.MESSAGES',
alias: 'smsg',
options: ['settle'] // settle option for queue:// will redlive untill message is accepted by a client.
}
]
Credit
To limit the number of messages handled at once use credit. With credit only a limited number of messages will be handled and the rest will wait at the broker. Upon each accepted/rejected message a new one would dequeue.
OSSEUS_MQ_HANDLE_CREDIT
(boolean) - Wheather to handle credit manually or not.OSSEUS_MQ_AMQP_INITIAL_CREDIT
(number) - How much credit to start with. If set to0
no message will be dequeue. Only relevant when OSSEUS_MQ_HANLDE_CREDIT is set totrue
.OSSEUS_MQ_AMQP_CREDIT_LIMIT
(number) - Max credit the queue can reach. Only relevant when OSSEUS_MQ_HANLDE_CREDIT is set totrue
.
These parameters would be set for all queues but can be overriden for each one differently.
- Example:
OSSEUS_MQ_AMQP_CONSUMERS_CLUSTER: [
{
name: 'queue://SYSTEM.MESSAGES',
alias: 'smsg',
options: ['settle'], // settle option for queue:// will redlive untill message is accepted by a client.
credit: {
handle: true,
initial: 1,
limit: 1
}
}
],
...
OSSEUS_MQ_DB_USAGE
(string, required) - what database type to use if send with persist flag set. Options are:MONGODB
POSTGRES
NONE
OSSEUS_MQ_DB_CONNECTION_STRING
(string, optional) - connection string for the database that will persist messages.- Required if
OSSEUS_MQ_DB_USAGE != "NONE"
- Required if
APPLICATION_NAME
(string, required) - name of the app to use in logging and messaging.
note: wait for osseus.mq to emit ready
event before usage
Protocols
AMQP
AMQP is a specification (not a product) that was created to enable interoperability between multiple integration broker implementations. AMQP is a protocol and a fairly complete specification for the most commonly used middleware functionality.
Topic / Queue names {#topicnames}
Topics should be used when a message sould have multiple receivers, order is not an issue and there is no single action that is "atomic" that needs to be taken by the consumers.
Queue should be used when only a single consumer should consume the message and report when it is finished with taking action on that message or return the message to the Queue for another consumer.
Queue can have exclusive mode and both of them can have durable and save historical messages.
In order to use a queue topic name should start with queue://
and to use a topic with topic://
, both support wildcards '*'
and both support complete will end '>'
, so topic://A.*.C
will match with wildcard whereas topic://A.>
will match everything that starts with A.
Methods
All methods are accessable via the mq
object which will be added to the base osseus
object.
The mq
object is an EventEmitter.
send (alias, message, options) ⇒ Promise {#send}
Method to send data to the named queue or topic, promise will be resolved once message is sent (and persisted if specified), promise will contain the base message object which will have a requestId
if provided, and the system generated messageId
Kind: function
| Param | Type | Description |
| ------------------| ------------------- | ------------------------------------------------------------------------------- |
| alias | string | string alias for the topic / queue |
| message | object | JSON to pass as the message (will be available in <object>.msg
) |
| options | object | options object |
| options.persist | 'async'/'sync' | persist message synchronously or asynchronously, if not set message not persisted |
| options.requestId | string | request id to add to the base message obejct |
| options.topic | string | set topic field on message for diffrent message types |
receive (alias, options, callback) ⇒ no return value {#receive}
Method to start getting notifications for incomming messages on the queue / topic. seta up a callback for the queue / topic, the callback is optional and user can instead use the .on('message',(alias, message, done, failed)
method of the object to get the emitted events, though receive
still needs to be called to set up a listener for the specific queue. queue must call done
once it is done proccessing the message, calling failed(string - error)
instead will set the message to error, not calling anything will redelive the messgae to this or any other consumer after the broker sees this instance has dissconnected.
Kind: function
| Param | Type | Description | | --------------------| ----------------------------------------------------| ----------------------------------------------- | | alias | string | string alias for the topic / queue | | options | object | options object | | callback (optional) | function(alais, message, done, failed) | called when a message arrives to the queue/topic|
Statistics
osseus-mq
allows getting statistics from queues. Getting statistics is done by sending a message to the broker and waiting for it to respond. The number of messages in the response is as the number of queues requested.
Since there is no way of telling when the broker is finished answering, gathering the statistics is divided into two parts.
The first one is for sending the request to the broker to get statistics for a destination and the second one is to get the last updated statistics received from the broker for the same destination.
sendDestinationStatisticsRequest (destination) ⇒ no return value
| Param | Type | Description | | --------------------| ----------------------------------------------------| ----------------------------------------------- | | destination | string | queue |
retrieveStatisticsResponse (destination) ⇒ no return value
| Param | Type | Description | | --------------------| ----------------------------------------------------| ----------------------------------------------- | | destination | string | queue |
It's best to wait for a few seconds between the steps to give the broker time to respond.
You can also use wildcards in the destination. Examples:
TEST.FOO
- Get statistics for TEST.FOO
TEST.>
- Get statistics for all queues start with TEST
>
- Get statistics for all queues.
Known Issues:
consumer:
- If setting a retain mode (ActiveMQ retain) then messages are expired from a topic based on ttl as well as retain method.
producer:
- Topics do not receive expire (adviosry messages) even if ttl expires most of the time, even if there was no consumer or a consumer, should maybe check with delivery count.
- Queues are automatically retaining all messages based on ttl, while topics fire and forget
- Settle on disposition don't work, broker sends ack frame always when the message is stored in the broker (both for queues and topics)
Advisory messages:
NoConsumer:
Topic :
- Works. Message is sent only if publisher connects to a topic and there are no consumers at that time when a consumer goes up or a consumer goes down. Consumer advisory message is sent with
applicationProperties: { consumerCount: int }
Queue :
- Message is never sent.
Consumer:
Topic :
- Works. Message is sent when a consumer count changes
applicationProperties: { consumerCount: int }
, not if there are 0 consumers when producer starts
Queue :
- Works. Message is sent when a consumer count changes
applicationProperties: { consumerCount: int }
, not if there are 0 consumers when producer starts
License
Code released under the MIT License.