今天研究了一下channel的源码,对channel的安全退出有了一些小见解。在此结合实际应用,对select 于channel结合对情况下,安全退出channel做一下记录。
场景1:直接退出(会丢失数据)
因为退出时,直接程序就中断了,channel里存对数据直接丢失。
package main import ( "fmt" "sync" "time" ) var ( wg sync.WaitGroup channel = make(chan int, 10) ) func main() { //先写满一个channel for i := 0; i < 10; i++ { channel <- i } wg.Add(1) go func() { defer wg.Done() for { select { case num := <-channel: fmt.Println("======", num) //每次从channel取值后sleep 1秒,方便我们分析 time.Sleep(time.Duration(num) * time.Second) } } }() wg.Wait() }
场景2:捕捉程序退出信号,然后关闭channel (不丢失数据)
package main import ( "fmt" "log" "os" "os/signal" "sync" "syscall" "time" ) var ( wg sync.WaitGroup channel = make(chan int, 10000) ) func main() { //先写满一个channel for i := 0; i < 10; i++ { channel <- i } wg.Add(1) go HandleSignals() wg.Add(1) go func() { defer wg.Done() for { select { case num, ok := <-channel: if !ok { return } fmt.Println("======", num) //每次从channel取值后sleep 1秒,方便我们分析 time.Sleep(time.Duration(num) * time.Second) } } }() wg.Wait() } func HandleSignals() { defer wg.Done() ch := make(chan os.Signal, 10) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR2) for { sig := <-ch switch sig { case syscall.SIGINT, syscall.SIGTERM: close(channel) log.Println("Exiting, please wait...") return } } }
以上实现是在捕捉到系统退出信号时 执行了 close(channel) 。 从而实现,完全退出前,仍将缓存在channel中到数据,读出并执行。
那是怎么实现的呢? 通过阅读源码 go/src/runtime/chan.go: closechan
看到以下实现,可以看到,在close channel时,仍会将channel中的数据读出来。 因此,我们要使用此特性时,就需要根据系统退出信号,关闭channel。然后判断channel是否关闭,若关闭,再退出for循环。 否则,直接退出的程序,就会直接将channel中的数据抛弃。
func closechan(c *hchan) { ... lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } ... c.closed = 1 var glist gList // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } ... }
原文地址:https://www.yuanshuli.com/post-68.html
有疑问加站长微信联系(非本文作者)