写了一个web接口,想高并发的请求这个接口,进行压力测试,所以服务端就实现了一个线程池。
代码从网上理解了之后写的。代码实例
简单的介绍:
首先实现一个Job接口,只要有方法实现了Do方法即可
定义个分发器结构体,主要是WorkPool线程池,用于存储Worker的JobChannel
init的时候,先初始化一个JobQueue队列,其他的函数调用这个线程池的时候,把任务放在这个队列即可。
然后Run的时候,创建多个Worker,起初的时候,woker会把自身的JobChannel先注册到线程池workerPool中,
然后worker.start就是for{select } 阻塞等待JobChannel中的job任务。
此时又启一个go d.Dispatcher() ,将JobQueue中的job任务放在worker的Jobchannle中。这样上面的for{select} 就可以拿到任务去执行。
注: maxWorkers 是内核CPU数量,本机4核,就是线程池可以放4个JobChannel,所以,在newWorker的时候,就创建了4个Worker来并发的处理job任务。
任务处理
package workPool import "fmt" type Worker struct { WorkerPool chan chan Job JobChannel chan Job Quit chan bool } func NewWorker(workpool chan chan Job) *Worker { return &Worker{WorkerPool: workpool,JobChannel: make(chan Job),Quit: make(chan bool)} } func (w *Worker) Start() { go func() { for{ w.WorkerPool <-w.JobChannel select { case job := <-w.JobChannel: if err := job.Do();err !=nil{ fmt.Println("exec some failed ....") } case <-w.Quit: return } } }() } func (w *Worker) Stop() { go func() { w.Quit <-true }() }
实现一个分发器
package workPool import "runtime" var( MaxWorkers = runtime.NumCPU() MaxQueue = 512 ) type Job interface { Do() error } var JobQueue chan Job type Dispatcher struct { MaxWorkers int WorkerPool chan chan Job Quit chan bool } func init() { runtime.GOMAXPROCS(MaxWorkers) JobQueue = make(chan Job,MaxQueue) dispatcher := NewDispatcher(MaxWorkers) dispatcher.Run() } func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job,maxWorkers) return &Dispatcher{MaxWorkers: maxWorkers,WorkerPool: pool,Quit: make(chan bool)} } func (d *Dispatcher) Run() { for i:=0;i<d.MaxWorkers;i++{ worker := NewWorker(d.WorkerPool) worker.Start() } go d.Dispatcher() } func (d *Dispatcher) Dispatcher() { for { select { case job := <-JobQueue: jobChannel := <-d.WorkerPool jobChannel <- job case <-d.Quit: return } } }
main函数中可以这样使用
package main import ( "context_http/workPool" "fmt" "net/http" ) type Msg struct { mobile string } func (m *Msg) Do() error { m.mobile = m.mobile+"_test" fmt.Println(m.mobile) return nil } func getMobile(w http.ResponseWriter,r *http.Request) { defer r.Body.Close() r.ParseForm() mobile := r.PostForm.Get("mobile") var work workPool.Job m := Msg{mobile: mobile} work = &m workPool.JobQueue <- work status := `{"status":"ok"}` w.Write([]byte(status)) } func main() { http.HandleFunc("/test",getMobile) err := http.ListenAndServe(":8081",nil) if err !=nil{ fmt.Println("server failure :",err) return } }
有疑问加站长微信联系(非本文作者)