协程池的简单设计

.container .card .information strong · · 614 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

微信公众号:LinuGo,欢迎关注

Golang标准库搭建的http服务端会为每一个请求创建一个协程去处理,虽然每个协程占有的栈空间很小,但是如果万一来个数百万千万的请求(当然,这种可能性有点极端),服务端只能对每一条请求乖乖创建一个协程,这时候,该go进程就存在大量的goroutine,占用服务器资源不说,还会增大gc压力。这时候就想给该机制加一个限制,搞一个协程池限制一下最大处理请求的协程数量。

浏览一下标准库该部分的源码实现

func (srv *Server) Serve(l net.Listener) error {
  .....

  ctx := context.WithValue(baseCtx, ServerContextKey, srv)
  for {
    //等待建立连接,没有请求则会阻塞住
    rw, err := l.Accept()
    if err != nil {......}
    connCtx := ctx
    if cc := srv.ConnContext; cc != nil {......}
    ......
    //开启协程处理请求,主要需要改造的就在此处
    go c.serve(connCtx)
  }
}

主要需要针对创建协程加一个限制条件,如果小于协程池规定的数量就允许创建,否则等待协程池有空闲位再创建。


大致思路

总体使用生产者消费者模式。使用两个有缓冲区的channel来实现协程的并发控制,一个sigChannel通过缓冲空间限制最大的协程数量,另一个jobChannel则用于传递请求的数据(包括请求函数以及参数),该jobChannel对于是否缓冲没有要求。

流程

(1)首先当请求到来之后,往sigChannel中写入标志位数据,如果此时有空闲位置,则不会阻塞在此处;

(2)之后往jobChannel中写入要执行的函数以及参数;

(3)后台监听jobChannel的函数worker(该函数要源源不断读取管道数据)则会取出管道中的数据;

(4)worker创建goroutine执行请求函数;

(5)该请求函数执行完成后,goroutine再去取出sigChannel管道中的标志数据,腾出来位置;

注:如果开始时候sigChannel写数据写入不了,则说明该池子满了,则需要阻塞等待。这样就实现了使用sigChannel控制并发量的功能。


代码实现

接下来使用代码实现这种思想

1、首先把net/http包中的代码给保存一份,防止被搞坏。直接在目录下搞了一个git仓库,先把源码commit一次,再搞一个分支自己瞎搞着玩,。在net/http下建了一个放协程池函数的文件夹,创建一个go文件。

2、首先定义两个channel,一个用来存放信号,一个存放函数以及参数,结合到http处理这里

type Info struct {
  //函数名称,对应http中c.serve()函数
  ParamFunc func(ctx context.Context)
  //函数的参数,对应c.serve()的connCtx参数
  Param context.Context
}

type Task struct {
  //用于传递函数以及参数的管道,对应jobChannel
  taskLet chan Info
  //用于传递信号量的管道
  taskCmp chan int64
}

type Pool struct {
  //两个管道对应的结构体
  tasks *Task 
  //协程池容量
  taskNum int64 
}

3、创建一个协程池对象,也就是初始化这两个管道

func NewPool(n int64) *Pool {
  taskc := make(chan Info,n)
  workc := make(chan int64,n)
  return &Pool{
    tasks: &Task{
      taskLet: taskc,
      taskCmp: workc,
    },
    taskNum: n,
  }
}

4、创建一个put函数,用于往两个channel中塞数据,即生产者

func (p *Pool) Put(a Info)  {
  //在sigChannel中塞数据,如果阻塞说明没有空闲
  p.tasks.taskCmp <- 1
  //在jobChannel中塞数据  
  p.tasks.taskLet <- a
}

5、创建一个run函数,用于监听管道并取出数据,即消费者

func (p *Pool) Run()  {
 //持续监听jobChannel管道,只要有数据监听到则说明已经有空闲位了,
 //需要创建goroutine执行传来的函数以及参数
  for {
    select {
    case let := <- p.tasks.taskLet:
      go p.Work(let)
    }
  }
}

func (p *Pool) Work(f Info)  {
  //执行传入的函数
  f.ParamFunc(f.Param)
  //执行完函数后把sigChannel中标志位取出
  <- p.tasks.taskCmp
}

6、修改源码,需要修改的代码加到server.go中

func (srv *Server) Serve(l net.Listener) error {
  .....
  //初始化一个连接池
  po := currencyctl.NewPool(srv.CorrencyNum)
  //异步开启这个池子,否则会阻塞
  go po.Run()
  ctx := context.WithValue(baseCtx, ServerContextKey, srv)
  for {
    //等待建立连接,没有请求则会阻塞住
    rw, err := l.Accept()
    if err != nil {......}
    connCtx := ctx
    if cc := srv.ConnContext; cc != nil {......}
    ......
    //go c.serve(connCtx)
    //改造成协程池
    po.Put(currencyctl.Info{ParamFunc:c.serve,Param:connCtx})
  }
}

我将处理并发数量的参数放到了server结构体中,通过http.ListenAndServe()方法传递并在下一次赋值。


测试阶段

接下来跑一个测试用例:

测试代码很简单,如下:

package main
import (
  "fmt"
  "net/http"
  _"net/http/pprof"
  "time"
)
func main() {
  go func() {//使用pprof跟踪
    http.ListenAndServe(":6060",nil,10)
  }()
  http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
    fmt.Println("收到请求。。。")
    time.Sleep(time.Second*1)
    writer.Write([]byte("hello http"))
  })
  http.ListenAndServe(":8000", nil,100)//限制最大并发量100
}

启动项目,做一个压力测试(这里我是用了go-stress-testing工具):

使用并发请求量为1000时候,查看pprof工具,查看系统协程数,控制在了100左右。

设置协程池协程量为200时候,使用1000并发请求,看到协程量控制在200

经过验证,该协程池在net/http标准库上的应用基本成功了,但是只是测试了一个简单的接口,没有经过复杂的业务验证,可能存在好多未知问题。

所以,又乖乖git checkout切到了原始分支。


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:.container .card .information strong

查看原文:协程池的简单设计

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

614 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传