Go语言同步和异步执行多个任务封装

chenqionghe · · 1156 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

同步适合多个连续执行的,每一步的执行依赖于上一步操作,异步执行则和任务执行顺序无关(如从10个站点抓取数据)

同步执行类RunnerAsync

支持返回超时检测,系统中断检测

错误常量定义

//超时错误
var ErrTimeout = errors.New("received timeout")

//操作系统系统中断错误
var ErrInterrupt = errors.New("received interrupt")

实现代码如下

package task

import (
    "os"
    "time"
    "os/signal"
    "sync"
)

//异步执行任务
type Runner struct {
    //操作系统的信号检测
    interrupt chan os.Signal

    //记录执行完成的状态
    complete chan error

    //超时检测
    timeout <-chan time.Time

    //保存所有要执行的任务,顺序执行
    tasks []func(id int) error

    waitGroup sync.WaitGroup

    lock sync.Mutex

    errs []error
}

//new一个Runner对象
func NewRunner(d time.Duration) *Runner {
    return &Runner{
        interrupt: make(chan os.Signal, 1),
        complete:  make(chan error),
        timeout:   time.After(d),
        waitGroup: sync.WaitGroup{},
        lock:      sync.Mutex{},
    }
}

//添加一个任务
func (this *Runner) Add(tasks ...func(id int) error) {
    this.tasks = append(this.tasks, tasks...)
}

//启动Runner,监听错误信息
func (this *Runner) Start() error {

    //接收操作系统信号
    signal.Notify(this.interrupt, os.Interrupt)

    //并发执行任务
    go func() {
        this.complete <- this.Run()
    }()

    select {
    //返回执行结果
    case err := <-this.complete:
        return err
        //超时返回
    case <-this.timeout:
        return ErrTimeout
    }
}

//异步执行所有的任务
func (this *Runner) Run() error {
    for id, task := range this.tasks {
        if this.gotInterrupt() {
            return ErrInterrupt
        }

        this.waitGroup.Add(1)
        go func(id int) {
            this.lock.Lock()

            //执行任务
            err := task(id)
            //加锁保存到结果集中
            this.errs = append(this.errs, err)

            this.lock.Unlock()
            this.waitGroup.Done()
        }(id)
    }
    this.waitGroup.Wait()

    return nil
}

//判断是否接收到操作系统中断信号
func (this *Runner) gotInterrupt() bool {
    select {
    case <-this.interrupt:
        //停止接收别的信号
        signal.Stop(this.interrupt)
        return true
        //正常执行
    default:
        return false
    }
}

//获取执行完的error
func (this *Runner) GetErrs() []error {
    return this.errs
}

  

使用方法    

Add添加一个任务,任务为接收int类型的一个闭包

Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作)

 

测试示例代码

package task

import (
    "testing"
    "time"
    "fmt"
    "os"
    "runtime"
)

func TestRunnerAsync_Start(t *testing.T) {

    //开启多核
    runtime.GOMAXPROCS(runtime.NumCPU())

    //创建runner对象,设置超时时间
    runner := NewRunnerAsync(8 * time.Second)
    //添加运行的任务
    runner.Add(
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
    )

    fmt.Println("同步执行任务")

    //开始执行任务
    if err := runner.Start(); err != nil {
        switch err {
        case ErrTimeout:
            fmt.Println("执行超时")
            os.Exit(1)
        case ErrInterrupt:
            fmt.Println("任务被中断")
            os.Exit(2)
        }
    }

    t.Log("执行结束")

}

//创建要执行的任务
func createTaskAsync() func(id int) {
    return func(id int) {
        fmt.Printf("正在执行%v个任务\n", id)
        //模拟任务执行,sleep两秒
        //time.Sleep(1 * time.Second)
    }
}

执行结果

同步执行任务
正在执行0个任务
正在执行1个任务
正在执行2个任务
正在执行3个任务
正在执行4个任务
正在执行5个任务
正在执行6个任务
正在执行7个任务
正在执行8个任务
正在执行9个任务
正在执行10个任务
正在执行11个任务
正在执行12个任务
	runnerAsync_test.go:49: 执行结束

  

 

