Jochen的golang小抄-进阶篇-并发编程(实战篇)

mob604756e39ef4 · · 595 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。


目录

  • runtime包
  • 临界资源
    • 什么是临界资源
    • 临界资源安全问题
    • 如何解决临界资源安全问题
  • 同步(sync)包
    • 同步等待组(WaitGroup)
    • 互斥锁
    • 读写锁(RWMutex)
  • 通道(channel)
    • 使用通道(channel)


小抄系列进阶篇涉及的概念较多,知识点重要,故每块知识点独立成篇,方便日后笔记的查询

本篇的主题是:并发编程

在前面我们介绍了go并发模型以及其实现原来,本章要介绍的是go语言中常用的并发编程操作以及要注意的问题

runtime包

runtime包下包含了go运行时系统交互的操作(如控制goroutine函数)、反射包的使用、运行时信息

尽管go编译器产生的是本地可执行的代码,但是这些代码仍然需要运行在go的runtime中,这个runtime就类似于Java和.NET语言中的虚拟机,负责管理内存分配、垃圾回收、堆栈处理、反射等

接下来我们介绍runtime包一些函数

package main

import (
   "fmt"
   "runtime"
   "time"
)

func main() {
   /*
      runtime包常用函数
   */

   //runtime.GOROOT:获取goroot目录
   rootPath := runtime.GOROOT()
   fmt.Println(rootPath) ///usr/local/go

   //runtime.GOOS:获取操作系统
   osName := runtime.GOOS
   fmt.Println(osName) //linux

   //runtime.NumCPU:获取当前计算机逻辑cpu数量(cpu是几核的)
   cpuName := runtime.NumCPU()
   fmt.Println(cpuName) //4

   //runtime.GOMAXPROCS:设置go程序运行期间最大的P数量:[1,256] 如果小于1则为默认的逻辑cpu数
   //一般设置为自己电脑的核心数,GO1.8后默认让程序执行在多核上,所以可以不用设置了
   n := runtime.GOMAXPROCS(runtime.NumCPU()) //返回的是设置之前的数量
   fmt.Println(n)                            //4

   //runtime.Gosched:让当前的goroutine让出当前执行的时间片给其他线程执行
   /*
         当一个goroutine发生阻塞的时候,go会自动的把与该goroutine处于同一系统线程的其他goroutine转移
      转移当另一个系统线程去,使得这些goroutine不会一同被阻塞
   */
   go func() {
      for i := 0; i < 8; i++ {
         fmt.Println("子goroutine")
      }
   }()
   //普通的执行逻辑是,两个goroutine谁先抢到cpu资源就谁先执行
   for i := 0; i < 6; i++ {
      runtime.Gosched() //主协程主动让出自己的执行时间片
      //这边让出完后,已经让出的紧接着又可以去抢占资源
      //所以在这个循环打印中前面让出的时间片后面又可以继续抢占资源,执行的结果就会不一样
      fmt.Println("主goroutine")
   }

   //终止当前的goroutine,但是当前的goroutine中的defer函数还是会被执行
   go func() {
      fmt.Println("goroutine开始")
      //调用f
      f()
      fmt.Println("goroutine结束了")

   }()
   /*
      输出:
         goroutine开始
         我是defer
      可以看到,f()中使用了runtime.Goexit,所以当前协程被终止了,后面也不会输出了

   */
   //为了保证主程序不结束,睡一会
   time.Sleep(2 * time.Second)

}

func f() {
   defer fmt.Println("我是defer")
   runtime.Goexit()
   fmt.Println("我被终止没有输出")
}

临界资源

什么是临界资源

临界资源:

临界资源即共享资源,其主要指的是并发环境中多个进程/线程/协程共享的资源

临界资源安全问题

临界资源安全问题也就是以往我们常听到的线程安全问题

在并发环境下,如果临界资源处理不当可能会导致数据一致性出现问题,也就是说如果多个goroutine在访问同一个数据资源的时候,其中一个线程修改了数据,那么这个数值就被修改了,对于其他的goroutine来讲,这个数值可能是不对的。

如下有一段资源争夺(data race(数据资源竞争))的代码:

package main

import (
	"fmt"
	"time"
)

