golang 数据库批量更新工具备份

ivy19860929 · · 4171 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

使用golang多线程批量更新数据数据。


共4个文件

main.go

package main

import (
	"bufio"
	"fmt"
	_ "github.com/go-sql-driver/mysql"
	"os"
	. "pt"
)

func main() {
	args := os.Args
	if len(args) == 2 {
		switch args[1] {
		case "1":
			Cu.Run()
		case "2":
			//平台负债表数据静态化
			Fz.Run()
		case "0":
			os.Exit(0)
		default:
		}
	}
	if len(args) == 1 {
		for {
			fmt.Println("操作目录: ")
			fmt.Println("1、平台有效客户更新(202:13306-platform)。 ")
			fmt.Println("2、平台负债数据静态化(202:13306-platform)。")
			fmt.Println("0、退出。 ")
			inputReader := bufio.NewReader(os.Stdin)
			command, _, _ := inputReader.ReadLine()
			code := string(command)
			switch code {
			case "1":
				Cu.Run()
			case "2":
				//平台负债表数据静态化
				Fz.Run()
			case "0":
				os.Exit(0)
			default:
				fmt.Println("default")
			}
			fmt.Println("-------处理完成-------")
		}
	}

}


pt/lib.go      

package pt

import (
	"database/sql"
	_ "github.com/go-sql-driver/mysql"
	"log"
	"os"
	"time"
)

const (
	DSN string = "root:mysqladmin56@tcp(192.168.0.202:13306)/platform?charset=utf8"
	//DSN string = "root:@tcp(127.0.0.1:3306)/db_name?charset=utf8"
)

/**
 * 数据库连接
 */
func Mydb() *sql.DB {
	db, err := sql.Open("mysql", DSN)
	if err != nil {
		log.Fatalf("Open database error: %s\n", err)
	}
	//defer db.Close() //不关闭连接
	err = db.Ping()
	if err != nil {
		log.Fatal(err)
	}
	return db
}

/**
 * 升级日志写入  文件追加参数奇葩 多 要3个
 * @param  {[type]} log string        [description]
 * @return {[type]}     [description]
 */
func writeResult(tag string, data string) {
	str_time := time.Now().Format("2006_01_02")
	filename := tag + "_" + str_time + ".log"
	fl, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
	if err != nil {
		log.Println(err)
	}
	defer fl.Close()
	fl.WriteString(data)
	fl.WriteString("\n")
}


pt/fz.go  

// 负债数据静态化封装
package pt

import (
	"database/sql"
	"fmt"
	"log"
	"time"
)

//门店负债数据
type FzData struct {
	mdid  int
	xjye  float32
	zsye  float32
	lcye  float32
	month string
}

type FzDataMap map[int]FzData

type FzClass struct {
	list FzDataMap
}

//list 使用前需要被初始化  所以直接在new时初始化 以下2种初始化方式都可
func NewFz() *FzClass {
	fz := &FzClass{list: make(FzDataMap)}
	//fz := new(FzClass)
	//fz.list = make(FzDataMap)
	return fz
}

func (fz *FzClass) Show() *FzClass {
	for i, value := range fz.list {
		fmt.Println(i)
		fmt.Println(value)
	}
	return fz
}

//暂无卵用 方便大数据扩展
func (fz *FzClass) Add(row FzData) *FzClass {
	fz.list[row.mdid] = row
	return fz
}

//门店数据静态化 入库
func (fz *FzClass) toDb(db *sql.DB, row FzData) {
	var num int
	one, err := db.Query("SELECT  COUNT(*) AS num  FROM `static_fz` WHERE `md_id` = ? AND `month` = ? ", row.mdid, row.month)
	if err != nil {
		log.Println(err)
	}
	defer one.Close()
	for one.Next() {
		err := one.Scan(&num)
		if err != nil {
			log.Fatal(err)
		}
	}
	if num > 0 {
		//存在更新
		stmt, _ := db.Prepare("UPDATE `static_fz` SET `xjye`=?, `zsye`=?, `lcye`=? WHERE `md_id` = ? AND `month` = ? ")
		defer stmt.Close()
		stmt.Exec(row.xjye, row.zsye, row.lcye, row.mdid, row.month)
	} else {
		//不存在插入
		stmt, _ := db.Prepare("INSERT INTO `static_fz` (`md_id`, `xjye`, `zsye`, `lcye`,`month`) VALUES (?,?,?,?,?)")
		defer stmt.Close()
		stmt.Exec(row.mdid, row.xjye, row.zsye, row.lcye, row.month)
	}
}

