go中控制goroutine数量

小中01 · · 393 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

通过channel+sync

var ( // channel长度

poolCount      = 5

// 复用的goroutine数量

goroutineCount = 10)func pool() {

jobsChan := make(chan int, poolCount) // workers

var wg sync.WaitGroup for i := 0; i < goroutineCount; i++ {

wg.Add(1) go func() { defer wg.Done() for item := range jobsChan { // ...

fmt.Println(item)

}

}()

} // senders

for i := 0; i < 1000; i++ {

jobsChan <- i

} // 关闭channel,上游的goroutine在读完channel的内容,就会通过wg的done退出

close(jobsChan)

wg.Wait()

}

通过WaitGroup启动指定数量的goroutine,监听channel的通知。发送者推送信息到channel,信息处理完了,关闭channel,等待goroutine依次退出。


使用semaphore

package mainimport ( "context"

"fmt"

"sync"

"time"


"golang.org/x/sync/semaphore")const ( // 同时运行的goroutine上限

Limit = 3

// 信号量的权重

Weight = 1)func main() {

names := []string{ "小白", "小红", "小明", "小李", "小花",

}


sem := semaphore.NewWeighted(Limit) var w sync.WaitGroup for _, name := range names {

w.Add(1) go func(name string) {

sem.Acquire(context.Background(), Weight) // ... 具体的业务逻辑

fmt.Println(name, "-吃饭了")

time.Sleep(2 * time.Second)

sem.Release(Weight)

w.Done()

}(name)

}

w.Wait()


fmt.Println("ending--------")

}

借助于x包中的semaphore,也可以进行goroutine的数量限制。


线程池

不过原本go中的协程已经是非常轻量了,对于协程池还是要根据具体的场景分析。


对于小场景使用channel+sync就可以,其他复杂的可以考虑使用第三方的协程池库。


panjf2000/ants


go-playground/pool


Jeffail/tunny


几个开源的线程池的设计

fasthttp中的协程池实现

fasthttp比net/http效率高很多倍的重要原因,就是利用了协程池。来看下大佬的设计思路。


1、按需增长goroutine数量,有一个最大值,同时监听channel,Server会把accept到的connection放入到channel中,这样监听的goroutine就能处理消费。


2、本地维护了一个待使用的channel列表,当本地channel列表拿不到ch,会在sync.pool中取。


3、如果workersCount没达到上限,则从生成一个workerFunc监听workerChan。


4、对于待使用的channel列表,会定期清理掉超过最大空闲时间的workerChan。


看下具体实现


// workerPool通过一组工作池服务传入的连接// 按照FILO(先进后出)的顺序,即最近停止的工作人员将为下一个工作传入的连接。//// 这种方案能够保持cpu的缓存保持高效(理论上)type workerPool struct { // 这个函数用于server的连接

// It must leave c unclosed.

WorkerFunc ServeHandler // 最大的Workers数量

MaxWorkersCount int


LogAllErrors bool


MaxIdleWorkerDuration time.Duration


Logger Logger


lock         sync.Mutex // 当前worker的数量

workersCount int

// worker停止的标识

mustStop     bool


// 等待使用的workerChan

// 可能会被清理

ready []*workerChan // 用来标识start和stop

stopCh chan struct{} // workerChan的缓存池,通过sync.Pool实现

workerChanPool sync.Pool


connState func(net.Conn, ConnState)}// workerChan的结构type workerChan struct {

lastUseTime time.Time

ch          chan net.Conn

}

Start

func (wp *workerPool) Start() { // 判断是否已经Start过了

if wp.stopCh != nil { panic("BUG: workerPool already started")

} // stopCh塞入值

wp.stopCh = make(chan struct{})

stopCh := wp.stopCh

wp.workerChanPool.New = func() interface{} { // 如果单核cpu则让workerChan阻塞

// 否则,使用非阻塞,workerChan的长度为1

return &workerChan{

ch: make(chan net.Conn, workerChanCap),

}

} go func() { var scratch []*workerChan for {

wp.clean(&scratch) select { // 接收到退出信号,退出

case <-stopCh: return

default:

time.Sleep(wp.getMaxIdleWorkerDuration())

}

}

}()

}// 如果单核cpu则让workerChan阻塞// 否则,使用非阻塞,workerChan的长度为1var workerChanCap = func() int { // 如果GOMAXPROCS=1,workerChan的长度为0,变成一个阻塞的channel

