Go语言基础(3)
Go语言的并发通过goroutine实现。goroutine类似于线程,属于用户态的线程,可以根据需要创建成千上万个goroutine并发工作。goroutine是由Go语言的运行时(runtime)调度完成,而线程是由操作系统调度完成。
Go语言还提供channel在多个goroutine间进行通信。goroutine和channel是 Go 语言秉承的 CSP(Communicating Sequential Process)并发模式的重要实现基础
Go语言中的goroutine就是这样一种机制,goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。
在Go语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能–goroutine,当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就可以了,就是这么简单粗暴。
goroutine并发
Go语言中使用goroutine非常简单,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine。
一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。
如下:
func hello() {
fmt.Println("Hello")
}
func main(){
go hello()
}
在程序启动时,Go程序就会为main()函数创建一个默认的goroutine。
当main()函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束
启动多个goroutine
在Go语言中可以启动多个goroutine
可增长的栈
OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这么大。所以在Go语言中一次创建十万左右的goroutine也是可以的。
goroutine调度
GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。
- G就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。
- P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。
- M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的;
P与M一般也是一一对应的。他们关系是: P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。
P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。
单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能
Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。
Go语言中可以通过runtime.GOMAXPROCS()
函数设置当前程序并发时占用的CPU逻辑核心数。
Go1.5版本之前,默认使用的是单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数。
例:
func main() {
runtime.GOMAXPROCS(1)
go a()
go b()
time.Sleep(time.Second)
}
Go语言中的操作系统线程和goroutine的关系:
- 一个操作系统线程对应用户态多个goroutine。
- go程序可以同时使用多个操作系统线程。
- goroutine和OS线程是多对多的关系,即m:n。
关于sync.WaitGroup的用法
func f1() {
rand.Seed(time.Now().UnixNano())
for i :=0; i<5; i++{
r1 := rand.Int()
r2 := rand.Intn(10)
fmt.Println(r1, r2)
}
}
var wg sync.WaitGroup
func main(){
fmt.Println("start ######")
for i:=0; i<100; i++{
wg.Add(1)
go func(x int) {
fmt.Println(x)
defer wg.Done()
}(i)
}
fmt.Println("main .....")
wg.Wait()
f1()
}
通道channel
函数与函数间需要交换数据才能体现并发执行函数的意义。
Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
channel类型
channel是一种类型,一种引用类型。声明通道类型的格式如下:
var 变量 chan 元素类型
创建channel
<u>通道是引用类型,通道类型的空值是nil</u>。
var ch chan int
fmt.Println(ch) // <nil>
<u>声明的通道后需要使用make函数初始化之后才能使用</u>。
创建channel的格式如下:
make(chan 元素类型, [缓冲大小])
channel的缓冲大小是可选的。
channel操作
通道有发送(send)、接收(receive)和关闭(close)三种操作。
发送和接收都使用<-符号。
现在我们先使用以下语句定义一个通道:
ch := make(chan int)
发送
将一个值发送到通道中。
ch <- 10 // 把10发送到ch中
接收
从一个通道中接收值。
x := <- ch // 从ch中接收值并赋值给变量x
<-ch // 从ch中接收值,忽略结果
关闭
我们通过调用内置的close函数来关闭通道。
close(ch)
关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。
关闭后的通道有以下特点:
- 对一个关闭的通道再发送值就会导致panic。
- 对一个关闭的通道进行接收会一直获取值直到通道为空。
- 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
- 关闭一个已经关闭的通道会导致panic。
无缓冲的通道
无缓冲的通道又称为阻塞的通道。
例:
func main() {
var ch chan int
ch = make(chan int)
ch <- 10
fmt.Println(ch)
}
在编译时:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
无缓冲的通道只有在有接收值的情况下才可以发送值,否则会产生死锁。所以上例中ch是一个无缓冲区的通道,不能直接把值放入通道。
以上代码需要改为:
var wg sync.WaitGroup
func recv(ch chan int){
fmt.Println( <- ch)
defer wg.Done()
}
func main() {
var ch chan int
ch = make(chan int)
wg.Add(1)
go recv(ch)
ch <- 10
wg.Wait()
}
编译运行并输出结果为: 10
无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。
使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。
有缓冲的通道
解决上面问题的方法还有一种就是使用有缓冲区的通道。可以在使用make函数初始化通道的时候为其指定通道的容量。只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量。
例:
var ch02 chan int
ch02 = make(chan int, 1)
ch02 <- 20
fmt.Println("发送成功")
tmp := <- ch02
fmt.Println("取值成功, ", tmp)
}
当向通道中发送完数据时,可以通过close函数来关闭通道。
当通道被关闭时,再往该通道发送值会引发panic。从一个已关闭的channel中读取数据不会报错,也不会阻塞接收者。如果一个已被close的通道中的值已经被取完,然后继续取值,取到的值是对应类型的零值。可以通过指定接受状态位来观察接受的数据是否是从一个已关闭的channel所发送出来的数据。例如j, ok := <-c
,则ok为false时,则代表channel已经被关闭。
例:
var ch01 chan int
ch01 = make(chan int, 3)
wg.Add(1)
go func() {
ch01 <- 10
ch01 <- 20
ch01 <- 15
defer wg.Done()
close(ch01)
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
val , ok := <-ch01
if !ok {
break
}
fmt.Println(val)
}
}()
wg.Wait()
}
单向通道
- chan<- int是一个只写单向通道(只能对其写入int类型值),可以对其执行发送操作但是不能执行接收操作;
- <-chan int是一个只读单向通道(只能从其读取int类型值),可以对其执行接收操作但是不能执行发送操作。
例:
func srcNumber(ch chan<- int){
for i:=1;i<=100;i++{
ch <- i
}
close(ch)
defer wg.Done()
}
func destNumber(destCh chan <- int, srcCh <-chan int) {
for i := range srcCh{
destCh <- i * i
}
close(destCh)
defer wg.Done()
}
func output(ch <-chan int) {
for i:=range ch {
fmt.Println(i)
}
}
func main() {
var ch03 chan int
var ch04 chan int
ch03 = make(chan int)
ch04 = make(chan int)
wg.Add(1)
go srcNumber(ch03)
wg.Add(1)
go destNumber(ch04,ch03)
output(ch04)
wg.Wait()
通道总结
channel常见的异常总结,如下图:
关闭已经关闭的channel也会引发panic
work pool
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("worker:%d start job:%d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("worker:%d end job:%d\n", id, j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 开启3个goroutine
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 5个任务
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 输出结果
for a := 1; a <= 5; a++ {
<-results
}
select多路复用
Go内置了select关键字,可以同时响应多个通道的操作。
select的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个通道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。具体格式如下:
select{
case <-ch1:
...
case data := <-ch2:
...
case ch3<-data:
...
default:
默认操作
}
例:
ch05 := make(chan int, 1)
for i:=1; i<=10; i++ {
select {
case x:=<-ch05:
fmt.Println(x)
case ch05 <- i:
}
}
}
使用select语句能提高代码的可读性。
1) 可处理一个或多个channel的发送/接收操作。
2) 如果多个case同时满足,select会随机选择一个。
3) 对于没有case的select{}会一直等待,可用于阻塞main函数。
并发安全
例:
var x int64
var wg sync.WaitGroup
func add() {
for i:=0; i<5000; i++ {
x = x+1
}
defer wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
上例代码中开启了两个goroutine去累加变量x的值,这两个goroutine在访问和修改x变量的时候就会存在数据竞争,导致最后的结果与期待的不符。
互斥锁
互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。 使用互斥锁来修复上面代码的问题:
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i:=0; i<5000; i++ {
lock.Lock()
x = x+1
lock.Unlock()
}
defer wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。
读写互斥锁
互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。
读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。
读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。
func write() {
//lock.Lock()
rwlock.Lock()
x++
time.Sleep(time.Millisecond*10)
rwlock.Unlock()
//lock.Unlock()
wg.Done()
}
func read(){
//lock.Lock()
rwlock.Lock()
time.Sleep(time.Millisecond*10)
rwlock.Unlock()
//lock.Unlock()
wg.Done()
}
func main() {
start := time.Now()
for i:=0; i<10; i++{
wg.Add(1)
go write()
}
for i:=0;i<1000;i++{
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
sync
sync.WaitGroup
在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:
方法名 | 功能 |
---|---|
(wg * WaitGroup) Add(delta int) | 计数器+delta |
(wg *WaitGroup) Done() | 计数器-1 |
(wg *WaitGroup) Wait() | 阻塞直到计数器变为0 |
sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。
sync.Once
在编程的很多场景下需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等。
Go语言中的sync包中提供了一个针对只执行一次场景的解决方案–sync.Once。
sync.Once只有一个Do方法,其签名如下:
func (o *Once) Do(f func()) {}
备注:如果要执行的函数f需要传递参数就需要搭配闭包来使用。
例
var icons map[string]image.Image
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {
if icons == nil {
loadIcons()
}
return icons[name]
}
多个goroutine并发调用Icon函数时不是并发安全的,现代的编译器和CPU可能会在保证每个goroutine都满足串行一致的基础上自由地重排访问内存的顺序。loadIcons函数可能会被重排为以下结果:
func loadIcons() {
icons = make(map[string]image.Image)
icons["left"] = loadIcon("left.png")
icons["up"] = loadIcon("up.png")
icons["right"] = loadIcon("right.png")
icons["down"] = loadIcon("down.png")
}
在这种情况下就会出现即使判断了icons不是nil也不意味着变量初始化完成了。考虑到这种情况,能想到的办法就是添加互斥锁,保证初始化icons的时候不会被其他的goroutine操作,但是这样做又会引发性能问题。
使用sync.Once改造的示例代码如下:
var icons map[string]image.Image
var loadIconsOnce sync.Once
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 是并发安全的
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
再例如:多个函数都要close某个channel时,使用sync.Once处理冲突。
sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。
sync.Map
关于Go语言内置的map类型:
var m = make(map[string]int)
func get(key string) int {
return m[key]
}
func set(key string, value int) {
m[key] = value
}
func main() {
for i:=0; i<20; i++{
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n) //把n的值转换字符串传给key
set(key,n)
fmt.Printf("key:=%v,value:=%v\n", key,get(key))
wg.Done()
}(i)
}
wg.Wait()
在执行上述代码时经常出现异常,提示如下:fatal error: concurrent map writes
说明:内置map不是并发安全的。
Go语言的sync包中提供了一个开箱即用的并发安全版map–sync.Map。开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。同时sync.Map内置了诸如Store、Load、LoadOrStore、Delete、Range等操作方法。
使用方法如下例所示:
var m2 = sync.Map{}
func main() {
for i:=0; i<20; i++{
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m2.Store(key,n)
value, _ := m2.Load(key)
fmt.Printf("k=:%v,v:=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
atomic原子操作
代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic提供。
例:
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i:=0; i<5000; i++ {
//lock.Lock()
//x = x+1
//lock.Unlock()
atomic.AddInt64(&x, 1)
}
defer wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。
atomic包
方法 | 解释 |
---|---|
func LoadInt32(addr int32) (val int32) <br />func LoadInt64(addr int64) (val int64) <br />func LoadUint32(addr uint32) (val uint32) <br />func LoadUint64(addr uint64) (val uint64) <br />func LoadUintptr(addr uintptr) (val uintptr) <br />func LoadPointer(addr unsafe.Pointer) (val unsafe.Pointer) | 读取操作 |
func StoreInt32(addr int32, val int32) <br />func StoreInt64(addr int64, val int64) <br />func StoreUint32(addr uint32, val uint32) <br />func StoreUint64(addr uint64, val uint64) <br />func StoreUintptr(addr uintptr, val uintptr) <br />func StorePointer(addr unsafe.Pointer, val unsafe.Pointer) | 写入操作 |
func AddInt32(addr int32, delta int32) (new int32) <br />func AddInt64(addr int64, delta int64) (new int64) <br />func AddUint32(addr uint32, delta uint32) (new uint32) <br />func AddUint64(addr uint64, delta uint64) (new uint64) <br />func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) | 修改操作 |
func SwapInt32(addr int32, new int32) (old int32) <br />func SwapInt64(addr int64, new int64) (old int64) <br />func SwapUint32(addr uint32, new uint32) (old uint32) <br />func SwapUint64(addr uint64, new uint64) (old uint64) <br />func SwapUintptr(addr uintptr, new uintptr) (old uintptr) <br />func SwapPointer(addr unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) | 交换操作 |
func CompareAndSwapInt32(addr int32, old, new int32) (swapped bool) <br />func CompareAndSwapInt64(addr int64, old, new int64) (swapped bool) <br />func CompareAndSwapUint32(addr uint32, old, new uint32) (swapped bool) <br />func CompareAndSwapUint64(addr uint64, old, new uint64) (swapped bool) <br />func CompareAndSwapUintptr(addr uintptr, old, new uintptr) (swapped bool) <br />func CompareAndSwapPointer(addr unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) | 比较并交换操作 |
有疑问加站长微信联系(非本文作者)