ants协程池学习笔记
go
ants是一个高性能的协程池
入口函数
首先看看PoolWithFund结构体定义:
// PoolWithFunc accepts the tasks from client,
// it limits the total of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct {
// capacity of the pool.
capacity int32
// running is the number of the currently running goroutines.
running int32
// workers is a slice that store the available workers.
workers []*goWorkerWithFunc
// state is used to notice the pool to closed itself.
state int32
// lock for synchronous operation.
lock sync.Locker
// cond for waiting to get a idle worker.
cond *sync.Cond
// poolFunc is the function for processing tasks.
poolFunc func(interface{})
// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
workerCache sync.Pool
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
blockingNum int
options *Options
}
NewPoolWithFunc
关键代码
p := &PoolWithFunc{
capacity: int32(size),
poolFunc: pf,
lock: internal.NewSpinLock(), //自旋锁
options: opts,
}
p.workerCache.New = func() interface{} {
return &goWorkerWithFunc{
pool: p,
args: make(chan interface{}, workerChanCap),
}
}
- capacity是协程池的容量
- poolFunc是真正执行工作的函数
- lock是一个自旋锁
自旋锁的作用是什么呢?
对于锁的竞争不激烈,且占用锁时间非常短的代码块来说性能能大幅度的提升,因为自旋的消耗会小于线程阻塞挂起再唤醒的操作的消耗,这些操作会导致线程发生两次上下文切换
- workerCache是一个sync.Pool类型的临时对象池,它复用的对象是一个goWorkerWithFunc,这个结构体实际是执行工作的worker。它由以下结构组成:
// goWorkerWithFunc is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type goWorkerWithFunc struct {
// pool who owns this worker.
pool *PoolWithFunc
// args is a job should be done.
args chan interface{}
// recycleTime will be update when putting a worker back into queue.
recycleTime time.Time
}
其中需要交给协程池处理的数据都会发送到args管道,该worker从该管道中取值处理即可
会初始化cond变量:
p.cond = sync.NewCond(p.lock)
这里使用的锁是自旋锁
做完初始化操作后,会启动一个后台协程:
// Start a goroutine to clean up expired workers periodically.
go p.periodicallyPurge()
从注释来看,它的作用是周期性地清理过期的worker。
提交任务给协程池
// Invoke submits a task to pool.
func (p *PoolWithFunc) Invoke(args interface{}) error {
if atomic.LoadInt32(&p.state) == CLOSED {
return ErrPoolClosed
}
var w *goWorkerWithFunc
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
}
w.args <- args
return nil
}
- 由代码可见,首先会判断当前协程池的状态是否为CLOSED,如果是的话返回一个协程池关闭的错误,提交失败
- 然后会调用retrieveWorker方法获取一个worker,如果没获取到会返回一个池子已满的错误,提交失败
- 获取worker成功,将提交的参数发送到worker的args管道中即可
获取worker
生产worker
spawnWorker := func() {
w = p.workerCache.Get().(*goWorkerWithFunc)
w.run()
}
首先尝试从临时对象池获取worker,若获取不到则会新建一个worker。然后调用worker的run方法让其运行
1. 首先给协程池加锁
2. 检查协程池的workers数组是否>0,workers数组存储的是所有可用的worker,若有空闲的worker,则取空闲worker数组的最后一个,然后将其从worker数组中移除,然后解锁
3. 若running的worker小于协程池的容量,则解锁协程池,生产一个worker
4. 若无可用的worker,running数量也大于等于协程池的容量,若协程池是非阻塞模式则直接返回;
5. 若有设置最大阻塞任务数,超过这个限制则直接返回
6. 阻塞任务数+1,然后协程池的p.Cond开始Wait,等待被别的协程唤醒
7. 若之后别的协程发出了唤醒信号,那么阻塞任务数-1,此时检查running状态的为0,那么生产一个worker
8. running状态数不为0,那么重新尝试从workers数组获取最后一个worker,若workers长度为0,则再次阻塞住,重新跳到`步骤5`处循环
可以看出,作者对协程池的中心思想是一个worker复用的过程:首先尝试从workers数组中取可用的worker,如果取不到则从临时对象池取一个worker,再取不到才会新建一个worker。这里有几个疑问:
1. worker何时被放回到临时对象池中?
2. workers数组何时会进行插入操作?
我们接着看看worker的run方法
worker运行
- 协程池的running状态数量+1
然后会启动一个协程
1. 监听args管道,不断从中取出参数进行处理
2. 如果参数为nil,那么这个协程中止
3. 注册的函数执行完参数后,会将自身插入到workers数组中,并将recycleTime更新为当前时间,然后给cond发一个通知Signal,这是为了告诉阻塞在`无可用worker且running数量大于等于协程池容量的获取worker的协程`,现在有worker可用了
4. 如果协程池的状态已为CLOSED或者running数量的协程池已经大于了协程池的容量,那么这个协程也会中止
5. 协程中止后,会进行一些收尾工作:
1. running数量-1
2. 将worker自身放回到临时对象池中
可以看出,worker协程的中止有两种情况会触发:一个是arg参数为nil,另一个是协程池状态相关导致worker中止(closed状态或running数超出限制)
其余情况下,worker协程是常驻的,并且每处理完一个任务,就会将自己放回到workers数组中,这样当下次有向协程池提交任务时,就可以从workers数组中取出可用的worker了;
周期性清理worker
周期性清理worker是一个后台协程,整个操作一个ticker里,ticker的每个周期会做这么几件事:
1. 如果协程池的状态为closed,则跳出ticker
2. 获取workers数组中所有recycleTime已经过期的worker
3. 向这些worker的管道中发送nil,让其退出
4. 如果running状态数为0了,有可能出现一种情况,因为Signal是在worker正常处理完一条数据后才会调用,如果是这种发送nil导致worker协程退出是不会调用Signal的,那么可能还有协程在等待cond的通知信号,但是此时running已经为0,之后没机会再被通知了,程序会死锁,所以在这里会广播Broadcast一个信号给那些阻塞的协程
周期性清理worker的作用主要是有的worker长期未被使用,协程存在会占用资源,因此将其清理掉;
有疑问加站长微信联系(非本文作者)