前段时间因为一个爬虫项目,最开始做的时候是无脑的一个下载任务就开一个协程,但是后期出现了比较大的内存问题,并且下载的效果也不是那么的好,后面发现是因为协程开的太多了,并且下行的带宽就只有那么的大,所以并不能和想象中的那样。哎,还是因为too young,too simple,sometimes naive.
这篇主要是讲解的tunny是如何是如何实现并保持一个goroutine pool的。
因为本人是小菜,加上时间仓促,所以要是有什么问题的话希望大佬指正。
1.简介
tunny地址:https://github.com/Jeffail/tunny
这是一个goroutine pool包,可以设置或者动态改变goroutine pool中goroutine的数量,生成一个固定的数量的pool,实现goroutine的重复使用,并且能在一定程度上控制goroutine
2.源码
1.基本的数据类型
通过tunny的源码包文件数量并不多,只有3个文件,tonny.go和worker.go,没有那么多的文件层次结构,所以阅读起来特别的方便。这也是我比较喜欢阅读go语言代码的原因。
tunny.go中
Pool结构
主要是用于对整个pool的管理,其中包括pool
type Pool struct {
ctor func() Worker //goroutine中用户的业务逻辑函数
workers []*workerWrapper //目前已经存在的goroutine信息,workerWrapper结构定义在worker.go的中,
reqChan chan workRequest //任务调度管道,主要是用户管理当前goroutine是否执行任务,它和workerWrapper中的reqChan 其实是一个,但是workerWrapper的reqChan只是一个发送管道,这个后面会继续讲解
workerMut sync.Mutex //锁
queuedJobs int64 计数,表示当前已经在运行的任务
}
worker接口主要用户包装用户的业务逻辑的func
type Worker interface {
// Process will synchronously perform a job and return the result.
//
Process(interface{}) interface{}
// BlockUntilReady is called before each job is processed and must block the
// calling goroutine until the Worker is ready to process the next job.
BlockUntilReady()
// Interrupt is called when a job is cancelled. The worker is responsible
// for unblocking the Process implementation.
Interrupt()
// Terminate is called when a Worker is removed from the processing pool
// and is responsible for cleaning up any held resources.
Terminate()
}
closureWorker 顾明思议,主要是用于包装用户的业务逻辑,
并且是Worker的完全接收者
type closureWorker struct {
processor func(interface{}) interface{}
}
在worker.go中
type workerWrapper struct {
worker Worker //用户存放用户定义的业务逻辑函数
interruptChan chan struct{} //用于外部干预,使当前goroutine提前终止
// reqChan is NOT owned by this type, it is used to send requests for work.
reqChan chan<- workRequest //这个和pool.go中Pool类型中的reqChan是一个,只不过当前这个是一个发送管道
// closeChan can be closed in order to cleanly shutdown this worker.
closeChan chan struct{} //这个是用于传递关闭当前goroutine的消息
// closedChan is closed by the run() goroutine when it exits.
closedChan chan struct{} //这个我感觉并没有太大的实际意义
}
这个主要是用于传递任务参数。以及返回任务执行结果的类型
type workRequest struct {
// jobChan is used to send the payload to this worker.
jobChan chan<- interface{}
// retChan is used to read the result from this worker.
retChan <-chan interface{}
// interruptFunc can be called to cancel a running job. When called it is no
// longer necessary to read from retChan.
interruptFunc func()
}
2.如何创建一个goroutine pool
根据代码的调用步骤,
首先是实例化一个Pool类型的数据,并将用户用户的业务func包装成closureWorker类型并存储在Pool类型实例中的ctor字段中
使用外部调用创建一个Pool对象:
包中创建一个Pool的逻辑
逻辑很简单,一眼就能看明白。
那么在哪里启动一个goroutine,请看下面
注意这里的参数传递,这里传递了一个channel类型的参数,众所周知,在go中,分为两种类型,一种是值类型,一种是引用类型(map,slice,channel),说这么多有什么用呢,怎么扯到引用类型上面去了呢,但这个很重要
我们接下我们看在newWorkerWrapper中的逻辑
上面说到,我们传递过去了两个参数,其中一个是一个channel类型的,因为channel引用类型,所以他的传递是地址,所以在最后newWorkerWrapper中赋值的时候workerWrapper.reqChan和pool.reqChan实际指向的是同一个地址,区别就是workerWrapper.reqChan是一个发送管道罢了
我们可以输出看看
下面是run函数中的代码
run函数中的代码算是是整个包中最重要的代码了。
他的实现原理是比较简单的,就是采用的是一个for+select+channel来实现的,并且select采用是嵌套的形式,但是其中还是有些比较难以理解的(当然对我小白我来说哈,2333333)
我感觉主要是这两段
这两段的代码,需要结合到下一个小姐来说,请看下一个。
2.调用goroutine pool
这里调用很简单,只需要ret := pool.Process(参数)就ok了
我们来看看Process中是怎么样的
Process中逻辑很简单,上一个小姐我们知道,pool的reqChan 和 pool.workers.reqChan 是指向的同一个地址,但是后者为一个发送管道所以,在这样来使用时安全的,数据是不会错误的 。
在前面我的run函数中,有两段代码还没说明意思,现在我就说明一下,第一个就是这段,
(1)在我们定义reqChan管道的时候,我们定义的是一个没有缓冲区的管道,所以在没有接受操作的情况下,我们向管道里面推送数据是会被阻塞住的。
(2)在go中select是在有IO操作的情况下会被触发,所以要是我们没有在Process函数中调用reqChan接收数据,当前goroutine是会被阻塞住的这样当前select内层的select也会被阻塞住。
然后我们在来看通过reqChan传递过来的值
上面讲到,channel是引用类型,所以它在传递的时候是传递的地址,而不是值,所以,我们接收到的jobChan和retChan和传递过来指向的是同样的地址,这样我们就能实现共享通信了。我们可以输出里面两边的地址看看,这里我开了一个容量为2的pool,然后我调用pool里面的其中一个goroutine,我们看打印的地址
看。。。。没错吧。。。。。
3.Extra
有一个问题,就是当我们的pool有2个goroutinr的时候,但是我们有200个任务需要完成,也就是需要调用200测goroutine,Tunny是怎么样实现调度的呢,这个后面的文章补充吧,下班。。。。。。。。
看,就是这样的嘛。
有疑问加站长微信联系(非本文作者)