egg-amqpx
v1.0.4
Published
amqp plugin for egg
Downloads
4
Maintainers
Readme
egg-amqpx
Install
$ npm i egg-amqpx --save
Usage
// {app_root}/config/plugin.js
exports.amqpx = {
enable: true,
package: 'egg-amqpx',
};
Configuration
// {app_root}/config/config.default.js
exports.amqpx = {
client: {
url: 'amqp://localhost',
consumer: {
// ...
},
producer: {
// ...
},
},
};
Queue Mode
定义了一个名为 task 的消息队列
// {app_root}/config/config.default.js
{
consumer: {
tasks: {
mode: 'queue',
queue: {
name: 'tasks', // 队列名称
options: { durable: true },
prefetch: 1, // 预先取几条消费
noAck: false
}
},
},
producer: {
tasks: {
mode: 'queue',
queue: {
name: 'tasks',
options: { durable: true }
}
},
}
}
// {app_root}/app/controller/test.js
async test() {
const message = { queue: 'tasks', content: JSON.stringify({ now: Date.now() }), option: { persistent: true } };
const result = await this.app.amqpx.send(message); // send to tasks queue
console.log(`send queue ${result} : ${JSON.stringify(message)}`);
}
PubSub Mode
定义一个发布订阅模式的消费者、生产者
{
consumer: {
orders: {
mode: 'pubsub',
exchange: {
name: 'logs', // 交换机名称
type: 'fanout', // 交换机类型
options: { durable: false },
},
queues: [ // 一个队列对应一个消费者,一个队列可以匹配多条规则
{
// name: 'A', // 队列名称
options: { exclusive: true },
},
{
// name: 'B',
options: { exclusive: true },
}
]
}
},
producer: {
orders: {
mode: 'pubsub',
exchange: {
name: 'logs', // 交换机名称
type: 'fanout', // 交换机类型
options: { durable: false },
}
},
}
}
Router Mode
{
consumer: {
logger: {
mode: 'router',
exchange: {
name: 'logger', // 交换机名称
type: 'direct', // 交换机类型
options: { durable: false },
},
queues: [ // 一个队列对应一个消费者,一个队列可以匹配多条规则
{
// name: 'A', // 队列名称
router: ['info', 'warning', 'error', 'debug'], // 匹配规则
options: { exclusive: true },
},
{
// name: 'B',
router: ['error'],
options: { exclusive: true },
}
]
}
},
producer: {
logger: {
mode: 'router',
exchange: {
name: 'logger', // 交换机名称
type: 'direct', // 交换机类型
options: { durable: false },
},
}
}
}
Topic Mode
定义按主题模式模式的消费者、生产者
// {app_root}/config/config.default.js
{
consumer: {
orders: {
mode: 'topic',
exchange: {
name: 'orders', // 交换机名称,主题 topic
type: 'topic', // 交换机类型
options: { durable: false },
},
queue: {
name: 'A', // 队列名称
subscriber: 'orders', // 消费者名称,对应文件名
rules: ['files.cn.hz.#'], // 匹配规则
options: { exclusive: true },
},
}
},
producer: {
orders: {
mode: 'topic',
exchange: {
name: 'orders', // 交换机名称,主题 topic
type: 'topic', // 交换机类型
options: { durable: false },
},
}
}
}
// {app_root}/app/controller/test.js
const message = {
exchange: 'orders',
type: 'topic',
key: 'files.cn.hz.store',
content: JSON.stringify({ now: Date.now() }),
};
const result = await this.app.amqpx.publish(message); // send to orders exchange
console.log(`send exchange ${result} : ${JSON.stringify(message)}`);
Example
Questions & Suggestions
Please open an issue here.