if runtime.GOMAXPROCS(0) == 1 { return 0

} // 如果GOMAXPROCS>1则使用非阻塞的workerChan

return 1}()

梳理下流程:


1、首先判断下stopCh是否为nil,不为nil表示已经started了;


2、初始化wp.stopCh = make(chan struct{}),stopCh是一个标识,用了struct{}不用bool,因为空结构体变量的内存占用大小为0,而bool类型内存占用大小为1,这样可以更加最大化利用我们服务器的内存空间;


3、设置workerChanPool的New函数,然后可以在Get不到东西时,自动创建一个;如果单核cpu则让workerChan阻塞,否则,使用非阻塞,workerChan的长度设置为1;


4、启动一个goroutine,处理clean操作,在接收到退出信号,退出。


Stop

func (wp *workerPool) Stop() { // 同start,stop也只能触发一次

if wp.stopCh == nil { panic("BUG: workerPool wasn't started")

} // 关闭stopCh

close(wp.stopCh) // 将stopCh置为nil

wp.stopCh = nil


// 停止所有的等待获取连接的workers

// 正在运行的workers,不需要等待他们退出,他们会在完成connection或mustStop被设置成true退出

wp.lock.Lock()

ready := wp.ready // 循环将ready的workerChan置为nil

for i := range ready {

ready[i].ch <- nil

ready[i] = nil

}

wp.ready = ready[:0] // 设置mustStop为true

wp.mustStop = true

wp.lock.Unlock()

}

梳理下流程:


1、判断stop只能被关闭一次;


2、关闭stopCh,设置stopCh为nil;


3、停止所有的等待获取连接的workers,正在运行的workers,不需要等待他们退出,他们会在完成connection或mustStop被设置成true退出。


clean

func (wp *workerPool) clean(scratch *[]*workerChan) {

maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration() // 清理掉最近最少使用的workers如果他们过了maxIdleWorkerDuration时间没有提供服务

criticalTime := time.Now().Add(-maxIdleWorkerDuration)


wp.lock.Lock()

ready := wp.ready

n := len(ready) // 使用二分搜索算法找出最近可以被清除的worker

// 最后使用的workerChan 一定是放回队列尾部的。

l, r, mid := 0, n-1, 0

for l <= r {

mid = (l + r) / 2

if criticalTime.After(wp.ready[mid].lastUseTime) {

l = mid + 1

} else {

r = mid - 1

}

}

i := r if i == -1 {

wp.lock.Unlock() return

} // 将ready中i之前的的全部清除

*scratch = append((*scratch)[:0], ready[:i+1]...)

m := copy(ready, ready[i+1:]) for i = m; i < n; i++ {

ready[i] = nil

}

wp.ready = ready[:m]

wp.lock.Unlock() // 通知淘汰的workers停止

// 此通知必须位于wp.lock之外,因为ch.ch

// 如果有很多workers,可能会阻塞并且可能会花费大量时间

// 位于非本地CPU上。

tmp := *scratch for i := range tmp {

tmp[i].ch <- nil

tmp[i] = nil

}

}

主要是清理掉最近最少使用的workers如果他们过了maxIdleWorkerDuration时间没有提供服务


getCh

获取一个workerChan


func (wp *workerPool) getCh() *workerChan { var ch *workerChan

createWorker := false


wp.lock.Lock()

ready := wp.ready

n := len(ready) - 1

// 如果ready为空

if n < 0 { if wp.workersCount < wp.MaxWorkersCount {

createWorker = true

wp.workersCount++

}

} else { // 不为空从ready中取一个

ch = ready[n]

ready[n] = nil

wp.ready = ready[:n]

}

wp.lock.Unlock() // 如果没拿到ch

if ch == nil { if !createWorker { return nil

} // 从缓存中获取一个ch

vch := wp.workerChanPool.Get()

ch = vch.(*workerChan) go func() { // 具体的执行函数

wp.workerFunc(ch) // 再放入到pool中

wp.workerChanPool.Put(vch)

}()

} return ch

}

梳理下流程:


1、获取一个可执行的workerChan,如果ready中为空,并且workersCount没有达到最大值,增加workersCount数量,并且设置当前操作createWorker = true;


2、ready中不为空,直接在ready获取一个;


