本文译自Rob Pike的Go语言PPT教程 – "The Go Programming Language Part3(updated June 2011)"。由于该教程的最新更新时间早于Go 1版本发布,因此该PPT中的一些内容与Go 1语言规范略有差异,到时我会在相应的地方做上注解。
第三部分大纲
- 并发与通信
- Goroutines
- 通道(Channel)
- 并发相关话题
并发与通信:Goroutines
Goroutines
术语:
对于"并发运行的事物"已经有了好多术语 – 进程、线程、协程(coroutine)、POSIX线程、NPTL线程、轻量级进程…,但这些事物都或多或少有不同。并且Go中的并发与哪种都不甚相同。
因此我们介绍一个新术语:goroutine。
定义
一个Goroutine是一个与其他goroutines运行在同一地址空间的Go函数或方法。一个运行的程序由一个或更多个goroutine组成。
它与线程、协程、进程等不同。它是一个goroutine。
注意:Concurrency与Parallelism是不同的概念。如果你不了解它们的不同,查查相关资料吧。
关于并发的问题有许多。我们后续会提及。现在就假设它能按其对外所宣称的那样正常工作吧。
启动一个Goroutine
调用一个函数或方法,然后说go:
func IsReady(what string, minutes int64) {
time.Sleep(minutes * 60*1e9) // Unit is nanosecs.
fmt.Println(what, "is ready")
}
go IsReady("tea", 6)
go IsReady("coffee", 2)
fmt.Println("I'm waiting…")
打印:
I'm waiting… (立即)
coffee is ready (2分钟后)
tea is ready (6分钟后)
一些简单的事实
goroutine的使用代价很低。
当从最外层函数返回,或执行到结尾处时,goroutine退出。
goroutines可以并行地在不同CPU上执行,共享内存。
你无需担心栈大小。
栈
在gccgo中,至少目前goroutines就是pthreads。在6g中,goroutines采用基于线程的多路复用技术,因此它们的代价更低廉。
无论是上面哪个实现,栈都很小(几KB),可以根据需要增长。因此goroutines使用很少的内存。你可以创建很多goroutines,它们还可以动态拥有很大的栈。
程序员无需考虑栈大小相关话题。在Go中,这种考虑甚至不应该出现。
调度
Goroutine多路复用系统线程。当一个goroutine执行了一个阻塞的系统调用时,其他goroutine不会不阻塞。
计划后续实现CPU绑定的goroutines,不过目前用6g如果你想要用户层级别的并行,你必须设置环境变量GOMAXPROCS或调用runtime.GOMAXPROCS(n)。
GOMAXPROCS告诉运行时调度器有多少个用户空间goroutine即将同时执行,理想情况下在不同的CPU核上。
*gccgo总是为每个goroutine单独分配一个线程执行。
并发与通信:Channels
Go中的Channel
除非两个goroutine可以通信,否则它们无法协作。
Go中有一个名为channel的类型,提供通信和同步能力。
Go中还提供一些特殊的基于channel的控制结构,使得编写并发程序更加容易。
Channel类型
该类型最简单形式:
chan elementType
通过这个类型的值,你可以发送和接收elementType类型的元素。
Channel是引用类型,这意味着如果你将一个chan变量赋值给另外一个,则这两个变量访问的是相同的channel。同样,这也意味着可以用make分配一个channel:
var c = make(chan int)
通信操作符:<-
箭头指示数据流向。
作为一个二元操作符,<-将值从右侧发送到左侧的channel中:
c := make(chan int)
c <- 1 // 向c发送1
作为前缀一元操作符,<- 从一个channel中接收数据:
v = <-c // 从c中接收数据,赋值给v
<-c // 接收数据,丢弃
i := <-c // 接收值,用于初始化i
语义
默认情况下,通信是同步的。(我们后续将讨论异步通信)。这意味着:
1) A在一个channel上的发送操作会阻塞,直到该channel上有一个接收者就绪。
2) 在一个channel上到的接收操作会阻塞,直到该channel上有一个发送者就绪。
因此通信是同步的一种形式:两个通过channel交换数据的goroutine在通信的时刻同步。
让我们泵一些数据吧
func pump(ch chan int) {
for i := 0; ; i++ { ch <- i }
}
ch1 := make(chan int)
go pump(ch1) // pump挂起; 我们运行
fmt.Println(<-ch1) // 打印 0
现在我们启动一个循环接收者:
func suck(ch chan int) {
for { fmt.Println(<-ch) }
}
go suck(ch1) // 大量数字出现
你仍可以溜进去,抓取一个值:
fmt.Println(<-ch1) // 输出:3141159
返回channel的函数
在前面的例子中,pump像一个生成器,喷涌出值。但在分配channel等方面做了很多工作。让我们将其打包到一个返回channel的函数中:
func pump() chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ { ch <- i }
}()
return ch
}
stream := pump()
fmt.Println(<-stream)// 打印 0
"返回channel的函数"是Go中的一个重要的惯用法。
到处都是返回channel的函数
我这里不再重复那些你可以从其他地方找到的知名例子。这里有些可以了解一下:
1) prime sieve: 在语言规范以及教程中。
2) Doug McIlroy的Power系列论文:http://plan9.bell-labs.com/who/rsc/thread/squint.pdf
这个程序的一个Go版本在测试套件中:http://golang.org/test/chan/powser1.go
Range和Channel
for循环的range子句接收channel作为一个操作数,在这种情况下,for循环迭代处理从channel接收到的值。我们来重写pump函数;这里是suck的重写,让它也启动一个goroutine:
func suck(ch chan int) {
go func() {
for v := range ch { fmt.Println(v) }
}()
}
suck(pump()) // 现在不再阻塞
关闭一个Channel
range是如何知道何时channel上的数据传输结束了呢?发送者调用一个内置函数close:
close(ch)
接收者使用"comma ok"测试发送者是否关闭了channel:
val, ok:= <- ch
当结果为(value, true),说明依然有数据;一旦channel关闭,数据流干,结果将会是(zero, false)。
在一个Channel上使用Range
在一个channel上使用range,诸如:
for value := range <-ch {
use(value)
}
等价于:
for {
value, ok := <-ch
if !ok {
break
}
use(value)
}
Close
关键点:
只有发送者可以调用close。
只有接收者可以询问是否channel被关闭了。
只有在获取值的同时询问(避免竞争)
只有在有必要通知接收者不会再有数据的时候才调用close。
大多数情况下,不需要用close;它与关闭一个文件没有可比性。
不管怎样,channel是可以垃圾回收的。
Channel的方向性
一个channel变量的最简单形式是一个非缓冲(同步的)值,该值可以用于进行发送和接收。
一个channel类型可以被指定为只发或只收:
var recvOnly <-chan int
var sendOnly chan<- int
Channel的方向性(2)
所有Channel创建时都是双向的,但我们可以将它们赋值给带方向性的channel变量。从类型安全性角度考虑,对于函数内的实例非常有用:
func sink(ch <-chan int) {
for { <-ch }
}
func source(ch chan<- int) {
for { ch <- 1 }
}
c := make(chan int)//双向的
go source(c)
go sink(c)
同步的Channel
同步的Channel是非缓冲的。发送动作不会完成,直到一个接收者接收这个值。
c := make(chan int)
go func() {
time.Sleep(60*1e9)
x := <-c
fmt.Println("received", x)
}()
fmt.Println("sending", 10)
c <- 10
fmt.Println("sent", 10)
输出:
sending 10 (立即发生)
sent 10 (60秒后,这两行出现)
received 10
异步的Channel
通过告知make缓冲中元素的数量,我们可以创建一个带缓冲的、异步的channel。
c := make(chan int, 50)
go func() {
time.Sleep(60*1e9)
x := <-c
fmt.Println("received", x)
}()
fmt.Println("sending", 10)
c <- 10
fmt.Println("sent", 10)
输出:
sending 10 (立刻发生)
sent 10(现在)
received 10 (60秒后)
缓冲不是类型的一部分
注意缓冲的大小甚至其自身都不是channel类型的一部分,只是值的一部分。因此下面的代码虽危险,但合法:
buf = make(chan int, 1)
unbuf = make(chan int)
buf = unbuf
unbuf = buf
缓冲是一个值的属性,而不是类型的。
Select
select是Go中的一个控制结构,类似于用于通信的switch语句。每个case必须是一个通信操作,要么是send要么是receive。
ci, cs := make(chan int), make(chan string)
select {
case v := <-ci:
fmt.Printf("received %d from ci\n", v)
case v := <-cs:
fmt.Printf("received %s from cs\n", v)
}
Select随机执行一个可运行的case。如果没有case可运行,它将阻塞,直到有case可运行。一个默认的子句应该总是可运行的。
Select语义
快速一览:
- 每个case都必须是一个通信(可能是:=)
- 所有channel表达式都会被求值
- 所有被发送的表达式都会被求值
- 如果任意某个通信可以进行,它就执行;其他被忽略。
- 如果有多个case都可以运行,Select会随机公平地选出一个执行。其他不会执行。
- 否则:
– 如果有default子句,则执行该语句。
– 如果没有default字句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。
随机bit生成器
幼稚但很有说明性的例子:
c := make(chan int)
go func() {
for {
fmt.Println(<-c)
}
}()
for {
select {
case c <- 0: //没有语句,没有fallthrough
case c <- 1:
}
}
测试可通信性
一个通信是否可以进行,而不阻塞?一个带default字句的select可以告诉我们:
select {
case v := <-ch:
fmt.Println("received", v)
default:
fmt.Println("ch not ready for receive")
}
如果没有其他case可以运行,那default子句将被执行,因此这对于非阻塞接收是一个惯用法;非阻塞发送显然也可以这么做。
超时
一个通信可以在一个给定的时间内成功完成么?time包包含了after函数:
func After(ns int64) <-chan int64
在指定时间段之后,它向返回的channel中传递一个值(当前时间)。
在select中使用它以实现超时:
select {
case v := <-ch:
fmt.Println("received", v)
case <-time.After(30*1e9):
fmt.Println("timed out after 30 seconds")
}
多路复用(multiplexing)
channel是原生值,这意味着他们也能通过channel发送。这个属性使得编写一个服务类多路复用器变得十分容易,因为客户端在提交请求时可一并提供用于回复应答的channel。
chanOfChans := make(chan chan int)
或者更典型的如:
type Reply struct { … }
type Request struct {
arg1, arg2 someType
replyc chan *Reply
}
多路复用服务器
type request struct {
a, b int
replyc chan int
}
type binOp func(a, b int) int
func run(op binOp, req *request) {
req.replyc <- op(req.a, req.b)
}
func server(op binOp, service <-chan *request) {
for {
req := <-service // 请求到达这里
go run(op, req) // 不等op
}
}
启动服务器
使用"返回channel的函数"惯用法来为一个新服务器创建一个channel:
func startServer(op binOp) chan<- *request {
service := make(chan *request)
go server(op, req)
return service
}
adderChan := startServer(
func(a, b int) int { return a + b }
)
客户端
在教程中有个例子更为详尽,但这里是一个变体:
func (r *request) String() string {
return fmt.Sprintf("%d+%d=%d",
r.a, r.b, <-r.replyc)
}
req1 := &request{7, 8, make(chan int)}
req2 := &request{17, 18, make(chan int)}
请求已经就绪,发送它们:
adderChan <- req1
adderChan <- req2
可以以任何顺序获得结果;r.replyc多路分解:
fmt.Println(req2, req1)
停掉
在多路复用的例子中,服务将永远运行下去。要将其干净地停掉,可通过一个channel发送信号。下面这个server具有相同的功能,但多了一个quit channel:
func server(op binOp, service <-chan *request,
quit <-chan bool) {
for {
select {
case req := <-service:
go run(op, req) // don't wait for it
case <-quit:
return
}
}
}
启动服务器
其余代码都相似,只是多了个channel:
func startServer(op binOp) (service chan<- *request,
quit chan<- bool) {
service = make(chan *request)
quit = make(chan bool)
go server(op, service, quit)
return service, quit
}
adderChan, quitChan := startServer(
func(a, b int) int { return a + b }
)
停掉:客户端
只有当准备停掉服务端的时候,客户端才会受到影响:
req1 := &request{7, 8, make(chan int)}
req2 := &request{17, 18, make(chan int)}
adderChan <- req1
adderChan <- req2
fmt.Println(req2, req1)
所有都完成后,向服务器发送信号,让其退出:
quitChan <- true
链
package main
import ("flag"; "fmt")
var nGoroutine = flag.Int("n", 100000, "how many")
func f(left, right chan int) { left <- 1 + <-right }
func main() {
flag.Parse()
leftmost := make(chan int)
var left, right chan int = nil, leftmost
for i := 0; i < *nGoroutine; i++ {
left, right = right, make(chan int)
go f(left, right)
}
right <- 0 // bang!
x := <-leftmost // 等待完成
fmt.Println(x) // 100000
}
例子:Channel作为缓存
var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)
func server() {
for {
b := <-serverChan // 等待做work
process(b) // 在缓存中处理请求
select {
case freeList <- b: // 如果有空间,重用缓存
default: // 否则,丢弃它
}
}
}
func client() {
for {
var b *Buffer
select {
case b = <-freeList: // 如果就绪,抓取一个
default: b = new(Buffer) // 否则,分配一个
}
load(b)// 读取下一个请求放入b中
serverChan <- b // 将请求发给server.
}
}
并发
并发相关话题
许多并发方面,当然,Go一直在尽力做好它们。诸如Channel发送和接收是原子的。select语句也是缜密定义和实现的等。
但goroutine在共享内存中运行,通信网络可能死锁,多线程调试器糟糕透顶等等。
接下来做什么?
Go给予你原生的
不要用你在使用C或C++或甚至是Java时的方式去编程。
channel给予你同步和通信的能力,并且使得它们很强大,但也可以很容易知道你是否可以很好的使用它们。
规则是:
不要通过共享内存通信,相反,通过通信共享内存。
特有的通信行为保证了同步!
模型
例如,使用一个channel发送数据到一个专职服务goroutine。如果同一时刻只有一个goroutine拥有指向数据的指针,那谈不上什么并发。
这是我们极力推荐的服务端编程模型,至少是对旧的"每个客户端一个线程"的泛化。它自从20世纪80年代就开始使用了,它工作的很好。
内存模型
那关于同步和共享内存的令人生厌的细节在:
http://golang.org/doc/go_mem.html
但如果你遵循我们的方法,你很少需要理解那些内容。
© 2012, bigwhite. 版权所有.
有疑问加站长微信联系(非本文作者)