go程序开发过程中,通过简单的调用go func 函数来开启协程,容易导致程序死锁,并且会无限制的开启groutine,groutine数量激增的情况下并发性能会明显下降,所以需要考虑使用工作池来控制协程数量,以达到高并发的效果.
直接上代码(JobPool.go)
package utils
import (
"fmt"
)
// 定义任务接口,所有实现该接口的均实现工作池
type Task interface {
DoTask() error
}
// 定义工作结构体
type Job struct {
Task Task
}
// 定义全部的工作队列
var JobQueue chan Job
// 定义工作者
type Worker struct {
WorkerPool chan chan Job // 工人对象池
JobChannel chan Job // 管道里面拿Job
quit chan bool
}
// 新建一个工作者
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool, // 工人对象池
JobChannel: make(chan Job), //工人的任务
quit: make(chan bool),
}
}
// 工作池启动主函数
func(w *Worker)Start(){
// 开一个新的协程
go func() {
for{
// 注册任务到工作池
w.WorkerPool <- w.JobChannel
select {
// 接收到任务
case job := <- w.JobChannel:
// 执行任务
err := job.Task.DoTask()
if err != nil {
fmt.Println("任务执行失败")
}
// 接收退出的任务, 停止任务
case <- w.quit:
return
}
}
}()
}
// 退出执行工作
func (w *Worker) Stop(){
go func(){
w.quit <- true
}()
}
// 定义任务发送者
type Sender struct {
maxWorkers int // 最大工人数
WorkerPool chan chan Job // 注册工作通道
quit chan bool // 退出信号
}
// 注册新发送者
func NewSender(maxWorkers int) *Sender{
Pool := make(chan chan Job, maxWorkers)
return &Sender{
WorkerPool: Pool, // 将工作者放到一个工作池中
maxWorkers: maxWorkers, // 最大工作者数量
quit: make(chan bool),
}
}
// 工作分发器
func(s *Sender)Run(){
for i:=0; i<s.maxWorkers; i++{
worker := NewWorker(s.WorkerPool)
// 执行任务
worker.Start()
}
// 监控任务发送
go s.Send()
}
// 退出发放工作
func (s *Sender) Quit(){
go func(){
s.quit <- true
}()
}
func(s *Sender)Send(){
for {
select {
// 接收到任务
case job :=<- JobQueue:
go func(job Job) {
jobChan := <- s.WorkerPool
jobChan <- job
}(job)
// 退出任务分发
case <- s.quit:
return
}
}
}
// 初始化对象池
func InitPool() {
maxWorkers := 4
maxQueue := 20
// 初始化一个任务发送者,指定工作者数量
send := NewSender(maxWorkers)
// 指定任务的队列长度
JobQueue = make(chan Job,maxQueue)
// 一直运行任务发送
send.Run()
}
使用方法
package main
import (
"fmt"
"os"
"test/utils" //引用: JobPool是放在test项目的utils包下
"time"
)
type Test struct {
num int
}
// 任务,实现JobPool的Task接口
func(t *Test)DoTask() error{
f, err := os.OpenFile("log.txt", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0777)
if err != nil {
return err
}
defer f.Close()
f.WriteString(fmt.Sprintf("这是任务:%d号,执行时间为:%s \n", t.num, fmt.Sprintf("%s", time.Now())))
return nil
}
func main(){
// 初始化对象池
utils.InitPool()
for i:=1;i<40 ;i++{
// 注册任务到Job队列中
task := &Test{i}
utils.JobQueue <- utils.Job{
Task:task,
}
}
// time.Sleep(180 * time.Second)
// 执行结束,关闭管道
close(utils.JobQueue)
}
参考文章: http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
有疑问加站长微信联系(非本文作者)