Go读取大量数据,开启多协程,将读取的文件内容导入数据库,出现丢失数据现象

Clouder · 2019-09-09 20:50:01 · 3581 次点击 · 预计阅读时间 2 分钟 · 大约8小时之前 开始浏览    
这是一个创建于 2019-09-09 20:50:01 的文章,其中的信息可能已经有所发展或是发生改变。

各位大佬,Go新人,昨天写了一个数据库导入工具,从本地文件读取,处理完数据格式后,导入DbFileReadWriteChanel的数据是正确无缺少的,就在开启100协程写入数据,执行完毕后,发现导入后的数据表中缺少了100多条。
\ 清空数据表后,又运行,一次还是缺少,然后进行少量的数据测试,发现,还是丢失,但如果只是使用主线程,导入,则无问题。
初步思考,代码出现问题的范围为一下代码块,请各位大佬协助,帮忙找一下问题所在,谢谢帮助。

//将存入chanel中的数据导入到数据库表中
func (this *DbFileRead) ImportTable(DbFileReadWriteChanel chan DbFileRead, exitChan chan bool) error {

    for {
        v, ok := <-DbFileReadWriteChanel

        if !ok {
            break
        }

        //fmt.Println(v)
        _, sqlErr := Sqldb.Exec("INSERT INTO IPINFO_TABLE VALUES(?,?,?,?,?)", v.Id, v.StartIp, v.EndIp, v.County, v.Local)
        //Sqldb.Exec("UPDATE orders SET shipped_at=? WHERE id IN (?)", time.Now, []int64{11,22,33})

        if sqlErr != nil {
            log.Printf("sql insert err is %v \n", sqlErr)
            return sqlErr
        }

        fmt.Printf("%d 当前已经完成 \n",v.Id)

    }

    defer Sqldb.Close()

    exitChan <- true

    close(exitChan)

    return nil
}

func main() {
    var dbfr *DbFileRead

    DbFileReadWriteChanel = make(chan DbFileRead, 600000)

    exitChan = make(chan bool,100)

    res, err := dbfr.ReadFile(FILEPATH)

    if err != nil && err == io.EOF {
        log.Printf("get file res success.")
    }

    handleResSlice := dbfr.HandleStringRes(res)

    dbfr.DbWriteChan(handleResSlice) //将handleResSlice 推入chanel中

    //dbfr.ImportTable(DbFileReadWriteChanel, exitChan)
    for i:=0;i<100 ;i++  {
        go dbfr.ImportTable(DbFileReadWriteChanel, exitChan)
    }


    //time.Sleep(time.Second*10)

    for {
        v, ok := <-exitChan
        fmt.Println("exit ", v)
        if !ok {
            break
        }
    }

}

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

3581 次点击  
加入收藏 微博
16 回复  |  直到 2019-09-16 10:36:16
Clouder
Clouder · #1 · 6年之前

我已经做完了主键的差异对比,发现丢失的数据,并不是有序的。

Clouder
Clouder · #2 · 6年之前

大佬们,能帮看看代码吗?

wangliangdong
wangliangdong · #3 · 6年之前

DbFileReadWriteChanel是你的数据通道吧,不应该是等len(DbFileReadWriteChanel)为0的时候关闭这个通道吗?

1071496910
1071496910 · #4 · 6年之前

exitChan 这个chan是不是关闭的早了, 应该所有的协程都退出了再关闭这边chan, 别再工作协作里边关闭这个chan,你这样只要一个协程工作完,整个就退出了

1071496910
1071496910 · #5 · 6年之前
10714969101071496910 #4 回复

exitChan 这个chan是不是关闭的早了, 应该所有的协程都退出了再关闭这边chan, 别再工作协作里边关闭这个chan,你这样只要一个协程工作完,整个就退出了

这里感觉用sync.WaitGroup 就好了, 这个不需要exitCh这样写

wangliangdong
wangliangdong · #6 · 6年之前

还是用waitGroup来控制协程的结束比较好,而且每个协程里都有死循环,其实100个协程能运行的也就是你CPU数量

wangliangdong
wangliangdong · #7 · 6年之前
package main

import (
    "fmt"
    "runtime"
    "sync"
)

var wg sync.WaitGroup
var list [600]int

func main() {
    s := make(chan int, 600000)
    for j := 0; j < 600; j++ {
        s <- j
    }
    for i := 0; i < runtime.NumCPU(); i++ {
        wg.Add(1)
        go importData(s)
    }
    wg.Wait()
    fmt.Println("完成", list, len(list))

}

func importData(s chan int) {
    defer wg.Done()
    for {
        if len(s) > 0 {
            v, ok := <-s
            if !ok {
                fmt.Println("失败", ok)
            }
            list[v] = v
        } else {
            break
        }
    }
}

你看看你想要的是不是这种效果

tangname
tangname · #8 · 6年之前

总结来说,你没必要用goroutine.

xiaobowen
xiaobowen · #9 · 6年之前

个人感觉是关闭chan 出现的问题

Schr0dingerCat
Schr0dingerCat · #10 · 6年之前

我也觉得为啥不用 sync.waitgroup? 再者,使用chan,waitgroup,那么chan还有必要自己手动close吗?自己写过几个小工具,chan从没关过。。。

fenglangjuxu
fenglangjuxu · #11 · 6年之前

为啥pc版本的评论这看不到,还是我没评论成功?

Clouder
Clouder · #12 · 6年之前
tangnametangname #8 回复

总结来说,你没必要用goroutine.

不行,需要使用goroutine,数据太多,主线程自己工作的话,效率太慢,尝试使用10个协程,一个小时导入不到10W条。。。。

colin321
colin321 · #13 · 6年之前

不应该在ImportTable关闭exitChan 以及Sqldb(如果这个连接不是在ImportTable里建立的话)

jinchunguang
jinchunguang · #14 · 6年之前

image.png

主线程本身是不慢的,你的sql数量多,直接分段事务,比你一条一条执行快多了。

Clouder
Clouder · #15 · 6年之前
jinchunguangjinchunguang #14 回复

![image.png](https://static.studygolang.com/190912/c7855ea6463af35c28a9f84422bb10de.png) 主线程本身是不慢的,你的sql数量多,直接分段事务,比你一条一条执行快多了。

好的,我学习一下

tangname
tangname · #16 · 6年之前
ClouderClouder #12 回复

#8楼 @tangname 不行,需要使用goroutine,数据太多,主线程自己工作的话,效率太慢,尝试使用10个协程,一个小时导入不到10W条。。。。

如果只是导入,你需要做的优化sql使用分段事务或者其他而不是go异步。你开一千个线程,sql的速度就那么快有什么用。

添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传