npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@colu-legacy/osseus-mq

v0.3.17

Published

Osseus pub/sub, bus, queue and more

Downloads

3

Readme

JavaScript Style Guide

Osseus MQ

AmazonMQ / ActiveMQ based osseus queue, topic and bus module

  • WARNING current version 0.1.0 is in alpha stage.

Install

$ npm install osseus-mq

Usage

Configuration

See osseus-config for the configuration details.

  • OSSEUS_MQ_PROTOCOL (string, required) - 'AMQP' for AMQP or 'STOMP' for STOMP

  • OSSEUS_MQ_AMQP_BROKERS (array, required) - array of brokers

  • OSSEUS_MQ_AMQP_CONSUMERS_<broker alias> (array, required) - array of consumers of a certain broker

  • OSSEUS_MQ_AMQP_PRODUCERS_<broker alias> (array, required) - array of producers to a certain broker

  • Example:

	OSSEUS_MQ_AMQP_BROKERS: [
            {
              'alias': 'cluster',
              'connect_options': {
                'username': 'admin',
                'password': 'admin',
                'transport': 'ssl', //ONLY for amqps, remove otherwise
                'connections': [
                  {'host': 'localhost', 'port': 5672},
                  {'host': 'localhost', 'port': 5673}
                ]
              }
            }
        ],
        OSSEUS_MQ_AMQP_CONSUMERS_CLUSTER: [
            {
              name: 'queue://SYSTEM.MESSAGES',
              alias: 'smsg',
              options: ['settle'] // settle option for queue:// will redlive untill message is accepted by a client.
            }
        ],
        OSSEUS_MQ_AMQP_PRODUCERS_CLUSTER: [
            {
              name: 'queue://SYSTEM.MESSAGES',
              alias: 'smsg',
              options: ['settle'] // settle option for queue:// will redlive untill message is accepted by a client.
            }
        ]
Credit

To limit the number of messages handled at once use credit. With credit only a limited number of messages will be handled and the rest will wait at the broker. Upon each accepted/rejected message a new one would dequeue.

  • OSSEUS_MQ_HANDLE_CREDIT (boolean) - Wheather to handle credit manually or not.
  • OSSEUS_MQ_AMQP_INITIAL_CREDIT (number) - How much credit to start with. If set to 0 no message will be dequeue. Only relevant when OSSEUS_MQ_HANLDE_CREDIT is set to true.
  • OSSEUS_MQ_AMQP_CREDIT_LIMIT (number) - Max credit the queue can reach. Only relevant when OSSEUS_MQ_HANLDE_CREDIT is set to true.

These parameters would be set for all queues but can be overriden for each one differently.

  • Example:
	
        OSSEUS_MQ_AMQP_CONSUMERS_CLUSTER: [
            {
              name: 'queue://SYSTEM.MESSAGES',
              alias: 'smsg',
              options: ['settle'], // settle option for queue:// will redlive untill message is accepted by a client.
              credit: {
        		handle: true,
        		initial: 1,
        		limit: 1
      		  }
              }
        ],
        ...
  • OSSEUS_MQ_DB_USAGE (string, required) - what database type to use if send with persist flag set. Options are:
    • MONGODB
    • POSTGRES
    • NONE
  • OSSEUS_MQ_DB_CONNECTION_STRING (string, optional) - connection string for the database that will persist messages.
    • Required if OSSEUS_MQ_DB_USAGE != "NONE"
  • APPLICATION_NAME (string, required) - name of the app to use in logging and messaging.

note: wait for osseus.mq to emit ready event before usage

Protocols

AMQP

AMQP is a specification (not a product) that was created to enable interoperability between multiple integration broker implementations. AMQP is a protocol and a fairly complete specification for the most commonly used middleware functionality.

