Golang 使用channel作并发访问吞吐量限制

olzhy · 2019-07-02 08:15:42 · 1612 次点击 · 预计阅读时间 12 分钟 · 大约8小时之前 开始浏览    
这是一个创建于 2019-07-02 08:15:42 的文章,其中的信息可能已经有所发展或是发生改变。

golang中可以使用Buffered channel作为信号量来对服务的并发访问作吞吐量限制。 如下代码中,Serve函数遍历请求队列,对每次请求,启动一个goroutine来进行handle,sem的缓冲大小限制了同时调用handle函数的数量,Serve函数虽可保障每一刻最多有MaxOutstanding个goroutine正在调用handle函数,但在请求过频与过多的情况下无法保证goroutine的过度创建以造成资源耗尽的风险。 ServeWithThroughputLimit函数对Serve作了改进,即对给sem发送消息提到了goroutine创建之前,以对goroutine的创建作限制。这样,同一时刻最多有MaxOutstanding个goroutine对请求进行handle。 golang中可以使用Buffered channel作为信号量来对服务的并发访问作吞吐量限制。 如下代码中,Serve函数遍历请求队列,对每次请求,启动一个goroutine来进行handle,sem的缓冲大小限制了同时调用handle函数的数量,Serve函数虽可保障每一刻最多有MaxOutstanding个goroutine正在调用handle函数,但在请求过频与过多的情况下无法保证goroutine的过度创建以造成资源耗尽的风险。 ServeWithThroughputLimit函数对Serve作了改进,即对给sem发送消息提到了goroutine创建之前,以对goroutine的创建作限制。这样,同一时刻最多有MaxOutstanding个goroutine对请求进行handle。

  1. package main  
  2.   
  3. import (  
  4.     "fmt"  
  5.     "sync"  
  6.     "time"  
  7. )  
  8.   
  9. const MaxOutstanding = 2  
  10.   
  11. type Req struct {  
  12.     id int  
  13. }  
  14.   
  15. func handle(req Req) {  
  16.     time.Sleep(time.Second)  
  17.     fmt.Println("handle req", req.id)  
  18. }  
  19.   
  20. func Serve(queue chan Req) {  
  21.     var wg sync.WaitGroup  
  22.     sem := make(chan int, MaxOutstanding)  
  23.     for req := range queue {  
  24.         wg.Add(1)  
  25.         go func(req Req) {  
  26.             fmt.Println("a goroutine launched")  
  27.             defer wg.Done()  
  28.             sem <- 1  
  29.             handle(req)  
  30.             <-sem  
  31.         }(req)  
  32.     }  
  33.     wg.Wait()  
  34. }  
  35.   
  36. func ServeWithThroughputLimit(queue chan Req) {  
  37.     var wg sync.WaitGroup  
  38.     sem := make(chan int, MaxOutstanding)  
  39.     for req := range queue {  
  40.         wg.Add(1)  
  41.         sem <- 1  
  42.         go func(req Req) {  
  43.             fmt.Println("a goroutine launched")  
  44.             defer wg.Done()  
  45.             handle(req)  
  46.             <-sem  
  47.         }(req)  
  48.     }  
  49.     wg.Wait()  
  50. }  
  51.   
  52. func main() {  
  53.     queue := make(chan Req, 5)  
  54.   
  55.     // requests  
  56.     go func() {  
  57.         for i := 0; i < 5; i++ {  
  58.             queue <- &Req{i}  
  59.         }  
  60.         close(queue)  
  61.     }()  
  62.   
  63.     // server  
  64.     // Serve(queue)  
  65.     ServeWithThroughputLimit(queue)  
  66. }  

调用Serve函数的输出为:

  1. a goroutine launched  
  2. a goroutine launched  
  3. a goroutine launched  
  4. a goroutine launched  
  5. a goroutine launched  
  6. handle req 4  
  7. handle req 3  
  8. handle req 1  
  9. handle req 2  
  10. handle req 0  
调用ServeWithThroughputLimit函数的输出为:
  1. a goroutine launched  
  2. a goroutine launched  
  3. handle req 0  
  4. a goroutine launched  
  5. handle req 1  
  6. a goroutine launched  
  7. handle req 2  
  8. a goroutine launched  
  9. handle req 3  
  10. handle req 4  

本文代码托管地址:https://github.com/olzhy/go-excercises/tree/master/throughput_limit

原文地址:https://leileiluoluo.com/posts/golang-throughput-limit.html


有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1612 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传