ants协程池学习笔记

byte · · 1214 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

ants协程池学习笔记

go

ants是一个高性能的协程池

入口函数

首先看看PoolWithFund结构体定义:

// PoolWithFunc accepts the tasks from client,
// it limits the total of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct {
    // capacity of the pool.
    capacity int32

    // running is the number of the currently running goroutines.
    running int32

    // workers is a slice that store the available workers.
    workers []*goWorkerWithFunc

    // state is used to notice the pool to closed itself.
    state int32

    // lock for synchronous operation.
    lock sync.Locker

    // cond for waiting to get a idle worker.
    cond *sync.Cond

    // poolFunc is the function for processing tasks.
    poolFunc func(interface{})

    // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
    workerCache sync.Pool

    // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
    blockingNum int

    options *Options
}

NewPoolWithFunc
关键代码

p := &PoolWithFunc{
        capacity: int32(size),
        poolFunc: pf,
        lock:     internal.NewSpinLock(), //自旋锁
        options:  opts,
    }
    p.workerCache.New = func() interface{} {
        return &goWorkerWithFunc{
            pool: p,
            args: make(chan interface{}, workerChanCap),
        }
    }
  • capacity是协程池的容量
  • poolFunc是真正执行工作的函数
  • lock是一个自旋锁

自旋锁的作用是什么呢?

对于锁的竞争不激烈,且占用锁时间非常短的代码块来说性能能大幅度的提升,因为自旋的消耗会小于线程阻塞挂起再唤醒的操作的消耗,这些操作会导致线程发生两次上下文切换
  • workerCache是一个sync.Pool类型的临时对象池,它复用的对象是一个goWorkerWithFunc,这个结构体实际是执行工作的worker。它由以下结构组成:
// goWorkerWithFunc is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type goWorkerWithFunc struct {
    // pool who owns this worker.
    pool *PoolWithFunc

    // args is a job should be done.
    args chan interface{}

    // recycleTime will be update when putting a worker back into queue.
    recycleTime time.Time
}

其中需要交给协程池处理的数据都会发送到args管道,该worker从该管道中取值处理即可

会初始化cond变量:

p.cond = sync.NewCond(p.lock)

这里使用的锁是自旋锁

做完初始化操作后,会启动一个后台协程:

// Start a goroutine to clean up expired workers periodically.
    go p.periodicallyPurge()

从注释来看,它的作用是周期性地清理过期的worker。

提交任务给协程池

// Invoke submits a task to pool.
func (p *PoolWithFunc) Invoke(args interface{}) error {
    if atomic.LoadInt32(&p.state) == CLOSED {
        return ErrPoolClosed
    }
    var w *goWorkerWithFunc
    if w = p.retrieveWorker(); w == nil {
        return ErrPoolOverload
    }
    w.args <- args
    return nil
}
  • 由代码可见,首先会判断当前协程池的状态是否为CLOSED,如果是的话返回一个协程池关闭的错误,提交失败
  • 然后会调用retrieveWorker方法获取一个worker,如果没获取到会返回一个池子已满的错误,提交失败
  • 获取worker成功,将提交的参数发送到worker的args管道中即可

获取worker

生产worker

spawnWorker := func() {
        w = p.workerCache.Get().(*goWorkerWithFunc)
        w.run()
}

首先尝试从临时对象池获取worker,若获取不到则会新建一个worker。然后调用worker的run方法让其运行

1. 首先给协程池加锁
2. 检查协程池的workers数组是否>0,workers数组存储的是所有可用的worker,若有空闲的worker,则取空闲worker数组的最后一个,然后将其从worker数组中移除,然后解锁
3. 若running的worker小于协程池的容量,则解锁协程池,生产一个worker
4. 若无可用的worker,running数量也大于等于协程池的容量,若协程池是非阻塞模式则直接返回;
5. 若有设置最大阻塞任务数,超过这个限制则直接返回
6. 阻塞任务数+1,然后协程池的p.Cond开始Wait,等待被别的协程唤醒
7. 若之后别的协程发出了唤醒信号,那么阻塞任务数-1,此时检查running状态的为0,那么生产一个worker
8. running状态数不为0,那么重新尝试从workers数组获取最后一个worker,若workers长度为0,则再次阻塞住,重新跳到`步骤5`处循环

可以看出,作者对协程池的中心思想是一个worker复用的过程:首先尝试从workers数组中取可用的worker,如果取不到则从临时对象池取一个worker,再取不到才会新建一个worker。这里有几个疑问:

1. worker何时被放回到临时对象池中?
2. workers数组何时会进行插入操作?

我们接着看看worker的run方法

worker运行

  1. 协程池的running状态数量+1

然后会启动一个协程

1. 监听args管道,不断从中取出参数进行处理
2. 如果参数为nil,那么这个协程中止
3. 注册的函数执行完参数后,会将自身插入到workers数组中,并将recycleTime更新为当前时间,然后给cond发一个通知Signal,这是为了告诉阻塞在`无可用worker且running数量大于等于协程池容量的获取worker的协程`,现在有worker可用了
4. 如果协程池的状态已为CLOSED或者running数量的协程池已经大于了协程池的容量,那么这个协程也会中止
5. 协程中止后,会进行一些收尾工作:
    1. running数量-1
    2. 将worker自身放回到临时对象池中

可以看出,worker协程的中止有两种情况会触发:一个是arg参数为nil,另一个是协程池状态相关导致worker中止(closed状态或running数超出限制)
其余情况下,worker协程是常驻的,并且每处理完一个任务,就会将自己放回到workers数组中,这样当下次有向协程池提交任务时,就可以从workers数组中取出可用的worker了;

周期性清理worker

周期性清理worker是一个后台协程,整个操作一个ticker里,ticker的每个周期会做这么几件事:

1. 如果协程池的状态为closed,则跳出ticker
2. 获取workers数组中所有recycleTime已经过期的worker
3. 向这些worker的管道中发送nil,让其退出
4. 如果running状态数为0了,有可能出现一种情况,因为Signal是在worker正常处理完一条数据后才会调用,如果是这种发送nil导致worker协程退出是不会调用Signal的,那么可能还有协程在等待cond的通知信号,但是此时running已经为0,之后没机会再被通知了,程序会死锁,所以在这里会广播Broadcast一个信号给那些阻塞的协程

周期性清理worker的作用主要是有的worker长期未被使用,协程存在会占用资源,因此将其清理掉;


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:byte

查看原文:ants协程池学习笔记

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1214 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传