使用golang多线程批量更新数据数据。
共4个文件
main.go
package main import ( "bufio" "fmt" _ "github.com/go-sql-driver/mysql" "os" . "pt" ) func main() { args := os.Args if len(args) == 2 { switch args[1] { case "1": Cu.Run() case "2": //平台负债表数据静态化 Fz.Run() case "0": os.Exit(0) default: } } if len(args) == 1 { for { fmt.Println("操作目录: ") fmt.Println("1、平台有效客户更新(202:13306-platform)。 ") fmt.Println("2、平台负债数据静态化(202:13306-platform)。") fmt.Println("0、退出。 ") inputReader := bufio.NewReader(os.Stdin) command, _, _ := inputReader.ReadLine() code := string(command) switch code { case "1": Cu.Run() case "2": //平台负债表数据静态化 Fz.Run() case "0": os.Exit(0) default: fmt.Println("default") } fmt.Println("-------处理完成-------") } } }
pt/lib.go
package pt import ( "database/sql" _ "github.com/go-sql-driver/mysql" "log" "os" "time" ) const ( DSN string = "root:mysqladmin56@tcp(192.168.0.202:13306)/platform?charset=utf8" //DSN string = "root:@tcp(127.0.0.1:3306)/db_name?charset=utf8" ) /** * 数据库连接 */ func Mydb() *sql.DB { db, err := sql.Open("mysql", DSN) if err != nil { log.Fatalf("Open database error: %s\n", err) } //defer db.Close() //不关闭连接 err = db.Ping() if err != nil { log.Fatal(err) } return db } /** * 升级日志写入 文件追加参数奇葩 多 要3个 * @param {[type]} log string [description] * @return {[type]} [description] */ func writeResult(tag string, data string) { str_time := time.Now().Format("2006_01_02") filename := tag + "_" + str_time + ".log" fl, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644) if err != nil { log.Println(err) } defer fl.Close() fl.WriteString(data) fl.WriteString("\n") }
pt/fz.go
// 负债数据静态化封装 package pt import ( "database/sql" "fmt" "log" "time" ) //门店负债数据 type FzData struct { mdid int xjye float32 zsye float32 lcye float32 month string } type FzDataMap map[int]FzData type FzClass struct { list FzDataMap } //list 使用前需要被初始化 所以直接在new时初始化 以下2种初始化方式都可 func NewFz() *FzClass { fz := &FzClass{list: make(FzDataMap)} //fz := new(FzClass) //fz.list = make(FzDataMap) return fz } func (fz *FzClass) Show() *FzClass { for i, value := range fz.list { fmt.Println(i) fmt.Println(value) } return fz } //暂无卵用 方便大数据扩展 func (fz *FzClass) Add(row FzData) *FzClass { fz.list[row.mdid] = row return fz } //门店数据静态化 入库 func (fz *FzClass) toDb(db *sql.DB, row FzData) { var num int one, err := db.Query("SELECT COUNT(*) AS num FROM `static_fz` WHERE `md_id` = ? AND `month` = ? ", row.mdid, row.month) if err != nil { log.Println(err) } defer one.Close() for one.Next() { err := one.Scan(&num) if err != nil { log.Fatal(err) } } if num > 0 { //存在更新 stmt, _ := db.Prepare("UPDATE `static_fz` SET `xjye`=?, `zsye`=?, `lcye`=? WHERE `md_id` = ? AND `month` = ? ") defer stmt.Close() stmt.Exec(row.xjye, row.zsye, row.lcye, row.mdid, row.month) } else { //不存在插入 stmt, _ := db.Prepare("INSERT INTO `static_fz` (`md_id`, `xjye`, `zsye`, `lcye`,`month`) VALUES (?,?,?,?,?)") defer stmt.Close() stmt.Exec(row.mdid, row.xjye, row.zsye, row.lcye, row.month) } } /** * 某一公司负债处理 * @param {[type]} c chan int [日志管道] * @param {[type]} comp_id int [公司ID] * @param {[type]} fz_month int [负债月份] */ func (fz *FzClass) oneComp(c chan string, comp_id int, fz_month string) { db := Mydb() defer db.Close() sql := "SELECT ed.`id`, SUM(IF(cc.type = 1, t.balance, 0)) AS xjye, SUM(IF(cc.type = 2, t.balance, 0)) AS zsye, IFNULL(tt.`lcye`,0) AS lcye FROM `customer_capital` `t` LEFT JOIN company_capital cc ON cc.id = t.capital_id LEFT JOIN customer_info ci ON ci.id = t.cu_id LEFT JOIN employ_dept ed ON ed.id = ci.store_id LEFT JOIN (SELECT ed.`id`,SUM(TRUNCATE(osd.pay_price / num * t.re_num, 1)) AS lcye FROM `customer_re_project` `t` LEFT JOIN customer_info ci ON ci.id = t.cu_id LEFT JOIN employ_dept ed ON ed.id = ci.store_id LEFT JOIN order_sale_detail osd ON osd.id = t.detail_id WHERE (ed.comp_id = ?) GROUP BY ed.id) tt ON tt.id = ed.id WHERE (ed.comp_id = ?) GROUP BY ed.id" rows, err := db.Query(sql, comp_id, comp_id) if err != nil { log.Println(err) } defer rows.Close() var rowData FzData rowData.month = fz_month for rows.Next() { err := rows.Scan(&rowData.mdid, &rowData.xjye, &rowData.zsye, &rowData.lcye) if err != nil { log.Fatal(err) } //fz.Add(rowData) fz.toDb(db, rowData) //返回管道信息写入 c <- "公司:" + fmt.Sprintf("%d", comp_id) + "->门店:" + fmt.Sprintf("%d", rowData.mdid) + "(处理完成)" } err = rows.Err() if err != nil { log.Fatal(err) } close(c) } func (fz *FzClass) Run() { db := Mydb() defer db.Close() sql := "SELECT id FROM `company_info` WHERE `status` = 1" rows, err := db.Query(sql) if err != nil { log.Println(err) } defer rows.Close() chs := make([]chan string, 0) //开多个管道接受消息 fz_month := time.Now().Format("200601") var i int = 0 var id int for rows.Next() { c := make(chan string) chs = append(chs, c) err := rows.Scan(&id) if err != nil { log.Fatal(err) } go fz.oneComp(c, id, fz_month) i = i + 1 } err = rows.Err() if err != nil { log.Fatal(err) } for _, ch := range chs { //多管道写法 for { x, ok := <-ch if ok == false { break } writeResult("fz", x) fmt.Println(x) //消息回收处理 可扩展写入文件日志 } } } var Fz *FzClass func init() { Fz = NewFz() }
pt/custatus.go
// 客户属性自动更新封装 // 需要公司开启自动更新并配置客户过期周期时间 package pt import ( "fmt" "log" "strconv" "strings" "time" ) type CuStatusClass struct { } func NewCu() *CuStatusClass { obj := new(CuStatusClass) return obj } /** *判断客户状态 **/ func (obj *CuStatusClass) getState(orders int, practs int, state int) int { if orders > 0 || practs > 0 { return 1 //最近有订单OR有实操 为有效客户 } if state == 3 { return 3 //死档客户 } if state == -1 { return -1 //无效客户 } //默认返回为久党客户 return 2 } /* * 获取公司关于有效客户的配置天数 默认30天 */ func (obj *CuStatusClass) getConfig(str string) int { var num int n := strings.Index(str, "member_config") if n == -1 { num = 30 } else { start := n + 20 end := n + 22 num2 := string([]byte(str)[start:end]) num, _ = strconv.Atoi(num2) } return num } /** * 更新一个公司的客户状态 (PT) 考虑新建数据库连接 提高效率 * @param {[type]} db *sql.DB [description] * @param {[type]} c chan int [description] * @param {[type]} comp_id int [公司ID] * @param {[type]} num int [有效期天数] * @return {[type]} [description] */ func (obj *CuStatusClass) updateOneComp(c chan string, comp_id int, num int) { db := Mydb() defer db.Close() end := time.Now().Unix() start := end - 3600*24*int64(num) //前推num天 sql := "SELECT a.id,a.name,a.status,(SELECT COUNT(*) FROM `order_sale` WHERE `cu_id` = a.id AND `pay_time` > ? AND `pay_time` < ? AND `type` IN (1,2)) AS orders, (SELECT COUNT(*) FROM `practice_order` WHERE `cu_id` = a.id AND `pay_time` > ? AND `pay_time` < ?) AS practs FROM `customer_info` AS a LEFT JOIN `config_membership` AS m ON a.membership_id = m.id WHERE m.`is_member` = 1 AND a.`company_id` = ?" rows, err := db.Query(sql, start, end, start, end, comp_id) if err != nil { log.Println(err) } defer rows.Close() var id int var orders int var practs int var name string var status int for rows.Next() { err := rows.Scan(&id, &name, &status, &orders, &practs) if err != nil { log.Fatal(err) } new_status := obj.getState(orders, practs, status) if status != new_status { stmt, err := db.Prepare("UPDATE `customer_info` SET `status`=? WHERE `id`=?") defer stmt.Close() if err != nil { log.Println(err) return } stmt.Exec(new_status, id) //返回管道信息写入 c <- fmt.Sprintf("%d", comp_id) + ":" + fmt.Sprintf("%d", id) + " " + name + " " + fmt.Sprintf("%d", status) + "->" + fmt.Sprintf("%d", new_status) } } err = rows.Err() if err != nil { log.Fatal(err) } close(c) } /** * 多公司并发处理 (PT) * @param {[type]} db *sql.DB [description] * @return {[type]} [description] */ func (obj *CuStatusClass) Run() { db := Mydb() defer db.Close() sql := "SELECT id , auto_cu_status, config FROM `company_info` WHERE `status` = 1" rows, err := db.Query(sql) if err != nil { log.Println(err) } defer rows.Close() chs := make([]chan string, 0) //开多个管道接受消息 var id int var auto int var config string for rows.Next() { err := rows.Scan(&id, &auto, &config) if err != nil { log.Fatal(err) } if auto == 1 { num := obj.getConfig(config) //客户有效期设置 c := make(chan string) chs = append(chs, c) go obj.updateOneComp(c, id, num) } } err = rows.Err() if err != nil { log.Fatal(err) } for _, ch := range chs { //多管道写法 for { x, ok := <-ch if ok == false { break } writeResult("cu_status", x) fmt.Println(x) //消息回收处理 可扩展写入文件日志 } } } var Cu *CuStatusClass func init() { Cu = NewCu() }
有疑问加站长微信联系(非本文作者)