作者近期在写一个项目时遇到了这样的需求:调用一个库API函数,函数内部又会拉起若干个后台goroutine。这时后台goroutine如果遇到错误想要及时通知库的使用者将不会是一件容易的事情,因为这是一个异步通知错误的方法。作者最终的解决思路是:使用者另启一个goroutine监听Err channel,库后台goroutine出现的错误将直接发送至Err channel中。
作者以自己项目简单举例:
func DaemonListen(err <-chan Errsocket){
for {
v, ok := <-err
if ok {
fmt.Println(v) /*处理错误*/
} else {
fmt.Println("Listen closed.")/*后台安全退出*/
return
}
}
}
func main(){
x := NewServer(/*......*/)
//后台会启动多个goroutine协同工作,该方法立即返回
x.Listen()
//启动守护goroutine监听error channel
go DaemonListen(x.ErrDone())
}
需要注意的是:1.channel类型不一定只是error。如果你需要更多的信息,完全可以是一个包含error的struct;2.告诉守护goroutine可以安全退出的方法是关闭该channel。若此时有goroutine试图向该channel发送error则会引发panic。
库如何安全关闭Channel
1. 维护一个goroutine注册链表
即确保关闭该channel之前其余所有goroutine都已经安全退出,不会再使用该channel。我们最先容易想到Go中的context标准库解决该问题。该标准库的作用也是维护层层调用的goroutine,并当parentCtx执行关闭操作时,能够顺利通知到所有childrenCtx,让所有childrenCtx安全退出。但遗憾的是,context只负责通知关闭,却不负责goroutine的退出顺序。即依然存在当channel被关闭时仍有子goroutine向channel发送数据的情况,我们仍需手动维护。另外,维护一个goroutine有时可能并不符合业务逻辑,例如:
当使用者调用exposedAPI关闭所有goroutine时,该API需要保存着所有运行着的goroutine信息。而事实上,goroutine并不需要向该API注册自己的信息。另外,当某goroutine异常宕机时,维护信息表也是一件较为复杂的事情。
2. errorDiversion
作者不清楚是否有业界前辈早已使用了类似或更成熟的技术,在这里作者只是提供自己处理该需求的一种方法。errDiversion(下面简称为eD)即另启一个守护goroutine,负责将error信息导流给上游channel或简单丢弃。
只需简单指定upstreamChannel和errChannel即可开启一个eD。eD的工作逻辑如下:判断接收channel(即上图upstreamChan(uC))是否关闭,若已被关闭,则将输送channel(上图errChan(eC))的数据直接丢弃,否则输送给上游。当输送channel关闭时,eD安全退出。判断上游channel的开闭状态可以尝试发送数据并捕获panic;或使用flag记录channel开闭状态即可(注意维护一致性)。
为什么要新创建一个eD goroutine而不是在子goroutine发送error前先作检查:新建eD的过程应该在父goroutine完成的,并只需要传递给子goroutine一个channel(eC)即可。对子goroutine屏蔽细节。
再次使用作者项目作简单演示:
/*uC(s.errDone),eC(s.errDoneTry),flag(s.closed)*/
func (s *server) errDiversion() {
//when upstream channel is closed(detected by closed flag),
//following data will be received but discarded
//when eDT channel has closed, this goroutine will exit
for {
err, ok := <-s.errDoneTry
if !ok { //errDoneTry has closed
return
}
if !s.closed { //errDone has NOT closed
s.errDone <- err
}
}
}
func (s *server) Listen() { //Notifies the consumer when an error occurs ASYNCHRONOUSLY
/*......*/
//父goroutine创建eD,简单为子goroutine传递eC即可。
go s.errDiversion()
go handleListen(listener, s)
return
}
func handleListen(l net.Listener, s *server) {
/*......*/
if err != nil {
s.errDoneTry <- err //s.errDoneTry即父goroutine传递的eC
}
/*......*/
}
数据一致性问题
最后简单提及维护数据一致性的问题。我们需要维护的有1. flag与channel close的关系;2. 确保eD能够及时执行(在uC关闭之前)。【换言之,当eC存有error时,先等待eD处理error再关闭uC】在这里我们使用普通锁来实现:
type somestruct struct {
//under protected data
errDone chan Errsocket
errDoneTry chan Errsocket
closed bool
mu sync.Mutex
/*...other data...*/
}
func xxx(){
/*...*/
s.mu.Lock()
s.closed = true
close(s.errDone)
s.mu.Unlock()
}
func subgoroutine(){
/*...*/
if err != nil{
s.mu.Lock() //Notice
eC <- err
}
}
errDiversion的代码也需要作部分调整:
func (s *server) errDiversion() {
for {
err, ok := <-s.errDoneTry
if !ok {
return
}
if !s.closed {
s.errDone <- err
}
s.mu.Unlock() //Notice
}
}
这样我们就完成了并发流程控制以及数据的一致性。注意不要再eD中上锁,因为eC是一个阻塞过程,会引发死锁。正确的做法是向eC传递error之前上锁。
多eD嵌套的解决方案
简单说明,即某上游eD(下简称为A)的eC是某下游eD(下简称为B)的uC。他们是共享同一个channel而非传递的关系。当B发送error至uC(a.eC)时,需要获得上游的锁并加锁。
有疑问加站长微信联系(非本文作者)