要点: 封装了协程模型基于select模型的通道传递; 支持同步和异步添加任务;由于golang无函数指针,任务函数利用了go 反射机制支持可变参的入参 开发者可以在高处理性能前提下,只专注业务开发,往任务池添加任务即可。 实例: //taskpool.go package taskpool import ( "reflect" "time" ) type Task struct { M_func interface{} M_args []interface{} } func (task *Task) Run() { go func() { f := reflect.ValueOf(task.M_func) if len(task.M_args) != f.Type().NumIn() { return } in := make([]reflect.Value, len(task.M_args)) for k, param := range task.M_args { in[k] = reflect.ValueOf(param) } f.Call(in) }() } type WorkPool struct { TaskChannel chan Task QuitChan chan int //终止通道 } //size 设置缓存大 func (pool *WorkPool) InitPool(size int) { pool.TaskChannel = make(chan Task, size) pool.QuitChan = make(chan int) go func() { DONE: for { select { case task := <-pool.TaskChannel: task.Run() case <-pool.QuitChan: break DONE } } }() } func (pool *WorkPool) ClosePool() { pool.QuitChan <- 1 } //同步阻塞方式添加任务 func (pool *WorkPool) AddTask(task Task) { pool.TaskChannel <- task } //非阻塞方式添加任务 time 超时时间 单位毫秒 func (pool *WorkPool) AddTaskSync(task Task, millitime int) bool { res := false go func(res bool) { select { case pool.TaskChannel <- task: res = true case <-time.After(time.Millisecond * time.Duration(millitime)): res = false } }(res) return res } //test_main.go package main import ( "fmt" "ms_lib/ms_taskpool" "time" ) func test(i int, test string) { fmt.Println("hahaha", i, test) } func main() { task_pool := ms_taskpool.WorkPool{} task_pool.InitPool(5) for i := 0; i < 1000; i++ { task := ms_taskpool.Task{M_func: test} task.M_args = append(task.M_args, i) task.M_args = append(task.M_args, "test") task_pool.AddTask(task) } //task_pool.ClosePool() //可强制主动关闭任务池 time.Sleep(5 * time.Second) fmt.Println("test done!") }
有疑问加站长微信联系(非本文作者)