func main() {
	/*
		临界资源
	*/
	n := 1 //在主协程和子协程都可以共同访问,这就是一个临界资源
	go func() {
		n = 2
		fmt.Println("子goroutine中,n的值为:", n)
	}()
	n = 3
	time.Sleep(2 * time.Second)
	fmt.Println("主goroutine中,n的值为:", n)
	/*
		输出:
			子goroutine中,n的值为: 2
			主goroutine中,n的值为: 2
			分析可以得出:主协程先持有资源,将n改为3,然后进入睡眠 此时子协程抢占cpu执行时间片,
		将n改为2,所以最终输出的结果都是n=2

			按正常的顺序执行的逻辑分析,我们想要的效果是子协程中打印n=2,主协程打印n=3,但是,
		在主协程中先修改了n=3,再要打印却未答应的时候,被子协程抢占了cpu资源,改变了n的数值,
		导致了我们数据出现了不一致的情况
	*/

}

拓展:使用go的CLI命令编译或者执行go源文件的时候,添加-race选项可以看到竞争资源的分析情况:

[jochen@jochen-inspiron7559 concurrency_goroutine]$ go run -race race1.go 
==================
WARNING: DATA RACE  #表示发现数据竞争问题
Write at 0x00c000134010 by goroutine 7: #在编号为7的子协程中访问了变量n
  main.main.func1()
      /home/GoWorkSpace/src/concurrency_goroutine/race1.go:14 +0x3c

Previous write at 0x00c000134010 by main goroutine: #在主协程中又访问了变量n
  main.main()
      /home/GoWorkSpace/src/concurrency_goroutine/race1.go:17 +0x88

Goroutine 7 (running) created at:
  main.main()
      /home/GoWorkSpace/src/concurrency_goroutine/race1.go:13 +0x7a
==================
子goroutine中,n的值为: 2
主goroutine中,n的值为: 2
Found 1 data race(s) #表示发现了一个数据竞争(race是竞争的意思)
exit status 66

再看一个卖票的例子,熟悉下data race(数据竞争)问题,即共享资源在并发状态的不安全问题

package main

import (
	"fmt"
	"math/rand"
	"time"
)

//全局变量,票的数量
var ticketNums = 10

func main() {
	/*
		模拟高铁售票:
			启用4个goroutine模拟四个售票口,四个售票口同时执行,表示四个售票口同时卖票
	*/
	go saleTickets("售票口1")
	go saleTickets("售票口2")
	go saleTickets("售票口3")
	saleTickets("售票口4") //保留一个在主goroutine调用,避免主goroutine提前结婚苏
	/*
		多次执行发现输出结果发现余票居然有时候会出现负数,为什么会这样呢?在下面的卖票函数中有过程分析和解释
	*/
}

//卖票
func saleTickets(name string) {
	rand.Seed(time.Now().UnixNano()) //随机数睡眠
	for {
		//1.为什么余票可能为负数?
		if ticketNums > 0 {
			//1.1协程执行到此处后进入睡眠,让出自己的时间片,其他的协程就会来抢占cpu的是时间片
			time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)//睡眠,模拟被其他协程抢占cpu时间片
			/*
				1.2当抢占到时间片后,恢复执行,此时即使票已经卖光了,但是由于该协程会保留了休眠前的执行上下文,
			协程还是会恢复休眠前位置继续往下执行,所以导致了可能会出现票数为负数的情况出现
			*/
			fmt.Println("剩余票数:", ticketNums)
			fmt.Println(name, "售票一张")
			ticketNums-- //1.3可能此时的ticketNum已经被别的协程置为0,或者-x
		} else {
			fmt.Println(name, "没票卖啦")
			break
		}
	}
}

如何解决临界资源安全问题

临界资源安全问题其实就是平时我们经常说到的线程安全问题,即共享资源在并发环境下出现 data race进而引发数据不一致的问题

线程安全问题的一般都是使用同步的解决方案,实现同步一般就是通过把共享资源加锁的方式

把一个资源加锁后,在某一时间段内,该资源只能允许一个goroutine访问。当前goroutine访问完毕,则释放锁,此时资源才可被其他goroutine访问

在go语言中,可以通过sync包下的锁操作给资源上锁

对于锁、原子操作等其他语言中常见的为了解决线程安全问题所使用的同步机制,go语言也有相应的实现库,下面我们逐一了解诶它们

同步(sync)包

