Goroutines
- 在Go语言中,每一个并发的执行单元叫作goroutine。设想一个程序中有两个函数,假设两个函数没有相互之间的调用关系。一个线性的程序会先调用其中的一个函数,然后再调用另一个。如果程序中包含多个goroutine,对两个函数的调用则可能发生在同一时刻。
- 当一个程序启动时,其main函数即在一个单独的goroutine中运行,我们叫它main goroutine。新的goroutine会用go语句来创建。在语法上,go语句是在一个普通的函数或方法调用前加上关键字go。go语句会使其语句中的函数在一个新创建的goroutine中运行。而go语句本身会迅速地完成。
f() // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait
- 主函数返回时,所有的goroutine都会被直接打断,程序退出。除了从主函数退出或者直接终止程序之外,没有其它的编程方法能够让一个goroutine来打断另一个的执行,但是之后可以看到一种方式来实现这个目的,通过goroutine之间的通信来让一个goroutine请求其它的goroutine,并被请求的goroutine自行结束执行。
Channel
- 如果说goroutine是Go语言程序的并发体的话,那么channels它们之间的通信机制。它可以让一个goroutine通过它给另一个goroutine发送值信息。每个channel都有一个特殊的类型,也就是channel可发送数据的类型。一个可以发送int类型数据的channel一般写为chan int。
- 使用内置的make函数,我们可以创建一个channel:
ch := make(chan int)
- 和map类似,channel也一个对make函数创建的底层数据结构的引用。当我们复制一个channel或把 channel用于函数参数传递时,我们只是拷贝了一个channel引用,因此调用者和被调用者将引用同一个channel对象。和其它的引用类型一样,channel的零值也是nil。
- channel有发送和接收两个主要操作,都是通信行为。一个发送语句将一个值从一个goroutine通过channel发送到另一个执行接收操作的goroutine。发送和接收两个操作都是用
<-
运算符。在发送语句中,<-
运算符分割channel和要发送的值。在接收语句中,<-
运算符写在channel对象之前。一个不使用接收结果的接收操作也是合法的。
ch <- x // 发送消息
x = <-ch // 从 channel 中接收消息
<-ch // 从 channel 接收并丢弃消息
- Channel还支持close操作,用于关闭channel,随后对基于该channel的任何发送操作都将导致panic异常。对一个已经被close过的channel执行接收操作依然可以接收到之前已经成功发送的数据;如果channel中已经没有数据的话将产生一个零值的数据。
使用内置的close函数就可以关闭一个channel:
close(ch)
以最简单方式调用make函数创建的时一个无缓冲的channel,但是我们也可以指定第二个整形参数,对应channel的容量。如果channel的容量大于零,那么该channel就是带缓冲的channel。
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3
无缓冲 channel
- 一个基于无缓冲Channel的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的Channel上执行接收操作,当发送的值通过Channel成功传输之后,两个goroutine可以继续执行后面的语句。反之,如果接收操作先发生,那么接收者goroutine也将阻塞,直到有另一个goroutine在相同的Channel上执行发送操作。
- 下面的程序在 main 函数的 goroutine 中将标准输入复制到server,因此当客户端程序关闭标准输入时,后台goroutine可能依然在工作。我们需要让主goroutine等待后台goroutine完成工作后再退出,我们使用了一个channel来同步两个goroutine,在后台goroutine返回之前,它先打印一个日志信息,然后向done对应的channel发送一个值。主goroutine在退出前先等待从done对应的channel接收一个值。因此,总是可以在程序退出前正确输出“done”消息。
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // NOTE: ignoring errors
log.Println("done")
done <- struct{}{} // signal the main goroutine
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // wait for background goroutine to finish
}
- 基于channel发送消息有两个重要方面。首先每个消息都有一个值,但是有时候通讯的事实和发生的时刻也同样重要。当我们更希望强调通讯发生的时刻时,我们将它称为消息事件。有些消息事件并不携带额外的信息,它仅仅是用作两个goroutine之间的同步,这时候我们可以用
struct{}
空结构体作为channels元素的类型,虽然也可以使用bool或int类型实现同样的功能,done <- 1
语句也比done <- struct{}{}
更短。 - 如果发送者知道,没有更多的值需要发送到channel的话,那么让接收者也能及时知道没有多余的值可接收将是有用的,因为接收者可以停止不必要的接收等待。这可以通过内置的close函数来关闭channel实现:
close(naturals)
- 当一个channel被关闭后,再向该channel发送数据将导致panic异常。当一个被关闭的channel中已经发送的数据都被成功接收后,后续的接收操作将不再阻塞,它们会立即返回一个零值。
- 接收 channel 语句中可以额外增加第二个值,标识 chnnel 是否已经关闭
x, ok := <-naturals
- Go语言的range循环可直接在channels上面迭代。使用range循环是上面处理模式的简洁语法,它依次从channel接收数据,当channel被关闭并且没有值可接收时跳出循环。
在下面的程序中,我们的计数器goroutine只生成100个含数字的序列,然后关闭naturals对应的channel,这将导致计算平方数的squarer对应的goroutine可以正常终止循环并关闭squares对应的channel。(在一个更复杂的程序中,可以通过defer语句关闭对应的channel。)最后,主goroutine也可以正常终止循环并退出程序。
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
// Squarer
go func() {
for x := range naturals {
squares <- x * x
}
close(squares)
}()
// Printer (in main goroutine)
for x := range squares {
fmt.Println(x)
}
}
- 试图重复关闭一个channel将导致panic异常,试图关闭一个nil值的channel也将导致panic异常。关闭一个channels还会触发一个广播机制,我们将在后面讨论。
单方向的 channel
- 当一个channel作为一个函数参数是,它一般总是被专门用于只发送或者只接收。
为了表明这种意图并防止被滥用,Go语言的类型系统提供了单方向的channel类型,分别用于只发送或只接收的channel。类型
chan<- int
表示一个只发送int的channel,只能发送不能接收。相反,类型<-chan int
表示一个只接收int的channel,只能接收不能发送。(箭头<-
和关键字chan的相对位置表明了channel的方向。)这种限制将在编译期检测。 - 因为关闭操作只用于断言不再向channel发送新的数据,所以只有在发送者所在的goroutine才会调用close函数,因此对一个只接收的channel调用close将是一个编译错误。
这是改进的版本,这一次参数使用了单方向channel类型:
func counter(out chan<- int) {
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
调用counter(naturals)将导致将chan int
类型的naturals隐式地转换为chan<- int
类型只发送型的channel。调用printer(squares)也会导致相似的隐式转换,这一次是转换为<-chan int
类型只接收型的channel。任何双向channel向单向channel变量的赋值操作都将导致该隐式转换。
带缓冲的 channel
带缓存的Channel内部持有一个元素队列。队列的最大容量是在调用make函数创建channel时通过第二个参数指定的。下面的语句创建了一个可以持有三个字符串元素的带缓存Channel。图8.2是ch变量对应的channel的图形表示形式。
ch = make(chan string, 3)
向缓存Channel的发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。如果内部缓存队列是满的,那么发送操作将阻塞直到因另一个goroutine执行接收操作而释放了新的队列空间。相反,如果channel是空的,接收操作将阻塞直到有另一个goroutine执行发送操作而向队列插入元素。
我们可以在无阻塞的情况下连续向新创建的channel发送三个值:
ch <- "A"
ch <- "B"
ch <- "C"
此刻,channel的内部缓存队列将是满的(图8.3),如果有第四个发送操作将发生阻塞。
如果我们接收一个值,
fmt.Println(<-ch) // "A"
那么channel的缓存队列将不是满的也不是空的(图8.4),因此对该channel执行的发送或接收操作都不会发送阻塞。通过这种方式,channel的缓存队列解耦了接收和发送的goroutine。
在某些特殊情况下,程序可能需要知道channel内部缓存的容量,可以用内置的cap函数获取:
fmt.Println(cap(ch)) // "3"
同样,对于内置的len函数,如果传入的是channel,那么将返回channel内部缓存队列中有效元素的个数。因为在并发程序中该信息会随着接收操作而失效,但是它对某些故障诊断和性能优化会有帮助。
fmt.Println(len(ch)) // "2"
在继续执行两次接收操作后channel内部的缓存队列将又成为空的,如果有第四个接收操作将发生阻塞:
fmt.Println(<-ch) // "B"
fmt.Println(<-ch) // "C"
下面的例子展示了一个使用了带缓存channel的应用。它并发地向三个镜像站点发出请求,三个镜像站点分散在不同的地理位置。它们分别将收到的响应发送到带缓存channel,最后接收者只接收第一个收到的响应,也就是最快的那个响应。因此mirroredQuery函数可能在另外两个响应慢的镜像站点响应之前就返回了结果。(顺便说一下,多个goroutines并发地向同一个channel发送数据,或从同一个channel接收数据都是常见的用法。)
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }
如果我们使用了无缓存的channel,那么两个慢的goroutines将会因为没有人接收而被永远卡住。这种情况,称为goroutines泄漏,这将是一个BUG。和垃圾变量不同,泄漏的goroutines并不会被自动回收,因此确保每个不再需要的goroutine能正常退出是重要的。
关于无缓存或带缓存channel之间的选择,或者是带缓存channel的容量大小的选择,都可能影响程序的正确性。无缓存channel更强地保证了每个发送操作与相应的同步接收操作;但是对于带缓存channel,这些操作是解耦的。同样,即使我们知道将要发送到一个channel的信息的数量上限,创建一个对应容量大小带缓存channel也是不现实的,因为这要求在执行任何接收操作之前缓存所有已经发送的值。如果未能分配足够的缓冲将导致程序死锁。
用带缓冲的channel 控制并发数量
此外对于buffered channel,我们可以用一个有容量限制的buffered channel来控制并发,这类似于操作系统里的计数信号量概念。从概念上讲,channel里的n个空槽代表n个可以处理内容的token(通行证),从channel里接收一个值会释放其中的一个token,并且生成一个新的空槽位。这样保证了在没有接收介入时最多有n个发送操作。(这里可能我们拿channel里填充的槽来做token更直观一些,不过还是这样吧~)。由于channel里的元素类型并不重要,我们用一个零值的struct{}来作为其元素。
下面的crawl
函数,将对links.Extract
的调用操作用获取、释放token的操作包裹起来,来确保同一时间对其只有20个调用。信号量数量和其能操作的IO
资源数量应保持接近。
// goroutine获取token后,可以进行抓取操作,如果满20了
// 那么 goroutine 会等到有获取 token 后再去执行
var tokens = make(chan struct{}, 20)
func crawl(url string) []string {
fmt.Println(url)
tokens <- struct{}{} // 获取 token
list, err := links.Extract(url)
<-tokens // 释放 token
if err != nil {
log.Print(err)
}
return list
}
并发循环的一个典型示例
在并发循环中为了知道最后一个goroutine什么时候结束(最后一个结束并不一定是最后一个开始),我们需要一个递增的计数器,在每一个goroutine启动时加一,在goroutine退出时减一。这需要一种特殊的计数器,这个计数器需要在多个goroutine操作时做到安全并且提供在其减为零之前一直等待的一种方法。这种计数类型被称为sync.WaitGroup,下面的代码就用到了这种方法:
// makeThumbnails6为从通道中接收到的每个文件创建缩略图。
// 返回每个创建的缩略图所占的自己数。
func makeThumbnails6(filenames <-chan string) int64 {
sizes := make(chan int64)
var wg sync.WaitGroup // number of working goroutines
for f := range filenames {
wg.Add(1)
// worker
go func(f string) {
defer wg.Done()
thumb, err := thumbnail.ImageFile(f)
if err != nil {
log.Println(err)
return
}
info, _ := os.Stat(thumb) // OK to ignore error
sizes <- info.Size()
}(f)
}
// closer
go func() {
wg.Wait()
close(sizes)
}()
var total int64
for size := range sizes {
total += size
}
return total
}
注意Add和Done方法的不对策。Add是为计数器加一,必须在worker goroutine开始之前调用,而不是在goroutine中;否则的话我们没办法确定Add是在"closer" goroutine调用Wait之前被调用。并且Add还有一个参数,但Done却没有任何参数;其实它和Add(-1)是等价的。我们使用defer来确保计数器即使是在出错的情况下依然能够正确地被减掉。上面的程序代码结构是当我们使用并发循环,但又不知道迭代次数时很通常而且很地道的写法。
select多通道复用
select语句的一般形式,和switch语句稍微有点相似。也会有几个case和最后的default选择支。每一个case代表一个通信操作(在某个channel上进行发送或者接收)并且会包含一些语句组成的一个语句块。
select {
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
}
一个接收表达式可能只包含接收表达式自身(译注:不把接收到的值赋值给变量什么的),就像上面的第一个case,或者包含在一个简短的变量声明中,像第二个case里一样;第二种形式让你能够在当前 case 块中引用接收到的值。
select会等待case中有能够执行的case时去执行。当条件满足时,select才会去通信并执行case之后的语句;这时候其它通信是不会执行的。一个没有任何case的select语句写作select{},会永远地等待下去。
下面这个例子更微秒。ch这个channel的buffer大小是1,所以会交替的为空或为满,所以只有一个case可以进行下去,无论i是奇数或者偶数,它都会打印0 2 4 6 8。
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x) // "0" "2" "4" "6" "8"
case ch <- i:
}
}
如果多个case同时就绪时,select会随机地选择一个执行,这样来保证每一个channel都有平等的被select的机会。增加上面例子的buffer大小会使其输出变得不确定,因为当buffer既不为满也不为空时,select语句的执行情况就像是抛硬币的行为一样是随机的。
有疑问加站长微信联系(非本文作者)