新手想请教一个协程批处理任务的问题

jayxtt999 · 2022-07-04 13:51:52 · 2024 次点击 · 大约8小时之前 开始浏览    置顶
这是一个创建于 2022-07-04 13:51:52 的主题,其中的信息可能已经有所发展或是发生改变。

package main

import (
    "encoding/json"
    "log"
    "sync"
)

var ResData []ItemData
var ch = make(chan ItemData)


func do(u UserRes,item ItemData,p Param, waitGroup *sync.WaitGroup)  {
    defer waitGroup.Done()
    //do something  5-10s
    item.money_total = 1 
    ch<-item
}

func main()  {
    sql := "SELECT id FROM user limit 10000"
    rows, err := dbCon.Queryx(sql)
    if err != nil {
        log.Fatalf("err: %v", err)
    }
    defer rows.Close()
    var wg sync.WaitGroup
    for rows.Next() {
        wg.Add(1)
        var u UserRes
        var item  ItemData
        err = rows.StructScan(&u)
        if err != nil {
            log.Fatalf("err: %v", err)
        }
        go do(u,item,p,&wg)

        ResData = append(ResData,<- ch)
    }
    json, err := json.Marshal(ResData)
    if err != nil {
        log.Fatalf("JSON ERR: %v", err)
    }
    wg.Wait()
}

这上面是部分代码示意 大概需求就是我需要给每个用户生成一份报表。每个用户生成时间大概就是5-10s,但是用户比较多 然后我发现我这样写好像还是需要很久的时间才能全部处理完 感觉是我对哪块的理解有问题或者是哪里阻塞了。 或者大佬们一般这种任务是怎么处理的。用什么组件


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2024 次点击  
加入收藏 微博
6 回复  |  直到 2022-07-07 10:14:56
guokun1998
guokun1998 · #1 · 3年之前
ch<-item
ResData = append(ResData,<- ch)

这是个无缓存的chan,必须发送/接受结束才会下一步。 可以带个缓存试试。若要存储执行结果,可以用sync.map。

lysShub
lysShub · #2 · 3年之前

绝大部分时间在数据库查询上

slclub
slclub · #3 · 3年之前

// do something 这里5-10 秒

可以优化,尽可能的多的取出数据,然后批量处理,减少db 操作

一个人 5到10 是不是可以优化到 100人用这个时间

或者是 分批次/全部,取出数据,然后用内存数据 去做逻辑。不用db的join ,子查询联合查询等。

xwszt
xwszt · #4 · 3年之前

这段程序是一条一条的处理的逻辑,速度肯定相比于多goroutine是慢的多了。阻塞在channel没有缓存。

1、数据检索可以批量检索出来,一次检索5000条都没问题,放到内存中肯定比多次去数据库里检索来的快。 2、既然用了channel,就用带有缓存的来处理,make(chan ItemData, 500),如果内存足够大,设置5000个都没问题;

johnny_wu
johnny_wu · #5 · 3年之前

ResData = append(ResData,<- ch),这个操作在for里面的,这不就一直阻塞了吗

golang_xuetu
golang_xuetu · #6 · 3年之前

解决办法:将 ResData = append(ResData,<- ch)操作放到for循环外。

原因:ch是无缓存的cahnnel,所以 <- ch操作会阻塞,直到ch中有数据才会执行,所以ResData = append(ResData,<- ch)会阻塞for循环,既这里的go do()并没有实现真正的并发执行,实际是串行执行操作

添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传