egg-yanxin-kafkajs
v1.0.1
Published
egg kafka plugin base kafkajs module
Downloads
5
Maintainers
Readme
egg-yanxin-kafkajs
用于egg.js
的kafka消息队列插件,依赖于kafkajs
,相关文档详情:kafkajs
依赖说明
依赖的 egg 版本
enn-egg-kafka 版本 | egg 1.x --- | --- 1.x | 😁 0.x | ❌
依赖的插件
- kafkajs
开启插件
// config/plugin.js
exports.ennEggKafka = {
enable: true,
package: 'kafkajs',
};
使用场景
- 对kafkajs模块封装的eggjs插件
配置
- 配置
config/config.default.js
// 普通配置
config.ennEggKafka = {
client: {
clientId: 'my-app',
brokers: ['1.1.1.1:9092'],
},
consumer: {
groupId: 'group-test',
fetchLogger: false, // 是否开启消费记录日志
},
producer: {
allowAutoTopicCreation: true,
},
};
// 阿里云kafka服务的SASL_SSL协议配置,仅作参考
config.ennEggKafka = {
client: {
clientId: 'my-app',
// 可配置多个地址
brokers: ['1.1.1.1:9093'],
ssl: {
rejectUnauthorized: false,
// 阿里云ca文件
ca: [path.join(__dirname, './config/ca-cert')],
},
sasl: {
mechanism: 'plain',
username: 'your username',
password: 'your password',
},
},
consumer: {
groupId: 'group-test',
fetchLogger: false, // 是否开启消费记录日志
},
producer: {
allowAutoTopicCreation: true,
}
}
其中fetchLogger为新增字段,其他配置字段详见 https://kafka.js.org/docs/getting-started
使用
// 生产消息
await app.kafka.producer.sendMessage('test_topic', [{ key: 'key2111', value: 'hey hey!1111' }])
// 单条信息消费
await app.kafka.consumer.initSubscribe({ topics: ['test_topic'] });
app.kafka.consumer.run({
eachMessage: async ({
topic,
partition,
message,
heartbeat
}) => {
const data = {
partition,
topic,
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
timestamp: message.timestamp,
offset: message.offset,
};
if (message.extra) {
data.extra = message.extra.toString();
}
console.log(message)
}
});
// 批量消费信息
await app.kafka.consumer.initConsumer({ topic: 'test_topic' });
app.kafka.consumer.run({
eachBatchAutoResolve: true,
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
commitOffsetsIfNecessary,
uncommittedOffsets,
isRunning,
isStale,
}) => {
const datas = [];
for (let message of batch.messages) {
console.log(message)
const data = {
topic: batch.topic,
partition: batch.partition,
highWatermark: batch.highWatermark,
message: {
offset: message.offset,
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
}
};
if (message.extra) {
data.extra = message.extra.toString();
}
resolveOffset(message.offset);
await heartbeat();
}
},
})