Topic / Queue names {#topicnames}

Topics should be used when a message sould have multiple receivers, order is not an issue and there is no single action that is "atomic" that needs to be taken by the consumers.

Queue should be used when only a single consumer should consume the message and report when it is finished with taking action on that message or return the message to the Queue for another consumer.

Queue can have exclusive mode and both of them can have durable and save historical messages.

In order to use a queue topic name should start with queue:// and to use a topic with topic://, both support wildcards '*' and both support complete will end '>', so topic://A.*.C will match with wildcard whereas topic://A.> will match everything that starts with A.

Methods

All methods are accessable via the mq object which will be added to the base osseus object.

The mq object is an EventEmitter.

send (alias, message, options) ⇒ Promise {#send}

Method to send data to the named queue or topic, promise will be resolved once message is sent (and persisted if specified), promise will contain the base message object which will have a requestId if provided, and the system generated messageId

Kind: function

| Param | Type | Description | | ------------------| ------------------- | ------------------------------------------------------------------------------- | | alias | string | string alias for the topic / queue | | message | object | JSON to pass as the message (will be available in <object>.msg) | | options | object | options object | | options.persist | 'async'/'sync' | persist message synchronously or asynchronously, if not set message not persisted | | options.requestId | string | request id to add to the base message obejct | | options.topic | string | set topic field on message for diffrent message types |

receive (alias, options, callback) ⇒ no return value {#receive}

Method to start getting notifications for incomming messages on the queue / topic. seta up a callback for the queue / topic, the callback is optional and user can instead use the .on('message',(alias, message, done, failed) method of the object to get the emitted events, though receive still needs to be called to set up a listener for the specific queue. queue must call done once it is done proccessing the message, calling failed(string - error) instead will set the message to error, not calling anything will redelive the messgae to this or any other consumer after the broker sees this instance has dissconnected.

Kind: function

| Param | Type | Description | | --------------------| ----------------------------------------------------| ----------------------------------------------- | | alias | string | string alias for the topic / queue | | options | object | options object | | callback (optional) | function(alais, message, done, failed) | called when a message arrives to the queue/topic|

Statistics

osseus-mq allows getting statistics from queues. Getting statistics is done by sending a message to the broker and waiting for it to respond. The number of messages in the response is as the number of queues requested. Since there is no way of telling when the broker is finished answering, gathering the statistics is divided into two parts. The first one is for sending the request to the broker to get statistics for a destination and the second one is to get the last updated statistics received from the broker for the same destination.

sendDestinationStatisticsRequest (destination) ⇒ no return value

| Param | Type | Description | | --------------------| ----------------------------------------------------| ----------------------------------------------- | | destination | string | queue |

retrieveStatisticsResponse (destination) ⇒ no return value

| Param | Type | Description | | --------------------| ----------------------------------------------------| ----------------------------------------------- | | destination | string | queue |

It's best to wait for a few seconds between the steps to give the broker time to respond.

You can also use wildcards in the destination. Examples:

TEST.FOO - Get statistics for TEST.FOO

TEST.> - Get statistics for all queues start with TEST

> - Get statistics for all queues.

Known Issues:

consumer:

  • If setting a retain mode (ActiveMQ retain) then messages are expired from a topic based on ttl as well as retain method.

producer:

  • Topics do not receive expire (adviosry messages) even if ttl expires most of the time, even if there was no consumer or a consumer, should maybe check with delivery count.
  • Queues are automatically retaining all messages based on ttl, while topics fire and forget
  • Settle on disposition don't work, broker sends ack frame always when the message is stored in the broker (both for queues and topics)

Advisory messages:

NoConsumer:
Topic :
  • Works. Message is sent only if publisher connects to a topic and there are no consumers at that time when a consumer goes up or a consumer goes down. Consumer advisory message is sent with applicationProperties: { consumerCount: int }
Queue :
  • Message is never sent.
Consumer:
Topic :
  • Works. Message is sent when a consumer count changes applicationProperties: { consumerCount: int }, not if there are 0 consumers when producer starts
Queue :
  • Works. Message is sent when a consumer count changes applicationProperties: { consumerCount: int }, not if there are 0 consumers when producer starts

License

Code released under the MIT License.