Golang 多goroutine的异步通知错误方法

Ovenvan · · 2067 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

作者近期在写一个项目时遇到了这样的需求:调用一个库API函数,函数内部又会拉起若干个后台goroutine。这时后台goroutine如果遇到错误想要及时通知库的使用者将不会是一件容易的事情,因为这是一个异步通知错误的方法。作者最终的解决思路是:使用者另启一个goroutine监听Err channel,库后台goroutine出现的错误将直接发送至Err channel中。
作者以自己项目简单举例:

func DaemonListen(err <-chan Errsocket){
    for {
        v, ok := <-err
        if ok {
            fmt.Println(v) /*处理错误*/
        } else {
            fmt.Println("Listen closed.")/*后台安全退出*/
            return
        }
    }
}

func main(){
    x := NewServer(/*......*/)
    //后台会启动多个goroutine协同工作,该方法立即返回
    x.Listen()
    //启动守护goroutine监听error channel
    go DaemonListen(x.ErrDone())
}

需要注意的是:1.channel类型不一定只是error。如果你需要更多的信息,完全可以是一个包含error的struct;2.告诉守护goroutine可以安全退出的方法是关闭该channel。若此时有goroutine试图向该channel发送error则会引发panic。


库如何安全关闭Channel

1. 维护一个goroutine注册链表

即确保关闭该channel之前其余所有goroutine都已经安全退出,不会再使用该channel。我们最先容易想到Go中的context标准库解决该问题。该标准库的作用也是维护层层调用的goroutine,并当parentCtx执行关闭操作时,能够顺利通知到所有childrenCtx,让所有childrenCtx安全退出。但遗憾的是,context只负责通知关闭,却不负责goroutine的退出顺序。即依然存在当channel被关闭时仍有子goroutine向channel发送数据的情况,我们仍需手动维护。另外,维护一个goroutine有时可能并不符合业务逻辑,例如:


业务需求.jpg

当使用者调用exposedAPI关闭所有goroutine时,该API需要保存着所有运行着的goroutine信息。而事实上,goroutine并不需要向该API注册自己的信息。另外,当某goroutine异常宕机时,维护信息表也是一件较为复杂的事情。

2. errorDiversion

作者不清楚是否有业界前辈早已使用了类似或更成熟的技术,在这里作者只是提供自己处理该需求的一种方法。errDiversion(下面简称为eD)即另启一个守护goroutine,负责将error信息导流给上游channel或简单丢弃。


ErrDiversion.jpg

只需简单指定upstreamChannel和errChannel即可开启一个eD。eD的工作逻辑如下:判断接收channel(即上图upstreamChan(uC))是否关闭,若已被关闭,则将输送channel(上图errChan(eC))的数据直接丢弃,否则输送给上游。当输送channel关闭时,eD安全退出。判断上游channel的开闭状态可以尝试发送数据并捕获panic;或使用flag记录channel开闭状态即可(注意维护一致性)。
为什么要新创建一个eD goroutine而不是在子goroutine发送error前先作检查:新建eD的过程应该在父goroutine完成的,并只需要传递给子goroutine一个channel(eC)即可。对子goroutine屏蔽细节。

再次使用作者项目作简单演示:

/*uC(s.errDone),eC(s.errDoneTry),flag(s.closed)*/
func (s *server) errDiversion() {
    //when upstream channel is closed(detected by closed flag),
    //following data will be received but discarded
    //when eDT channel has closed, this goroutine will exit
    for {
        err, ok := <-s.errDoneTry
        if !ok { //errDoneTry has closed
            return
        }
        if !s.closed { //errDone has NOT closed
            s.errDone <- err
        }
    }
}

func (s *server) Listen() { //Notifies the consumer when an error occurs ASYNCHRONOUSLY
    /*......*/
    //父goroutine创建eD,简单为子goroutine传递eC即可。
    go s.errDiversion()
    go handleListen(listener, s)
    return
}

func handleListen(l net.Listener, s *server) {
    /*......*/
    if err != nil {
        s.errDoneTry <- err    //s.errDoneTry即父goroutine传递的eC
    }
    /*......*/
}

数据一致性问题

最后简单提及维护数据一致性的问题。我们需要维护的有1. flag与channel close的关系;2. 确保eD能够及时执行(在uC关闭之前)。【换言之,当eC存有error时,先等待eD处理error再关闭uC】在这里我们使用普通锁来实现:

type somestruct struct {
    //under protected data
    errDone    chan Errsocket
    errDoneTry chan Errsocket
    closed     bool

    mu         sync.Mutex

    /*...other data...*/
}

func xxx(){
    /*...*/
    s.mu.Lock()
    s.closed = true
    close(s.errDone)
    s.mu.Unlock()
}

func subgoroutine(){
    /*...*/
    if err != nil{
        s.mu.Lock()    //Notice
        eC <- err
    }
}

errDiversion的代码也需要作部分调整:

func (s *server) errDiversion() {
    for {
        err, ok := <-s.errDoneTry
        if !ok {
            return
        }
        if !s.closed {
            s.errDone <- err
        }
        s.mu.Unlock()    //Notice
    }
}

这样我们就完成了并发流程控制以及数据的一致性。注意不要再eD中上锁,因为eC是一个阻塞过程,会引发死锁。正确的做法是向eC传递error之前上锁。

多eD嵌套的解决方案

简单说明,即某上游eD(下简称为A)的eC是某下游eD(下简称为B)的uC。他们是共享同一个channel而非传递的关系。当B发送error至uC(a.eC)时,需要获得上游的锁并加锁。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:Ovenvan

查看原文:Golang 多goroutine的异步通知错误方法

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2067 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传