3、如果没有获取到则在sync.pool中获取一个,之后再放回到pool中;


4、拿到了就启动一个workerFunc监听workerChan,处理具体的业务逻辑。


workerFunc

func (wp *workerPool) workerFunc(ch *workerChan) { var c net.Conn var err error // 监听workerChan

for c = range ch.ch { if c == nil { break

} // 具体的业务逻辑

...

c = nil


// 释放workerChan

// 在mustStop的时候将会跳出循环

if !wp.release(ch) { break

}

}


wp.lock.Lock()

wp.workersCount--

wp.lock.Unlock()

}// 把Conn放入到channel中func (wp *workerPool) Serve(c net.Conn) bool {

ch := wp.getCh() if ch == nil { return false

}

ch.ch <- c return true}func (wp *workerPool) release(ch *workerChan) bool { // 修改 ch.lastUseTime

ch.lastUseTime = time.Now()

wp.lock.Lock() // 如果需要停止,直接返回

if wp.mustStop {

wp.lock.Unlock() return false

} // 将ch放到ready中

wp.ready = append(wp.ready, ch)

wp.lock.Unlock() return true}

梳理下流程:


1、workerFunc会监听workerChan,并且在使用完workerChan归还到ready中;


2、Serve会把connection放入到workerChan中,这样workerFunc就能通过workerChan拿到需要处理的连接请求;


3、当workerFunc拿到的workerChan为nil或wp.mustStop被设为了true,就跳出for循环。


panjf2000/ants

先看下示例


示例一


package mainimport ( "fmt"

"sync"

"sync/atomic"

"time"


"github.com/panjf2000/ants")func demoFunc() {

time.Sleep(10 * time.Millisecond)

fmt.Println("Hello World!")

}func main() { defer ants.Release()


runTimes := 1000


var wg sync.WaitGroup

syncCalculateSum := func() {

demoFunc()

wg.Done()

} for i := 0; i < runTimes; i++ {

wg.Add(1)

_ = ants.Submit(syncCalculateSum)

}

wg.Wait()

fmt.Printf("running goroutines: %d\n", ants.Running())

fmt.Printf("finish all tasks.\n")

}

示例二


package mainimport ( "fmt"

"sync"

"sync/atomic"

"time"


"github.com/panjf2000/ants")var sum int32func myFunc(i interface{}) {

n := i.(int32)

atomic.AddInt32(&sum, n)

fmt.Printf("run with %d\n", n)

}func main() { var wg sync.WaitGroup

runTimes := 1000


// Use the pool with a method,

// set 10 to the capacity of goroutine pool and 1 second for expired duration.

p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {

myFunc(i)

wg.Done()

}) defer p.Release() // Submit tasks one by one.

for i := 0; i < runTimes; i++ {

wg.Add(1)

_ = p.Invoke(int32(i))

}

wg.Wait()

fmt.Printf("running goroutines: %d\n", p.Running())

fmt.Printf("finish all tasks, result is %d\n", sum) if sum != 499500 { panic("the final result is wrong!!!")

}

}

设计思路

整体的设计思路


梳理下思路:


1、先初始化缓存池的大小,然后处理任务事件的时候,一个task分配一个goWorker;


2、在拿goWorker的过程中会存在下面集中情况;


本地的缓存中有空闲的goWorker,直接取出;


本地缓存没有就去sync.Pool,拿一个goWorker;


3、如果缓存池满了,非阻塞模式直接返回nil,阻塞模式就循环去拿直到成功拿出一个;


4、同时也会定期清理掉过期的goWorker,通过sync.Cond唤醒其的阻塞等待;


5、对于使用完成的goWorker在使用完成之后重新归还到pool。


具体的设计细节可参考,作者的文章Goroutine 并发调度模型深度解析之手撸一个高性能 goroutine 池


go-playground/pool

go-playground/pool会在一开始就启动


先放几个使用的demo


Per Unit Work


