tunny
github地址:https://github.com/Jeffail/tunny
项目结构
tunny的项目结构非常简单,核心文件就是tunny.go与worker.go
整体分析
tunny主要是通过reqChan管道来联系pool与worker之间的关系,worker的数量与协程池的大小相等,在初始化协程池时决定;各个worker竞争地获取reqChan中的数据,然后处理,最后返回给pool;
代码详解
type Pool struct {
queuedJobs int64
ctor func() Worker
workers []*workerWrapper
reqChan chan workRequest
workerMut sync.Mutex
}
Pool结构体:
- queuedJobs,这个变量代表pool当前积压的job数量
- ctor,这个变量代表worker具体的构造函数
- workers,这个变量代表pool实际拥有的worker
- reqChan,这个变量是pool与所有worker进行通信的管道,所有worker与pool都使用相同的reqChan指针
- workerMut,这个变量是在pool进行SetSize操作时使用的,防止不同协程同时对size进行操作
type Worker interface {
// Process will synchronously perform a job and return the result.
Process(interface{}) interface{}
// BlockUntilReady is called before each job is processed and must block the
// calling goroutine until the Worker is ready to process the next job.
BlockUntilReady()
// Interrupt is called when a job is cancelled. The worker is responsible
// for unblocking the Process implementation.
Interrupt()
// Terminate is called when a Worker is removed from the processing pool
// and is responsible for cleaning up any held resources.
Terminate()
}
worker在tunny中被设计成了一个interface,因为在之后的代码中可以看到,worker可以有许多不同地实现,正如之前一篇整理的博客所说:golang编码技巧总结,我们在写代码时都应该使用interface,来面向接口编程,实现解耦;
两种worker
// closureWorker is a minimal Worker implementation that simply wraps a
// func(interface{}) interface{}
type closureWorker struct {
processor func(interface{}) interface{}
}
闭包worker,这个worker是最常用的一种worker,它主要执行初始化时赋予它的processeor函数来完成工作;
type callbackWorker struct{}
func (w *callbackWorker) Process(payload interface{}) interface{} {
f, ok := payload.(func())
if !ok {
return ErrJobNotFunc
}
f()
return nil
}
回调worker,这种worker处理的数据必须是一个函数,然后调用这个函数;
// NewFunc creates a new Pool of workers where each worker will process using
// the provided func.
func NewFunc(n int, f func(interface{}) interface{}) *Pool {
return New(n, func() Worker {
return &closureWorker{
processor: f,
}
})
}
初始化协程池时需要两个参数,一个是协程池大小n,一个是希望协程池执行的函数,这个函数最终交由闭包worker,运行时由它实际处理数据;
func New(n int, ctor func() Worker) *Pool {
p := &Pool{
ctor: ctor,
reqChan: make(chan workRequest),
}
p.SetSize(n)
return p
}
可以看到,reqChan在这时出现了,这个在之后的代码中将是连接pool与worker的核心;
SetSize会做什么呢?
func (p *Pool) SetSize(n int) {
p.workerMut.Lock()
defer p.workerMut.Unlock()
lWorkers := len(p.workers)
if lWorkers == n {
return
}
// Add extra workers if N > len(workers)
for i := lWorkers; i < n; i++ {
p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
}
// Asynchronously stop all workers > N
for i := n; i < lWorkers; i++ {
p.workers[i].stop()
}
// Synchronously wait for all workers > N to stop
for i := n; i < lWorkers; i++ {
p.workers[i].join()
}
// Remove stopped workers from slice
p.workers = p.workers[:n]
}
首先,会对这个函数加锁,这是为了防止在多个协程同时进行SetSize操作;
其次,当worker数量小于需要SetSize的数量,则增加worker的数量;
若worker数量大于SetSize的数量,则减小worker的数量;
增加worker的数量是如何增加呢?newWorkerWrapper
函数有很多值得关注的地方,值得注意的是,pool将它的reqChan传给了这个函数,也就是传给了worker;
func newWorkerWrapper(
reqChan chan<- workRequest,
worker Worker,
) *workerWrapper {
w := workerWrapper{
worker: worker,
interruptChan: make(chan struct{}),
reqChan: reqChan,
closeChan: make(chan struct{}),
closedChan: make(chan struct{}),
}
go w.run()
return &w
}
可以看到,在调用初始化newWorkerWrapper后,go了一个协程,进行w.run()操作,worker在这里是调用的之前传入的闭包worker的构造函数生成的,因此这里的worker是闭包worker;
func (w *workerWrapper) run() {
jobChan, retChan := make(chan interface{}), make(chan interface{})
defer func() {
w.worker.Terminate()
close(retChan)
close(w.closedChan)
}()
for {
// NOTE: Blocking here will prevent the worker from closing down.
w.worker.BlockUntilReady()
select {
case w.reqChan <- workRequest{
jobChan: jobChan,
retChan: retChan,
interruptFunc: w.interrupt,
}:
select {
case payload := <-jobChan:
result := w.worker.Process(payload)
select {
case retChan <- result:
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case _, _ = <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case <-w.closeChan:
return
}
}
}
解读这个run函数,这是整个worker的核心;
首先,能看到一个大的for循环,里面嵌套了select;
一进入select,会无脑往reqChan里传入workRequest,这时需要与pool的接收函数对应起来看:
func (p *Pool) Process(payload interface{}) interface{} {
atomic.AddInt64(&p.queuedJobs, 1)
request, open := <-p.reqChan
if !open {
panic(ErrPoolNotRunning)
}
request.jobChan <- payload
payload, open = <-request.retChan
if !open {
panic(ErrWorkerClosed)
}
atomic.AddInt64(&p.queuedJobs, -1)
return payload
}
可以发现,因为worker会无脑往reqChan管道里传入workRequest,因此pool一定会取到塞入的值交给request变量,payload是实际处理的数据,pool将其塞入workRequest的jobChan中,之后阻塞等待从retChan取得结果,由于这个jobChan与worker的jobChan是同一个指针,因此payload能在worker的
select {
case payload := <-jobChan:
result := w.worker.Process(payload)
select {
case retChan <- result:
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
...
case语句中被取到,然后进行处理,处理完后进入下一个select语句,无脑将result塞到retChan中;由于worker的retChan与pool的retChan是同一个指针,因此pool取到了retChan的结果,将其返回;
多个worker的情况,则会竞争从reqChan取数据,但是总能保证只有size个worker在工作,达到了限制协程数量的目的。
有疑问加站长微信联系(非本文作者)