@appolo/bus
v8.1.5
Published
appolo bus module
Downloads
705
Readme
Appolo Bus Module
bus module for appolo
built with rabbot
Installation
npm i @appolo/bus
Options
| key | Description | Type | Default
| --- | --- | --- | --- |
| id
| injection id | string
| busProvider
|
| connection
| AMQP connection string | string
| null |
| auto
| true to auto initialize busProvider and start listen to events | boolean
| true
|
| listener
| true to register queue event handlers | boolean
| true
|
| exchangeName
| name of the exchange | string
| |
| queueName
| name of the queue | string
| |
| appendEnv
| append env
name to queueName and exchangeName | boolean
| true
|
| exchange
| exchange options | object
| {}
|
| queue
| queue options | object
| {}
|
| requestQueue
| request queue options | object
| {}
|
| replayQueue
| request queue options or false to disable
| object
| {}
|
Exchange Options
| key | Description | Type | Default
| --- | --- | --- | --- |
| type
| request queue options or false to disable
| string
| topic
|
| autoDelete
| delete when consumer count goes to 0 | boolean | false
|
| durable
| survive broker restarts |boolean | true
|
| persistent
| persistent delivery, messages saved to disk| boolean | true
|
| alternate
| define an alternate exchange | string | |
| publishTimeout
| timeout in milliseconds for publish calls to this exchange | 2^32
|
| replyTimeout
| timeout in milliseconds to wait for a reply | 2^32
| |
| limit
| the number of unpublished messages to cache while waiting on connection | 2^16
| |
| noConfirm
| prevents rabbot from creating the exchange in confirm mode | boolean | false |
Queue Options
| key | Description | Type | Default
| --- | --- | --- | --- |
| autoDelete
| delete when consumer count goes to 0 | boolean | false
|
| durable
| survive broker restarts | boolean | true
|
| subscribe
| auto-start the subscription | boolean | false
|
| limit
| max number of unacked messages allowed for consumer | 2^16 | 1
|
| noAck
| the server will remove messages from the queue as soon as they are delivered | boolean | false |
| noBatch
| causes ack, nack & reject to take place immediately | boolean | false
|
| noCacheKeys
| disable cache of matched routing keys to prevent unbounded memory growth | boolean | false |
| queueLimit
| max number of ready messages a queue can hold | 2^32 | |
| messageTt
| time in ms before a message expires on the queue | 2^32 | |
| expires
| time in ms before a queue with 0 consumers expires | 2^32 | |
in config/modules/all.ts
import {PubSubModule} from '@appolo/pubsub';
export = async function (app: App) {
await app.module(new BusModule({redis:"amqp://connection-string"}));
}
Usage
Publisher
import {define, singleton} from 'appolo'
import {publisher} from "@appolo/bus";
@define()
@singleton()
export class SomePublisher {
@publisher("test")
async publish(data: any): Promise<any> {
return data
}
}
Or with BusProvider
@define()
@singleton()
export class SomePublisher {
inject() busProvider:BusProvider
publish(data:any): Promise<any> {
return this.busProvider.publish("test",data)
}
}
Handler
if you don not call msg ack or nack
it will be called on handler return msg.ack()
or msg.nack()
on error
import {define, singleton} from 'appolo'
import {handler} from "@appolo/bus";
@define()
@singleton()
export class SomeHandler {
@handler("test")
handle(msg: IMessage<data>) {
//do something
}
@handler("someName")
handle(msg: IMessage<data>) {
try{
//do some thing
msg.ack();
}
catch(){
msg.nack();
}
}
}
Request
import {define, singleton} from 'appolo'
import {request} from "@appolo/bus";
@define()
@singleton()
export class SomePublisher {
@request("test")
async getData(data: any): Promise<any> {
return data
}
public async handleData(){
let data = await this.getData({userId:1})
}
}
Or with BusProvider
@define()
@singleton()
export class SomePublisher {
inject() busProvider:busProvider
publish(data:any): Promise<any> {
let data = await this.busProvider.request("test",data)
return data;
}
}
Reply
import {define, singleton} from 'appolo'
import {handler} from "@appolo/bus";
@define()
@singleton()
export class SomeHandler {
inject() busProvider:busProvider
@reply("test")
handle(msg: IMessage<data>) {
return {userId:1}
}
// or reply methods
@reply("someName")
handle(msg: IMessage<data>) {
try{
//get some data
msg.replySuccess(msg,{userId:1})
}
catch(){
msg.replyError(msg,e)
}
}
}
IMessage
each handler and reply handler called with message object
{
// metadata specific to routing & delivery
fields: {
consumerTag: "", // identifies the consumer to rabbit
deliveryTag: #, // identifies the message delivered for rabbit
redelivered: true|false, // indicates if the message was previously nacked or returned to the queue
exchange: "" // name of exchange the message was published to,
routingKey: "" // the routing key (if any) used when published
},
properties:{
contentType: "application/json", // see serialization for how defaults are determined
contentEncoding: "utf8", // rabbot's default
headers: {}, // any user provided headers
correlationId: "", // the correlation id if provided
replyTo: "", // the reply queue would go here
messageId: "", // message id if provided
type: "", // the type of the message published
appId: "" // not used by rabbot
},
content: { "type": "Buffer", "data": [ ... ] }, // raw buffer of message body
body: , // this could be an object, string, etc - whatever was published
type: "" // this also contains the type of the message published
}
message.ack()
Enqueues the message for acknowledgement.
message.nack()
Enqueues the message for rejection. This will re-enqueue the message.
message.reject()
Rejects the message without re-queueing it. Please use with caution and consider having a dead-letter-exchange assigned to the queue before using this feature.
message.reply( data:any )
Acknowledges the messages and sends the message back to the requestor.
message.replySuccess( data:T )
reply the message with json object {success: true,data}
message.replyError( e: RequestError<T> )
reply the message with json object {success: false,message: e.message, data:e.data}
BusProvider
initialize()
initialize busProvider and start listen to events if not in in auto
mode
publish(type: string, data: any, expire?: number): Promise<void>
publish event
- type - event name
- data - any data
- expire - timeout until the message is expired in the queue
request<T>(type: string, data: any, expire?: number): Promise<T>
request data by event return promise with event response
- type - event name
- data - any data
- expire - timeout until the request is rejected
close<T>(): Promise<void>
close the connection and clean all handlers
getQueueMessagesCount(): Promise<number>
return number of pending events in the queue