[Go] 写一个守护协程的通用套路是什么?

eddix · · 677 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

背景

根据一个 Goroutine 是否直接依赖用户交互,我们可以将 Goroutine 分为两大类,一类是直接依赖用户交互的前台协程,比如 HTTP Server Handler等;另一类是不直接依赖用户交互的后台协程,比如 HTTP Server,定时任务协程等。前台协程随用户的交互开始执行,随交互结束而结束,比较容易设计。本文主要讨论后台协程设计的一些通用套路。

一个良好的后台协程需要至少满足以下两个诉求:

  • 容易控制,尤其是启动、停止、重启等操作。
  • 状态容易被观察,比如是否正在运行中。

针对这两个诉求,我们来寻找一个通用的实现套路。

设计与实现

简陋的后台协程

得益于 Go 从语法上对并发的支持,写一个简陋的后台协程再简单不过了。我们从下面这个 Demo 开始讨论,这个 Demo 的任务很简单,每隔一秒钟将下一个斐波那契数输出在标准输出里面。

package main

type Fibonacci struct {
    a, b int
}

func NewFibonacci() *Fibonacci {
    return &Fibonacci{a:0, b:1}
}

func (f *Fibonacci) Run() {
    go func() {
        for {
            time.Sleep(time.Second)
            fmt.Println(f.b)
            f.a, f.b = f.b, f.a + f.b
        }
    }()
}

func main() {
    NewFibonacci().Run()
}

直接执行这个程序,什么都不会输出,因为主协程里面没有任何逻辑执行,程序启动后直接就退出了,对吧?不过现实中许多后台协程就是这样写的,因为真实世界里很多主协程是有其它任务在执行的,所以 Fibonacci 会一直执行下去,直到程序结束。

入门级的后台协程

观察上面这个 Fibonacci 我们会发现它的一些缺陷:首先我们没法终止它,一旦启动就失控了;其次我们也没法观察它,比如在任何时候去向它要一个当前时间的斐波那契数,是要不到的。

先说控制,我们很容易想到一种方式,就是使用一个bool变量去维护协程是否需要继续运行下去。

然后获取斐波那契数这个事情也很简单,加一个方法就好了。

实际上,这种方案就是我遇到的大多数协程的实现方式。我们在 Fibonacci 上按这个方案写,代码就是这样:

type Fibonacci struct {
    a, b int
    stop bool
    mtx sync.Mutex
}

func NewFibonacci() *Fibonacci {
    return &Fibonacci{a:0, b:1}
}

func (f *Fibonacci) Run() {
    go func() {
        for {
            if f.isStop() {
                break
            }
            time.Sleep(time.Second)
            f.mtx.Lock()
            fmt.Println(f.b)
            f.a, f.b = f.b, f.a + f.b
            f.mtx.Unlock()
        }
    }()
}

// 调用 Stop 结束
func (f *Fibonacci) Stop() {
    f.mtx.Lock()
    defer f.mtx.Unlock()
    f.stop = true
}

func (f *Fibonacci) isStop() {
    f.mtx.Lock()
    defer f.mtx.Unlock()
    return f.stop
}

// Value 获取当前的斐波那契数
func (f *Fibonacci) Value() int {
    f.mtx.Lock()
    defer f.mtx.Unlock()
    return f.b
}

进阶版的后台协程

观察入门版的代码,我们会发现一些潜在的问题。首先,添加bool变量的方法的问题是需要自己维护一把锁,随着程序的升级,这把锁有可能会被用去保护别的变量,比如在代码中我们就用它来保护斐波那契数了。这样的做法可能会带来性能下降,如果逻辑不对甚至可能会出现死锁问题。

另外我们继续观察这段代码还会发现另一个问题,即我们调用Stop后,实际上很可能协程并不会马上结束,它有可能正好处在 Sleep 状态,所以 Stop 调用后,很可能过几秒会再打印一个数,然后协程才结束。

一般做到这一步时,会有人用想到用 channel 来代替bool变量了。我遇到的部分有经验的工程师会用这个办法。用 channel 有一个好处,是可以通过对多个channel同时select监听的方式,达到立马生效的效果。代码如下:

type Fibonacci struct {
    a, b int
    stop chan struct{}
    mtx sync.Mutex
}

func NewFibonacci() *Fibonacci {
    return &Fibonacci{
        a: 0,
        b: 1,
        stop: make(chan struct{}),
    }
}

func (f *Fibonacci) Run() {
    go func() {
        t := time.NewTicker(time.Second)
        for {
            select {
            case <-f.stop:
                t.Stop()
                return
            case <-t.C:
                f.mtx.Lock()
                fmt.Println(f.b)
                f.a, f.b = f.b, f.a + f.b
                f.mtx.Unlock()
            }
        }
    }()
}

// 调用 Stop 结束
func (f *Fibonacci) Stop() {
    close(f.stop)
}

// Value 获取当前的斐波那契数
func (f *Fibonacci) Value() int {
    f.mtx.Lock()
    defer f.mtx.Unlock()
    return f.b
}

这段代码基本上就是比较常见的实现得比较好的后台协程代码了,我们调用Start(),它就执行,调用Stop(),就立马结束,调用Value()就拿到结果。看上去还不错。

更好的后台协程

