@chandre/egg-amqp
v1.1.4
Published
amqplib,rcp,queue
Downloads
11
Readme
@change/egg-amqp
Install
$ npm i @change/egg-amqp --save
Usage
// {app_root}/config/plugin.js
exports.amqp = {
enable: true,
package: '@change/egg-amqp',
};
Configuration
// {app_root}/config/config.default.js
exports.amqp = {
client: {
url: 'amqp://127.0.0.1:5672',
}
};
see config/config.default.js for more detail.
Example
Controller
// {app_root}/app/amqp/controller/queue.js
const Controller = require('egg').Controller;
class QueueController extends Controller {
async test(ctx){
const channel = ctx.channel; //amqp 频道
const message = ctx.message; //amqp 消息
const params = ctx.request.body; //消息内容
ctx.body = 'test'
channel.ack(message); // 队列处理完成
}
}
return QueueController
Middleware
// {app_root}/app/amqp/middleware/handlerQueue.js
module.exports = (app) => {
return async(ctx, next) => {
await next();
return ctx.body; //rpc调用回复结果
}
}
Router
module.exports = app => {
const { middleware, controller } = app.amqp;
// 创建RPC app.amqp.createRpcConsumer("前缀", ['全局中间件']);
const RPC = app.amqp.createRpcConsumer('app.rpc', middleware.handlerQueue(app));
// 添加RPC队列方法
RPC.add({
'test': {
handler: controller.queue.test //控制器
options: { durable: false, autoDelete: true } //队列配置选项
consume: { noAck: false } // 消费者配置选项
}
})
// 创建队列 app.amqp.createQueueConsumer("前缀", ['全局中间件']);
const QUEUE = app.amqp.createQueueConsumer('app.queue');
// 添加队列
QUEUE.add({
'test': {
handler: controller.queue.test //控制器
prefetch: 1, // 预取队列数据
options: { durable: false, autoDelete: true } //队列配置选项
consume: { noAck: false } // 消费者配置选项
middleware: [], //控制器中间件
}
})
// 创建交换机 app.amqp.createExchange("前缀", 交换机配置参数,['交换机队列全局中间件'] )
const EXCHANGE = app.amqp.createExchange('app', {
name: 'fanout', // 交换机名
type: 'fanout', //交换机类型
durable: false,
autoDelete: true,
})
// 添加交换机队列
EXCHANGE.add({
'queue.test': {
handler: controller.queue.test //控制器
prefetch: 1, // 预取队列数据
options: { durable: false, autoDelete: true } //队列配置选项
consume: { noAck: false } // 消费者配置选项
exchange: {}, // 交换机绑定队列参数
routerKey: [], // 交换机队列路由键
middleware: [], //控制器中间件
}
})
}
发送消息到指定队列
// this.app.amqp.send(队列名, 消息内容, [队列属性])
this.app.amqp.send('app.queue.test', {
data: 'hello queue'
}, {
timestamp: 1655360041817,
})
RPC队列调用
// this.app.amqp.call(rpc队列, 消息内容, [队列属性])
this.app.amqp.call('app.rpc.test', {
data: 'hello queue'
} ).then(res => {
console.log(res)
}).catch(err => {
console.log(err)
});
广播消息到交换机中所有队列
// this.app.amqp.publish(交换机, 消息内容, [路由键,队列属性])
this.app.amqp.publish('app.fanout', {
data: 'hello queue'
})
RPC消息到交换机中指定路由键
// this.app.amqp.callTopic(交换机, 消息内容, 路由键, [队列属性])
const result = await this.app.amqp.callTopic('app.fanout', {
data: 'hello queue'
}, 'routerKey')
console.log(result)