为了解决共享资源的线程安全问题,一般会使用同步操作

go语言将同步相关操作定义在sync包下

sync包提供了基本的同步基元,如互斥锁。除了Once和WaitGroup类型,大部分都是适用于低水平程序线程,高水平的同步使用channel通信更好一些

同步等待组(WaitGroup)

type WaitGroup struct {
    // 包含隐藏或非导出字段
}

WaitGroup用于等待一组goroutine的结束
在每一个WaitGroup中都维护一个计数器,用以记录要等待执行goroutine的数量,可以通过父线程调用Add方法来设定应等待的线程的数量
每个被等待的线程在结束时应调用Done方法,来表示把计数器的数值-1,当计数器为0时,父goroutine同步阻塞结束
同时,主协程里可以调用Wait方法阻塞至所有线程结束

//Add方法向内部计数加上delta,delta可以是负数;如果内部计数器变为0,Wait方法阻塞等待的所有线程都会释放,如果计数器小于0,方法panic
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done() //Done方法减少WaitGroup计数器的值,应在线程的最后执行。
func (wg *WaitGroup) Wait() //Wait方法阻塞直到WaitGroup计数器减为0。
/*
	注意Add加上正数的调用应在Wait之前,否则Wait可能只会等待很少的线程。一般来说本方法应在创建新的线程或者其他应等待的事件之前调用。
*/

牛刀小试

package main

import (
	"fmt"
	"sync"
)

//1.1创建全局同步等待组对象
var wg sync.WaitGroup

func main() {
	/*
		WaitGroup : 同步等待组
			Add()  设置等待组中要执行的子goroutine的数量
			Done() 让等待组计数器-1
			Wait() 让主goroutine等待
	*/
	//1.使用同步等待组同步等待子协程结束
	wg.Add(2) //等待子协程数为2
	go test1()
	go test2() //直接这样执行,主协程结束子goroutine也会结束,所以需要使用同步等待组
	fmt.Println("主goroutine阻塞...等待子goroutine执行结束")

	wg.Wait() //阻塞等待
	fmt.Println("主goroutine解除阻塞")
}

func test1() {
	for i := 1; i < 10; i++ {
		fmt.Println("test1函数打印:A", i)
	}
	wg.Done() //给wg等待组中的计数器-1,实际内部调用的Add(-1)
}

func test2() {
	defer wg.Done() //给wg等待组中的计数器-1
	for i := 1; i < 10; i++ {
		fmt.Println("\ttest2函数打印:B", i)
	}
}

可能遇到的错误:

fatal error: all goroutines are asleep - deadlock!

主线程一直等不到计数器被设置为0,就会一直阻塞等待,造成死锁。一般出现在在于你的子goroutine忘记调用Done方法或者是你的Add方法设置的计数器大于要等待的子协程数量

互斥锁

互斥锁对象也定义在同步包下

互斥锁:也称为同步锁,用以资源访问同步操作。当协程访问某一共享资源时可上锁,上锁后的资源不可被其他的协程访问,待访问结束后,该协程释放锁,此时该资源才能被其他协程访问

在go语言中,互斥锁并非是锁主一个对象,而锁的是一段代码(上锁和解锁之间的代码段)

对于每一个资源,都对应一个互斥锁的标记,该标记用来保证在一个时刻只能有一个goroutine来访问资源,其他goroutine只能等待

互斥锁对象定义如下:

type Mutex struct {
    // 包含隐藏或非导出字段
}

Mutex是一个互斥锁,可以创建为其他结构体的字段;零值为解锁状态。Mutex类型的锁和线程无关,可以由不同的线程加锁和解锁。

Mutex中两个重要的方法为Lock(上锁)和UnLock(释放锁)

func (m *Mutex) Lock() //Lock方法锁住m,如果m已经加锁,则阻塞直到m解锁。
func (m *Mutex) Unlock() //Unlock方法解锁m,如果m未加锁会导致运行时错误。锁和线程无关,可以由不同的线程加锁和解锁

下面使用互斥锁解决上面售票案例中,余票为可能为负数的临界资源安全问题

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

//全局变量,票的数量
var ticketNums = 10

