npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@hapiness/rabbitmq

v1.7.3

Published

Hapiness module for rabbitmq

Downloads

38

Readme

RabbitMQ Module

RabbitMQ module for the Hapiness framework.

RabbitMQ is a server that implement the AMQP 0-9-1 protocol.

Getting started with AMQP concepts

The module uses amqp.node to connect to RabbitMQ and is architectured arround the channel API provided.

Table of contents

How this module works

Prototyping your AMQP usage

With this module you will be able to configure your AMQP stack easily with the way you prefer.

We provide three decorators, @Exchange, @Queue, @Message that will allow you to quickly getting started.

Configuration

Connection object

Connection & initialization

This module supports only one connection at the same time.

By default the module will retry to connect after a connection error. This behaviour is configurable.

When the connection is ready the extension will find all classes with decorators and do all the work to get everything ready.

Channels

Each connection can open several channels. Every operation on RabbitMQ occurs through channels.

You can create them easily with the ChannelService.

Exchanges

Exchanges needs a name and a type.

Decorator parameters:

  • name: string
  • type: ExchangeType (ExchangeType.Direct, ExchangeType.Topic, ExchangeType.Fanout)
  • options: Object optional see exchange assert options

Queues

Queues only requires a name.

Decorator parameters:

Message and routing

Each message sent on RabbitMQ is consumed by a queue.

You can decide to receive all the messages on your queue onMessage method. That's a good option if you have only one type of message arriving on it. You can also call your own dispatcher there.

It's also possible to receive plenty of different messages on the same queue. Creating one class to handle each message is then a better choice.

This module allow you to link a RabbitMessage to your custom message class. We provide a message router that will load the right message decorator class when receiving new messages. If no message class is found the onMessage method on your queue is used as a fallback. If you did not provide this method an error will be throwned.

Decorator parameters:

  • queue: the queue class where the message is consumed
  • exchange: the exchange class
  • routingKey: string or regex to match the routingKey of the message
  • filter: a simple one level object with keys and values. Keys are the path on the RabbitMQ message and values could be a string, number, boolean or RegExp.

Using your module inside Hapiness application

yarn or npm it in your package.json

$ npm install --save @hapiness/core @hapiness/rabbitmq rxjs

or

$ yarn add @hapiness/core @hapiness/rabbitmq rxjs
"dependencies": {
    "@hapiness/core": "^1.3.0",
    "@hapiness/rabbitmq": "^1.2.3",
    "rxjs": "^5.5.6",
    //...
}
//...

Importing RabbitMQModule from the library

This module provide an Hapiness extension for RabbitMQ. To use it, simply register it during the bootstrap step of your project and provide the RabbitMQExt with its config

import { RabbitMQExt } from '@hapiness/rabbitmq';

@HapinessModule({
    version: '1.0.0',
    providers: [],
    declarations: [],
    imports: [RabbitMQModule]
})
class MyApp implements OnStart {
    constructor() {}
    onStart() {}
}

Hapiness
    .bootstrap(
        MyApp,
        [
            /* ... */
            RabbitMQExt.setConfig(
                {
                    connection: {
                        host: 'localhost',
                        port: 5276,
                        vhost: 'my_vhost',
                        login: 'xxx',
                        password: 'xxxx'
                    }
                }
            )
        ]
    )
    .catch(err => {
        /* ... */
    });

Using RabbitMQ inside your application

Using decorators

@Exchange({
    name: 'user.exchange',
    type: ExchangeType.Topic,
    // See options available: http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange
    options: {
        durable: true,
        autoDelete: false
    }
})
export class UserExchange implements ExchangeInterface {}

@Queue({
    name: 'user.queue',
    // See options available: http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
    options: {
        durable: true
    },
    binds: [{
        exchange: UserExchange,
        pattern: 'user.*'
    }]
})
export class UserQueue implements QueueInterface {

    // Inject your services
    constructor(private _myService; MyService) {}

    // Optional
    // Do some action when the queue is asserted
    onAsserted() {
        this._myService.foo();
    }

    // When a message is consumed it will arrives here if no message class has been found
    // by the router
    onMessage(message: RabbitMessage, ch: ChannelInterface) {
        return Observable.of({ ack: true });
    }

}

@Message({
    queue: UserQueue,
    exchange: UserExchange,
    routingKey: 'user.edited'
})
export class UserCreatedMessage implements MessageInterface {

    constructor(private _myService: MyService) {
        super();
    }

    // Executed when a message is consumed and dispatched here
    onMessage(message: RabbitMessage, ch: ChannelInterface): Observable<MessageResult> {
        this._myService.foo();
        // You can return an object to let the consumer know what to do with your message:
        // acknowleding, rejecting it or do nothing
        return Observable.of({ ack: true });
    }

}

This configuration will create:

  • One exchange of type topic named user.exchange.
  • One durable queue named user.queue
    • It will bind this queue to the previously created exchange with the routingKey user.*
  • It will dispatch all messages which are sent to the exchange and have the routingKey user.edited consumed by the previously created queue to the new message we created.
  • All other messages sent to the exchange with a routingKey matching the pattern user.* or sent directly to the queue will be consumed by the onMessage() method defined in the queue.

Integration in your hapiness application

Module

