曾经实现了一个用java的简易数据同步(https://www.jianshu.com/p/e1fd181e0b42),这几天用golang重写了一下。以下的示例,是把goods,user库的一些表,同步到stats中,用于聚合分析。
多数据源配置
spring:
datasource:
stats:
url:
maxIdleConns: 10
maxOpenConns: 10
connMaxLifetime: 1800000
goods:
url:
maxIdleConns: 10
maxOpenConns: 10
connMaxLifetime: 1800000
user:
url:
maxIdleConns: 10
maxOpenConns: 10
connMaxLifetime: 1800000
同步表信息
package model
type SyncTable struct {
Database string
Table string
}
表列信息
package model
type TableColumn struct {
ColumnName string
}
同步结果
package model
type TableSyncResult struct {
Database string `json:"database"`
Table string `json:"table"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
Total int64 `json:"total"`
Limit int `json:"limit"`
SrcUpdateTime string `json:"srcUpdateTime"`
DestUpdateTime string `json:"destUpdateTime"`
}
同步服务
package service
import (
"fmt"
"sort"
"strings"
"time"
"xorm.io/xorm"
"model"
"utility"
)
type TableSyncService interface {
SyncTables() ([]model.SyncTable, error)
Sync(database string, table string, limit int) (model.TableSyncResult, error)
}
func NewTableSyncService(engines map[string]*xorm.Engine) TableSyncService {
return &tableSyncService{
engines: engines,
}
}
type tableSyncService struct {
engines map[string]*xorm.Engine
}
func (s *tableSyncService) SyncTables() ([]model.SyncTable, error) {
databases, err1 := s.databases()
if err1 != nil {
return nil, err1
}
tables, err2 := s.statsTables()
if err2 != nil {
return nil, err2
}
ms := make([]model.SyncTable, 0, len(tables))
for _, table := range tables {
for k, database := range databases {
prefix := fmt.Sprintf("%s_", k)
if strings.HasPrefix(table, prefix) {
tb := table[len(prefix):]
if exist, err := s.tableExist(k, database, tb); err != nil {
return nil, err
} else if exist {
m := &model.SyncTable{Database: k, Table: tb}
ms = append(ms, *m)
}
}
}
}
return ms, nil
}
func (s *tableSyncService) databases() (map[string]string, error) {
databases := make(map[string]string)
for k, engine := range s.engines {
database := ""
if _, err := engine.SQL("SELECT DATABASE()").Get(&database); err != nil {
return nil, err
}
if database != "stats" {
databases[k] = database
}
}
return databases, nil
}
func (s *tableSyncService) statsTables() ([]string, error) {
tables := make([]string, 0)
if err := s.engines["stats"].SQL("SHOW TABLES").Find(&tables); err != nil {
return nil, err
}
sort.Strings(tables)
return tables, nil
}
func (s *tableSyncService) tableExist(k string, database, table string) (bool, error) {
return s.engines[k].SQL("SELECT * FROM information_schema.TABLES WHERE TABLE_SCHEMA=? AND TABLE_NAME=?", database, table).Exist()
}
func (s *tableSyncService) Sync(database string, table string, limit int) (model.TableSyncResult, error) {
result := &model.TableSyncResult{StartTime: time.Now().Format(utility.DATE_TIME_PATTERN), Database: database, Table: table, Limit: limit}
namedParameterJdbcTemplate := s.engines[database]
destUpdateTime, err := s.getDestUpdateTime(database, table)
if err != nil {
return *result, err
}
result.DestUpdateTime = destUpdateTime
srcUpdateTime, err1 := s.getSrcUpdateTime(namedParameterJdbcTemplate, table)
if err1 != nil {
return *result, err1
}
result.SrcUpdateTime = srcUpdateTime
total, err2 := s.getTotal(namedParameterJdbcTemplate, table, srcUpdateTime, destUpdateTime)
if err2 != nil {
return *result, err2
}
result.Total = total
if total > 0 {
schema, err3 := s.getTableSchema(namedParameterJdbcTemplate, table)
if err3 != nil {
return *result, err3
}
sb := new(strings.Builder)
sb.WriteString("INSERT INTO ")
sb.WriteString(database)
sb.WriteString("_")
sb.WriteString(table)
sb.WriteString("(")
for i := 0; i < len(schema); i++ {
field := schema[i].ColumnName
sb.WriteString(field)
if i < len(schema)-1 {
sb.WriteString(",")
}
}
sb.WriteString(") VALUES(")
for i := 0; i < len(schema); i++ {
sb.WriteString("?")
if i < len(schema)-1 {
sb.WriteString(",")
}
}
sb.WriteString(") ON DUPLICATE KEY UPDATE ")
f := make([]model.TableColumn, 0, len(schema)-1)
for _, c := range schema {
if c.ColumnName != "id" {
f = append(f, c)
}
}
for i := 0; i < len(f); i++ {
field := f[i].ColumnName
sb.WriteString(field)
sb.WriteString("=?")
if i < len(f)-1 {
sb.WriteString(",")
}
}
for start := int64(0); start < total; {
if _, err := s.getData(namedParameterJdbcTemplate, table, schema, srcUpdateTime, destUpdateTime, start, limit, sb.String()); err != nil {
return *result, err
}
start += int64(limit)
}
}
result.EndTime = time.Now().Format(utility.DATE_TIME_PATTERN)
return *result, nil
}
func (s *tableSyncService) getDestUpdateTime(database string, table string) (string, error) {
updateTime := ""
_, err := s.engines["stats"].SQL(fmt.Sprintf("SELECT update_time FROM %s_%s ORDER BY update_time DESC LIMIT 1", database, table)).Get(&updateTime)
return updateTime, err
}
func (s *tableSyncService) getSrcUpdateTime(engine *xorm.Engine, table string) (string, error) {
updateTime := ""
_, err := engine.SQL(fmt.Sprintf("SELECT update_time FROM %s ORDER BY update_time DESC LIMIT 1", table)).Get(&updateTime)
return updateTime, err
}
func (s *tableSyncService) getTotal(engine *xorm.Engine, table string, srcUpdateTime string, destUpdateTime string) (int64, error) {
sb := new(strings.Builder)
params := make([]interface{}, 0, 2)
sb.WriteString("SELECT COUNT(id) AS count FROM ")
sb.WriteString(table)
if destUpdateTime != "" {
sb.WriteString(" WHERE update_time >=? AND update_time <=?")
params = append(params, destUpdateTime, srcUpdateTime)
}
total := int64(0)
_, err := engine.SQL(sb.String(), params...).Get(&total)
return total, err
}
func (s *tableSyncService) getData(
engine *xorm.Engine,
table string,
schema []model.TableColumn,
srcUpdateTime string,
destUpdateTime string,
start int64, limit int,
updateSql string) (bool, error) {
sb := new(strings.Builder)
sb.WriteString("SELECT * FROM ")
sb.WriteString(table)
sb.WriteString(" WHERE update_time >=? AND update_time <=?")
sb.WriteString(" ORDER BY update_time")
sb.WriteString(" LIMIT ?,?")
data, err := engine.QueryInterface(sb.String(), destUpdateTime, srcUpdateTime, start, limit)
if err != nil {
return false, err
}
if len(data) > 0 {
for _, d := range data {
params := make([]interface{}, 0, len(schema)*2)
params = append(params, updateSql)
for _, c := range schema {
params = append(params, d[c.ColumnName])
}
for _, c := range schema {
if c.ColumnName != "id" {
params = append(params, d[c.ColumnName])
}
}
if _, err := s.engines["stats"].Exec(params...); err != nil {
return false, err
}
}
}
return true, nil
}
func (s *tableSyncService) getTableSchema(engine *xorm.Engine, table string) ([]model.TableColumn, error) {
db := ""
if _, err := engine.SQL("SELECT DATABASE()").Get(&db); err != nil {
return nil, err
}
ms := make([]model.TableColumn, 0)
err := engine.SQL("SELECT * FROM information_schema.columns WHERE table_schema =? AND table_name =? ORDER BY ordinal_position ASC", db, table).Find(&ms)
return ms, err
}
主函数
package main
import (
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/kataras/iris/v12/mvc"
"core"
"model"
"service"
)
var (
springApplication *core.SpringApplication
)
var (
tableSyncService service.TableSyncService
)
func main() {
springApplication, _ = core.NewSpringApplication()
initService()
initMvc(springApplication.MvcApplication())
springApplication.Run()
}
func initService() {
orms := springApplication.OrmEngines()
if orm, ok := orms["stats"]; ok {
goodsService = service.NewGoodsService(orm)
userService = service.NewUserService(orm)
}
tableSyncService = service.NewTableSyncService(orms)
if syncTables, err := tableSyncService.SyncTables(); err != nil {
springApplication.Application().Logger().Error(err)
} else {
for _, syncTable := range syncTables {
go func(syncTable model.SyncTable) {
for {
if syncResult, err := tableSyncService.Sync(syncTable.Database, syncTable.Table, 100); err != nil {
springApplication.Application().Logger().Error(syncResult, err)
}
time.Sleep(time.Minute * 5)
}
}(syncTable)
}
}
}
有疑问加站长微信联系(非本文作者)