/**
 * 某一公司负债处理
 * @param  {[type]} c chan int            [日志管道]
 * @param  {[type]} comp_id int           [公司ID]
 * @param  {[type]} fz_month int          [负债月份]
 */
func (fz *FzClass) oneComp(c chan string, comp_id int, fz_month string) {
	db := Mydb()
	defer db.Close()
	sql := "SELECT ed.`id`, SUM(IF(cc.type = 1, t.balance, 0)) AS xjye, SUM(IF(cc.type = 2, t.balance, 0)) AS zsye, IFNULL(tt.`lcye`,0) AS lcye FROM `customer_capital` `t` LEFT JOIN company_capital cc ON cc.id = t.capital_id LEFT JOIN customer_info ci ON ci.id = t.cu_id LEFT JOIN employ_dept ed ON ed.id = ci.store_id LEFT JOIN (SELECT ed.`id`,SUM(TRUNCATE(osd.pay_price / num * t.re_num, 1)) AS lcye FROM `customer_re_project` `t` LEFT JOIN customer_info ci ON ci.id = t.cu_id LEFT JOIN employ_dept ed ON ed.id = ci.store_id LEFT JOIN order_sale_detail osd ON osd.id = t.detail_id WHERE (ed.comp_id = ?) GROUP BY ed.id) tt ON tt.id = ed.id WHERE (ed.comp_id = ?) GROUP BY ed.id"
	rows, err := db.Query(sql, comp_id, comp_id)
	if err != nil {
		log.Println(err)
	}
	defer rows.Close()
	var rowData FzData
	rowData.month = fz_month
	for rows.Next() {
		err := rows.Scan(&rowData.mdid, &rowData.xjye, &rowData.zsye, &rowData.lcye)
		if err != nil {
			log.Fatal(err)
		}
		//fz.Add(rowData)
		fz.toDb(db, rowData)
		//返回管道信息写入
		c <- "公司:" + fmt.Sprintf("%d", comp_id) + "->门店:" + fmt.Sprintf("%d", rowData.mdid) + "(处理完成)"
	}

	err = rows.Err()
	if err != nil {
		log.Fatal(err)
	}
	close(c)
}

func (fz *FzClass) Run() {
	db := Mydb()
	defer db.Close()
	sql := "SELECT id FROM `company_info` WHERE `status` = 1"
	rows, err := db.Query(sql)
	if err != nil {
		log.Println(err)
	}
	defer rows.Close()
	chs := make([]chan string, 0) //开多个管道接受消息
	fz_month := time.Now().Format("200601")
	var i int = 0
	var id int
	for rows.Next() {
		c := make(chan string)
		chs = append(chs, c)
		err := rows.Scan(&id)
		if err != nil {
			log.Fatal(err)
		}
		go fz.oneComp(c, id, fz_month)
		i = i + 1
	}
	err = rows.Err()
	if err != nil {
		log.Fatal(err)
	}

	for _, ch := range chs { //多管道写法
		for {
			x, ok := <-ch
			if ok == false {
				break
			}
			writeResult("fz", x)
			fmt.Println(x) //消息回收处理 可扩展写入文件日志
		}
	}

}

var Fz *FzClass

func init() {
	Fz = NewFz()
}



 pt/custatus.go

// 客户属性自动更新封装
// 需要公司开启自动更新并配置客户过期周期时间
package pt

import (
	"fmt"
	"log"
	"strconv"
	"strings"
	"time"
)

type CuStatusClass struct {
}

func NewCu() *CuStatusClass {
	obj := new(CuStatusClass)
	return obj
}

/**
 *判断客户状态
 **/
func (obj *CuStatusClass) getState(orders int, practs int, state int) int {
	if orders > 0 || practs > 0 {
		return 1 //最近有订单OR有实操 为有效客户
	}
	if state == 3 {
		return 3 //死档客户
	}
	if state == -1 {
		return -1 //无效客户
	}
	//默认返回为久党客户
	return 2
}

