npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@quansitech/qs-mysql-sync-kafka

v1.2.1

Published

kafka mysql sync consumer

Downloads

14

Readme

mysql 同步组件(Kafka)

介绍

用于将mysql的数据同步到其他的数据源,如不同类型的关系数据库,nosql数据库,Elasticsearch等。

结合了kafka的mysql-connect组件,可快速实现同步。

PS. 暂时实现了mysql->elasticsearch(7.10)的同步功能,暂不支持集群模式。

准备

kafka 版本 2.13-2.6.0 官网

kafka-connect-mysql

mysql监控表

CREATE TABLE `es_sync` (
  `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT primary key,
  `data_type` varchar(30) COLLATE utf8mb4_general_ci NOT NULL COMMENT '数据类型',
  `op` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '操作: update delete insert等',
  `data_key` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '字段值',
  `create_date` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

//设置全覆盖索引,提升查询性能
ALTER TABLE `kb_es_sync` ADD KEY `idx_full_cover` (`id`,`create_date`,`data_type`,`op`,`data_key`);

安装

npm install @quansitech/qs-mysql-sync-kafka

设置

  1. 启动kafka及kafka-connect-mysql (方法自行查找相关文档)。
  2. 创建mysql监控表,kafka-connect-mysql使用timestamp+incrementing模式同步内容到kafka的topic。
  3. 监控需要同步的表,当表发生变动时利用增删改触发器向es_sync表插入一条数据。
    • data_type: 是对应的模块名,封装着初始化、增、删、改等具体的同步逻辑,也可以是更个性化的逻辑,根据具体需求灵活设置。
    • op:就是对应的模块具体方法,如上面提到的增、删、改等操作。
    • data_key:是传递给op方法的值。
  4. 编写具体的模块同步逻辑。
  5. 执行初始化,并启动consumer模式,开始监控同步。

配置及接口说明

组件由三个类组成,分别是

  1. MysqlEsSync 负责提供mysql->es同步的调用方法。
  2. SyncClient 组件的核心模块,负责topic信息的接收,根据信息调度不同的处理模块。
  3. Utils 组件的工具类
  • 类接口详解
    • MysqlEsSync

      1. getInstance(opt) 对象实例化入口(单例)

        参数说明:

        opt object类型

        const defaultOpt = {
           mysqlConnectionLimit : 10, //mysql连接数
           mysqlHost: '', //mysql服务器地址
           mysqlPort: 3306, //mysql服务器端口
           mysqlUser: '', //mysql用户名
           mysqlPassword: '', //mysql用户密码
           mysqlDatabase: '', //要连接的mysql数据库
           esHost: '', //es服务器地址
           esPort: '', //es服务器端口号
           bulkFetchRow: 500 //批量同步每次同步的数据行数
        };
      2. deleteEsIndex(index) 删除Es索引

        参数说明:

        index string类型 索引名称

      3. createEsIndex(index, mapping, settings) 创建Es索引

        参数说明:

        index string类型 索引名称

        mapping object类型 索引的mapping配置

        settings object类型 索引的settings配置

      4. deleteEsDoc(id, index) 删除Es索引指定doc记录

        参数说明:

        id string类型 doc id值

        index string类型 索引名称

      5. bulkPutEsFromMysql(sql, docKey, index, pipeline) 从mysql批量同步数据指指定的Es索引

        参数说明:

        sql string类型 sql查询语句

        docKey string类型 作为doc主键的数据库字段

        index string类型 索引名称

        pipeline string类型 指定向索引插入数据时需要执行的数据处理管道的管道名称

      6. bulkPutEsFromMysqlById(sql, docKey, index, idColumn, pipeline) 从mysql批量同步数据指指定的Es索引(数据量很大情况下用这个方法效率更高)

        参数说明:

        sql string类型 sql查询语句

        docKey string类型 作为doc主键的数据库字段

        index string类型 索引名称

        idColumn string类型 mysql数据表主键,用于分页排序

        pipeline string类型 指定向索引插入数据时需要执行的数据处理管道的管道名称

      7. deletePolicy(name) 删除Es的enrich策略

        参数说明:

        name string类型 enrich策略名

      8. createPolicy(name, body) 创建Es的enrich策略

        参数说明:

        name string类型 enrich策略名

        body object类型 策略配置

      9. executePolicy(name, requestTimeout) 执行Es的enrich策略

        参数说明:

        name string类型 enrich策略名 requestTimeout 整数类型 请求超时时间,单位毫秒 默认30000毫秒

      10. createPipeline(id, body) 创建Es 管道

        参数说明:

        id string类型 管道名

        body object类型 管道配置

      11. deletePipeline(id) 删除Es 管道

        参数说明:

        id string类型 管道名

      12. esUpdateByQuery(index, pipeline, body) 更新Es符合条件的索引记录

        参数说明:

        index string类型 索引名称

        pipeline string类型 更新执行的管道的管道名

        body object类型 需要执行更新操作的记录搜索设置

    • SyncClient

      1. constructor(opt, modules) 构造函数

      参数说明:

      opt object类型 配置参数

      const defaultOpt = {
         kafkaTopic: '', //要监听的topic名
         kafkaHost: '', //kafka 服务器地址
         adminEmail: '', //如发生错误需要发送邮件,则可在此设置邮件,多个用逗号隔开
         nodemailerOpt: { 
             host: '', //邮件服务器
             port: 465, //邮件服务器端口号
             secure: true, //是否使用安全连接
             user: '', //邮件账号
             password: '' //账号密码
         }
      };

      modules array类型 模块数组

      modules = [
         require('./sync-module/book') //引入模块的js
      ]
      1. init(param) 数据初始化

        参数说明:

        param object类型 设置项

        param = {
            module: '' //指定初始化的模块名
        }
        
        { module: 'all' } //表示初始化所有模块
        { module: [ 'book' ]} //表示仅初始化book模块
      2. startConsumer(param) 启动consumer

        参数说明:

        param object类型 设置项

        param = {
            offset: 100 //指定topic偏移量
        }
    • Utils

      1. parseParam() 将命令行的参数转换成对象配置

        //node index.js init --module=all
        const param = Utils.parseParam();
        
        console.log(param);
        // { module: 'all' }
        
        //node index.js init --module=book,borrow
        const param = Utils.parseParam();
        
        console.log(param);
        // { module: ['book', 'borrow'] }
      2. chunkRun(chunkSize, arr, runFn) Es最大支持65536 的terms查询设置,如果需要批量同步的sql数据大于该值,则需要切分,该方法可以自动帮我们完成数组的切分

        参数说明:

        chunkSize 整数类型 切分大小

        arr 数组类型 需要被切分的数组

        runFn 匿名函数 切分后的数组会作为参数传入该匿名函数执行

      3. delayRun(runFn, delayMilliSecond) 封装了setTimeout,返回promise

        参数说明:

        runFn 匿名类型 需要推迟执行的函数

        delayMilliSecond 整数类型 延迟执行毫秒

        await utils.delayRun(async () => {
           await doSomeThingAsync();
        }, 10000);
  • 模块接口说明
    1. name 模块名称

    2. init() 执行初始化时会调用该方法

    3. 其余的方法可根据需要自定义,但必须与 es_sync表里的op字段对应,否则报错。es_sync表的data_key记录会通过第一个参数传入

简单用例

假设我们要同步一张书籍表book

CREATE TABLE `book` (
  `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT primary key,
  `book_name` varchar(30) NOT NULL,
  `isbn` varchar(30) NOT NULL,
  `author` varchar(30) NOT NULL,
  `price` varchar(10) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

创建增删改触发器

CREATE TRIGGER `tri_book_delete_after` AFTER DELETE ON `book` FOR EACH ROW BEGIN
    insert into es_sync(`data_type`, `op`, `data_key`) values('book', 'delete', old.id);
END;

CREATE TRIGGER `tri_book_insert_after` AFTER INSERT ON `book` FOR EACH ROW BEGIN
    insert into es_sync(`data_type`, `op`, `data_key`) values('book', 'insert', new.id);
END

CREATE TRIGGER `tri_book_update_after` AFTER UPDATE ON `book` FOR EACH ROW BEGIN
    insert into es_sync(`data_type`, `op`, `data_key`) values('book', 'update', new.id);
END

编写同步模块 book.js

const MysqlSync = require('@quansitech/qs-mysql-sync-kafka');

const MysqlEsSync = MysqlSync.MysqlEsSync;

const opt = {
    mysqlHost: '127.0.0.1',
    mysqlPort: 3306,
    mysqlUser: 'db_user',
    mysqlPassword: '123456',
    mysqlDatabase: 'demo',
    esHost: '127.0.0.1',
    esPort: '9200'
};
const mesClient = MysqlEsSync.getInstance(opt);

const indexName = 'book';

const indexMapping = {
    properties: {
        id: { type: "keyword" },
        book_name: { type: "keyword" },
        isbn: { type: 'keyword' },
        author: {
            type: 'keyword',
        },
        price: {
            type: 'float'
        }
    }
};

const fetchSql = (id = '') => {
    let sql = `select id,book_name,isbn,author,price from book`;
    if(id){
        sql = `${sql} where id='${id}'`;
    }
    return sql;
};

module.exports = {
    name: 'book',
    init: async function(){
        await mesClient.deleteEsIndex(indexName);

        await mesClient.createEsIndex(indexName, indexMapping);
        
        await mesClient.bulkPutEsFromMysqlById(fetchSql(), 'id', indexName, 'id');
    },
    insert: async function(id){
        try{
            await mesClient.bulkPutEsFromMysql(fetchSql(id), 'id', indexName);
        }
        catch(e){
            throw e;
        }
    },
    update: async function(id){
        try{
            await mesClient.bulkPutEsFromMysql(fetchSql(id), 'id', indexName);
        }
        catch(e){
            throw e;
        }
    },
    delete: async function(id){
        try{
            await mesClient.deleteEsDoc(id, indexName);
        }
        catch(e){
            throw e;
        }
    }
}

编写启动代码 index.js

const MysqlSync = require('@quansitech/qs-mysql-sync-kafka');

const SyncClient = MysqlSync.SyncClient;
const utils = MysqlSync.Utils;

const opt = {
    kafkaTopic: 'demo.es_sync',
    kafkaHost: '127.0.0.1:9092'
};
const syncClient = new SyncClient(opt, [
    require('./book'),
]);


const param = utils.parseParam();
switch(process.argv[2]){
    case 'init':
        syncClient.init(param).then(() => { process.exit(0) });
        break;
    case 'consumer':
        syncClient.startConsumer(param);
        break;
}

执行初始化

node index.js init --module=all

启动kafka consumer

node index.js consumer

高级用法

  • 模块方法的延迟执行

    customOp(id, startSync = false) 
    //当customOp返回true时,核心获取后会将该操作放入延迟队列。当所有topic的消息都过完一遍后再发起延迟队列的执行。执行延迟队列的方法时,startSync参数会传入true。

    看说明可能比较难理解,我们举个具体的例子来说明。

    有两张数据库表book, borrow,book是书籍资料表,borrow 是借阅记录表。假设这两张表数据量非常大,我们想将他们同步到ES来统计作者的被借阅次数汇总,借此来分析作者的受欢迎程度。

    book表

    CREATE TABLE `book` (
      `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT primary key,
      `book_name` varchar(30) NOT NULL,
      `isbn` varchar(30) NOT NULL,
      `author` varchar(30) NOT NULL,
      `price` varchar(10) NOT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

    borrow表

    CREATE TABLE `borrow` (
      `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT primary key,
      `book_id` varchar(30) NOT NULL,
      `reader_id` varchar(30) NOT NULL,
      `borrow_time` timestamp(3) NOT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

    (触发器参照简单用例,这里不再提供代码)

    将这两张表同步到ES,但你会发现ES不支持关联查询,而ES提供了一种enrich的策略类型结合pipeline来将两个不同的索引组合的方法。

    book模块

    const MysqlSync = require('@quansitech/qs-mysql-sync-kafka');
      
    const MysqlEsSync = MysqlSync.MysqlEsSync;
      
    const opt = {
        mysqlHost: '127.0.0.1',
        mysqlPort: 3306,
        mysqlUser: 'db_user',
        mysqlPassword: '123456',
        mysqlDatabase: 'demo',
        esHost: '127.0.0.1',
        esPort: '9200'
    };
    const mesClient = MysqlEsSync.getInstance(opt);
      
    const indexName = 'book';
      
    const indexMapping = {
        properties: {
            id: { type: "keyword" },
            book_name: { type: "keyword" },
            isbn: { type: 'keyword' },
            author: {
                type: 'keyword',
            },
            price: {
                type: 'float'
            }
        }
    };
      
    const fetchSql = (id = '') => {
        let sql = `select id,book_name,isbn,author,price from book`;
        if(id){
            sql = `${sql} where id='${id}'`;
        }
        return sql;
    };
      
    const updateByQueryBody = id => {
        return {
            query: {
                term: {
                    book_id: {
                        value: id
                    }
                }
            }
        };
    };
      
    module.exports = {
        name: 'book',
        init: async function(){
            await mesClient.deleteEsIndex(indexName);
      
            await mesClient.createEsIndex(indexName, indexMapping);
              
            await mesClient.bulkPutEsFromMysqlById(fetchSql(), 'id', indexName, 'id');
        },
        insert: async function(id){
            try{
                await mesClient.bulkPutEsFromMysql(fetchSql(id), 'id', indexName);
            }
            catch(e){
                throw e;
            }
        },
        update: async function(id){
            try{
                await mesClient.bulkPutEsFromMysql(fetchSql(id), 'id', indexName);
                  
                //book索引发生变化,必须重启policy来重建enrich索引
                await mesClient.executePolicy('book_lookup');
                  
                //触发borrow数据的更新
                await mesClient.esUpdateByQuery('borrow', 'borrow_pipeline', updateByQueryBody(id));
            }
            catch(e){
                throw e;
            }
        },
        delete: async function(id){
            try{
                await mesClient.deleteEsDoc(id, indexName);
            }
            catch(e){
                throw e;
            }
        }
    }

    borrow模块

    const MysqlSync = require('@quansitech/qs-mysql-sync-kafka');
      
    const MysqlEsSync = MysqlSync.MysqlEsSync;
      
    const indexName = 'borrow';
    const pipelineId = 'borrow_pipeline';
    const policyName = 'book_lookup';
      
    const opt = {
        mysqlHost: '127.0.0.1',
        mysqlPort: 3306,
        mysqlUser: 'db_user',
        mysqlPassword: '123456',
        mysqlDatabase: 'demo',
        esHost: '127.0.0.1',
        esPort: '9200'
    };
    const mesClient = MysqlEsSync.getInstance(opt);
      
    const indexMapping = {
        properties: {
            id: { type: "keyword" },
            reader_id: { type: "keyword" },
            book_id: { type: "keyword" },
            borrow_time: { type: "date" }
        }
    }
      
    const pipelineBody = {
        processors: [
            {
                enrich: {
                    policy_name: policyName,
                    field : "book_id",
                    target_field: "book_data"
                }
            }
        ]
    };
      
    const policyBody = {
        match: {
            indices: 'book',
            match_field: 'id',
            enrich_fields: [ 'isbn', 'book_name','author', 'price']
        }
    };
      
    const fetchSql = (id = '', bookId = '' ) => {
        let sql = `SELECT
        id, 
        reader_id, 
        book_id, 
        borrow_time
        FROM borrow where 1=1`;
        if(id){
            sql = `${sql} and id='${id}'`;
        }
        if(bookId){
            sql = `${sql} and book_id='${bookId}'`;
        }
        return sql;
    };
      
    module.exports = {
        name: 'borrow',
        init: async function(){
            await mesClient.deleteEsIndex(indexName);
      
            await mesClient.createEsIndex(indexName, indexMapping);
      
            await mesClient.deletePipeline(pipelineId);
      
            await mesClient.deletePolicy(policyName);
      
            await mesClient.createPolicy(policyName, policyBody);
      
            await mesClient.executePolicy(policyName);
      
            await mesClient.createPipeline(pipelineId, pipelineBody);
              
            await mesClient.bulkPutEsFromMysqlById(fetchSql(), 'id', indexName, 'id', pipelineId);
        },
        insert: async function(id){
            try{
                await mesClient.bulkPutEsFromMysql(fetchSql(id), 'id', indexName, pipelineId);
            }
            catch(e){
                throw e;
            }
        },
        update: async function(id){
            try{
                await mesClient.bulkPutEsFromMysql(fetchSql(id), 'id', indexName, pipelineId);
            }
            catch(e){
                throw e;
            }
        },
        delete: async function(id){
            try{
                await mesClient.deleteEsDoc(id, indexName);
            }
            catch(e){
                throw e;
            }
        }
    }

    book模块的update操作会触发enrich policy的重启,而这个操作很耗时。如果有成千上万的book数据更新,那肯定是处理不过来的。这种场景就可以使用延迟执行的特性,现在改造下book模块。

    .
    .
    .
    //改成可批量更新的模式
    const fetchSql = (id = '') => {
        let sql = `SELECT
        id, 
        reader_id, 
        book_id, 
        borrow_time
        FROM borrow where 1=1`;
        if(id && typeof id == 'string'){
            sql = `${sql} where id='${id}'`;
        }
        else if(id && typeof id == 'object'){
            sql = `${sql} where id in ('${id.join("','")}')`;
        }
        return sql;
    };
      
    //改成可批量更新的模式
    const updateByQueryBody = id => {
        if(id && typeof id == 'string'){
            return {
                query: {
                    term: {
                        book_id: {
                            value: id
                        }
                    }
                }
            };
        }
        else if(id && typeof id == 'object'){
            return {
                query: {
                    terms: {
                        book_id: id
                    }
                }
            };
        }
    };
      
    //用于收集需要延迟更新的id
    let updateIds = [];
    module.exports = {
    .
    .
    .
    	update: async function(id, startSync = false){
            try{
                //先不执行,仅仅收集id
                if(startSync == false){
                    if(!updateIds.includes(id)){
                        updateIds.push(id);
                    }
                    //返回true 告诉核心模块该方法需要延迟执行
                    return true;
                }
                else{ //startSync=true  所有消息收集完毕,开始执行
                    await mesClient.bulkPutEsFromMysql(fetchSql(updateIds), 'id', indexName, pipelineId);
      
                    await mesClient.executePolicy('book_lookup');
          
                    //es terms查询最多支持到65536的数据大小,需要切块执行更新
                    await utils.chunkRun(60000, updateIds, async chunkIds => {
                        await mesClient.esUpdateByQuery('borrow', 'borrow_pipeline', updateByQueryBody(chunkIds));
                    });
                      
                    updateIds = [];
                }
            }
            catch(e){
                throw e;
            }
              
        },
    .
    .
    .
    }

    经改造后, update操作仅在消息收集完的最后一刻执行一次,这样就大大提高了程序的效率和稳定性。

  • 索引的映射类型为object时的处理方法

    一般情况下索引映射是一维类型如下

    //一维的映射
    const indexMapping = {
        properties: {
            id: { type: "keyword" },
            reader_id: { type: "keyword" },
            book_id: { type: "keyword" },
            borrow_time: { type: "date" }
        }
    }
      
    //sql查询的字段名称和映射属性名对应即可完成导入
    const fetchSql = (id = '', bookId = '' ) => {
        let sql = `SELECT
        id, 
        reader_id, 
        book_id, 
        borrow_time
        FROM borrow where 1=1`;
        if(id){
            sql = `${sql} and id='${id}'`;
        }
        if(bookId){
            sql = `${sql} and book_id='${bookId}'`;
        }
        return sql;
    };

    但如果索引映射是二维类型(object)

    //二维的映射
    const indexMapping = {
        properties: {
            id: { type: "keyword" },
            reader_id: { type: "keyword" },
            book_id: { type: "keyword" },
            book:{
                properties: {
                    isbn: { type: "keyword" },
                    book_name: { type: "keyword" }
                }
            }
            borrow_time: { type: "date" }
        }
    }
      
    //查询sql的字段名称必须做特殊化的处理
    const fetchSql = (id = '', bookId = '' ) => {
        let sql = `SELECT
        b.id, 
        b.reader_id, 
        b.book_id, 
    	bk.book_name as "book.book_name",
        bk.isbn as "book.isbn",
        borrow_time
        FROM borrow b,book bk where bk.id=b.book_id`;
        if(id){
            sql = `${sql} and id='${id}'`;
        }
        if(bookId){
            sql = `${sql} and book_id='${bookId}'`;
        }
        return sql;
    };
      
    //上面的sql的book_name要写成book.book_name 以表示二维的关系,中间用 "." 来分隔
    //isbn同理