You need to include RabbitMQModule in imports and all your decorated classes in declarations.

@HapinessModule({
            version: '1.0.0',
            declarations: [
                MyQueue,
                MyExchange,
                MyMessage,
                ...
            ],
            providers: [
                MyService
            ],
            exports: [],
            imports: [RabbitMQModule]
        })
Bootstrap

You need to inject the extension in bootstrap using setConfig to instantiate the module.

Hapiness.bootstrap(RabbitMQModuleTest, [
    RabbitMQExt.setConfig({
        connection: {
            host: '....',
            login: '....',
            password: '....'
        }
    })
]).catch(err => done(err));

Using the services

Once the extension is loaded and RabbitMQ is connected you can use the services in your app.

We provide two services:

ConnectionService, ChannelService, MessageService

To send messages you can also use the sendMessage() utility provided.


class FooProvider {

    constructor(private _channelService: ChannelService, private _messageService: MessageService) {}

    bar(): Observable<ChannelManager> {
        // Upsert a channel by specifying a key to identify it
        // one key per channel.
        // The function returns a Observable of ChannelManager instance
    	this._channelService.upsert('publish')
            .subscribe(channelManager => {
                this._myChannelManager = channelManager;
            });
    }


    foo() {
        // Use the created channel
        // Use the manager to retrieve the channel instance
        const ch = this._myChannelManager.getChannel();

        // ... or retrieve it with the shortcut getChannel and your key
        const ch = this._channelService.getChannel('publish');

        // Use any function from amqp.node
        ch.sendToQueue(...);

        this.sendToQueue(ch, { foo: 'bar' }, UserQueue);
        this.publish(ch, { foo: 'bar' }, UserExchange, { routingKey: 'foo.bar' });
    }

}

Back to top

Contributing

To set up your development environment:

  1. clone the repo to your workspace,
  2. in the shell cd to the main folder,
  3. hit npm or yarn install,
  4. run npm or yarn run test.
    • It will lint the code and execute all tests.
    • The test coverage report can be viewed from ./coverage/lcov-report/index.html.

Back to top

Change History

  • v1.7.2 (2019-12-16)
    • Handle all errors when sending a message
    • Fix scope of "this" when sending message
  • v1.7.1 (2019-12-13)
    • Handle channel closed error when sending a message to add a custom code on the thrown error
  • v1.7.0 (2019-02-27)
    • Add method to cancel consuming queue
    • Refactor consume queue to allow easier consume/cancel
    • Add a QueueStore to fetch all the queues manager instances
  • v1.6.2 (2018-11-22)
    • Create DI with providers for queues and exchanges
  • v1.6.1 (2018-11-14)
    • force_json_decode is now true by default
  • v1.6.0 (2018-10-31)
    • Add assert option in Exchange and Queue decorator to allow to disable assert during bootstrap
    • Add check option in Exchange and Queue decorator to verify existence during bootstrap
  • v1.5.1 (2018-09-24)
    • Fix reconnection error: use once instad of on and rebind event correctly
  • v1.5.0 (2018-08-24)
    • Add possibility to provide a custom MessageRouter
  • v1.4.3 (2018-08-20)
    • Emit RETRY_LIMIT_EXCEEDED error on ConnectionManager
  • v1.4.2 (2018-06-11)
    • Do not retry to connect if closing server
  • v1.4.1 (2018-05-31)
    • Fix channel creation after reconnection
  • v1.4.0 (2018-04-24)
    • Refactor channel management to handle connection errors
  • v1.3.0 (2018-03-27)
    • Add shutdown (SIGTERM/SIGINT) support
  • v1.2.3 (2018-02-05)
    • Latest packages' versions.
    • Fix typings
    • Documentation.
  • v1.2.2 (2017-12-20)
    • Latest packages' versions.
    • Fix queue dispatching in routing messages
    • Documentation.
  • v1.2.1 (2017-11-23)
    • Latest packages' versions.
    • Fix routing messages
    • Documentation.
  • v1.2.0 (2017-11-20)
    • Latest packages' versions.
    • Update Module + Tests related to latest core version.
    • Documentation.
    • Change packaging process.
  • v1.1.2 (2017-11-02)
    • Fix decorators prefetch
  • v1.1.1 (2017-10-31)
    • Fix queue binding
  • v1.1.0 (2017-10-27)
    • Allow to define queue binds without pattern
    • Allow to define queue bind pattern as array
    • Add default prefetch that is used for each channel creation if not specified in create() method first argument
    • Rename decodeContent to decodeJSONContent and change logic to not throw if content is not JSON, add force argument to try to decode if headers.json boolean is missing
    • Add force_json_decode option in queue decorator to force JSON decoding of all messages consumed
    • Rework dispatcher logic (1)
    • Add channel option for queue to allow using different channel for each queue with a different prefetch
    • Export a global event object for connection and queueManager events
    • Correct logic behind message routing
    • Add checks and throw if messages do not have all required properties
    • If the message has a filter property and it does not match discard the class from the selection
    • Update tests
    • Update documentation
  • v1.0.0 (2017-10-23)
    • Publish all features of the module
    • Tests
    • Documentation

Back to top

Maintainers

Back to top

License

Copyright (c) 2017 Hapiness Licensed under the MIT license.

Back to top