一种 golang 实现 多协程任务处理的套路

Bulesxz · · 5837 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

# 一种 golang 实现 多协程任务处理的套路 ## 那么是什么样的任务呢,一般是在生产者-消费者模式的消费者进程 ,举几个例子 1. 消费kafka 数据 2. 消费redis 数据 3. 轮询处理数据库数据 4. ... ## 下面来分析一下 1. 业务逻辑处理协程 到底多少个呢 ?处理一个数据 就 go 一个吗,也可以不过有点粗暴,协程也不是越多越好,调度也是要好性能的 所以还是控制一下,一般吧 弄个cpu * 2 就差不多了 (runtime.NumCPU() *2) 2. 获取数据协程 由于我要分析的例子 都是一个 for 循环 不停读取数据 交个任务处理协程,所以这里就 用一个协程 3. 进程如何关闭 总不能kill -9 粗暴处理吧,这样容易造成数据异常或者丢数据,一般都是 捕捉 信号 signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) 直接上代码 ```golang package main import ( "fmt" "os" "os/signal" "runtime" "sync/atomic" "syscall" "time" ) type TaskData struct { } type Service struct { capacity int tasks chan *TaskData numThread int closeChans chan struct{} stopFlag int32 loopStopChan chan struct{} } func NewService(capacity int) *Service { service := &Service{} service.capacity = capacity service.numThread = runtime.NumCPU() * 2 service.tasks = make(chan *TaskData, capacity) service.stopFlag = 0 service.closeChans = make(chan struct{}, service.numThread) service.loopStopChan = make(chan struct{}) return service } func (this *Service) Stop() { atomic.StoreInt32(&this.stopFlag, 1) <-this.loopStopChan close(this.tasks) for i := 0; i < this.numThread; i++ { <-this.closeChans } } func (this *Service) Run() { for i := 0; i < this.numThread; i++ { go this.run(i) } go this.LoopConsume() } func (this *Service) run(i int) { fmt.Println("go run:", i) loop: for { select { case task, ok := <-this.tasks: if ok { //#TODO process fmt.Println("process", task) } else { break loop } } } this.closeChans <- struct{}{} } func (this *Service) LoopConsume() { fmt.Println("loop") for atomic.LoadInt32(&this.stopFlag) == 0 { //TODO ReadData task := &TaskData{} this.tasks <- task fmt.Println("consume.") time.Sleep(time.Second * 2) } this.loopStopChan <- struct{}{} } func main() { service := NewService(100) go service.Run() //启动程序处理 c := make(chan os.Signal) signal.Notify(c, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) s := <-c //等待关闭信号 fmt.Println(s) service.Stop() //关闭service fmt.Println("exit :D") } ``` ## 思考 1: ``` golang service.stopFlag service.closeChans service.loopStopChan ``` 这几个变量干什么用的,是为了安全退出程序用的 ### 1. stopFlag > 首先 要退出 LoopConsume 大循环 用什么去通知呢,用channel 也可以,就是要配合select 使用,但是用原子标记是不是更简洁呢? 所以 stopFlag 是为了退出 LoopConsume 用的 ### 2. closeChans > 由于我们go 了很多个协程,那么要监听每一个协程退出,就需要多个channel 去接收 ``` for i := 0; i < this.numThread; i++ { <-this.closeChans } ``` > 这段代码的意思就是等待所有 处理协成退出 ### 3. loopStopChan > 这个又是干什么的呢,同样也是处理协程退出的 只不过是 LoopConsume,为什么不 closeChans 大小再加一个 而变成这样呢 ``` service.closeChans = make(chan struct{}, service.numThread+1) ``` ``` func (this *Service) Stop() { atomic.StoreInt32(&this.stopFlag, 1) close(this.closeChans) for i := 0; i < this.numThread+1; i++ { <-this.closeChans } } ``` 这么做会发生什么呢?,假如这样 一旦执行了stop 那么 this.stopFlag = 1,但是 LoopConsume 可能还在从 //TODO ReadData 获取数据阶段 当执行了 close(this.tasks) ,此时 恰好又要执行 this.tasks <- task,但是此时 tasks 已经关闭,那么就会panic 其实在整个例子里 LoopConsume 就相当于一个生产者,而run 相当于一个消费者,我们是不是应该先关不生产者 等待 消费者 消费完了 再退出呢,毫无疑问 肯定是的,所以就要有一个channel 等 生产者 退出了 再发送 channel 去 让消费者退出,所以单独用一个 loopStopChan ## 思考2: ``` func (this *Service) LoopConsume() { fmt.Println("loop") for atomic.LoadInt32(&this.stopFlag) == 0 { //TODO ReadData task := &TaskData{} this.tasks <- task fmt.Println("consume.") time.Sleep(time.Second * 2) } this.loopStopChan <- struct{}{} } ``` 这段代码其实就是不停的获取数据,我这里没有写获取数据的部分,因为这个是和业务相关的,举个实际点的例子 比如 比如 读取mysql SELECT ID, * FROM DATA WHERE ID > OFFSET LIMIT N; 每次 从OFFSET 位置读取 N 条数据,读取后 如果获取的条数 为 num ,若num等于 N , 那么 OFFSET += N 继续 read,否则 说明数据不够了 ,则,OFFSET += num,并且 sleep n 秒 (避免没有数据的时候空跑) 上伪代码 ``` func (this *Service) LoopConsume() { fmt.Println("loop") for atomic.LoadInt32(&this.stopFlag) == 0 { rows:= Read(offset) if rows 行数 == N { task := &TaskData{} this.tasks <- task offset + = N }else{ time.Sleep(time.Second * 20) offset += rows 行数 } } this.loopStopChan <- struct{}{} } ``` 这里有没有发现问题呢,假如程序刚好进入了time.Sleep(time.Second * 20) 这里呢,此时stop 岂不是 要等待20s 可是其实进程已经很闲了,有什么办法解决 呢,还是可以用标记的方法,一段程序进入sleep 可以设置一个标记 上伪代码 ``` func (this *Service) LoopConsume() { fmt.Println("loop") for atomic.LoadInt32(&this.stopFlag) == 0 { atomic.StoreInt32(&this.forcestopFlag, 0) rows:= Read(offset) if rows 行数 == N { task := &TaskData{} this.tasks <- task offset + = N }else{ atomic.StoreInt32(&this.forcestopFlag, 1) time.Sleep(time.Second * 20) offset += rows 行数 } } this.loopStopChan <- struct{}{} } func (this *Service) Stop() { atomic.StoreInt32(&this.stopFlag, 1) if atomic.LoadInt32(&this.forcestopFlag) == 0{ <-this.loopStopChan //只有当forcestopFlag = 0 的时候才需要等待 LoopConsume退出 } close(this.tasks) for i := 0; i < this.numThread; i++ { <-this.closeChans } } ``` ### 思考3: 此模型 run 里面 或者 LoopConsume 还可以 go 协程出来吗,显然不行,因为一旦go 出来 了,那么现有的 stop 就失效了,因为无法获取这些协程是否退出。 其实我觉得也没有必要 再go 一个出来,因为LoopConsume 一般是读 ,速度比 业务处理的要高, 如果这个不满足你的实际业务需求,你可以 go 多个 LoopConsume ,同样把 loopStopChan 也弄成 长度为 N 的channel <-this.loopStopChan 变成这样 ``` for i := 0; i <N ; i++ { <-this.loopStopChan } ``` 再极端 你的业务非得 在 LoopConsume go 一个或者多个协程,那么你得思考 该怎么同步了,至于用什么方法,得好好思考了,可以提出来大伙一起讨论讨论 同理在 run 里面你也想 go 一个或者多个协程, 还是一样得想办法考虑同步问题 ## 总结 这个小service 只是一种多协程处理任务的套路,常用在生产者消费者模型的消费者进程。 对于对性能要求比较高的可能不适合,比如这里的 协程数是固定的,可以改进成伸缩的动态变化, 代码写的比较简单,一些错误之处 还望各位 大神多多指正,欢迎讨论。

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

5837 次点击  ∙  1 赞  
加入收藏 微博
1 回复  |  直到 2018-05-02 00:21:09
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传