package mainimport ( "fmt"

"time"


"gopkg.in/go-playground/pool.v3")func main() {


p := pool.NewLimited(10) defer p.Close()


user := p.Queue(getUser(13))

other := p.Queue(getOtherInfo(13))


user.Wait() if err := user.Error(); err != nil { // handle error

} // do stuff with user

username := user.Value().(string)

fmt.Println(username)


other.Wait() if err := other.Error(); err != nil { // handle error

} // do stuff with other

otherInfo := other.Value().(string)

fmt.Println(otherInfo)

}func getUser(id int) pool.WorkFunc { return func(wu pool.WorkUnit) (interface{}, error) { // simulate waiting for something, like TCP connection to be established

// or connection from pool grabbed

time.Sleep(time.Second * 1) if wu.IsCancelled() { // return values not used

return nil, nil

} // ready for processing...


return "Joeybloggs", nil

}

}func getOtherInfo(id int) pool.WorkFunc { return func(wu pool.WorkUnit) (interface{}, error) { // simulate waiting for something, like TCP connection to be established

// or connection from pool grabbed

time.Sleep(time.Second * 1) if wu.IsCancelled() { // return values not used

return nil, nil

} // ready for processing...


return "Other Info", nil

}

}

Batch Work


package mainimport ( "fmt"

"time"


"gopkg.in/go-playground/pool.v3")func main() {


p := pool.NewLimited(10) defer p.Close()


batch := p.Batch() // for max speed Queue in another goroutine

// but it is not required, just can't start reading results

// until all items are Queued.


go func() { for i := 0; i < 10; i++ {

batch.Queue(sendEmail("email content"))

} // DO NOT FORGET THIS OR GOROUTINES WILL DEADLOCK

// if calling Cancel() it calles QueueComplete() internally

batch.QueueComplete()

}() for email := range batch.Results() { if err := email.Error(); err != nil { // handle error

// maybe call batch.Cancel()

} // use return value

fmt.Println(email.Value().(bool))

}

}func sendEmail(email string) pool.WorkFunc { return func(wu pool.WorkUnit) (interface{}, error) { // simulate waiting for something, like TCP connection to be established

// or connection from pool grabbed

time.Sleep(time.Second * 1) if wu.IsCancelled() { // return values not used

return nil, nil

} // ready for processing...


return true, nil // everything ok, send nil, error if not

}

}

来看下实现


workUnit

workUnit作为channel信息进行传递,用来给work传递当前需要执行的任务信息。


// WorkUnit contains a single uint of works valuestype WorkUnit interface { // 阻塞直到当前任务被完成或被取消

Wait() // 执行函数返回的结果

Value() interface{} // Error returns the Work Unit's error

Error() error // 取消当前的可执行任务

Cancel() // 判断当前的可执行单元是否被取消了

IsCancelled() bool}var _ WorkUnit = new(workUnit)// workUnit contains a single unit of works valuestype workUnit struct { // 任务执行的结果

value      interface{} // 错误信息

err        error // 通知任务完成

done       chan struct{} // 需要执行的任务函数

fn         WorkFunc // 任务是会否被取消

cancelled  atomic.Value // 是否正在取消任务

cancelling atomic.Value // 任务是否正在执行

writing    atomic.Value

}

limitedPool

var _ Pool = new(limitedPool)// limitedPool contains all information for a limited pool instance.type limitedPool struct { // 并发量

workers uint

// work的channel

work    chan *workUnit // 通知结束的channel

cancel  chan struct{} // 是否关闭的标识

closed  bool

// 读写锁

m       sync.RWMutex

}// 初始化一个poolfunc NewLimited(workers uint) Pool { if workers == 0 { panic("invalid workers '0'")

} // 初始化pool的work数量

p := &limitedPool{

workers: workers,

} // 初始化pool的操作

p.initialize() return p

}func (p *limitedPool) initialize() { // channel的长度为work数量的两倍

p.work = make(chan *workUnit, p.workers*2)

p.cancel = make(chan struct{})

p.closed = false


// fire up workers here

for i := 0; i < int(p.workers); i++ {

p.newWorker(p.work, p.cancel)

}

}// 将工作传递并取消频道到newWorker()以避免任何潜在的竞争状况// 在p.work读写之间func (p *limitedPool) newWorker(work chan *workUnit, cancel chan struct{}) { go func(p *limitedPool) { var wu *workUnit defer func(p *limitedPool) { // 捕获异常,结束掉异常的工作单元,并将其再次作为新的任务启动

if err := recover(); err != nil {


trace := make([]byte, 1<<16)

n := runtime.Stack(trace, true)


s := fmt.Sprintf(errRecovery, err, string(trace[:int(math.Min(float64(n), float64(7000)))]))


iwu := wu

iwu.err = &ErrRecovery{s: s} close(iwu.done) // 重新启动

p.newWorker(p.work, p.cancel)

}

}(p) var value interface{} var err error // 监听channel,读取内容

