Golang非CSP并发模型外的其他并行方法总结

RyuGou · · 847 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

Golang最为让人熟知的并发模型当属CSP并发模型,也就是由goroutine和channel构成的[GMP并发模型](https://i6448038.github.io/2017/12/04/golang-concurrency-principle/),具体内容不在赘述了,可以翻回之前的文章查看。在这里,要讲讲Golang的其他并发方式。 Golang不仅可以使用CSP并发模式,还可以使用传统的共享数据的并发模式。 # 临界区(critical section) 这是传统语言比较常用的的方式,即加锁。加锁使其线程同步,每次只允许一个goroutine进入某个代码块,此代码块区域称之为"*临界区(critical section)*”。 Golang为*临界区(critical section)*提供的是互斥锁的包和条件变量的包。 ## 互斥锁 就是通常使用的锁,用来让线程串行用的。Golang提供了互斥锁``sync.Mutex``和读写互斥锁 ``sync.RWMutex``,用法极其简单: ``` var s sync.Mutex s.Lock() // 这里的代码就是串行了,吼吼吼。。。 s.Unlock() ``` Lock和Unlock ### ``sync.Mutex``和``sync.RWMutex``的区别 没啥大的区别,只不过``sync.RWMutex``更加细腻,可以将“读操作”和“写操作”区别对待。 ``sync.RWMutex``中的Lock和unLock针对写操作 ``` var s sync.RWMutex s.Lock() // 上写锁了,吼吼 s.Unlock() ``` ``sync.RWMutex``中的RLock和RUnLock针对读操作 ``` var s sync.RWMutex s.RLock() // 上读锁了,吼吼.. s.RUnlock() ``` 读写锁有以下规则: + 写锁被锁定,(再试图进行)读锁和写锁都阻塞 + 读锁被锁定,(再试图进行)写锁阻塞,(再试图进行)读锁不阻塞 即:多个写操作不能同时进行,写操作和读操作也不能同时进行,多个读操作可以同时进行 ### 注意事项: + 不要重复锁定互斥锁;因为代码写起来麻烦,容易出错,万一死锁(deadlock)了就废了。Go语言运行时系统自己抛出的panic都属于致命错误,都是无法恢复的,调用``recover``函数对它们起不到任何作用。一旦产生死锁,程序必然崩溃。 + 锁定和解锁一定要成对出现,如果怕忘记解锁,最好是使用``defer``语句来解锁;但是,一定不要对未锁定的或者已经锁定的互斥锁解锁,因为会触发``panic``,而且此``panic``和死锁一样,属于致命错误,程序肯定崩溃 + ``sync.Mutex``是个结构体,尽量不要其当做参数,在多个函数直接传播。因为没啥意义,Golang的参数都是副本,多个副本之间都是相互独立的。 ## 条件变量Cond 互斥锁是用来锁住资源,“创造”临界区的。而条件变量Cond可以认为是用来自行调度线程(在此即为groutine)的,当某个状态时,阻塞等待,当状态改变时,唤醒。 Cond的使用,离不开互斥锁,即离不开``sync.Mutex``和``sync.RWMutex``。 Cond初始化都需要有个互斥锁。(ps:哪怕初始化不需要,就应用场景而言,也得需要个互斥锁) ``Cond``提供Wait、Signal、Broadcast 三种方法。 Wait表示线程(groutine)阻塞等待; Signal表示唤醒等待的groutine; Broadcast表示唤醒等待的所有groutine; 初始化: ``` cond := sync.NewCond(&sync.Mutex{}) ``` 在其中一个groutine中: ``` cond.L.Lock() for status == 0 { cond.Wait() } //状态改变,goroutine被唤醒后,干点啥。。。 cond.L.Unlock() ``` 以上算是模板 在另外一个groutine中: ``` cond.L.Lock() status = 1 cond.Signal() // 或者使用cond.Broadcast()来唤醒以上groutine中沉睡的groutine cond.L.Unlock() ``` # 原子操作(atomicity) 原子操作是硬件芯片级别的支持,所以可以保证绝对的线程安全。而且执行效率比其他方式要高出好几个数量级。 Go语言的原子操作当然也是基于CPU和操作系统的,Go语言提供的原子操作的包是``sync/atomic``,此包提供了加(Add)、CAS(交换并比较 compare and swap)、成对出现的存储(store)和加载(load)以及交换(swap)。 此包提供的大多数函数针对的数据类型也非常的单一:只有整型!使用方式十分的简单,看着函数直接调用就好。 ``` var a int32 a = 1 a = atomic.AddInt32(&a, 2) //此处是原子操作,就这么简单,吼吼 ``` 在此特别强调一下CAS,CAS对应的函数前缀是“CompareAndSwap”,含义和用法正如英文翻译:比较并交换。在进行CAS操作的时候,函数会先判断被操作变量的当前值是否与我们预期的旧值相等,如果相等,它就把新值赋给该变量,并返回true,反之,就忽略此操作,并返回false。 可能是Golang提供的原子操作的数据类型实在是有限,Go又补充了一个结构体``atomic.Value``,此结构体相当于一个小容器,可以提供原子操作的存储``store``和提取``load`` ``` var atomicVal atomic.Value str := "hello" atomicVal.Store(str) //此处是原子操作哦 newStr := atomicVal.Load() //此处是原子操作哦 ``` # 其他 为了能更好的调度goroutine,Go提供了``sync.WaitGroup``、``sync.Once``还有``context`` ## ``sync.WaitGroup`` ``sync.WaitGroup``的作用就是在多goroutine并发程序中,让主goroutine等待所有goroutine执行结束。(直接查看代码注释) ``sync.WaitGroup``提供了三个函数``Add``、``Done``和``Wait``三者用法如下: + Add 写在主goroutine中,参数为将要运行的goroutine的数量 + Done 写在各个非主goroutine中,表示运行结束 + Wait 写在主goroutine中,block主goroutine,等待所有其他goroutine运行结束 ``` var wait sync.WaitGroup wait.Add(2) //必须是运行的goroutine的数量 go func() { //TODO 一顿小操作 defer wait.Done() // done函数用在goroutine中,表示goroutine操作结束 }() go func() { //TODO 一顿小操作 defer wait.Done() // done函数用在goroutine中,表示goroutine操作结束 }() wait.Wait() // block住了,直到所有goroutine都结束 ``` ### 注意 ``sync.WaitGroup``中有一个计数器,记录的是需要等待的goroutine的数量,默认值是0,可以通过Add方法来增加或者减少值,但是切记,千万不能让计数器的值小于零,会触发panic! ``sync.WaitGroup``调用Wait方法的时候,``sync.WaitGroup``中计数器的值一定要为0。因此Add中的值一定要等于非主goroutine的数量! 且不要把Add和Wait方法放到不同的goroutine中执行! ## ``sync.Once`` 真真正正的只执行一次。 ``sync.Once``只要一个方法:``Do``,里面就一个参数:``func``。多说无益,复制下面代码,猜猜执行结果就知道了。 ``` package main import ( "fmt" "sync" ) func main() { var once sync.Once onceBody := func() { fmt.Println("Only once") } done := make(chan bool) for i := 0; i < 10; i++ { go func() { once.Do(onceBody) done <- true }() } for i := 0; i < 10; i++ { <-done } } ``` 执行结果 ``` Only once ``` 没错,只有一行。真只执行了一次。 ## context context可以用来实现一对多的goroutine协作。这个包的应用场景主要是在API中。字面意思也很直接,上下文。当一个请求来时,会产生一个goroutine,但是这个goroutine往往要衍生出许多额外的goroutine去处理操作,例如链接database、请求rpc请求。。等等,这些衍生的goroutine和主goroutine有很多公用数据的,例如同一个请求生命周期、用户认证信息、token等,当这个请求超时或者被取消的时候,这里所有的goroutine都应该结束。context就可以帮助我们达到这个效果。 很显然,主goroutine和衍生的所有子goroutine之间形成了一颗树结构。我们的context可以从根节点遍布整棵树,当然,是线程安全的。 线程之间的基本是这样的: ``` func DoSomething(ctx context.Context, arg Arg) error { // ... use ctx ... } ``` 有两个根context:background和todo;这两个根都是contenxt空的,没有值的。两者也没啥太本质的区别,Background是最常用的,作为Context这个树结构的最顶层的Context,它不能被取消。当不知道用啥context的时候就可以用TODO。 根生成子节点有以下方法: ``` //生成可撤销的Context (手动撤销) func WithCancel(parent Context) (ctx Context, cancel CancelFunc) //生成可定时撤销的Context (定时撤销) func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) //也是生成可定时撤销的Context (定时撤销) func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) //不可撤销的Context,可以存一个kv的值 func WithValue(parent Context, key, val interface{}) Context ``` ### 可撤销的Context 以下是每个方法的调用方式(全都来自godoc,可粘贴复用): 可撤销的``func WithCancel(parent Context) (ctx Context, cancel CancelFunc)`` ``` gen := func(ctx context.Context) <-chan int { dst := make(chan int) n := 1 go func() { for { select { case <-ctx.Done(): //只有撤销函数被调用后,才会触发 return case dst <- n: n++ } } }() return dst } ctx, cancel := context.WithCancel(context.Background()) defer cancel() //调用返回的cancel方法来让 context声明周期结束 for n := range gen(ctx) { fmt.Println(n) if n == 5 { break } } ``` 要想结束所有线程,就调用``ctx, cancel := context.WithCancel(context.Background())``函数返回的cancel函数即可,当撤销函数被调用之后,对应的Context值会先关闭它内部的接收通道,也就是它的Done方法返回的通道。 ``WithDeadline``和``WithTimeout``用法基本类似,而且WithTimeout函数内部调用了WithDeadline函数。两者唯一区别是WithTimeout表示从现在开始xxx超时,而WithDeadline的时间可以是之前的时间:意思是说WithTimeout表示从现在开始, xxx时间后超时。而WithDeadline表示xx时间点,结束!这个时间点可以是昨天,时间点不收任何限制。 以下是godoc给出的列子: ``WithDeadline`` ``` package main import ( "context" "fmt" "time" ) func main() { d := time.Now().Add(50 * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) // Even though ctx will be expired, it is good practice to call its // cancelation function in any case. Failure to do so may keep the // context and its parent alive longer than necessary. defer cancel() //时间超时会自动调用 select { case <-time.After(1 * time.Second): fmt.Println("overslept") case <-ctx.Done(): fmt.Println(ctx.Err()) } } ``` 输出: ``` context deadline exceeded ``` ``WithTimeout`` ``` package main import ( "context" "fmt" "time" ) func main() { // Pass a context with a timeout to tell a blocking function that it // should abandon its work after the timeout elapses. ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() //时间超时会自动调用 select { case <-time.After(1 * time.Second): fmt.Println("overslept") case <-ctx.Done(): fmt.Println(ctx.Err()) // prints "context deadline exceeded" } } ``` 输出: ``` context deadline exceeded ``` ### 不可撤销的context,传递值 ``WithValue``可以用来在传递值的,值的存取是以KV的形式来进行的。直接上例子 ``` type favContextKey string f := func(ctx context.Context, k favContextKey) { if v := ctx.Value(k); v != nil { fmt.Println("found value:", v) return } fmt.Println("key not found:", k) } k := favContextKey("language") k1 := favContextKey("Chinese") ctx := context.WithValue(context.Background(), k, "Go") ctx1 := context.WithValue(ctx, k1, "Go1") f(ctx1, k1) f(ctx1, k) ``` 输出: ``` found value: Go1 found value: Go ``` #### 更多精彩内容,请关注我的微信公众号 ``互联网技术窝`` 或者加微信共同探讨交流: ![](https://i6448038.github.io/img/weichat/qrcode.jpg)

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

847 次点击  ∙  1 赞  
加入收藏 微博
1 回复  |  直到 2019-05-04 17:57:32
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传