你真的了解 sync.Once 吗

Remember · · 543 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

> 转型做go大概一个多月了吧,工作中也是边写边学,最近也是在极客时间学习一些go相关课程,现学现用,源码在我 github 上:https://github.com/wuqinqiang/Go_Concurrency ### 是什么 引用官方描述的一段话,`Once is a object that will perform exactly one action`,即它是一个对象,它提供了保证某个动作只被执行一次的功能。最典型的场景当然就是单例对象的初始化操作。 ### 咋么做 `Once` 的代码很简洁,从头到尾加注释不超过 70 行代码。对外暴露了一个唯一接口 `Do(f func())` ,使用起来也是非常简单。 ```go package main import ( "fmt" "sync" ) func main() { var once sync.Once fun1 := func() { fmt.Println("第一次打印") } once.Do(fun1) fun2 := func() { fmt.Println("第二次打印") } once.Do(fun2) } ``` 在运行上面这段代码之后,从结果中你会发现只运行了 `fun1`。这样看好像没什么问题,但是这段代码并不是并发的调用 `Do()` ,那就稍微调整一下代码: ```go package main import ( "fmt" "sync" "time" ) func main() { var once sync.Once for i := 0; i < 5; i++ { go func(i int) { fun1 := func() { fmt.Printf("i:=%d\n", i) } once.Do(fun1) }(i) } // 为了防止主goroutine直接运行完了,啥都看不到 time.Sleep(50 * time.Millisecond) } ``` 我们开启了5个并发的 `goroutine` ,不管你咋么运行,始终只打印一次,至于 `i` 是多少,就看先执行的是哪个 `g` 了。`Once` 保证只有第一次调用 `Do()` 方法时,传递的 `f` (无参数无返回值的函数) 才会执行,并且之后不管调用的参数是否改变了,也不再执行。 ### 咋么实现 在看一个功能的同时,其实我们本身也可以站在技术的角度上来思考,如果是你,你会咋么实现这个 `Once`。我觉得这是件很有意思的事情。 第一时间想到的就是 `go` 中开箱即用的 sync.Mutex 的 `Lock()` 方法的第一段: ```go // Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { ...... return } ...... } ``` 利用 `atomic` 的原子操作来实现这个需求。这确实可以保证只执行一次。但是也存在一个巨大的坑,我们来验证下: ```go package main import ( "fmt" "net" "sync/atomic" "time" ) type OnceA struct { done uint32 } func (o *OnceA) Do(f func()) { if !atomic.CompareAndSwapUint32(&o.done, 0, 1) { return } f() } func main() { var once OnceA var conn net.Conn go func() { fun1 := func() { time.Sleep(5 * time.Second) //模拟初始化的速度很慢 conn, _ = net.DialTimeout("tcp", "baidu.com:80", time.Second) } once.Do(fun1) }() time.Sleep(500 * time.Millisecond) fun2 := func() { fmt.Println("执行fun2") conn, _ = net.DialTimeout("tcp", "baidu.com:80", time.Second) } //再调用do已经检查到done为1了 once.Do(fun2) _, err := conn.Write([]byte("\"GET / HTTP/1.1\\r\\nHost: baidu.com\\r\\n Accept: */*\\r\\n\\r\\n\"")) if err != nil { fmt.Println("err:", err) } } ``` `conn` 是一个 `net.Conn` 的接口类型变量,这里为了达到效果,通过 `sleep` 模拟了初始化资源的耗时 ,当 `fun2()` 想要进行初始化的时候,已然发现 `done` 的值是 1 了,但是 `fun1` 初始化速度很慢,导致接下来操作 `conn.Write` 的时候,因为此时 `conn` 还是一个空资源,最终运行时抛出空指针的 `panic` 了。 这个问题的原因在于真正使用资源的时候,资源初始化还没到位,真是尴尬????。 那么 Go 是如何避免这种问题的呢? ```go // Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package sync import ( "sync/atomic" ) // Once is an object that will perform exactly one action. type Once struct { done uint32 m Mutex } func (o *Once) Do(f func()) { // Note: Here is an incorrect implementation of Do: // // if atomic.CompareAndSwapUint32(&o.done, 0, 1) { // f() // } // // Do guarantees that when it returns, f has finished. // This implementation would not implement that guarantee: // given two simultaneous calls, the winner of the cas would // call f, and the second would return immediately, without // waiting for the first's call to f to complete. // This is why the slow path falls back to a mutex, and why // the atomic.StoreUint32 must be delayed until after f returns. if atomic.LoadUint32(&o.done) == 0 { // Outlined slow-path to allow inlining of the fast-path. o.doSlow(f) } } func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock() if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() } } ``` 你看大佬都直接注释贴心的告诉你 `if atomic.CompareAndSwapUint32(&o.done, 0, 1)` 这个不是正确的实现。并发的情况下,胜者获得调用 f ,但是第二个会直接返回,没有等待第一个初始化结束。 所以 `Once` 实现使用了一个互斥锁,互斥锁保证了只有一个 `g` 初始化,同时采取的是双检查的机制,再次判断 `Once.done` 是否为 0,如果为 0,代表第一次初始化,等到初始化结束之后,再释放锁。并发情况下,其他的 `g` 就会被阻塞在 `o.m.Lock()`。 ### 如何避坑 说是避坑,但是绝大多数的坑都是由于程序员自身代码问题所导致的,虽然有点尴尬,但确实如此。 `Once` 的“坑” 还算少的,不像 `sync.Mutex` 和 `Channel` 那样,稍微姿势不注意点就 `panic` 了。这一块后续再写文章介绍下。除了上面需要注意的使用资源的时候资源还未初始化完成的问题,在 `Once` 中还需要避免的是死锁问题。 ```go // 由于嵌套调用 Do 里面的 lock导致死锁 func ErrOne() { var o sync.Once o.Do(func() { o.Do(func() { fmt.Println("初始化") }) }) } ``` 这里 `Do` 调用了 `f`,`f` 里面又调用了 `Do`,最终导致死锁。我把上面的代码简化成下面这样 ```go package main import "sync" func main() { var mu sync.Mutex mu.Lock() mu.Lock() } ``` 避免这种错误也很简单,不要在 `f` 函数中再次调用当前的 `Once` 即可。 ### 延伸 上面有提到过,`Once.Do` 由于某些原因导致初始化失败,但是原生的问题在于,后续再也没有机会执行同一个 `Once.Do` 了,发生这样的情况,理想的处理是,只有真正初始化成功,才设置 `Done` 的值,并且如果初始化失败,理应通知到上游服务,这样上游服务可以做一些重试机制或者异常处理等操作。 ```go package main ​ import ( "fmt" "io" "net" "os" "sync" "sync/atomic" "time" ) ​ type Once struct { done uint32 m sync.Mutex } // 传入的f 有返回值,如果初始化失败,返回对应error, // Do方法再把这个err返回给上游服务 func (o *Once) Do(f func() error) error { if atomic.LoadUint32(&o.done) == 1 { //fast path return nil } return o.doSlow(f) } ​ func (o *Once) doSlow(f func() error) error { o.m.Lock() defer o.m.Unlock() var err error if o.done == 0 { //双检查,还没有初始化 err = f() if err == nil { // 只有真正初始化成功才把 done 的值改成1 atomic.StoreUint32(&o.done, 1) } } return err } ``` 我们改变了 `f` 函数,增加了一个返回值,在初始化失败之后返回给 `Do` 函数,由 `Do` 函数再把错误返回给上游的调用方,把控制权交还给调用方做失败的处理。另外改动的一点是,只有真正初始化成功之后才把 `Done` 的值改成 1。那么我们可以简单的把上面的业务代码改造一下: ```go package main import ( "fmt" "io" "net" "os" "sync" "sync/atomic" "time" ) type Once struct { done uint32 m sync.Mutex } // 传入的f 有返回值,如果初始化失败,返回对应error, // Do方法再把这个err返回给上游服务 func (o *Once) Do(fn func() error) error { if atomic.LoadUint32(&o.done) == 1 { return nil } return o.doSlow(fn) } func (o *Once) doSlow(fn func() error) error { o.m.Lock() defer o.m.Unlock() var err error if o.done == 0 { /双检查,还没有初始化 err = fn() if err == nil { // 只有真正初始化成功才把 done 的值改成1 atomic.StoreUint32(&o.done, 1) } } return err } func main() { urls := []string{ "127.0.0.1:3453", "127.0.0.1:9002", "127.0.0.1:9003", "baidu.com:80", } var conn net.Conn var o Once count := 0 var err error for _, url := range urls { err := o.Do(func() error { count++ fmt.Printf("初始化%d次\n", count) conn, err = net.DialTimeout("tcp", url, time.Second) fmt.Println(err) return err }) if err == nil { break } if count == 3 { fmt.Println("初始化失败,不再重试") break } } if conn != nil { _, _ = conn.Write([]byte("GET / HTTP/1.1\r\nHost: google.com\r\n Accept: */*\r\n\r\n")) _, _ = io.Copy(os.Stdout, conn) } } ``` 当我们在使用一些开源工具时,只要业务需要,你可以改造各种你想要的东西。有时候,阻塞住你的,往往就是一身空想罢了。共勉.

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

543 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传