for { select { // channel中取出数据

case wu = <-work: // 防止channel 被关闭后读取到零值

if wu == nil { continue

} // 单个和批量的cancellation这个都支持

if wu.cancelled.Load() == nil { // 执行我们的业务函数

value, err = wu.fn(wu)


wu.writing.Store(struct{}{}) // 如果WorkFunc取消了此工作单元,则需要再次检查

// 防止产生竞争条件

if wu.cancelled.Load() == nil && wu.cancelling.Load() == nil {

wu.value, wu.err = value, err // 执行完成,关闭当前channel

close(wu.done)

}

} // 如果取消了,就退出

case <-cancel: return

}

}


}(p)

}// 放置一个执行的task到channel,并返回channelfunc (p *limitedPool) Queue(fn WorkFunc) WorkUnit { // 初始化一个workUnit类型的channel

w := &workUnit{

done: make(chan struct{}), // 具体的执行函数

fn:   fn,

} go func() {

p.m.RLock() // 如果pool关闭的时候通知channel关闭

if p.closed {

w.err = &ErrPoolClosed{s: errClosed} if w.cancelled.Load() == nil { close(w.done)

}

p.m.RUnlock() return

} // 将channel传递给pool的work

p.work <- w


p.m.RUnlock()

}() return w

}

梳理下流程:


1、首先初始化pool的大小;


2、然后根据pool的大小启动对应数量的worker,阻塞等待channel被塞入可执行函数;


3、然后可执行函数会被放入workUnit,然后通过channel传递给阻塞的worker。


同样这里也提供了批量执行的方法


batch

// batch contains all information for a batch run of WorkUnitstype batch struct {

pool    Pool

m       sync.Mutex // WorkUnit的切片

units   []WorkUnit // 结果集,执行完后的workUnit会更新其value,error,可以从结果集channel中读取

results chan WorkUnit // 通知batch是否完成

done    chan struct{}

closed  bool

wg      *sync.WaitGroup

}// 初始化Batchfunc newBatch(p Pool) Batch { return &batch{

pool:    p,

units:   make([]WorkUnit, 0, 4),

results: make(chan WorkUnit),

done:    make(chan struct{}),

wg:      new(sync.WaitGroup),

}

}// 将WorkFunc放入到WorkUnit中并保留取消和输出结果的参考。func (b *batch) Queue(fn WorkFunc) {


b.m.Lock() if b.closed {

b.m.Unlock() return

} // 返回一个WorkUnit

wu := b.pool.Queue(fn) // 放到WorkUnit的切片中

b.units = append(b.units, wu) // 通过waitgroup进行goroutine的执行控制

b.wg.Add(1)

b.m.Unlock() // 执行任务

go func(b *batch, wu WorkUnit) {

wu.Wait() // 将执行的结果写入到results中

b.results <- wu

b.wg.Done()

}(b, wu)

}// QueueComplete让批处理知道不再有排队的工作单元// 以便在所有工作完成后可以关闭结果渠道。// 警告:如果未调用此函数,则结果通道将永远不会耗尽,// 但会永远阻止以获取更多结果。func (b *batch) QueueComplete() {

b.m.Lock()

b.closed = true

close(b.done)

b.m.Unlock()

}// 取消批次的任务func (b *batch) Cancel() {


b.QueueComplete()


b.m.Lock() // 一个个取消units,倒叙的取消

for i := len(b.units) - 1; i >= 0; i-- {

b.units[i].Cancel()

}


b.m.Unlock()

}// 输出执行完成的结果集func (b *batch) Results() <-chan WorkUnit { // 启动一个协程监听完成的通知

// waitgroup阻塞直到所有的worker都完成退出

// 最后关闭channel

go func(b *batch) {

<-b.done

b.m.Lock() // 阻塞直到上面waitgroup中的goroutine一个个执行完成退出

b.wg.Wait()

b.m.Unlock() // 关闭channel

close(b.results)

}(b) return b.results

}

USB Microphone  https://www.soft-voice.com/

Wooden Speakers  https://www.zeshuiplatform.com/

亚马逊测评 www.yisuping.cn

深圳网站建设www.sz886.com



有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:小中01

查看原文:go中控制goroutine数量

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

393 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传