npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@appolo/bus

v8.1.5

Published

appolo bus module

Downloads

705

Readme

Appolo Bus Module

bus module for appolo built with rabbot

Installation

npm i @appolo/bus

Options

| key | Description | Type | Default | --- | --- | --- | --- | | id | injection id | string| busProvider| | connection | AMQP connection string | string | null | | auto | true to auto initialize busProvider and start listen to events | boolean | true | | listener | true to register queue event handlers | boolean | true | | exchangeName | name of the exchange | string | | | queueName | name of the queue | string | | | appendEnv | append env name to queueName and exchangeName | boolean | true | | exchange | exchange options | object | {} | | queue | queue options | object | {} | | requestQueue | request queue options | object | {} | | replayQueue | request queue options or false to disable | object | {} |

Exchange Options

| key | Description | Type | Default | --- | --- | --- | --- | | type | request queue options or false to disable | string | topic | | autoDelete | delete when consumer count goes to 0 | boolean | false | | durable | survive broker restarts |boolean | true | | persistent | persistent delivery, messages saved to disk| boolean | true | | alternate | define an alternate exchange | string | | | publishTimeout | timeout in milliseconds for publish calls to this exchange | 2^32 | | replyTimeout | timeout in milliseconds to wait for a reply | 2^32 | | | limit | the number of unpublished messages to cache while waiting on connection | 2^16 | | | noConfirm | prevents rabbot from creating the exchange in confirm mode | boolean | false |

Queue Options

| key | Description | Type | Default | --- | --- | --- | --- | | autoDelete | delete when consumer count goes to 0 | boolean | false| | durable | survive broker restarts | boolean | true | | subscribe | auto-start the subscription | boolean | false | | limit | max number of unacked messages allowed for consumer | 2^16 | 1| | noAck | the server will remove messages from the queue as soon as they are delivered | boolean | false | | noBatch | causes ack, nack & reject to take place immediately | boolean | false | | noCacheKeys | disable cache of matched routing keys to prevent unbounded memory growth | boolean | false | | queueLimit | max number of ready messages a queue can hold | 2^32 | | | messageTt | time in ms before a message expires on the queue | 2^32 | | | expires | time in ms before a queue with 0 consumers expires | 2^32 | | in config/modules/all.ts

import {PubSubModule} from '@appolo/pubsub';

export = async function (app: App) {
   await app.module(new BusModule({redis:"amqp://connection-string"}));
}

Usage

Publisher

import {define, singleton} from 'appolo'
import {publisher} from "@appolo/bus";

@define()
@singleton()
export class SomePublisher {

    @publisher("test")
    async publish(data: any): Promise<any> {
        return data
    }
}

Or with BusProvider

@define()
@singleton()
export class SomePublisher {

    inject() busProvider:BusProvider

    publish(data:any): Promise<any> {
        return this.busProvider.publish("test",data)
    }
}

Handler

if you don not call msg ack or nack it will be called on handler return msg.ack() or msg.nack() on error

import {define, singleton} from 'appolo'
import {handler} from "@appolo/bus";

@define()
@singleton()
export class SomeHandler {

    @handler("test")
    handle(msg: IMessage<data>) {
       //do something
    }

    @handler("someName")
    handle(msg: IMessage<data>) {

        try{
           //do some thing

           msg.ack();
        }
        catch(){
            msg.nack();
        }
    }
}

Request

import {define, singleton} from 'appolo'
import {request} from "@appolo/bus";

@define()
@singleton()
export class SomePublisher {

    @request("test")
    async getData(data: any): Promise<any> {
        return data
    }

    public async handleData(){
        let data = await this.getData({userId:1})
    }


}

Or with BusProvider

@define()
@singleton()
export class SomePublisher {

    inject() busProvider:busProvider

    publish(data:any): Promise<any> {
        let data = await  this.busProvider.request("test",data)

        return data;
    }
}

Reply

import {define, singleton} from 'appolo'
import {handler} from "@appolo/bus";

@define()
@singleton()
export class SomeHandler {

    inject() busProvider:busProvider


    @reply("test")
    handle(msg: IMessage<data>) {
        return {userId:1}
    }

    // or reply methods
    @reply("someName")
    handle(msg: IMessage<data>) {

        try{
            //get some data
         msg.replySuccess(msg,{userId:1})
        }
        catch(){
            msg.replyError(msg,e)
        }
    }
}

IMessage

each handler and reply handler called with message object

{
  // metadata specific to routing & delivery
  fields: {
    consumerTag: "", // identifies the consumer to rabbit
    deliveryTag: #, // identifies the message delivered for rabbit
    redelivered: true|false, // indicates if the message was previously nacked or returned to the queue
    exchange: "" // name of exchange the message was published to,
    routingKey: "" // the routing key (if any) used when published
  },
  properties:{
    contentType: "application/json", // see serialization for how defaults are determined
    contentEncoding: "utf8", // rabbot's default
    headers: {}, // any user provided headers
    correlationId: "", // the correlation id if provided
    replyTo: "", // the reply queue would go here
    messageId: "", // message id if provided
    type: "", // the type of the message published
    appId: "" // not used by rabbot
  },
  content: { "type": "Buffer", "data": [ ... ] }, // raw buffer of message body
  body: , // this could be an object, string, etc - whatever was published
  type: "" // this also contains the type of the message published
}

message.ack()

Enqueues the message for acknowledgement.

message.nack()

Enqueues the message for rejection. This will re-enqueue the message.

message.reject()

Rejects the message without re-queueing it. Please use with caution and consider having a dead-letter-exchange assigned to the queue before using this feature.

message.reply( data:any )

Acknowledges the messages and sends the message back to the requestor.

message.replySuccess( data:T )

reply the message with json object {success: true,data}

message.replyError( e: RequestError<T> )

reply the message with json object {success: false,message: e.message, data:e.data}

BusProvider

initialize()

initialize busProvider and start listen to events if not in in auto mode

publish(type: string, data: any, expire?: number): Promise<void>

publish event

  • type - event name
  • data - any data
  • expire - timeout until the message is expired in the queue

request<T>(type: string, data: any, expire?: number): Promise<T>

request data by event return promise with event response

  • type - event name
  • data - any data
  • expire - timeout until the request is rejected

close<T>(): Promise<void>

close the connection and clean all handlers

getQueueMessagesCount(): Promise<number>

return number of pending events in the queue