//互斥锁对象
var mutex sync.Mutex //可以理解创建一把锁头
var wg sync.WaitGroup
func main() {
	/*
		模拟高铁售票:
			启用4个goroutine模拟四个售票口,四个售票口同时执行,表示四个售票口同时卖票
	*/
	wg.Add(4)
	//通过同步锁的方式解决临界资源安全问题
	go saleTickets("售票口1")
	go saleTickets("售票口2")
	go saleTickets("售票口3")
	go saleTickets("售票口4") //保留一个在主goroutine调用,避免主goroutine提前结婚苏

	wg.Wait()
}

//卖票
func saleTickets(name string) {
	rand.Seed(time.Now().UnixNano()) //随机数睡眠
	for {
		//上锁
		mutex.Lock() //锁的是一段代码(上锁和解锁之间的代码段),从该行开始只能有一个goroutine来访问
		if ticketNums > 0 {
			//因为上锁,g1先夺得锁,即使睡眠,g2过来抢占资源,也只能干等到锁释放才能进入上锁后的代码
			time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)//睡眠,模拟被其他协程抢占cpu时间片
			fmt.Println("剩余票数:", ticketNums)
			fmt.Println(name, "售票一张")
			ticketNums-- //可能此时的ticketNum已经被别的协程置为0,或者-x
		} else {
			//要注意如果某个争夺的协程进来后break,没有释放锁资源,会导致其他goroutine没法正常执行而报错
			mutex.Unlock()
			fmt.Println(name, "没票卖啦")
			break
		}
		//释放锁
		mutex.Unlock()
	}
	wg.Done()
}

ps:切记,在使用互斥锁的时候,对资源操作完,一定要解锁,否则会出现程序异常,死锁的问题,可以使用defer语句确保解锁操作

读写锁(RWMutex)

互斥锁强行同步往往会降低性能,开发中往往是这样的场景:
当某个数据永远不会被修改,是只读的时候,就不会存在资源竞争的问题

所以,读取数据并不是造成线程安全的原因。真正造成临界资源安全问题的原因是在并发条件对数据的写操作

读写锁:

RWMutex读写锁是专门针对于读操作和写操作的互斥锁。它和普通的互斥锁最大的不同在于它可以分别针对“读操作”和"写操作"进行锁定和解锁

读写锁可以让多个读操作同时并发读取,但是对于写操作是完全互斥的(当一个goroutine进行写操作时,其他的goroutine即不能读也不能写)

读写锁对象也定义在同步包下

type RWMutex struct {
    // 包含隐藏或非导出字段
}

RWMutex是读写互斥锁。该锁可以被同时多个读取者持有或唯一个写入者持有。RWMutex可以创建为其他结构体的字段;零值为解锁状态。RWMutex类型的锁也和线程无关,可以由不同的线程加读取锁/写入和解读取锁/写入锁。

RWMutex实现了如下方法

func (rw *RWMutex) Lock() //Lock方法将rw锁定为写入状态,禁止其他线程读取或者写入
func (rw *RWMutex) Unlock() //Unlock方法解除rw的写入锁状态,如果m未加写入锁会导致运行时错误
func (rw *RWMutex) RLock() //RLock方法将rw锁定为读取状态,禁止其他线程写入,但不禁止读取
func (rw *RWMutex) RUnlock() //Runlock方法解除rw的读取锁状态,如果未加读取锁会导致运行时错误
func (rw *RWMutex) RLocker() Locker //Rlocker方法返回一个互斥锁,通过调用rw.Rlock和rw.Runlock实现了Locker接口。

牛刀小试

package main

import (
   "fmt"
   "sync"
   "time"
)

var wg *sync.WaitGroup

var rwMutex *sync.RWMutex //定义读取锁变量
func main() {
   /*
      读写锁
   */
   rwMutex = new(sync.RWMutex) //创建读取锁对象
   wg = new(sync.WaitGroup)
   //wg.Add(3)
   //go readData(333)
   //go readData(666)
   //go readData(888)
   /*
      输出:
         读操作 start...
         reading... 888
         读操作 start...
         reading... 333
         读操作 start...
         reading... 666
         end
         end
         end
      可以发现,读操作是可以同时执行的

   */
   wg.Add(3)
   go writeData(111)
   go readData(222) //当写入锁未释放时,读取锁会阻塞无法上锁,直到锁释放才能上锁
   go writeData(999)
   /*
      输出:
         写操作 start...
         writing... 999
         读操作 start...
         写操作 start...
         reading... 222
         写完over~
         end
         writing... 111
         写完over~

      可以发现,无法同时执行写入操作


   */
   wg.Wait()

}

