npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@db3dev/amqp-rxjs

v1.0.6

Published

A rxjs wrapper for amqplib

Downloads

15

Readme

AMQP-RXJS

This project creates a wrapper around the amqplib library that supports:

  • Listening to queues
  • Sending messages to queues
  • Setting up a remote procedure call (RPC) server
  • Pushing to a RPC while waiting for a response.

Please Observe

All messages are converted into a JSON string for safely parsing back to a JSON object from a buffer. What this means to you is that so long as you use this library to send messages you should not have any problem parsing them. If you try to send a message outside this library then you need to be sure that you are sending JSON.stringify objects in order to avoid JSON.parse exceptions.

Creating A Connection

This library contains a class intended to encapsulate a single connection with support for multiple channels. There are two methods for creating a connection:

  • #connect
  • #connectAutoRetry

Connect

Calling this method will return an observable that will emit the first channel created after getting connected. If either connecting or creating the channel fails then an error will be emitted instead.

If you try to call #connect while a connection is opened it will attempt to close the open connection first.

Sample Typescript *'...' represents ambigious code that will depend on your own usage:

import { AMQP } from '@db3dev/amqp-rxjs'

const amqp = new AMQP({...});
amqp.connect().subscribe(
    // Function called when connection has been successful
    (channel) => console.log('Connected via AMQP!')
    
    // Catch function
    (error) => console.error(`Connection Failed...\n${error.stack}`);
);

Connect Auto Retry

#connectAutoRetry performs the same actions as the previous #connect method but will attempt to connect again after 10 seconds if the attempt failed.

Sample Typescript *'...' represents ambigious code that will depend on your own usage:

import { AMQP } from '@db3dev/amqp-rxjs'

const amqp = new AMQP({...});
amqp.connectAutoRetry().subscribe(
    // Function called when connection has been successful
    (channel) => console.log('Connected via AMQP!')
    
    // Catch function
    (error) => console.error(`Failed to create channel...\n${error.stack}`);
);

Closing The Connection

You can also manually close the class's connection.

import { AMQP } from '@db3dev/amqp-rxjs'

const amqp = new AMQP({...});
amqp.connect().switchMap(() => amqp.closeConnection()).subscribe();

Channels

Each instantiated class has one connection, a default channel but also stores a collection of any additional channels in a hashmap.

Create A New Channel

#createChannel will use the current connection to create a channel. An error will be thrown if a connection does not already exist. Specifying a key will make the new channel retreivable.

import { AMQP } from '@db3dev/amqp-rxjs'

const amqp = new AMQP({...});
amqp
    .connect()
    .switchMap(() => amqp.createConnection('two'))
    .subscribe(
        (key) => console.log(!!amqp.getChannel(key))
        // calling getChannel with no parameters retreives the default channel.
    );

Closing A Channel

#closeChannel will close an open channel. Throws an error if the channel could not be retreived.

import { AMQP } from '@db3dev/amqp-rxjs'

const amqp = new AMQP({...});
amqp
    .connect()
    .switchMap(() => amqp.createConnection('two'))
    .switchMap((key) => amqp.closeConnection(key))
    .subscribe(
        (key) => console.log(!!amqp.getChannel(key))
        // calling getChannel with no parameters retreives the default channel.
    );

Listening To A Queue

#listenQueue will create a subject that will emit the message sent to a specified queue. The queue subjects are ReplaySubjects. This means when you subscribe to them you will get every message the subject has received during the time subscribed to the queue in order of earliest to latest.

import { AMQP } from '@db3dev/amqp-rxjs'

const amqp = new AMQP({...});
amqp
    .connect()
    .subscribe(() => init());

function init () {
    amqp.listenQueue<string>('SendMeStrings').subscribe(
        (msg) => console.log(msg);
    );

    amqp.listenQueue<{msg: string}>('SendMeWrappedStrings').subscribe(
        (msg) => console.log(msg.msg)
    );
}

Sending To A Queue

#sendToQueue will send a message to a specified queue.

import { AMQP } from '@db3dev/amqp-rxjs'

const amqp = new AMQP({...});
amqp
    .connect()
    .subscribe(() => init());

function init () {
    amqp.sendToQueue('SendMeStrings', 'Here is a string!').subscribe(
        (success) => console.log(success),
        (failure) => console.error(failure)
    );

    amqp.sendToQueue('SendMeWrappedStrings', {msg: 'Here is a wrapped string!').subscribe(
        (success) => console.log(success),
        (failure) => console.error(failure)
    );
    
    amqp.listenQueue<string>('SendMeStrings').subscribe(
        (msg) => console.log(msg);
    );

    amqp.listenQueue<{msg: string}>('SendMeWrappedStrings').subscribe(
        (msg) => console.log(msg.msg)
    );
}

Remote Procedure Call (RPC)

The RPC makes use of node event emitters and correlationIds to effeciently and effectively send/receive messages. The Rx library provides a very efficent means of publishing and subscribing asynchronously.

Sending A RPC Message

#sendRPC, where T being the type you expect to get back, will send a message to a designated queue. If that message is received by a RPC listener it will have the means it needs to be able to send a processed response back.

To send RPC messages, RPC must initially be enabled by the #\enableRPCSend method.

import { AMQP } from '@db3dev/amqp-rxjs'

const amqp = new AMQP({...});
amqp
    .connect()
    .switchMap(() => amqp.enableRPCSend())
    .subscribe(() => init());

function init () {
    amqp.sendRPC<string>('rpcQueue', {action: 'process', payload: 'aaa'}).subscribe(
        (response) => console.log(response),
        (error) => console.error(error)
    );
}

Setting Up A RPC Listener

#listenRPC will receive messages from a specified queue and then process each message through a specified function. The function signature specifically needs to take in the message as a parameter and a callback function. When the process is over pass the response to the callback method to be sent back.

import { AMQP } from '@db3dev/amqp-rxjs'

const amqp = new AMQP({...});
amqp
    .connect()
    .switchMap(() => amqp.enableRPCSend())
    .subscribe(() => init());

function init () {
    amqp.sendRPC<string>('rpcQueue', {action: 'process', payload: 'aaa'}).subscribe(
        (response) => console.log(response),
        (error) => console.error(error)
    );

    amqp.listenRPC('rpcQueue', (message, cb) => {
        console.log(message.payload);
        cb('thank you for the payload');
    });
}