本篇是调度剖析的第三部分,将重点关注并发特性。
回顾:
第一部分
第二部分
简介
首先,在我平时遇到问题的时候,特别是如果它是一个新问题,我一开始并不会考虑使用并发的设计去解决它。我会先实现顺序执行的逻辑,并确保它能正常工作。然后在可读性和技术关键点都 Review 之后,我才会开始思考并发执行的实用性和可行性。有的时候,并发执行是一个很好的选择,有时则不一定。
在本系列的第一部分中,我解释了系统调度的机制和语义,如果你打算编写多线程代码,我认为这些机制和语义对于实现正确的逻辑是很重要的。在第二部分中,我解释了Go 调度的语义,我认为它能帮助你理解如何在 Go 中编写高质量的并发程序。在这篇文章中,我会把系统调度和Go 调度的机制和语义结合在一起,以便更深入地理解什么才是并发以及它的本质。
什么是并发
并发意味着乱序
执行。拿一组原来是顺序执行的指令,而后找到一种方法,使这些指令乱序执行,但仍然产生相同的结果。那么,顺序执行还是乱序执行?根本在于,针对我们目前考虑的问题,使用并发必须是有收益的!确切来说,是并发带来的性能提升要大于它带来的复杂性成本。当然有些场景,代码逻辑就已经约束了我们不能执行乱序,这样使用并发也就没有了意义。
并发与并行
理解并发
与并行
的不同也非常重要。并行
意味着同时执行两个或更多指令,简单来说,只有多个CPU核心之间才叫并行
。在 Go 中,至少要有两个操作系统硬件线程并至少有两个 Goroutine 时才能实现并行,每个 Goroutine 在一个单独的系统线程上执行指令。
如图:
我们看到有两个逻辑处理器P
,每个逻辑处理器都挂载在一个系统线程M
上,而每个M
适配到计算机上的一个CPU处理器Core
。
其中,有两个 Goroutine G1
和 G2
在并行
执行,因为它们同时在各自的系统硬件线程上执行指令。
再看,在每一个逻辑处理器中,都有三个 Goroutine G2 G3 G5
或 G1 G4 G6
轮流共享各自的系统线程。看起来就像这三个 Goroutine 在同时运行着,没有特定顺序地执行它们的指令,并在系统线程上共享时间。
那么这就会发生竞争,有时候如果只在一个物理核心上实现并发则实际上会降低吞吐量。还有有意思的是,有时候即便利用上了并行的并发,也不会给你带来想象中更大的性能提升。
工作负载
我们怎么判断在什么时候并发会更有意义呢?我们就从了解当前执行逻辑的工作负载类型开始。在考虑并发时,有两种类型的工作负载是很重要的。
两种类型
CPU-Bound:这是一种不会导致 Goroutine 主动切换上下文到等待状态的类型。它会一直不停地进行计算。比如说,计算 π 到第 N 位的 Goroutine 就是 CPU-Bound 的。
IO-Bound:与上面相反,这种类型会导致 Goroutine 自然地进入到等待状态。它包括请求通过网络访问资源,或使用系统调用进入操作系统,或等待事件的发生。比如说,需要读取文件的 Goroutine 就是 IO-Bound。我把同步事件(互斥,原子),会导致 Goroutine 等待的情况也包含在此类。
在 CPU-Bound 中,我们需要利用并行。因为单个系统线程处理多个 Goroutine 的效率不高。而使用比系统线程更多的 Goroutine 也会拖慢执行速度,因为在系统线程上切换 Goroutine 是有时间成本的。上下文切换会导致发生STW(Stop The World)
,意思是在切换期间当前工作指令都不会被执行。
在 IO-Bound 中,并行则不是必须的了。单个系统线程可以高效地处理多个 Goroutine,是因为Goroutine 在执行这类指令时会自然地进入和退出等待状态。使用比系统线程更多的 Goroutine 可以加快执行速度,因为此时在系统线程上切换 Goroutine 的延迟成本并不会产生STW
事件。进入到IO阻塞时,CPU就闲下来了,那么我们可以使不同的 Goroutine 有效地复用相同的线程,不让系统线程闲置。
我们如何评估一个系统线程匹配多少 Gorountine 是最合适的呢?如果 Goroutine 少了,则会无法充分利用硬件;如果 Goroutine 多了,则会导致上下文切换延迟。这是一个值得考虑的问题,但此时暂不深究。
现在,更重要的是要通过仔细推敲代码来帮助我们准确识别什么情况需要并发,什么情况不能用并发,以及是否需要并行。
加法
我们不需要复杂的代码来展示和理解这些语义。先来看看下面这个名为add
的函数:
1 func add(numbers []int) int {
2 var v int
3 for _, n := range numbers {
4 v += n
5 }
6 return v
7 }
在第 1 行,声明了一个名为add
的函数,它接收一个整型切片并返回切片中所有元素的和。它从第 2 行开始,声明了一个v
变量来保存总和。然后第 3 行,线性地遍历切片,并且每个数字被加到v
中。最后在第 6 行,函数将最终的总和返回给调用者。
问题:add
函数是否适合并发执行?从大体上来说答案是适合的。可以将输入切片分解,然后同时处理它们。最后将每个小切片的执行结果相加,就可以得到和顺序执行相同的最终结果。
与此同时,引申出另外一个问题:应该分成多少个小切片来处理是性能最佳的呢?要回答此问题,我们必须知道它的工作负载类型。add
函数正在执行 CPU-Bound 工作负载,因为实现算法正在执行纯数学运算,并且它不会导致 Goroutine 进入等待状态。这意味着每个系统线程使用一个 Goroutine 就可以获得不错的吞吐量。
并发版本
下面来看一下并发版本如何实现,声明一个 addConcurrent
函数。代码量相比顺序版本增加了很多。
1 func addConcurrent(goroutines int, numbers []int) int {
2 var v int64
3 totalNumbers := len(numbers)
4 lastGoroutine := goroutines - 1
5 stride := totalNumbers / goroutines
6
7 var wg sync.WaitGroup
8 wg.Add(goroutines)
9
10 for g := 0; g < goroutines; g++ {
11 go func(g int) {
12 start := g * stride
13 end := start + stride
14 if g == lastGoroutine {
15 end = totalNumbers
16 }
17
18 var lv int
19 for _, n := range numbers[start:end] {
20 lv += n
21 }
22
23 atomic.AddInt64(&v, int64(lv))
24 wg.Done()
25 }(g)
26 }
27
28 wg.Wait()
29
30 return int(v)
31 }
第 5 行:计算每个 Goroutine 的子切片大小。使用输入切片总数除以 Goroutine 的数量得到。
第 10 行:创建一定数量的 Goroutine 执行子任务
第 14-16 行:子切片剩下的所有元素都放到最后一个 Goroutine 执行,可能比前几个 Goroutine 处理的数据要多。
第 23 行:将子结果追加到最终结果中。
然而,并发版本肯定比顺序版本更复杂,但和增加的复杂性相比,性能有提升吗?值得这么做吗?让我们用事实来说话,下面运行基准测试。
基准测试
下面的基准测试,我使用了1000万个数字的切片,并关闭了GC。分别有顺序版本add
函数和并发版本addConcurrent
函数。
func BenchmarkSequential(b *testing.B) {
for i := 0; i < b.N; i++ {
add(numbers)
}
}
func BenchmarkConcurrent(b *testing.B) {
for i := 0; i < b.N; i++ {
addConcurrent(runtime.NumCPU(), numbers)
}
}
无并行
以下是所有 Goroutine 只有一个硬件线程可用的结果。顺序版本使用 1 Goroutine,并发版本在我的机器上使用runtime.NumCPU
或 8 Goroutines。在这种情况下,并发版本实际正跑在没有并行的机制上。
10 Million Numbers using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
BenchmarkSequential 1000 5720764 ns/op : ~10% Faster
BenchmarkConcurrent 1000 6387344 ns/op
BenchmarkSequentialAgain 1000 5614666 ns/op : ~13% Faster
BenchmarkConcurrentAgain 1000 6482612 ns/op
结果表明:当只有一个系统线程可用于所有 Goroutine 时,顺序版本比并发快约10%到13%。这和我们之前的理论预期相符,主要就是因为并发版本在单核上的上下文切换和 Goroutine 管理调度的开销。
有并行
以下是每个 Goroutine 都有单独可用的系统线程的结果。顺序版本使用 1 Goroutine,并发版本在我的机器上使用runtime.NumCPU
或 8 Goroutines。在这种情况下,并发版本利用上了并行机制。
10 Million Numbers using 8 goroutines with 8 cores
2.9 GHz Intel 4 Core i7
Concurrency WITH Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 8 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
BenchmarkSequential-8 1000 5910799 ns/op
BenchmarkConcurrent-8 2000 3362643 ns/op : ~43% Faster
BenchmarkSequentialAgain-8 1000 5933444 ns/op
BenchmarkConcurrentAgain-8 2000 3477253 ns/op : ~41% Faster
结果表明:当为每个 Goroutine 提供单独的系统线程时,并发版本比顺序版本快大约41%到43%。这才也和预期一致,所有 Goroutine 现都在并行运行着,意味着他们真的在同时执行。
排序
另外,我们也要知道并非所有的 CPU-Bound 都适合并发。当切分输入或合并结果的代价非常高时,就不太合适。下面展示一个冒泡排序算法来说明此场景。
顺序版本
01 package main
02
03 import "fmt"
04
05 func bubbleSort(numbers []int) {
06 n := len(numbers)
07 for i := 0; i < n; i++ {
08 if !sweep(numbers, i) {
09 return
10 }
11 }
12 }
13
14 func sweep(numbers []int, currentPass int) bool {
15 var idx int
16 idxNext := idx + 1
17 n := len(numbers)
18 var swap bool
19
20 for idxNext < (n - currentPass) {
21 a := numbers[idx]
22 b := numbers[idxNext]
23 if a > b {
24 numbers[idx] = b
25 numbers[idxNext] = a
26 swap = true
27 }
28 idx++
29 idxNext = idx + 1
30 }
31 return swap
32 }
33
34 func main() {
35 org := []int{1, 3, 2, 4, 8, 6, 7, 2, 3, 0}
36 fmt.Println(org)
37
38 bubbleSort(org)
39 fmt.Println(org)
40 }
这种排序算法会扫描每次在交换值时传递的切片。在对所有内容进行排序之前,可能需要多次遍历切片。
那么问题:bubbleSort
函数是否适用并发?我相信答案是否定的。原始切片可以分解为较小的,并且可以同时对它们排序。但是!在并发执行完之后,没有一个有效的手段将子结果的切片排序合并。下面我们来看并发版本是如何实现的。
并发版本
01 func bubbleSortConcurrent(goroutines int, numbers []int) {
02 totalNumbers := len(numbers)
03 lastGoroutine := goroutines - 1
04 stride := totalNumbers / goroutines
05
06 var wg sync.WaitGroup
07 wg.Add(goroutines)
08
09 for g := 0; g < goroutines; g++ {
10 go func(g int) {
11 start := g * stride
12 end := start + stride
13 if g == lastGoroutine {
14 end = totalNumbers
15 }
16
17 bubbleSort(numbers[start:end])
18 wg.Done()
19 }(g)
20 }
21
22 wg.Wait()
23
24 // Ugh, we have to sort the entire list again.
25 bubbleSort(numbers)
26 }
bubbleSortConcurrent
它使用多个 Goroutine 同时对输入的一部分进行排序。我们直接来看结果:
Before:
25 51 15 57 87 10 10 85 90 32 98 53
91 82 84 97 67 37 71 94 26 2 81 79
66 70 93 86 19 81 52 75 85 10 87 49
After:
10 10 15 25 32 51 53 57 85 87 90 98
2 26 37 67 71 79 81 82 84 91 94 97
10 19 49 52 66 70 75 81 85 86 87 93
由于冒泡排序的本质是依次扫描,第 25 行对 bubbleSort
的调用将掩盖使用并发解决问题带来的潜在收益。结论是:在冒泡排序中,使用并发不会带来性能提升。
读取文件
前面已经举了两个 CPU-Bound 的例子,下面我们来看 IO-Bound。
顺序版本
01 func find(topic string, docs []string) int {
02 var found int
03 for _, doc := range docs {
04 items, err := read(doc)
05 if err != nil {
06 continue
07 }
08 for _, item := range items {
09 if strings.Contains(item.Description, topic) {
10 found++
11 }
12 }
13 }
14 return found
15 }
第 2 行:声明了一个名为 found
的变量,用于保存在给定文档中找到指定主题的次数。
第 3-4 行:迭代文档,并使用read
函数读取每个文档。
第 8-11 行:使用 strings.Contains
函数检查文档中是否包含指定主题。如果包含,则found
加1。
然后来看一下read
是如何实现的。
01 func read(doc string) ([]item, error) {
02 time.Sleep(time.Millisecond) // 模拟阻塞的读
03 var d document
04 if err := xml.Unmarshal([]byte(file), &d); err != nil {
05 return nil, err
06 }
07 return d.Channel.Items, nil
08 }
此功能以 time.Sleep
开始,持续1毫秒。此调用用于模拟在我们执行实际系统调用以从磁盘读取文档时可能产生的延迟。这种延迟的一致性对于准确测量find
顺序版本和并发版本的性能差距非常重要。
然后在第 03-07 行,将存储在全局变量文件中的模拟 xml
文档反序列化为struct
值。最后,将Items
返回给调用者。
并发版本
01 func findConcurrent(goroutines int, topic string, docs []string) int {
02 var found int64
03
04 ch := make(chan string, len(docs))
05 for _, doc := range docs {
06 ch <- doc
07 }
08 close(ch)
09
10 var wg sync.WaitGroup
11 wg.Add(goroutines)
12
13 for g := 0; g < goroutines; g++ {
14 go func() {
15 var lFound int64
16 for doc := range ch {
17 items, err := read(doc)
18 if err != nil {
19 continue
20 }
21 for _, item := range items {
22 if strings.Contains(item.Description, topic) {
23 lFound++
24 }
25 }
26 }
27 atomic.AddInt64(&found, lFound)
28 wg.Done()
29 }()
30 }
31
32 wg.Wait()
33
34 return int(found)
35 }
第 4-7 行:创建一个channel
并写入所有要处理的文档。
第 8 行:关闭这个channel
,这样当读取完所有文档后就会直接退出循环。
第 16-26 行:每个 Goroutine 都从同一个channel
接收文档,read
并 strings.Contains
逻辑和顺序的版本一致。
第 27 行:将各个 Goroutine 计数加在一起作为最终计数。
基准测试
同样的,我们再次运行基准测试来验证我们的结论。
func BenchmarkSequential(b *testing.B) {
for i := 0; i < b.N; i++ {
find("test", docs)
}
}
func BenchmarkConcurrent(b *testing.B) {
for i := 0; i < b.N; i++ {
findConcurrent(runtime.NumCPU(), "test", docs)
}
}
无并行
10 Thousand Documents using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
BenchmarkSequential 3 1483458120 ns/op
BenchmarkConcurrent 20 188941855 ns/op : ~87% Faster
BenchmarkSequentialAgain 2 1502682536 ns/op
BenchmarkConcurrentAgain 20 184037843 ns/op : ~88% Faster
当只有一个系统线程时,并发版本比顺序版本快大约87%到88%。与预期一致,因为所有 Goroutine 都有效地共享单个系统线程。
有并行
10 Thousand Documents using 8 goroutines with 8 core
2.9 GHz Intel 4 Core i7
Concurrency WITH Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
BenchmarkSequential-8 3 1490947198 ns/op
BenchmarkConcurrent-8 20 187382200 ns/op : ~88% Faster
BenchmarkSequentialAgain-8 3 1416126029 ns/op
BenchmarkConcurrentAgain-8 20 185965460 ns/op : ~87% Faster
有意思的来了,使用额外的系统线程提供并行能力,实际代码性能却没有提升。也印证了开头的说法。
结语
我们可以清楚地看到,使用 IO-Bound 并不需要并行来获得性能上的巨大提升。这与我们在 CPU-Bound 中看到的结果相反。当涉及像冒泡排序这样的算法时,并发的使用会增加复杂性而没有任何实际的性能优势。
所以,我们在考虑解决方案时,首先要确定它是否适合并发,而不是盲目认为使用更多的 Goroutine 就一定会提升性能。
有疑问加站长微信联系(非本文作者)