【Golang语言社区投稿】golang高并发基于协程,通道的任务池

mb5fdb0a1b25659 · · 392 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

要点:
封装了协程模型基于select模型的通道传递;
支持同步和异步添加任务;由于golang无函数指针,任务函数利用了go 反射机制支持可变参的入参
开发者可以在高处理性能前提下,只专注业务开发,往任务池添加任务即可。

实例:
//taskpool.go 
package taskpool

import (
    "reflect"
    "time"
)

type Task struct {
    M_func interface{}
    M_args []interface{}
}

func (task *Task) Run() {
    go func() {
        f := reflect.ValueOf(task.M_func)
        if len(task.M_args) != f.Type().NumIn() {
            return
        }
        in := make([]reflect.Value, len(task.M_args))
        for k, param := range task.M_args {
            in[k] = reflect.ValueOf(param)
        }
        f.Call(in)

    }()
}

type WorkPool struct {
    TaskChannel chan Task
    QuitChan    chan int //终止通道
}

//size 设置缓存大
func (pool *WorkPool) InitPool(size int) {
    pool.TaskChannel = make(chan Task, size)
    pool.QuitChan = make(chan int)
    go func() {
    DONE:
        for {
            select {
            case task := <-pool.TaskChannel:
                task.Run()
            case <-pool.QuitChan:
                break DONE
            }
        }
    }()
}
func (pool *WorkPool) ClosePool() {
    pool.QuitChan <- 1
}

//同步阻塞方式添加任务
func (pool *WorkPool) AddTask(task Task) {
    pool.TaskChannel <- task
}

//非阻塞方式添加任务 time 超时时间 单位毫秒
func (pool *WorkPool) AddTaskSync(task Task, millitime int) bool {
    res := false
    go func(res bool) {
        select {
        case pool.TaskChannel <- task:
            res = true
        case <-time.After(time.Millisecond * time.Duration(millitime)):
            res = false
        }
    }(res)
    return res
}

//test_main.go
package main

import (
    "fmt"
    "ms_lib/ms_taskpool"
    "time"
)

func test(i int, test string) {
    fmt.Println("hahaha", i, test)
}

func main() {
    task_pool := ms_taskpool.WorkPool{}
    task_pool.InitPool(5)
    for i := 0; i < 1000; i++ {
        task := ms_taskpool.Task{M_func: test}
        task.M_args = append(task.M_args, i)
        task.M_args = append(task.M_args, "test")
        task_pool.AddTask(task)
    }
    //task_pool.ClosePool() //可强制主动关闭任务池

    time.Sleep(5 * time.Second)
    fmt.Println("test done!")
}



有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:mb5fdb0a1b25659

查看原文:【Golang语言社区投稿】golang高并发基于协程,通道的任务池

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

392 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传