go 实现一个简易的线程池(二)

嘉磊 · · 1338 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

针对(一)中所出现的问题,现在使用一个两级channel系统,一个用来存放任务队列,另一个用来控制任务队列上执行操作的"工人"数量([参考文章]:(http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/))。

  • 先将具体的业务操作抽取出来,里面定义一个具体的业务方法
// MyService 业务接口
type MyService struct {
}

// WriteInfo 写日志
func (s *MyService) WriteInfo() {
    time.Sleep(1 * time.Second)
    t := time.Now()
    logFile, err := os.OpenFile("syslog.txt", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766)
    defer logFile.Close()
    if err != nil {
        panic(err)
    }
    infoLog := log.New(logFile, "[INFO]", log.LstdFlags)
    infoLog.Print("time=" + strconv.FormatInt(t.UTC().UnixNano(), 10))
}
  • 定义一个代表工作的结构体Job,和一个代表工人的结构体Worker
// Job 表示要执行的作业
type Job struct {
    MyService MyService
}

// Worker 执行作业的工人
type Worker struct {
    WorkerPool chan chan Job
    JobChannel chan Job
}
// NewWorker 新建一个工人
func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
    }
}
  • 给工人定义一个Start方法,表示监听自己的工作任务,有活儿来了就开始工作
func (w Worker) Start() {
    go func() {
        for {
            w.WorkerPool <- w.JobChannel
            select {
            case job := <-w.JobChannel:
                //有工作任务时,开始执行业务接口的方法
                job.MyService.WriteInfo()
            }
        }
    }()
}
  • 初始化池
var maxWorkers = 20

// JobQueue 作业队列
var JobQueue = make(chan Job, maxWorkers)

// InitPool 给池中初始化一定量的工人,以及开启任务队列的监听
func InitPool() {
    // 创建工作池
    pool := make(chan chan Job, maxWorkers)
    // 创建一定数量的工人(可以看做:创建了N个工人,每个工人能并发处理N件工作)
    for i := 0; i < maxWorkers; i++ {
        worker := NewWorker(pool)
        worker.Start()
    }
    //监听JobQueue上是否有新任务
    go func() {
        for {
            select {
            case job := <-JobQueue:
                go func(job Job) {
                    // 获取可用的工人channel,若没有,则阻塞
                    jobChannel := <-pool
                    jobChannel <- job
                }(job)
            }
        }
    }()
}
  • 提供一个http接口,用于测试
func init() {
    InitPool()
}

func main() {
    http.HandleFunc("/test/pool/", indexHandler)
    http.ListenAndServe(":9000", nil)
}

func indexHandler(w http.ResponseWriter, r *http.Request) {
    service := MyService{}
    //service.WriteInfo()
    work := Job{MyService: service}
    JobQueue <- work
    w.Header().Set("Content-Type", "application/json; charset=UTF-8")
    result := "{\"msg\":\"SUCCESS\",\"code\":0}"
    fmt.Fprintln(w, result)
}
  • 使用jmeter进行测试,和(一)中一样,100个样本,循环10次执行


    聚合报告.png
图形结果.png

可以看到处理请求的能力还是非常不错的。其实这种测试方法并不能和(一)的结果进行横向比对,毕竟(一)中只开启了20个协程,而在(二)中由于双队列的存在,处理请求的协程数量肯定是要比(一)中的多的多。
但(二)的模式肯定是要优于(一)的,这是毋容置疑的。我只是为了给自己做个笔记,加深对go中channel的理解,毕竟,好记性不如烂笔头么 (ง •̀_•́)ง
源码我上传到了github,地址:https://github.com/wleirock/studygo/tree/master/pool


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:嘉磊

查看原文:go 实现一个简易的线程池(二)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1338 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传