异步执行类Runner

支持返回超时检测,系统中断检测

实现代码如下

package task

import (
    "os"
    "time"
    "os/signal"
    "sync"
)

//异步执行任务
type Runner struct {
    //操作系统的信号检测
    interrupt chan os.Signal

    //记录执行完成的状态
    complete chan error

    //超时检测
    timeout <-chan time.Time

    //保存所有要执行的任务,顺序执行
    tasks []func(id int) error

    waitGroup sync.WaitGroup

    lock sync.Mutex

    errs []error
}

//new一个Runner对象
func NewRunner(d time.Duration) *Runner {
    return &Runner{
        interrupt: make(chan os.Signal, 1),
        complete:  make(chan error),
        timeout:   time.After(d),
        waitGroup: sync.WaitGroup{},
        lock:      sync.Mutex{},
    }
}

//添加一个任务
func (this *Runner) Add(tasks ...func(id int) error) {
    this.tasks = append(this.tasks, tasks...)
}

//启动Runner,监听错误信息
func (this *Runner) Start() error {

    //接收操作系统信号
    signal.Notify(this.interrupt, os.Interrupt)

    //并发执行任务
    go func() {
        this.complete <- this.Run()
    }()

    select {
    //返回执行结果
    case err := <-this.complete:
        return err
        //超时返回
    case <-this.timeout:
        return ErrTimeout
    }
}

//异步执行所有的任务
func (this *Runner) Run() error {
    for id, task := range this.tasks {
        if this.gotInterrupt() {
            return ErrInterrupt
        }

        this.waitGroup.Add(1)
        go func(id int) {
            this.lock.Lock()

            //执行任务
            err := task(id)
            //加锁保存到结果集中
            this.errs = append(this.errs, err)

            this.lock.Unlock()
            this.waitGroup.Done()
        }(id)
    }
    this.waitGroup.Wait()

    return nil
}

//判断是否接收到操作系统中断信号
func (this *Runner) gotInterrupt() bool {
    select {
    case <-this.interrupt:
        //停止接收别的信号
        signal.Stop(this.interrupt)
        return true
        //正常执行
    default:
        return false
    }
}

//获取执行完的error
func (this *Runner) GetErrs() []error {
    return this.errs
}

  

使用方法    

Add添加一个任务,任务为接收int类型,返回类型error的一个闭包

Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作)

getErrs获取所有的任务执行结果

 

测试示例代码

package task

import (
    "testing"
    "time"
    "fmt"
    "os"
    "runtime"
)

func TestRunner_Start(t *testing.T) {
    //开启多核心
    runtime.GOMAXPROCS(runtime.NumCPU())

    //创建runner对象,设置超时时间
    runner := NewRunner(18 * time.Second)
    //添加运行的任务
    runner.Add(
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
    )

    fmt.Println("异步执行任务")

    //开始执行任务
    if err := runner.Start(); err != nil {
        switch err {
        case ErrTimeout:
            fmt.Println("执行超时")
            os.Exit(1)
        case ErrInterrupt:
            fmt.Println("任务被中断")
            os.Exit(2)
        }
    }

    t.Log("执行结束")

    t.Log(runner.GetErrs())

}

//创建要执行的任务
func createTask() func(id int) error {
    return func(id int) error {
        fmt.Printf("正在执行%v个任务\n", id)
        //模拟任务执行,sleep
        //time.Sleep(1 * time.Second)
        return nil
    }
}

执行结果

异步执行任务
正在执行2个任务
正在执行1个任务
正在执行4个任务
正在执行3个任务
正在执行6个任务
正在执行5个任务
正在执行9个任务
正在执行7个任务
正在执行10个任务
正在执行13个任务
正在执行8个任务
正在执行11个任务
正在执行12个任务
正在执行0个任务
	runner_test.go:49: 执行结束
	runner_test.go:51: [<nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil>]

  

 


有疑问加站长微信联系(非本文作者)

本文来自:博客园

感谢作者:chenqionghe

查看原文:Go语言同步和异步执行多个任务封装

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1156 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传