redis5-stream
v1.1.0
Published
redis-stream curd
Downloads
153
Maintainers
Readme
使用说明
redis包其实对stream已经进行了封装,只是能参考的文献太少,并且传参结构不明确,经过我一天多的试错,把传参及调用都封装了一下,提交上npm供大家使用
安装
npm install redis
npm install redis5-rstream
更新日志
V1.1.0
- xack封装为promise,思考再三此处回调无太大意义(但回调方法依然保留),外层可直接使用await获取结果,具体可见用例
- 增加xreadGroup、xread回调函数的同步阻塞,解决streams时效性内容的处理顺序隐患
- 建议每个xreadGroup监听一个流或有时效性内容的多个streams,这样既能提高并发性能,又能保证数据按正确顺序获取并处理,监听多个streams启用多个数据库连接分别监听,近期会放出demo到github上
V1.0.9
- 解决了新建消费组历史数据无法获取bug,新建分组时不执行回调函数
使用
stream.write
- 输入数据(支持链式操作)
const redis = require('redis');
const streams = require('redis5-stream');
//数据库配置文件
const config = require('./config/config');
client = redis.createClient(config.port,config.host);
if(config.redis.auth) client.auth(config.redis.auth);
//输入数据: client为数据库对象,login为streams命名
var stream = new streams(client, 'login');
//结尾使用.quit()中断操作
stream.write({test: '23'}).write({test: '1231223'}).quit();
stream.xreadGroup()
- xreadGroup监听消费组数据(可监听多个streams,自动对streams创建消费组,可对空streams创建消费组)
const redis = require('redis');
const streams = require('redis5-stream');
//数据库配置文件
const config = require('./config/config');
client = redis.createClient(config.port,config.host);
if(config.redis.auth) client.auth(config.redis.auth);
//阻断监听消费组数据(循环监听,获取1条数据后执行业务代码,自动再次监听):
var group_name = 'g';
var consumer_name = 'c';
var stream = new streams(client, 'common,login');
stream.xreadGroup(group_name, consumer_name, (res) => {
for (let val of res) {
var streamName = val[0];
var id = val[1][0][0];
var valule = val[1][0][1];
console.log('监听数据并执行业务代码',valule);
//xack消费确认
var ret = await stream.xack(streamName, group_name, id);
}
let id = str[0];
let mystream = res[0][0];
});
stream.xack()
- 确认消费组消费(xreadGroup获取streams消息后,必须要执行该命令)
//第四个callback回调为可选参数
stream.xack(mystream, group_name, id,(res) => {
//res返回值为1时,执行成功
});
V1.1.0版本以后可以使用await获取结果
var ret = await stream.xack(mystream, group_name, id);
stream.xread
- xread阻塞监听数据(可监听多个streams)
const redis = require('redis');
const streams = require('redis5-stream');
//数据库配置文件
const config = require('./config/config');
client = redis.createClient(config.port,config.host);
if(config.redis.auth) client.auth(config.redis.auth);
var stream = new streams(client, 'common,login');
stream.xread((res) => {
console.log('监听数据并执行业务代码',res);
})
其他
- 支持streams.del()操作
- 预计2020年1月23日提供程序DEMO