/*
 * 获取公司关于有效客户的配置天数 默认30天
 */
func (obj *CuStatusClass) getConfig(str string) int {
	var num int
	n := strings.Index(str, "member_config")
	if n == -1 {
		num = 30
	} else {
		start := n + 20
		end := n + 22
		num2 := string([]byte(str)[start:end])
		num, _ = strconv.Atoi(num2)
	}
	return num
}

/**
 * 更新一个公司的客户状态 (PT) 考虑新建数据库连接 提高效率
 * @param  {[type]} db      *sql.DB       [description]
 * @param  {[type]} c       chan          int           [description]
 * @param  {[type]} comp_id int           [公司ID]
 * @param  {[type]} num int           	  [有效期天数]
 * @return {[type]}         			  [description]
 */
func (obj *CuStatusClass) updateOneComp(c chan string, comp_id int, num int) {
	db := Mydb()
	defer db.Close()
	end := time.Now().Unix()
	start := end - 3600*24*int64(num) //前推num天
	sql := "SELECT a.id,a.name,a.status,(SELECT COUNT(*) FROM `order_sale` WHERE `cu_id` = a.id AND `pay_time` > ? AND `pay_time` < ? AND `type` IN (1,2)) AS orders, (SELECT COUNT(*) FROM `practice_order` WHERE `cu_id` = a.id AND `pay_time` > ? AND `pay_time` < ?) AS practs FROM `customer_info` AS a LEFT JOIN `config_membership` AS m ON a.membership_id = m.id WHERE m.`is_member` = 1 AND a.`company_id` = ?"
	rows, err := db.Query(sql, start, end, start, end, comp_id)
	if err != nil {
		log.Println(err)
	}
	defer rows.Close()
	var id int
	var orders int
	var practs int
	var name string
	var status int
	for rows.Next() {
		err := rows.Scan(&id, &name, &status, &orders, &practs)
		if err != nil {
			log.Fatal(err)
		}

		new_status := obj.getState(orders, practs, status)
		if status != new_status {
			stmt, err := db.Prepare("UPDATE `customer_info` SET `status`=? WHERE `id`=?")
			defer stmt.Close()
			if err != nil {
				log.Println(err)
				return
			}
			stmt.Exec(new_status, id)
			//返回管道信息写入
			c <- fmt.Sprintf("%d", comp_id) + ":" + fmt.Sprintf("%d", id) + " " + name + " " + fmt.Sprintf("%d", status) + "->" + fmt.Sprintf("%d", new_status)
		}
	}
	err = rows.Err()
	if err != nil {
		log.Fatal(err)
	}
	close(c)
}

/**
 * 多公司并发处理 (PT)
 * @param  {[type]} db *sql.DB       [description]
 * @return {[type]}    [description]
 */
func (obj *CuStatusClass) Run() {
	db := Mydb()
	defer db.Close()
	sql := "SELECT id , auto_cu_status, config FROM `company_info` WHERE `status` = 1"
	rows, err := db.Query(sql)
	if err != nil {
		log.Println(err)
	}
	defer rows.Close()
	chs := make([]chan string, 0) //开多个管道接受消息
	var id int
	var auto int
	var config string
	for rows.Next() {
		err := rows.Scan(&id, &auto, &config)
		if err != nil {
			log.Fatal(err)
		}
		if auto == 1 {
			num := obj.getConfig(config) //客户有效期设置
			c := make(chan string)
			chs = append(chs, c)
			go obj.updateOneComp(c, id, num)
		}

	}
	err = rows.Err()
	if err != nil {
		log.Fatal(err)
	}
	for _, ch := range chs { //多管道写法
		for {
			x, ok := <-ch
			if ok == false {
				break
			}
			writeResult("cu_status", x)
			fmt.Println(x) //消息回收处理 可扩展写入文件日志
		}
	}

}

var Cu *CuStatusClass

func init() {
	Cu = NewCu()
}










有疑问加站长微信联系(非本文作者)

本文来自:CSDN博客

感谢作者:ivy19860929

查看原文:golang 数据库批量更新工具备份

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

4171 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传