npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@avanzu/rhea-composable

v1.8.5

Published

Allows bidirectional and unidirectional messaging using the AMQP 1.0 protocol.

Downloads

445

Readme

rhea-composable

Allows bidirectional and unidirectional messaging using the AMQP 1.0 protocol. The heavy lifting regarding the messaging protocol is handled courtesy of rhea.

Usage

There are two scenarios that are available: unidirecitonal and bidirectional messaging.

In both cases, you have two independent system components that are unaware of each other. They only share the knowledge of the queue to communicate over and the format of the message(s).

connecting to the broker

Both systems will have to establish a connection to the message broker.

// src/connection.js
const { useConnection } = require('@avanzu/rhea-composable')

const connection = useConnection()
    .connectionOf(
        'my-connection-id', {
            host: process.env.AMQP_HOST, 
            port: process.env.AMQP_PORT,
            username: process.env.AMQP_USER,
            password: process.env.AMQP_PASSWD,
            transport: process.env.AMQP_TRANSPORT,
    })

module.exports = connection

This will open a connection to your amqp broker. For a full list of connection options, please refer to the rhea documentation.

Attention

typically you can only have exactly one broker connection per IP. Trying to open more than that will most likely cause an exception.

unidirectional

Drop a message into a queue to be processed without expecting a reply.

A corresponding example can be executed via npm run examples:uni

In this scenario we will have a worker and a dispatcher.

worker setup

The worker will use the established connection in order to receive and process messages using a map of message handlers.

// src/worker.js
const { useProcessor } = require('@avanzu/rhea-composable')
const connection = require('./connection')


// declare a map of message handlers 
const handlers = {
    mySubject: ({ message }) => {
        console.log('"mySubject" handler received message %o', message)
    },
    default: ({ message }) => {
        console.log('default handler received message %o', message)
    },
}

// attach handlers to the queue
useProcessor(connection).processMessages(
    process.env.WORKER_QUEUE_NAME, 
    handlers
)

dispatcher setup

The dispatcher will use the established connection in order to add messages to the queue that the worker is processing.

// src/dispatcher.js

const { useSender } = require('@avanzu/rhea-composable')
const connection = require('./connection')

const sender = useSender().openSender(
    connection, 
    process.env.WORKER_QUEUE_NAME
)

// triggering the subject based handler 
sender.send({ subject: 'mySubject', body: { foo: 'bar' } })

// triggering the default handler
sender.send({ subject: '', body: { bar: 'baz' } })

bidirectional

Drop a message into a queue to be processed and expect a reply.

A corresponding example can be executed via npm run examples:bi

In this scenario we will have a responder and a requestor.

responder setup

The responder setup is actually quite similar to the worker setup. The main difference is, that the handlers are supposed to return a reply message.

// src/responder.js
const { useProcessor } = = require('@avanzu/rhea-composable')
const connection = require('./connection')

// declare a map of message handlers 
const handlers = {
    // subject based message handling
    mySubject: ({ message }) => {
        console.log('Received request for "mySubject" %o', message)
        return {
            subject: 'mySubjectReply',
            body: { received: message },
        }
    },
    default: ({ message }) =>
        new Promise((Ok) => {
            console.log('Received request on default handler %o', message)
            Ok({ body: { received: message } })
        }),
}

// attach handlers to the queue
useProcessor(connection).processMessages(
    process.env.DIALOG_QUEUE_NAME, 
    handlers
)

requestor setup

The requestor works similar to the dispatcher. Since you do expect a reply, the send method does return a promise that you can await.

You can assign each individual message with a ttl in milliseconds. Not doing so will assign a ttl of 5000 ms.

// src/requestor.js

const { useDialog } = = require('@avanzu/rhea-composable')
const connection = require('./connection')

const sender = useDialog(connection)
    .openDialog(process.env.DIALOG_QUEUE_NAME)


// triggering the subject based handler 
sender
    .send({ subject: 'mySubject',  body: { foo: 'bar' } })
    .then(
        (reply) => { console.log('Received reply %o', reply) },
        (error) => { console.error('Received error %o', error)}
    )


// triggering the default handler
sender
    .send({ ttl: 10000, body: { bar: 'baz' }  })
    .then(
        (reply) => { console.log('Received reply %o', reply) },
        (error) => { console.error('Received error %o', error)}
    )  

messaging strategies

In terms of organizing different message types, there are two ways of dealing with them.

Single message type queues

...will guarantee that there will only ever be one specific type of message on a specific queue.

This strategy is advisable when you expect a high volume of time critical messages that should be processed in parallel. You will end up with far more queues in the broker but every queue processor has its own receiver so they will be processed in parallel.

Subject based messages over a single queue

...will use a single queue to deliver multiple message types which are distinguished by their subject.

This strategy is advisable when you expect a few messages occasionally. You will end up with fewer queues in the broker but processing might take longer when experiencing higher load.