MySQL Binlog 增量同步工具go-mysql-transfer实现详解

wj596 · · 2688 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

一、 概述 ================= 工作需要研究了下阿里开源的MySQL Binlog增量订阅消费组件canal,其功能强大、运行稳定,但是有些方面不是太符合需求,主要有如下三点: 1、需要自己编写客户端来消费canal解析到的数据 2、server-client模式,需要同时部署server和client两个组件,我们的项目中有6个业务数据库要实时同步到redis,意味着要多部署12个组件,硬件和运维成本都会增加。 3、从server端到client端需要经过一次网络传输和序列化反序列化操作,然后再同步到接收端,感觉没有直接怼到接收端更高效。 go-mysql-transfer功能对标canal,使用go语言编写,规避了上述三点。旨在实现一个简洁高效、稳定可靠的Binlog增量同步工具, 具有如下特点: 1、不依赖其它组件,一键部署 2、集成多种接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ,不需要再编写客户端,开箱即用 3、内置丰富的数据解析、消息生成规则;支持Lua脚本,以处理更复杂的数据逻辑 4、支持监控告警,集成Prometheus客户端 5、高可用集群部署 6、数据同步失败重试 7、全量数据初始化 二、 与同类工具比较 ================= ![](https://static.studygolang.com/200904/e2d73ae65a114638d2f03cb306ac2f05.png) 三、 设计实现 ================= 1、实现原理 go-mysql-transfer将自己伪装成MySQL的Slave,向Master发送dump协议获取binlog,解析binlog并生成消息,实时发送给接收端。 ![go-mysql-transfer原理](https://upload-images.jianshu.io/upload_images/2897315-3b0fee8246d16250.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 2、数据转换规则 将从binlog解析出来的数据,经过简单的处理转换发送到接收端。使用内置丰富数数据转换规则,可完成大部分同步工作。 例如将表t_user同步到reids,配置如下规则: ``` rule: - schema: eseap #数据库名称 table: t_user #表名称 column_underscore_to_camel: true #列名称下划线转驼峰,默认为false datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ss value_encoder: json #值编码类型,支持json、kv-commas、v-commas redis_structure: string # redis数据类型。 支持string、hash、list、set类型(与redis的数据类型一致) redis_key_prefix: USER_ #key前缀 redis_key_column: USER_NAME #使用哪个列的值作为key,不填写默认使用主键 ``` t_user表,数据如下: ![](https://upload-images.jianshu.io/upload_images/2897315-4791c8c2db44e989.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 同步到Redis后,数据如下: ![](https://upload-images.jianshu.io/upload_images/2897315-9a29cbf41aecbfeb.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 更多规则配置和同步案例 请见后续的"使用说明"章节。 3、数据转换脚本 Lua 是一种轻量小巧的脚本语言, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。开发者只需要花费少量时间就能大致掌握Lua的语法,照虎画猫写出可用的脚本。 基于Lua的高扩展性,可以实现更为复杂的数据解析、消息生成逻辑,定制需要的数据格式。 使用方式: ``` rule: - schema: eseap table: t_user lua_file_path: lua/t_user_string.lua #lua脚本文件 ``` 示例脚本: ``` local json = require("json") -- 加载json模块 local ops = require("redisOps") -- 加载redis操作模块 local row = ops.rawRow() --当前变动的一行数据,table类型,key为列名称 local action = ops.rawAction() --当前数据库的操作事件,包括:insert、updare、delete local id = row["ID"] --获取ID列的值 local userName = row["USER_NAME"] --获取USER_NAME列的值 local key = "user_"..id -- 定义key if action == "delete" -- 删除事件 then ops.DEL(key) -- 删除KEY else local password = row["PASSWORD"] --获取USER_NAME列的值 local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值 local result= {} -- 定义结果 result["id"] = id result["userName"] = userName result["password"] = password result["createTime"] = createTime result["source"] = "binlog" -- 数据来源 local val = json.encode(result) -- 将result转为json ops.SET(key,val) -- 对应Redis的SET命令,第一个参数为key(string类型),第二个参数为value end ``` t_user表,数据如下: ![](https://upload-images.jianshu.io/upload_images/2897315-4791c8c2db44e989.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 同步到Redis后,数据如下: ![](https://upload-images.jianshu.io/upload_images/2897315-6bed01bb9e835eda.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 更多Lua脚本使用说明 和同步案例 请见后续的"使用说明"章节。 4、监控告警 Prometheus是流行开源监控报警系统和TSDB,其指标采集组件被称作exporter。go-mysql-transfer本身就是一个exporter。向Prometheus提供应用状态、接收端状态、insert数量、update数量、delete数量、delay延时等指标。 go-mysql-transfer内置Prometheus exporter可以监控系统的运行状况,并进行健康告警。 相关配置: ``` enable_exporter: true #启用prometheus exporter,默认false exporter_addr: 9595 #prometheus exporter端口,默认9595 ``` 直接访问127.0.0.1:9595可以看到导出的指标值,如何与Prometheus集成,请参见Prometheus相关教程。 指标说明: transfer_leader_state:当前节点是否为leader,0=否、1=是 transfer_destination_state:接收端状态, 0=掉线、1=正常 transfer_inserted_num:插入数据的数量 transfer_updated_num:修改数据的数量 transfer_deleted_num:删除数据的数量 transfer_delay:与MySQL Master的时延 5、高可用 可以选择依赖zookeeper或者etcdr构建高可用集群,一个集群中只存在一个leader节点,其余皆为follower节点。 只有leader节点响应binglog的dump事件,follower节点为蛰伏状态,不发送dump命令,因此多个follower也不会加重Master的负担。 当leader节点出现故障,follower节点迅速替补上去,实现秒级故障切换。 ![go-mysql-transfer高可用集群](https://upload-images.jianshu.io/upload_images/2897315-88c45f4acda0db1d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 相关配置: ``` cluster: # 集群配置 name: myTransfer #集群名称,具有相同name的节点放入同一个集群 # ZooKeeper地址,多个用逗号分隔 zk_addrs: 192.168.1.10:2181,192.168.1.11:2182,192.168.1.12:2183 #zk_authentication: 123456 #digest类型的访问秘钥,如:user:password,默认为空 #etcd_addrs: 192.168.1.10:2379 #etcd连接地址,多个用逗号分隔 #etcd_user: test #etcd用户名 #etcd_password: 123456 #etcd密码 ``` 6、失败重试 网络抖动、接收方故障都会导致数据同步失败,需要有重试机制,才能保证不漏掉数据,使得每一条数据都能送达。 通常有两种重试实现方式,一种方式是记录下故障时刻binglog的position(位移),等故障恢复后,从position处重新dump 数据,发送给接收端。 一种方式是将同步失败的数据在本地落盘,形成队列。当探测到接收端可用时,逐条预出列尝试发送,发送成功最终出列。确保不丢数据,队列先进先出的特性也可保证数据顺序性,正确性。 go-mysql-transfer采用的是后者,目的是减少发送dump命令的次数,减轻Master的负担。因为binglog记录的整个Master数据库的日志,其增长速度很快。如果只需要拿几条数据,而dump很多数据,有点得不偿失。 7、全量数据初始化 如果数据库原本存在无法通过binlog进行增量同步的数据,可以使用命令行工具-stock完成始化同步。 stock基于 SELECT * FROM {table}的方式分批查询出数据,根据规则或者Lua脚本生成指定格式的消息,批量发送到接收端。 执行命令 go-mysql-transfer -stoc,在控制台可以直观的看到数据同步状态,如下: ![全量数据初始化](https://upload-images.jianshu.io/upload_images/2897315-4a16c9934ba84c04.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 四、安装 ================= **二进制安装包** 直接下载编译好的安装包: [点击下载](https://github.com/wj596/go-mysql-transfer/releases) **源码编译** 1、依赖Golang 1.14 及以上版本 2、设置' GO111MODULE=on ' 3、拉取源码 ‘ go get -d github.com/wj596/go-mysql-transfer’ 3、进入目录,执行 ‘ go build ’ 编译 五、部署运行 ================= **开启MySQL的binlog** ``` #Linux在my.cnf文件 #Windows在my.ini文件 log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 go-mysql-transfer 的 slave_id 重复 ``` **命令行运行** 1、修改app.yml 2、Windows直接运行 go-mysql-transfer.exe 3、Linux执行 nohup go-mysql-transfer & **docker运行** 1、拉取源码 ‘ go get -d github.com/wj596/go-mysql-transfer’ 2、修改配置文件 ‘ app.yml ’ 中相关配置 3、构建镜像 ‘ docker image build -t go-mysql-transfer -f Dockerfile . ’ 4、运行 ‘ docker run -d --name go-mysql-transfer -p 9595:9595 go-mysql-transfer:latest ’ 六、使用说明 ================= 1、[同步到Redis操作说明](https://www.jianshu.com/p/c533659a1d83?_blank) 2、[同步到MongoDB操作说明](https://www.jianshu.com/p/51124c9371f9?_blank) 3、[同步到Elasticsearch操作说明](https://www.jianshu.com/p/5a9b6c4f318c?_blank) 4、[同步到RocketMQ操作说明](https://www.jianshu.com/p/18bb121bbf63?_blank) 5、[同步到Kafka操作说明](https://www.jianshu.com/p/aec8e4c28c06?_blank) 6、[同步到RabbitMQ操作说明](https://www.jianshu.com/p/ba5f1d3c75f2?_blank) 七、开源 ================= github:[go-mysql-transfer](https://github.com/wj596/go-mysql-transfer?_blank) 八、性能测试 ================= 1、测试环境 平台:虚拟机 CPU:E7-4890 4核8线程 内存:8G 硬盘:机械硬盘 OS:Windows Sever 2012 R2 MySQL: 5.5 Rides: 4.0.2 2、测试数据 t_user表,14个字段,1个字段包含中文,数据量527206条 3、测试配置 规则: ``` schema: eseap table: t_user order_by_column: id #排序字段,全量数据初始化时不能为空 #column_lower_case:false #列名称转为小写,默认为false #column_upper_case:false#列名称转为大写,默认为false column_underscore_to_camel: true #列名称下划线转驼峰,默认为false # 包含的列,多值逗号分隔,如:id,name,age,area_id 为空时表示包含全部列 #include_column: ID,USER_NAME,PASSWORD date_formatter: yyyy-MM-dd #date类型格式化, 不填写默认yyyy-MM-dd datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ss value_encoder: json #值编码,支持json、kv-commas、v-commas redis_structure: string # 数据类型。 支持string、hash、list、set类型(与redis的数据类型一直) redis_key_prefix: USER_ #key的前缀 redis_key_column: ID #使用哪个列的值作为key,不填写默认使用主键 ``` 脚本: ``` local json = require("json") -- 加载json模块 local ops = require("redisOps") -- 加载redis操作模块 local row = ops.rawRow() --当前变动的一行数据,table类型,key为列名称 local action = ops.rawAction() --当前数据库的操作事件,包括:insert、updare、delete local id = row["ID"] --获取ID列的值 local userName = row["USER_NAME"] --获取USER_NAME列的值 local key = "user_"..id -- 定义key if action == "delete" -- 删除事件 then ops.DEL(key) -- 删除KEY else local password = row["PASSWORD"] --获取USER_NAME列的值 local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值 local result= {} -- 定义结果 result["id"] = id result["userName"] = userName result["password"] = password result["createTime"] = createTime result["source"] = "binlog" -- 数据来源 local val = json.encode(result) -- 将result转为json ops.SET(key,val) -- 对应Redis的SET命令,第一个参数为key(string类型),第二个参数为value end ``` 3、测试用例一 使用规则,将52万条数据全量初始化同步到Redis,结果如下: ![](https://upload-images.jianshu.io/upload_images/2897315-8a6bfe3f75c5c7ad.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 3次运行的中间值为4.6秒 4、测试用例二 使用Lua脚本,将52万条数据全量初始化同步到Redis,结果如下: ![](https://upload-images.jianshu.io/upload_images/2897315-5a12b97f7887da9e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 3次运行的中间值为9.5秒 5、测试用例三 使用规则,将binlog中52万条增量数据同步到Redis。结果如下: ![](https://upload-images.jianshu.io/upload_images/2897315-c3f860e36409c516.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 每秒增量同步(TPS)32950条 6、测试用例四 使用Lua脚本,将binlog中52万条增量数据同步到Redis。结果如下: ![](https://upload-images.jianshu.io/upload_images/2897315-52c6ef411a8f4475.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 每秒增量同步(TPS)15819条 7、测试用例五 100个线程不停向MySQL写数据,使用规则将数据实时增量同步到Redis,TPS保持在4000以上,资源占用情况如下: ![](https://upload-images.jianshu.io/upload_images/2897315-4c00b412b6cf18d3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 100个线程不停向MySQL写数据,使用Lua脚本将数据实时增量同步到Redis,TPS保持在2000以上,资源占用情况如下: ![](https://upload-images.jianshu.io/upload_images/2897315-3aed462011d0f63a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 以上测试结果,会随着测试环境的不同而改变,仅作为参考。

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2688 次点击  ∙  1 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传