@quansitech/qs-mysql-sync-kafka
v1.2.1
Published
kafka mysql sync consumer
Downloads
14
Readme
mysql 同步组件(Kafka)
介绍
用于将mysql的数据同步到其他的数据源,如不同类型的关系数据库,nosql数据库,Elasticsearch等。
结合了kafka的mysql-connect组件,可快速实现同步。
PS. 暂时实现了mysql->elasticsearch(7.10)的同步功能,暂不支持集群模式。
准备
kafka 版本 2.13-2.6.0 官网
mysql监控表
CREATE TABLE `es_sync` (
`id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT primary key,
`data_type` varchar(30) COLLATE utf8mb4_general_ci NOT NULL COMMENT '数据类型',
`op` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '操作: update delete insert等',
`data_key` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '字段值',
`create_date` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
//设置全覆盖索引,提升查询性能
ALTER TABLE `kb_es_sync` ADD KEY `idx_full_cover` (`id`,`create_date`,`data_type`,`op`,`data_key`);
安装
npm install @quansitech/qs-mysql-sync-kafka
设置
- 启动kafka及kafka-connect-mysql (方法自行查找相关文档)。
- 创建mysql监控表,kafka-connect-mysql使用timestamp+incrementing模式同步内容到kafka的topic。
- 监控需要同步的表,当表发生变动时利用增删改触发器向es_sync表插入一条数据。
- data_type: 是对应的模块名,封装着初始化、增、删、改等具体的同步逻辑,也可以是更个性化的逻辑,根据具体需求灵活设置。
- op:就是对应的模块具体方法,如上面提到的增、删、改等操作。
- data_key:是传递给op方法的值。
- 编写具体的模块同步逻辑。
- 执行初始化,并启动consumer模式,开始监控同步。
配置及接口说明
组件由三个类组成,分别是
- MysqlEsSync 负责提供mysql->es同步的调用方法。
- SyncClient 组件的核心模块,负责topic信息的接收,根据信息调度不同的处理模块。
- Utils 组件的工具类
类接口详解
MysqlEsSync
getInstance(opt) 对象实例化入口(单例)
参数说明:
opt object类型
const defaultOpt = { mysqlConnectionLimit : 10, //mysql连接数 mysqlHost: '', //mysql服务器地址 mysqlPort: 3306, //mysql服务器端口 mysqlUser: '', //mysql用户名 mysqlPassword: '', //mysql用户密码 mysqlDatabase: '', //要连接的mysql数据库 esHost: '', //es服务器地址 esPort: '', //es服务器端口号 bulkFetchRow: 500 //批量同步每次同步的数据行数 };
deleteEsIndex(index) 删除Es索引
参数说明:
index string类型 索引名称
createEsIndex(index, mapping, settings) 创建Es索引
参数说明:
index string类型 索引名称
mapping object类型 索引的mapping配置
settings object类型 索引的settings配置
deleteEsDoc(id, index) 删除Es索引指定doc记录
参数说明:
id string类型 doc id值
index string类型 索引名称
bulkPutEsFromMysql(sql, docKey, index, pipeline) 从mysql批量同步数据指指定的Es索引
参数说明:
sql string类型 sql查询语句
docKey string类型 作为doc主键的数据库字段
index string类型 索引名称
pipeline string类型 指定向索引插入数据时需要执行的数据处理管道的管道名称
bulkPutEsFromMysqlById(sql, docKey, index, idColumn, pipeline) 从mysql批量同步数据指指定的Es索引(数据量很大情况下用这个方法效率更高)
参数说明:
sql string类型 sql查询语句
docKey string类型 作为doc主键的数据库字段
index string类型 索引名称
idColumn string类型 mysql数据表主键,用于分页排序
pipeline string类型 指定向索引插入数据时需要执行的数据处理管道的管道名称
deletePolicy(name) 删除Es的enrich策略
参数说明:
name string类型 enrich策略名
createPolicy(name, body) 创建Es的enrich策略
参数说明:
name string类型 enrich策略名
body object类型 策略配置
executePolicy(name, requestTimeout) 执行Es的enrich策略
参数说明:
name string类型 enrich策略名 requestTimeout 整数类型 请求超时时间,单位毫秒 默认30000毫秒
createPipeline(id, body) 创建Es 管道
参数说明:
id string类型 管道名
body object类型 管道配置
deletePipeline(id) 删除Es 管道
参数说明:
id string类型 管道名
esUpdateByQuery(index, pipeline, body) 更新Es符合条件的索引记录
参数说明:
index string类型 索引名称
pipeline string类型 更新执行的管道的管道名
body object类型 需要执行更新操作的记录搜索设置
SyncClient
- constructor(opt, modules) 构造函数
参数说明:
opt object类型 配置参数
const defaultOpt = { kafkaTopic: '', //要监听的topic名 kafkaHost: '', //kafka 服务器地址 adminEmail: '', //如发生错误需要发送邮件,则可在此设置邮件,多个用逗号隔开 nodemailerOpt: { host: '', //邮件服务器 port: 465, //邮件服务器端口号 secure: true, //是否使用安全连接 user: '', //邮件账号 password: '' //账号密码 } };
modules array类型 模块数组
modules = [ require('./sync-module/book') //引入模块的js ]
init(param) 数据初始化
参数说明:
param object类型 设置项
param = { module: '' //指定初始化的模块名 } { module: 'all' } //表示初始化所有模块 { module: [ 'book' ]} //表示仅初始化book模块
startConsumer(param) 启动consumer
参数说明:
param object类型 设置项
param = { offset: 100 //指定topic偏移量 }
Utils
parseParam() 将命令行的参数转换成对象配置
//node index.js init --module=all const param = Utils.parseParam(); console.log(param); // { module: 'all' } //node index.js init --module=book,borrow const param = Utils.parseParam(); console.log(param); // { module: ['book', 'borrow'] }
chunkRun(chunkSize, arr, runFn) Es最大支持65536 的terms查询设置,如果需要批量同步的sql数据大于该值,则需要切分,该方法可以自动帮我们完成数组的切分
参数说明:
chunkSize 整数类型 切分大小
arr 数组类型 需要被切分的数组
runFn 匿名函数 切分后的数组会作为参数传入该匿名函数执行
delayRun(runFn, delayMilliSecond) 封装了setTimeout,返回promise
参数说明:
runFn 匿名类型 需要推迟执行的函数
delayMilliSecond 整数类型 延迟执行毫秒
await utils.delayRun(async () => { await doSomeThingAsync(); }, 10000);
模块接口说明
name 模块名称
init() 执行初始化时会调用该方法
其余的方法可根据需要自定义,但必须与 es_sync表里的op字段对应,否则报错。es_sync表的data_key记录会通过第一个参数传入
简单用例
假设我们要同步一张书籍表book
CREATE TABLE `book` (
`id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT primary key,
`book_name` varchar(30) NOT NULL,
`isbn` varchar(30) NOT NULL,
`author` varchar(30) NOT NULL,
`price` varchar(10) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
创建增删改触发器
CREATE TRIGGER `tri_book_delete_after` AFTER DELETE ON `book` FOR EACH ROW BEGIN
insert into es_sync(`data_type`, `op`, `data_key`) values('book', 'delete', old.id);
END;
CREATE TRIGGER `tri_book_insert_after` AFTER INSERT ON `book` FOR EACH ROW BEGIN
insert into es_sync(`data_type`, `op`, `data_key`) values('book', 'insert', new.id);
END
CREATE TRIGGER `tri_book_update_after` AFTER UPDATE ON `book` FOR EACH ROW BEGIN
insert into es_sync(`data_type`, `op`, `data_key`) values('book', 'update', new.id);
END
编写同步模块 book.js
const MysqlSync = require('@quansitech/qs-mysql-sync-kafka');
const MysqlEsSync = MysqlSync.MysqlEsSync;
const opt = {
mysqlHost: '127.0.0.1',
mysqlPort: 3306,
mysqlUser: 'db_user',
mysqlPassword: '123456',
mysqlDatabase: 'demo',
esHost: '127.0.0.1',
esPort: '9200'
};
const mesClient = MysqlEsSync.getInstance(opt);
const indexName = 'book';
const indexMapping = {
properties: {
id: { type: "keyword" },
book_name: { type: "keyword" },
isbn: { type: 'keyword' },
author: {
type: 'keyword',
},
price: {
type: 'float'
}
}
};
const fetchSql = (id = '') => {
let sql = `select id,book_name,isbn,author,price from book`;
if(id){
sql = `${sql} where id='${id}'`;
}
return sql;
};
module.exports = {
name: 'book',
init: async function(){
await mesClient.deleteEsIndex(indexName);
await mesClient.createEsIndex(indexName, indexMapping);
await mesClient.bulkPutEsFromMysqlById(fetchSql(), 'id', indexName, 'id');
},
insert: async function(id){
try{
await mesClient.bulkPutEsFromMysql(fetchSql(id), 'id', indexName);
}
catch(e){
throw e;
}
},
update: async function(id){
try{
await mesClient.bulkPutEsFromMysql(fetchSql(id), 'id', indexName);
}
catch(e){
throw e;
}
},
delete: async function(id){
try{
await mesClient.deleteEsDoc(id, indexName);
}
catch(e){
throw e;
}
}
}
编写启动代码 index.js
const MysqlSync = require('@quansitech/qs-mysql-sync-kafka');
const SyncClient = MysqlSync.SyncClient;
const utils = MysqlSync.Utils;
const opt = {
kafkaTopic: 'demo.es_sync',
kafkaHost: '127.0.0.1:9092'
};
const syncClient = new SyncClient(opt, [
require('./book'),
]);
const param = utils.parseParam();
switch(process.argv[2]){
case 'init':
syncClient.init(param).then(() => { process.exit(0) });
break;
case 'consumer':
syncClient.startConsumer(param);
break;
}
执行初始化
node index.js init --module=all
启动kafka consumer
node index.js consumer
高级用法
模块方法的延迟执行
customOp(id, startSync = false) //当customOp返回true时,核心获取后会将该操作放入延迟队列。当所有topic的消息都过完一遍后再发起延迟队列的执行。执行延迟队列的方法时,startSync参数会传入true。
看说明可能比较难理解,我们举个具体的例子来说明。
有两张数据库表book, borrow,book是书籍资料表,borrow 是借阅记录表。假设这两张表数据量非常大,我们想将他们同步到ES来统计作者的被借阅次数汇总,借此来分析作者的受欢迎程度。
book表
CREATE TABLE `book` ( `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT primary key, `book_name` varchar(30) NOT NULL, `isbn` varchar(30) NOT NULL, `author` varchar(30) NOT NULL, `price` varchar(10) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
borrow表
CREATE TABLE `borrow` ( `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT primary key, `book_id` varchar(30) NOT NULL, `reader_id` varchar(30) NOT NULL, `borrow_time` timestamp(3) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
(触发器参照简单用例,这里不再提供代码)
将这两张表同步到ES,但你会发现ES不支持关联查询,而ES提供了一种enrich的策略类型结合pipeline来将两个不同的索引组合的方法。
book模块
const MysqlSync = require('@quansitech/qs-mysql-sync-kafka'); const MysqlEsSync = MysqlSync.MysqlEsSync; const opt = { mysqlHost: '127.0.0.1', mysqlPort: 3306, mysqlUser: 'db_user', mysqlPassword: '123456', mysqlDatabase: 'demo', esHost: '127.0.0.1', esPort: '9200' }; const mesClient = MysqlEsSync.getInstance(opt); const indexName = 'book'; const indexMapping = { properties: { id: { type: "keyword" }, book_name: { type: "keyword" }, isbn: { type: 'keyword' }, author: { type: 'keyword', }, price: { type: 'float' } } }; const fetchSql = (id = '') => { let sql = `select id,book_name,isbn,author,price from book`; if(id){ sql = `${sql} where id='${id}'`; } return sql; }; const updateByQueryBody = id => { return { query: { term: { book_id: { value: id } } } }; }; module.exports = { name: 'book', init: async function(){ await mesClient.deleteEsIndex(indexName); await mesClient.createEsIndex(indexName, indexMapping); await mesClient.bulkPutEsFromMysqlById(fetchSql(), 'id', indexName, 'id'); }, insert: async function(id){ try{ await mesClient.bulkPutEsFromMysql(fetchSql(id), 'id', indexName); } catch(e){ throw e; } }, update: async function(id){ try{ await mesClient.bulkPutEsFromMysql(fetchSql(id), 'id', indexName); //book索引发生变化,必须重启policy来重建enrich索引 await mesClient.executePolicy('book_lookup'); //触发borrow数据的更新 await mesClient.esUpdateByQuery('borrow', 'borrow_pipeline', updateByQueryBody(id)); } catch(e){ throw e; } }, delete: async function(id){ try{ await mesClient.deleteEsDoc(id, indexName); } catch(e){ throw e; } } }
borrow模块
const MysqlSync = require('@quansitech/qs-mysql-sync-kafka'); const MysqlEsSync = MysqlSync.MysqlEsSync; const indexName = 'borrow'; const pipelineId = 'borrow_pipeline'; const policyName = 'book_lookup'; const opt = { mysqlHost: '127.0.0.1', mysqlPort: 3306, mysqlUser: 'db_user', mysqlPassword: '123456', mysqlDatabase: 'demo', esHost: '127.0.0.1', esPort: '9200' }; const mesClient = MysqlEsSync.getInstance(opt); const indexMapping = { properties: { id: { type: "keyword" }, reader_id: { type: "keyword" }, book_id: { type: "keyword" }, borrow_time: { type: "date" } } } const pipelineBody = { processors: [ { enrich: { policy_name: policyName, field : "book_id", target_field: "book_data" } } ] }; const policyBody = { match: { indices: 'book', match_field: 'id', enrich_fields: [ 'isbn', 'book_name','author', 'price'] } }; const fetchSql = (id = '', bookId = '' ) => { let sql = `SELECT id, reader_id, book_id, borrow_time FROM borrow where 1=1`; if(id){ sql = `${sql} and id='${id}'`; } if(bookId){ sql = `${sql} and book_id='${bookId}'`; } return sql; }; module.exports = { name: 'borrow', init: async function(){ await mesClient.deleteEsIndex(indexName); await mesClient.createEsIndex(indexName, indexMapping); await mesClient.deletePipeline(pipelineId); await mesClient.deletePolicy(policyName); await mesClient.createPolicy(policyName, policyBody); await mesClient.executePolicy(policyName); await mesClient.createPipeline(pipelineId, pipelineBody); await mesClient.bulkPutEsFromMysqlById(fetchSql(), 'id', indexName, 'id', pipelineId); }, insert: async function(id){ try{ await mesClient.bulkPutEsFromMysql(fetchSql(id), 'id', indexName, pipelineId); } catch(e){ throw e; } }, update: async function(id){ try{ await mesClient.bulkPutEsFromMysql(fetchSql(id), 'id', indexName, pipelineId); } catch(e){ throw e; } }, delete: async function(id){ try{ await mesClient.deleteEsDoc(id, indexName); } catch(e){ throw e; } } }
book模块的update操作会触发enrich policy的重启,而这个操作很耗时。如果有成千上万的book数据更新,那肯定是处理不过来的。这种场景就可以使用延迟执行的特性,现在改造下book模块。
. . . //改成可批量更新的模式 const fetchSql = (id = '') => { let sql = `SELECT id, reader_id, book_id, borrow_time FROM borrow where 1=1`; if(id && typeof id == 'string'){ sql = `${sql} where id='${id}'`; } else if(id && typeof id == 'object'){ sql = `${sql} where id in ('${id.join("','")}')`; } return sql; }; //改成可批量更新的模式 const updateByQueryBody = id => { if(id && typeof id == 'string'){ return { query: { term: { book_id: { value: id } } } }; } else if(id && typeof id == 'object'){ return { query: { terms: { book_id: id } } }; } }; //用于收集需要延迟更新的id let updateIds = []; module.exports = { . . . update: async function(id, startSync = false){ try{ //先不执行,仅仅收集id if(startSync == false){ if(!updateIds.includes(id)){ updateIds.push(id); } //返回true 告诉核心模块该方法需要延迟执行 return true; } else{ //startSync=true 所有消息收集完毕,开始执行 await mesClient.bulkPutEsFromMysql(fetchSql(updateIds), 'id', indexName, pipelineId); await mesClient.executePolicy('book_lookup'); //es terms查询最多支持到65536的数据大小,需要切块执行更新 await utils.chunkRun(60000, updateIds, async chunkIds => { await mesClient.esUpdateByQuery('borrow', 'borrow_pipeline', updateByQueryBody(chunkIds)); }); updateIds = []; } } catch(e){ throw e; } }, . . . }
经改造后, update操作仅在消息收集完的最后一刻执行一次,这样就大大提高了程序的效率和稳定性。
索引的映射类型为object时的处理方法
一般情况下索引映射是一维类型如下
//一维的映射 const indexMapping = { properties: { id: { type: "keyword" }, reader_id: { type: "keyword" }, book_id: { type: "keyword" }, borrow_time: { type: "date" } } } //sql查询的字段名称和映射属性名对应即可完成导入 const fetchSql = (id = '', bookId = '' ) => { let sql = `SELECT id, reader_id, book_id, borrow_time FROM borrow where 1=1`; if(id){ sql = `${sql} and id='${id}'`; } if(bookId){ sql = `${sql} and book_id='${bookId}'`; } return sql; };
但如果索引映射是二维类型(object)
//二维的映射 const indexMapping = { properties: { id: { type: "keyword" }, reader_id: { type: "keyword" }, book_id: { type: "keyword" }, book:{ properties: { isbn: { type: "keyword" }, book_name: { type: "keyword" } } } borrow_time: { type: "date" } } } //查询sql的字段名称必须做特殊化的处理 const fetchSql = (id = '', bookId = '' ) => { let sql = `SELECT b.id, b.reader_id, b.book_id, bk.book_name as "book.book_name", bk.isbn as "book.isbn", borrow_time FROM borrow b,book bk where bk.id=b.book_id`; if(id){ sql = `${sql} and id='${id}'`; } if(bookId){ sql = `${sql} and book_id='${bookId}'`; } return sql; }; //上面的sql的book_name要写成book.book_name 以表示二维的关系,中间用 "." 来分隔 //isbn同理