golang版本的简易数据同步

EasyNetCN · · 452 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

曾经实现了一个用java的简易数据同步(https://www.jianshu.com/p/e1fd181e0b42),这几天用golang重写了一下。以下的示例,是把goods,user库的一些表,同步到stats中,用于聚合分析。

多数据源配置

spring:
  datasource:
    stats:
      url: 
      maxIdleConns: 10
      maxOpenConns: 10
      connMaxLifetime: 1800000
    goods:
      url: 
      maxIdleConns: 10
      maxOpenConns: 10
      connMaxLifetime: 1800000
    user:
      url: 
      maxIdleConns: 10
      maxOpenConns: 10
      connMaxLifetime: 1800000

同步表信息

package model

type SyncTable struct {
    Database string
    Table    string
}

表列信息

package model

type TableColumn struct {
    ColumnName string
}

同步结果

package model

type TableSyncResult struct {
    Database       string `json:"database"`
    Table          string `json:"table"`
    StartTime      string `json:"startTime"`
    EndTime        string `json:"endTime"`
    Total          int64  `json:"total"`
    Limit          int    `json:"limit"`
    SrcUpdateTime  string `json:"srcUpdateTime"`
    DestUpdateTime string `json:"destUpdateTime"`
}

同步服务

package service

import (
    "fmt"
    "sort"
    "strings"
    "time"

    "xorm.io/xorm"
    "model"
    "utility"
)

type TableSyncService interface {
    SyncTables() ([]model.SyncTable, error)
    Sync(database string, table string, limit int) (model.TableSyncResult, error)
}

func NewTableSyncService(engines map[string]*xorm.Engine) TableSyncService {
    return &tableSyncService{
        engines: engines,
    }
}

type tableSyncService struct {
    engines map[string]*xorm.Engine
}

func (s *tableSyncService) SyncTables() ([]model.SyncTable, error) {
    databases, err1 := s.databases()

    if err1 != nil {
        return nil, err1
    }

    tables, err2 := s.statsTables()

    if err2 != nil {
        return nil, err2
    }

    ms := make([]model.SyncTable, 0, len(tables))

    for _, table := range tables {
        for k, database := range databases {
            prefix := fmt.Sprintf("%s_", k)

            if strings.HasPrefix(table, prefix) {
                tb := table[len(prefix):]

                if exist, err := s.tableExist(k, database, tb); err != nil {
                    return nil, err
                } else if exist {
                    m := &model.SyncTable{Database: k, Table: tb}

                    ms = append(ms, *m)
                }
            }
        }
    }

    return ms, nil
}

func (s *tableSyncService) databases() (map[string]string, error) {
    databases := make(map[string]string)

    for k, engine := range s.engines {
        database := ""

        if _, err := engine.SQL("SELECT DATABASE()").Get(&database); err != nil {
            return nil, err
        }

        if database != "stats" {
            databases[k] = database
        }
    }

    return databases, nil
}

func (s *tableSyncService) statsTables() ([]string, error) {
    tables := make([]string, 0)

    if err := s.engines["stats"].SQL("SHOW TABLES").Find(&tables); err != nil {
        return nil, err
    }

    sort.Strings(tables)

    return tables, nil
}

func (s *tableSyncService) tableExist(k string, database, table string) (bool, error) {
    return s.engines[k].SQL("SELECT * FROM information_schema.TABLES WHERE TABLE_SCHEMA=? AND TABLE_NAME=?", database, table).Exist()
}

func (s *tableSyncService) Sync(database string, table string, limit int) (model.TableSyncResult, error) {
    result := &model.TableSyncResult{StartTime: time.Now().Format(utility.DATE_TIME_PATTERN), Database: database, Table: table, Limit: limit}

    namedParameterJdbcTemplate := s.engines[database]
    destUpdateTime, err := s.getDestUpdateTime(database, table)

    if err != nil {
        return *result, err
    }

    result.DestUpdateTime = destUpdateTime

    srcUpdateTime, err1 := s.getSrcUpdateTime(namedParameterJdbcTemplate, table)

    if err1 != nil {
        return *result, err1
    }

    result.SrcUpdateTime = srcUpdateTime

    total, err2 := s.getTotal(namedParameterJdbcTemplate, table, srcUpdateTime, destUpdateTime)

    if err2 != nil {
        return *result, err2
    }

    result.Total = total

    if total > 0 {
        schema, err3 := s.getTableSchema(namedParameterJdbcTemplate, table)

        if err3 != nil {
            return *result, err3
        }

        sb := new(strings.Builder)

        sb.WriteString("INSERT INTO ")
        sb.WriteString(database)
        sb.WriteString("_")
        sb.WriteString(table)
        sb.WriteString("(")

        for i := 0; i < len(schema); i++ {
            field := schema[i].ColumnName

            sb.WriteString(field)

            if i < len(schema)-1 {
                sb.WriteString(",")
            }
        }

        sb.WriteString(") VALUES(")

        for i := 0; i < len(schema); i++ {
            sb.WriteString("?")

            if i < len(schema)-1 {
                sb.WriteString(",")
            }
        }

        sb.WriteString(") ON DUPLICATE KEY UPDATE ")

        f := make([]model.TableColumn, 0, len(schema)-1)

        for _, c := range schema {
            if c.ColumnName != "id" {
                f = append(f, c)
            }
        }

        for i := 0; i < len(f); i++ {
            field := f[i].ColumnName

            sb.WriteString(field)
            sb.WriteString("=?")

            if i < len(f)-1 {
                sb.WriteString(",")
            }
        }

        for start := int64(0); start < total; {
            if _, err := s.getData(namedParameterJdbcTemplate, table, schema, srcUpdateTime, destUpdateTime, start, limit, sb.String()); err != nil {
                return *result, err
            }

            start += int64(limit)
        }
    }

    result.EndTime = time.Now().Format(utility.DATE_TIME_PATTERN)

    return *result, nil
}

func (s *tableSyncService) getDestUpdateTime(database string, table string) (string, error) {
    updateTime := ""

    _, err := s.engines["stats"].SQL(fmt.Sprintf("SELECT update_time FROM %s_%s ORDER BY update_time DESC LIMIT 1", database, table)).Get(&updateTime)

    return updateTime, err
}

func (s *tableSyncService) getSrcUpdateTime(engine *xorm.Engine, table string) (string, error) {
    updateTime := ""

    _, err := engine.SQL(fmt.Sprintf("SELECT update_time FROM %s ORDER BY update_time DESC LIMIT 1", table)).Get(&updateTime)

    return updateTime, err
}

func (s *tableSyncService) getTotal(engine *xorm.Engine, table string, srcUpdateTime string, destUpdateTime string) (int64, error) {
    sb := new(strings.Builder)
    params := make([]interface{}, 0, 2)

    sb.WriteString("SELECT COUNT(id) AS count FROM ")
    sb.WriteString(table)

    if destUpdateTime != "" {
        sb.WriteString(" WHERE update_time >=? AND update_time <=?")

        params = append(params, destUpdateTime, srcUpdateTime)
    }

    total := int64(0)

    _, err := engine.SQL(sb.String(), params...).Get(&total)

    return total, err
}

func (s *tableSyncService) getData(
    engine *xorm.Engine,
    table string,
    schema []model.TableColumn,
    srcUpdateTime string,
    destUpdateTime string,
    start int64, limit int,
    updateSql string) (bool, error) {
    sb := new(strings.Builder)

    sb.WriteString("SELECT * FROM ")
    sb.WriteString(table)
    sb.WriteString(" WHERE update_time >=? AND update_time <=?")
    sb.WriteString(" ORDER BY update_time")
    sb.WriteString(" LIMIT ?,?")

    data, err := engine.QueryInterface(sb.String(), destUpdateTime, srcUpdateTime, start, limit)

    if err != nil {
        return false, err
    }

    if len(data) > 0 {
        for _, d := range data {
            params := make([]interface{}, 0, len(schema)*2)

            params = append(params, updateSql)

            for _, c := range schema {
                params = append(params, d[c.ColumnName])
            }

            for _, c := range schema {
                if c.ColumnName != "id" {
                    params = append(params, d[c.ColumnName])
                }
            }

            if _, err := s.engines["stats"].Exec(params...); err != nil {
                return false, err
            }
        }
    }

    return true, nil
}

func (s *tableSyncService) getTableSchema(engine *xorm.Engine, table string) ([]model.TableColumn, error) {
    db := ""

    if _, err := engine.SQL("SELECT DATABASE()").Get(&db); err != nil {
        return nil, err
    }

    ms := make([]model.TableColumn, 0)

    err := engine.SQL("SELECT * FROM information_schema.columns WHERE table_schema =? AND table_name =? ORDER BY ordinal_position ASC", db, table).Find(&ms)

    return ms, err
}

主函数

package main

import (
    "time"

    _ "github.com/go-sql-driver/mysql"
    "github.com/kataras/iris/v12/mvc"

    "core"
    "model"
    "service"
)

var (
    springApplication *core.SpringApplication
)

var (
    tableSyncService service.TableSyncService
)

func main() {
    springApplication, _ = core.NewSpringApplication()

    initService()
    initMvc(springApplication.MvcApplication())

    springApplication.Run()
}

func initService() {
    orms := springApplication.OrmEngines()

    if orm, ok := orms["stats"]; ok {
        goodsService = service.NewGoodsService(orm)
        userService = service.NewUserService(orm)
    }

    tableSyncService = service.NewTableSyncService(orms)

    if syncTables, err := tableSyncService.SyncTables(); err != nil {
        springApplication.Application().Logger().Error(err)
    } else {
        for _, syncTable := range syncTables {
            go func(syncTable model.SyncTable) {
                for {
                    if syncResult, err := tableSyncService.Sync(syncTable.Database, syncTable.Table, 100); err != nil {
                        springApplication.Application().Logger().Error(syncResult, err)
                    }

                    time.Sleep(time.Minute * 5)
                }
            }(syncTable)
        }
    }
}

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:EasyNetCN

查看原文:golang版本的简易数据同步

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

452 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传