//读操作
func readData(i int) {
   fmt.Println("读操作", "start...")

   rwMutex.RLock() //给读操作上锁

   fmt.Println("reading...",i)
   time.Sleep(1 * time.Second) //模拟耗时读取操作

   rwMutex.RUnlock() //释放读取锁
   wg.Done()
   fmt.Println("end")
}

//写操作
func writeData(i int) {
   defer wg.Done()

   fmt.Println("写操作", "start...")

   rwMutex.Lock() //给写操作上锁

   fmt.Println( "writing...",i)
   time.Sleep(1 * time.Second) //模拟耗时写入操作

   rwMutex.Unlock() //释放写入锁
   fmt.Println("写完over~")
}

读写锁总结:

  1. 多个goroutine可以同时读
  2. 写的时候,只能等,读不了也写不了

Go语言中有一句很经典的话:不要以共享内存的方式去通信,而是要以通信的方式去共享内存
即go语言并不鼓励用锁保护的机制去多个goroutine来访问共享的数据,而是应该通过后面介绍到的channel机制实现goroutine之间的共享资源状态变化的传递,这样在goroutine内就可以通过判断资源的状态实现和锁一样效果

通道(channel)

通道是goroutine之间通信的管道,用于实现多个goroutine之间数据的传递

go语言中,可以通过数据结构channel实现某一goroutine中的数据传递给另一goroutine

go语言会在语言层面确保,在某一时间点,只有一个goroutine能访问channel中的数据。为解决线程安全问题提供了极为优雅的方式

使用通道(channel)

每个通道都有其相关的类型,该类型是通道允许传递的类型。通道的零值为nil,其没有任何用处,所以通道必须使用和map、slice的方法来定义

老样子,数据结构的学习还是少说多撸

package main

import (
	"fmt"
	"time"
)

func main() {
	/*
		通道channel
	*/

	//1.声明通道
	var c1 chan int //代表c1通道能传递int类型数据
	//通道默认值为nil, 没有任何卵用,要使用必须先创建通道
	fmt.Printf("%T, %v\n", c1, c1) //chan int, <nil>

	//2.创建通道 和map slice一样
	if c1 == nil {
		c1 = make(chan int)
		//可以看到打印通道的值是内存地址,说明他是一个引用类型的数据。前面证明过很多次这个结论,不搞了
		fmt.Printf("%T, %v\n", c1, c1) //chan int, 0xc000112000

	}

	/*
		3.使用通道
			通过通道发送数据 c <- data
			从通道接收数据  data := <- c    从箭头方向看数据流入流出记忆,很形象
		ps:对于通道来说,从其中发送和接收数据的过程中都是阻塞的

			案例:子协程写数据,主协程读数据
	*/
	go func() {
		for i := 0; i < 10; i++ {
			fmt.Println("子goroutine准备写入数据:", i)
			time.Sleep(2 * time.Second) //模拟延迟效果
			c1 <- i
		}
		fmt.Println("子goroutine end...")
	}()

	data := <-c1 //当通道没数据时,读取操作会发生阻塞,待通道有数据为止
	fmt.Println("从通道c1读取到数据", data)
	fmt.Println("main goroutine end...")

}

总结channel注意事项:

  • 每个通道都要有一个关联的数据类型, 表示的是传递的数据类型
  • nil通道是不能使用的(nil map也无法直接存储键值对),必须要去创建对应对象(一般使用make())
  • chan <- data表示向管道写入数据
    data <- chan表示从通道中读取数据
  • 当从通道中发生或读取数据是阻塞式的
    当发送数据时,直到另一个goroutine读取数据,阻塞才会接触
    当接收数据时,知道另一个goroutine写入数据,阻塞才会接触
  • channel是同步的,意味着同一时间只允许一条goroutine进行操作
  • 通道的发送和接收必须处在不同的goroutine中

ps:当一个goroutine在一个通道上发送数据,而没有其他goroutine去从通道接收数据,或反过来操作,就会发生死锁现象,要千万注意!


有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:mob604756e39ef4

查看原文:Jochen的golang小抄-进阶篇-并发编程(实战篇)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

595 次点击  ∙  2 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传