Go协程池,通俗易懂的解释,还请大佬们多多指正

sunshinev123 · · 1049 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

协程池主要是为了减少 go 协程频繁创建、销毁带来的性能损耗,虽然可以忽略不计,但是网上说特殊情况还是有用的。 另外一个就是限制协程的数量,避免无限制的协程创建 那这个协程池通俗易懂来讲,比如老板给员工分配任务: 老板领了一堆任务,得找工人干活呀, 那领导就拿出一个任务,给一个空闲的员工 A,再把下一个任务,给另外一个空闲的员工 B 。 问题在于: 1. 不能招无限个员工 2. 员工是可以重复处理多个任务的 这时候 A 或者 B,指不定谁先忙完了 如果有人忙完了,领导就把下一个任务,给先忙完的人。A/B 就是协程池里面的两个协程 下面这段代码,完成了如下功能 - 协程池数量上限控制 - 协程空闲清理,释放内存 - 协程复用 使用协程池如下 ``` // 启动api服务 SafeGo(func() { //_ = http.ListenAndServe(":8081", nil) }) ``` 协程池代码 ``` package core import ( "context" "log" "sync" "time" ) type Task func() // boss 老板 type GoPool struct { MaxWorkerIdleTime time.Duration // worker 最大空闲时间 MaxWorkerNum int32 // 协程最大数量 TaskEntryChan chan *Task // 任务入列 Workers []*worker // 已创建worker FreeWorkerChan chan *worker // 空闲worker Lock sync.Mutex } const ( WorkerStatusStop = 1 WorkerStatusLive = 0 ) // 干活的人 type worker struct { Pool *GoPool StartTime time.Time // 开始时间 TaskChan chan *Task // 执行队列 LastWorkTime time.Time // 最后执行时间 Ctx context.Context Cancel context.CancelFunc Status int32 // 被过期删掉的标记 } var defaultPool = func() *GoPool { return NewPool() }() // 初始化 func NewPool() *GoPool { g := &GoPool{ MaxWorkerIdleTime: 10 * time.Second, MaxWorkerNum: 20, TaskEntryChan: make(chan *Task, 2000), FreeWorkerChan: make(chan *worker, 2000), } // 分发任务 go g.dispatchTask() //清理空闲worker go g.fireWorker() return g } // 定期清理空闲worker func (g *GoPool) fireWorker() { for { select { // 10秒执行一次 case <-time.After(10 * time.Second): for k, w := range g.Workers { if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime { log.Printf("overtime %v %p", k, w) // 终止协程 w.Cancel() // 清理Free w.Status = WorkerStatusStop } } g.Lock.Lock() g.Workers = g.cleanWorker(g.Workers) g.Lock.Unlock() } } } // 递归清理无用worker func (g *GoPool) cleanWorker(workers []*worker) []*worker { for k, w := range workers { if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime { workers = append(workers[:k], workers[k+1:]...) // 删除中间1个元素 return g.cleanWorker(workers) } } return workers } // 分发任务 func (g *GoPool) dispatchTask() { for { select { case t := <-g.TaskEntryChan: log.Printf("dispatch task %p", t) // 获取worker w := g.fetchWorker() // 将任务扔给worker w.accept(t) } } } // 获取可用worker func (g *GoPool) fetchWorker() *worker { for { select { // 获取空闲worker case w := <-g.FreeWorkerChan: if w.Status == WorkerStatusLive { return w } default: // 创建新的worker if int32(len(g.Workers)) < g.MaxWorkerNum { w := &worker{ Pool: g, StartTime: time.Now(), LastWorkTime: time.Now(), TaskChan: make(chan *Task, 1), Ctx: context.Background(), Status: WorkerStatusLive, } ctx, cancel := context.WithCancel(w.Ctx) w.Cancel = cancel // 接到任务自己去执行吧 go w.execute(ctx) g.Lock.Lock() g.Workers = append(g.Workers, w) g.Lock.Unlock() g.FreeWorkerChan <- w log.Printf("worker create %p", w) } } } } // 添加任务 func (g *GoPool) addTask(t Task) { // 将任务放到入口任务队列 g.TaskEntryChan <- &t } // 接受任务 func (w *worker) accept(t *Task) { // 每个worker自己的工作队列 w.TaskChan <- t } // 执行任务 func (w *worker) execute(ctx context.Context) { for { select { case t := <-w.TaskChan: // 执行 (*t)() // 记录工作状态 w.LastWorkTime = time.Now() w.Pool.FreeWorkerChan <- w case <-ctx.Done(): log.Printf("worker done %p", w) return } } } // 执行 func SafeGo(t Task) { defaultPool.addTask(t) } ```

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1049 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传