golang源码阅读---tunny协程池的基本实现原理

mataye · · 1565 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

前段时间因为一个爬虫项目,最开始做的时候是无脑的一个下载任务就开一个协程,但是后期出现了比较大的内存问题,并且下载的效果也不是那么的好,后面发现是因为协程开的太多了,并且下行的带宽就只有那么的大,所以并不能和想象中的那样。哎,还是因为too young,too simple,sometimes naive.

这篇主要是讲解的tunny是如何是如何实现并保持一个goroutine pool的。

因为本人是小菜,加上时间仓促,所以要是有什么问题的话希望大佬指正。

1.简介

tunny地址:https://github.com/Jeffail/tunny
这是一个goroutine pool包,可以设置或者动态改变goroutine pool中goroutine的数量,生成一个固定的数量的pool,实现goroutine的重复使用,并且能在一定程度上控制goroutine

2.源码

1.基本的数据类型

通过tunny的源码包文件数量并不多,只有3个文件,tonny.go和worker.go,没有那么多的文件层次结构,所以阅读起来特别的方便。这也是我比较喜欢阅读go语言代码的原因。

tunny.go中

Pool结构
主要是用于对整个pool的管理,其中包括pool

type Pool struct {
    ctor    func() Worker //goroutine中用户的业务逻辑函数
    workers []*workerWrapper //目前已经存在的goroutine信息,workerWrapper结构定义在worker.go的中,
    reqChan chan workRequest //任务调度管道,主要是用户管理当前goroutine是否执行任务,它和workerWrapper中的reqChan 其实是一个,但是workerWrapper的reqChan只是一个发送管道,这个后面会继续讲解

    workerMut  sync.Mutex //锁
    queuedJobs int64 计数,表示当前已经在运行的任务
}

worker接口主要用户包装用户的业务逻辑的func

type Worker interface {
    // Process will synchronously perform a job and return the result.
    //
    Process(interface{}) interface{}

    // BlockUntilReady is called before each job is processed and must block the
    // calling goroutine until the Worker is ready to process the next job.
    BlockUntilReady()

    // Interrupt is called when a job is cancelled. The worker is responsible
    // for unblocking the Process implementation.
    Interrupt()

    // Terminate is called when a Worker is removed from the processing pool
    // and is responsible for cleaning up any held resources.
    Terminate()
}

closureWorker 顾明思议,主要是用于包装用户的业务逻辑,
并且是Worker的完全接收者

type closureWorker struct {
    processor func(interface{}) interface{}
}

在worker.go中

type workerWrapper struct {
    worker        Worker  //用户存放用户定义的业务逻辑函数
    interruptChan chan struct{} //用于外部干预,使当前goroutine提前终止

    // reqChan is NOT owned by this type, it is used to send requests for work.
    reqChan chan<- workRequest //这个和pool.go中Pool类型中的reqChan是一个,只不过当前这个是一个发送管道

    // closeChan can be closed in order to cleanly shutdown this worker.
    closeChan chan struct{}  //这个是用于传递关闭当前goroutine的消息

    // closedChan is closed by the run() goroutine when it exits.
    closedChan chan struct{} //这个我感觉并没有太大的实际意义
}

这个主要是用于传递任务参数。以及返回任务执行结果的类型

type workRequest struct {
    // jobChan is used to send the payload to this worker.
    jobChan chan<- interface{}

    // retChan is used to read the result from this worker.
    retChan <-chan interface{}

    // interruptFunc can be called to cancel a running job. When called it is no
    // longer necessary to read from retChan.
    interruptFunc func()
}

2.如何创建一个goroutine pool

根据代码的调用步骤,
首先是实例化一个Pool类型的数据,并将用户用户的业务func包装成closureWorker类型并存储在Pool类型实例中的ctor字段中

使用外部调用创建一个Pool对象:

clipboard.png

包中创建一个Pool的逻辑

clipboard.png

clipboard.png

逻辑很简单,一眼就能看明白。
那么在哪里启动一个goroutine,请看下面

clipboard.png

注意这里的参数传递,这里传递了一个channel类型的参数,众所周知,在go中,分为两种类型,一种是值类型,一种是引用类型(map,slice,channel),说这么多有什么用呢,怎么扯到引用类型上面去了呢,但这个很重要

我们接下我们看在newWorkerWrapper中的逻辑

clipboard.png

上面说到,我们传递过去了两个参数,其中一个是一个channel类型的,因为channel引用类型,所以他的传递是地址,所以在最后newWorkerWrapper中赋值的时候workerWrapper.reqChan和pool.reqChan实际指向的是同一个地址,区别就是workerWrapper.reqChan是一个发送管道罢了
我们可以输出看看

clipboard.png

下面是run函数中的代码

clipboard.png

run函数中的代码算是是整个包中最重要的代码了。

他的实现原理是比较简单的,就是采用的是一个for+select+channel来实现的,并且select采用是嵌套的形式,但是其中还是有些比较难以理解的(当然对我小白我来说哈,2333333)

我感觉主要是这两段
clipboard.png

这两段的代码,需要结合到下一个小姐来说,请看下一个。

2.调用goroutine pool

这里调用很简单,只需要ret := pool.Process(参数)就ok了

我们来看看Process中是怎么样的

clipboard.png

Process中逻辑很简单,上一个小姐我们知道,pool的reqChan 和 pool.workers.reqChan 是指向的同一个地址,但是后者为一个发送管道所以,在这样来使用时安全的,数据是不会错误的 。

在前面我的run函数中,有两段代码还没说明意思,现在我就说明一下,第一个就是这段,

clipboard.png

(1)在我们定义reqChan管道的时候,我们定义的是一个没有缓冲区的管道,所以在没有接受操作的情况下,我们向管道里面推送数据是会被阻塞住的。
(2)在go中select是在有IO操作的情况下会被触发,所以要是我们没有在Process函数中调用reqChan接收数据,当前goroutine是会被阻塞住的这样当前select内层的select也会被阻塞住。

clipboard.png

然后我们在来看通过reqChan传递过来的值

clipboard.png

上面讲到,channel是引用类型,所以它在传递的时候是传递的地址,而不是值,所以,我们接收到的jobChan和retChan和传递过来指向的是同样的地址,这样我们就能实现共享通信了。我们可以输出里面两边的地址看看,这里我开了一个容量为2的pool,然后我调用pool里面的其中一个goroutine,我们看打印的地址

clipboard.png

看。。。。没错吧。。。。。

3.Extra

有一个问题,就是当我们的pool有2个goroutinr的时候,但是我们有200个任务需要完成,也就是需要调用200测goroutine,Tunny是怎么样实现调度的呢,这个后面的文章补充吧,下班。。。。。。。。

clipboard.png
看,就是这样的嘛。


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:mataye

查看原文:golang源码阅读---tunny协程池的基本实现原理

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1565 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传