goroutine原理
概念介绍
- 并发
⼀个CPU上能同时执⾏多项任务,在很短时间内,CPU来回切换任务执⾏(在某段很短时间内执⾏程序a,然后⼜迅速得切换到程序b去执⾏),有时间上的重叠(宏观上是同时的,微观仍是顺序执⾏),这样看起来多个任务像是同时执⾏,这就是并发。
- 并⾏
当系统有多个CPU时,每个CPU同⼀时刻都运⾏任务,互不抢占⾃⼰所在的CPU资源,同时进⾏,称为并⾏。
- 进程
CPU在切换程序的时候,如果不保存上⼀个程序的状态(context--上下⽂),直接切换下⼀个程序,就会丢失上⼀个程序的⼀系列状态,于是引⼊了进程这个概念,⽤以划分好程序运⾏时所需要的资源。因此进程就是⼀个程序运⾏时候的所需要的基本资源单位(也可以说是程序运⾏的⼀个实体)。
- 线程
CPU切换多个进程的时候,会花费不少的时间,因为切换进程需要切换到内核态,⽽每次调度需要内核态都需要读取⽤户态的数据,进程⼀旦多起来,CPU调度会消耗⼀⼤堆资源,因此引⼊了线程的概念,线程本身⼏乎不占有资源,他们共享进程⾥的资源,内核调度起来不会那么像进程切换那么耗费资源。
- 协程
协程拥有⾃⼰的寄存器上下⽂和栈。协程调度切换时,将寄存器上下⽂和栈保存到其他地⽅,在切回来的时候,恢复先前保存的寄存器上下⽂和栈。因此,协程能保留上⼀次调⽤时的状态(即所有局部状态的⼀个特定组合),每次过程重⼊时,就相当于进⼊上⼀次调⽤的状态,换种说法:进⼊上⼀次离开时所处逻辑流的位置。线程和进程的操作是由程序触发系统接⼝,最后的执⾏者是系统;协程的操作执⾏者则是⽤户⾃身程序,goroutine也是协程。
Go并发模型
Go语⾔的并发处理参考了CSP(Communicating Sequential Process)模型。
CSP并发模型是在1970年左右提出的概念,属于⽐较新的概念,不同于传统的多线程通过共享内存来通信,CSP讲究的是“以通信的⽅式来共享内存”。
Go的CSP模型实现与原始的CSP实现有点差别:原始的CSP中channel⾥的任务都是⽴即执⾏的,⽽go语⾔为其增加了⼀个缓存,即任务可以先暂存起来,等待执⾏线程准备好再顺序执⾏。
Go的CSP并发模型,是通过goroutine和channel来实现的。
- goroutine 是Go语⾔中并发的执⾏单位。有点抽象,其实就是和传统概念上的”线程“类似,可以理解为”线程“。
- channel是Go语⾔中各个并发结构体(goroutine)之前的通信机制。通俗的讲,就是各个goroutine之间通信的”管道“,有点类似于Linux中的管道。
⽣成⼀个goroutine的⽅式⾮常的简单:Go⼀下,就⽣成了。
go f()
通信机制channel也很⽅便,传数据⽤channel <- data,取数据⽤<-channel。
在通信过程中,传数据channel <- data和取数据<-channel必然会成对出现,因为这边传,那边
取,两个goroutine之间才会实现通信。
⽽且不管传还是取,必阻塞,直到另外的goroutine传或者取为⽌。
Go调度器GMP
Go语⾔运⾏时环境提供了⾮常强⼤的管理goroutine和系统内核线程的调度器, 内部提供了三种对象:Goroutine,Machine,Processor
。
Goroutine : 指应⽤创建的goroutine
Machine : 指系统内核线程。
Processor : 指承载多个goroutine的运⾏器
在宏观上说,Goroutine与Machine因为Processor的存在,形成了多对多(M:N)的关系。M个⽤户线程对应N个系统线程,缺点增加了调度器的实现难度
Goroutine是Go语⾔中并发的执⾏单位。Goroutine底层是使⽤协程(coroutine)实现,coroutine是⼀种运⾏在⽤户态的⽤户线程(参考操作系统原理:内核态,⽤户态)它可以由语⾔和框架层调度。Go在语⾔层⾯实现了调度器,同时对⽹络,IO库进⾏了封装处理,屏蔽了操作系统层⾯的复杂的细节,在语⾔层⾯提供统⼀的关键字⽀持。
三者与内核级线程的关系如下所示:
⼀个Machine会对应⼀个内核线程(K),同时会有⼀个Processor与它绑定。⼀个Processor连接⼀个或者多个Goroutine。Processor有⼀个运⾏时的Goroutine(上图中绿⾊的G),其它的Goroutine处于等待状态。
Processor的数量同时可以并发任务的数量,可通过GOMAXPROCS限制同时执⾏⽤户级任务的操作系统线程。GOMAXPROCS值默认是CPU的可⽤核⼼数,但是其数量是可以指定的。在go语⾔运⾏时环境,可以使⽤
runtime.GOMAXPROCS(MaxProcs)
来指定Processor数量。
默认数量为
func schedinit() {
//设置最⼤的M数量
sched.maxmcount = 10000
}
- 当⼀个Goroutine创建被创建时,Goroutine对象被压⼊Processor的本地队列或者Go运⾏时全局Goroutine队列。
- Processor唤醒⼀个Machine,如果Machine的waiting队列没有等待被 唤醒的Machine,则创建⼀个(只要不超过Machine的最⼤值,10000),Processor获取到Machine后,与此Machine绑定,并执⾏此Goroutine。
- Machine执⾏过程中,随时会发⽣上下⽂切换。当发⽣上下⽂切换时,需要对执⾏现场进⾏保护,以便下次被调度执⾏时进⾏现场恢复。Go调度器中Machine的栈保存在Goroutine对象上,只需要将Machine所需要的寄存器(堆栈指针、程序计数器等)保存到Goroutine对象上即可。
- 如果此时Goroutine任务还没有执⾏完,Machine可以将Goroutine重新压⼊Processor的队列,等待下⼀次被调度执⾏。
- 如果执⾏过程遇到阻塞并阻塞超时,Machine会与Processor分离,并等待阻塞结束。此时Processor可以继续唤醒Machine执⾏其它的Goroutine,当阻塞结束时,Machine会尝试”偷取”⼀个Processor,如果失败,这个Goroutine会被加⼊到全局队列中,然后Machine将⾃⼰转⼊Waiting队列,等待被再次唤醒。
channel原理
channel数据结构
channel⼀个类型管道,通过它可以在goroutine之间发送和接收消息。它是Golang在语⾔层⾯提供的goroutine间的通信⽅式。
Go依赖于称为CSP(Communicating Sequential Processes)的并发模型,通过
Channel实现这种同步模式。
通过channel来实现通信:
package main
import (
"fmt"
"time"
)
func goRoutineA(a <-chan int) {
val := <-a
fmt.Println("goRoutineA:", val) }
func goRoutineB(b chan int) {
val := <-b
fmt.Println("goRoutineB:", val) }
func main() {
ch := make(chan int, 3)
go goRoutineA(ch)
go goRoutineB(ch)
ch <- 3
time.Sleep(time.Second)
}
channel结构体:
//path:src/runtime/chan.go
type hchan struct {
qcount uint // 当前队列中剩余元素个数
dataqsiz uint // 环形队列⻓度,即可以存放的元素个数
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的⼤⼩
closed uint32 // 标识关闭状态
elemtype *_type // 元素类型
sendx uint // 队列下标,指示元素写⼊时存放到队列中的位置
recvx uint // 队列下标,指示元素从队列的该位置读出
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
lock mutex // 互斥锁,chan不允许并发读写
}
channel实现⽅式
chan内部实现了⼀个环形队列作为其缓冲区,队列的⻓度是创建chan时指定的。
下面展示了⼀个可缓存6个元素的channel示意图:
- dataqsiz指示了队列⻓度为6,即可缓存6个元素
- buf指向队列的内存,队列中还剩余两个元素
- qcount表示队列中还有两个元素
- sendx指示后续写⼊的数据存储的位置,取值[0, 6]
- recvx指示从该位置读取数据, 取值[0, 6]
等待队列
从channel读数据,如果channel缓冲区为空或者没有缓冲区,当前goroutine会被阻塞。 向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞。
被阻塞的goroutine将会挂在channel的等待队列中:
- 因读阻塞的goroutine会被向channel写⼊数据的goroutine唤醒;
- 因写阻塞的goroutine会被从channel读数据的goroutine唤醒;
以下展示了⼀个没有缓冲区的channel,有⼏个goroutine阻塞等待读数据:
注意,⼀般情况下recvq和sendq⾄少有⼀个为空。只有⼀个例外,那就是同⼀个goroutine使⽤select语句向channel⼀边写数据,⼀边读数据。
channel读写
创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区⻓度由make语句传⼊,buf的⼤⼩则与元素⼤⼩和缓冲区⻓度共同决定。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements
stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be
collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=",
elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
向channel写数据
向⼀个channel中写数据简单过程如下:
- 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq
取出G,并把数据写⼊,最后把该G唤醒,结束发送过程; - 如果缓冲区中有空余位置,将数据写⼊缓冲区,结束发送过程;
- 如果缓冲区中没有空余位置,将待发送数据写⼊G,将当前G加⼊sendq,进⼊睡眠,等待被读goroutine唤醒;
从channel读数据
从⼀个channel读数据简单过程如下:
- 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,
最后把G唤醒,结束读取过程; - 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中⾸部读出数据,把G中
数据写⼊缓冲区尾部,把G唤醒,结束读取过程; - 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
- 将当前goroutine加⼊recvq,进⼊睡眠,等待被写goroutine唤醒;
关闭channel
关闭channel时会把recvq中的G全部唤醒,本该写⼊G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// 释放所有接收者
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 释放所有发送者
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// 垃圾回收
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
} }
除此之外,panic出现的常⻅场景还有:
- 关闭值为nil的channel
- 关闭已经被关闭的channel
- 向已经关闭的channel写数据
有疑问加站长微信联系(非本文作者)