我们观察进阶版的实现,似乎挑不出什么毛病了。但实际上还有三个问题。

第一个问题是,如果程序中有不定量的类似 Fibonacci 这样的后台协程,如何用一套简单且行之有效的方式统一地控制它们,同时也保留单个控制的能力?

有一种简单的想法是,在程序中声明一个带Stop方法interface,然后用一个slice或map保存所有可以Stop的后台协程,在需要Stop的时候依次调用它们。

第二个问题是,如果连续调用Stop()两次,第二次就会因为关闭一个已经关闭的channel而出现panic。

第三个问题是,在这段代码中我们只是计算一下f.a+f.b并且print出来,不太会panic。在真实的代码中后台协程代码是有可能出现panic的,我们不光要避免这种panic由于未被recover导致整个程序崩溃,还需要在出现panic后自动恢复。

这些问题我们要自己解决起来也不是不行,但是如果自己解决下去的话,会写出很多代码,这不符合我对通用套路的标准:容易理解,实现成本低,不会因为过于复杂而难以在每个地方使用。

那么有没有简单高效的办法做到写出一个优雅的后台协程呢?办法是有的,答案就在标准库的 context 包里面。

下面就是这个套路的代码。

type Fibonacci struct {
    a, b int
    stop func()
    mtx sync.Mutex
}

func NewFibonacci() *Fibonacci {
    return &Fibonacci{a: 0, b: 1}
}

func (f *Fibonacci) Run(ctx context.Context) {
    // 使用WithCancel派生一个可被取消的ctx,用来控制后台
    // 协程。
    ctx, f.stop = context.WithCancel(ctx)
    go func() {
        for {
            select {
            case <-ctx.Done():
                return
            case <-f.loop(ctx):
                // f.loop() 在正常运行时errch是阻塞状态,如果
                // 出错了才有数据,此时select会被唤起,并重新
                // 启动 loop(),实现panic后自动恢复。
            }
        }
    }()
}

func (f *Fibonacci) loop(ctx context.Context) <-chan error {
    errch := make(chan error)
    go func() {
        t := time.NewTicker(time.Second)
        defer func() {
            t.Stop()
            if r := recover(); r != nil {
                errch <- fmt.Errorf("panic with error %v", r)
                close(errch)
            }
        }()
        for {
            select {
            case <-ctx.Done():
                close(errch)
                return
            case <-t.C:
                f.nextFibonacci()
            }
        }
    }()
    return errch
}

func (f *Fibonacci) nextFibonacci() {
    f.mtx.Lock()
    defer f.mtx.Unlock()
    fmt.Println(f.b)
    f.a, f.b = f.b, f.a + f.b
}

// 调用 Stop 结束
func (f *Fibonacci) Stop() {
    if f.stop != nil {
        f.stop()
    }
}

// Value 获取当前的斐波那契数
func (f *Fibonacci) Value() int {
    f.mtx.Lock()
    defer f.mtx.Unlock()
    return f.b
}

我们来简单地看一下这个代码的几个关键点:

  1. Run 方法要求外部传入一个 Context,这样当外部取消这个 Context 时,Fibonacci 实际上也就结束了。
  2. Run 方法内部基于传入的 Context 又派生了一个 Context 出来,这样做的目的是为 stop 方法赋值,调用 f.stop 的时候,实际上就是调用Cancel方法来取消派生出来的 Context。
  3. Run 并不直接执行业务逻辑,而是另起loop协程去执行,Run 本身实际上是监督loop的执行,一旦loop出现panic,及时将其重启。当然,loop协程也是通过Context来控制的。

调用示例

最基本的调用如下:

f := NewFibonacci().Run(context.Background())
// ... 执行一些其它操作
f.Stop()

我们可以创建一大堆类似 Fibonacci 这样用 Context 控制的后台协程,然后很轻松地将他们全部结束。

ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 100; i++ {
    NewFibonacci().Run(ctx)
}
// ... 执行一些其它操作
// 调用cancel,100个后台协程全部结束
cancel()

我们也可以用 context.WithTimeout 创建带超时的 context,让 Fibonacci 后台只执行一小段时间。

ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
for i := 0; i < 100; i++ {
    NewFibonacci().Run(ctx)
}
<-ctx.Done()
cancel()

最重要的是,得益于 Context 在标准库中的广泛支持,我们可以很容易地将 Fibonacci 这种实现与各种控制方法结合起来,例如与 HTTP Request 结合,当一个请求进来时启动一个 Fibonacci,并且在请求结束后自动结束。

func ServeHTTP(w http.ResponseWriter, r *http.Request) {
    NewFibonacci().Run(r.Context())
    // ... 执行 Request 的处理逻辑
}

总结

我们讨论了写后台协程的一个通用套路,在这个套路里面有两个核心点需要遵循。

第一点是后台协程通过监听 Context 而不是自己创建的某个变量去做启停控制,这个 Context 有两个要点:从外部传入,在内部派生。

第二点是后台协程应该考虑实现类似 supervisor 这样的自动重启机制,在任务结束时自动恢复。

以上就是我所总结的写 Go 守护协程的套路,如果你发现我的方法有错误,或者你有更好的套路,欢迎留言讨论。


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:eddix

查看原文:[Go] 写一个守护协程的通用套路是什么?

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

677 次点击  ∙  1 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传