主要用于mysql数据同步,与可以跳转到某个事务开始读取。
配置mysql支持GTID同步
server-id=12
binlog_format = ROW
log_bin=D:/dev_tool/mysql-5.7.22-winx64/log_bin/binlog-bin
log_bin_index=D:/dev_tool/mysql-5.7.22-winx64/log_bin/binlog
secure-file-priv=D:/backup
gtid-mode = ON
enforce_gtid_consistency = 1
log-slave-updates= 1
开启binlog模式row,以及开启gitid
示例
使用github.com/siddontang/go-mysql库进行监听binlog日志
package main
import (
"github.com/satori/go.uuid"
"github.com/siddontang/go-mysql/canal"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"log"
)
type MyEventHandler struct {
Start bool
}
func (h *MyEventHandler) OnRotate(roateEvent *replication.RotateEvent) error {
log.Println("OnRotate")
log.Printf("%d", roateEvent.Position)
h.Start = true
return nil
}
// OnTableChanged is called when the table is created, altered, renamed or dropped.
// You need to clear the associated data like cache with the table.
// It will be called before OnDDL.
func (h *MyEventHandler) OnTableChanged(schema string, table string) error {
log.Println("OnTableChanged")
return nil
}
func (h *MyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
log.Println("OnDDL")
return nil
}
func (h *MyEventHandler) OnXID(nextPos mysql.Position) error {
log.Println("OnXID")
return nil
}
func (h *MyEventHandler) OnGTID(gtid mysql.GTIDSet) error {
log.Println("OnGTID")
log.Println(gtid.String())
return nil
}
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
func (h *MyEventHandler) OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error {
log.Println("OnPosSynced")
log.Printf("%s %d", pos.Name, pos.Pos)
return nil
}
func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
log.Printf("%s %v\n", e.Action, e.Rows)
for _, v := range e.Table.Columns {
log.Printf("%s\n", v.Name)
}
return nil
}
func (h *MyEventHandler) String() string {
return "MyEventHandler"
}
func main() {
cfg := canal.NewDefaultConfig()
cfg.Addr = "127.0.0.1:3306"
cfg.User = "root"
cfg.Password = "123456"
// We only care table canal_test in test db
cfg.Dump.TableDB = "test"
cfg.Dump.Tables = []string{"t_user"}
c, _ := canal.NewCanal(cfg)
uuid := uuid.Must(uuid.FromString("a615ea0a-6fb4-11e8-a87e-509a4c0ef59e"))
it := mysql.Interval{}
//默认基本上是1
it.Start = 1
it.Stop = 9
set := mysql.NewUUIDSet(uuid, it)
mset := &mysql.MysqlGTIDSet{}
//key测试可以随意
m := map[string]*mysql.UUIDSet{"12aaa": set}
mset.Sets = m
// Register a handler to handle RowsEvent
c.SetEventHandler(&MyEventHandler{})
c.StartFromGTID(mset)
// Start canal
c.Run()
}
uuid的查询可以通过mysql命令
SHOW GLOBAL VARIABLES LIKE 'server_uuid';
stop的位置可以使用mysql命令
SHOW GLOBAL VARIABLES LIKE '%gtid%';
读取binlog可以用改变stop的值来读取从什么事件开始读取,默认这块配置id是自增1的,所以下一条也就是加1
有疑问加站长微信联系(非本文作者)