Go 语言的分布式读写互斥

u012275397 · · 2262 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

Go语言默认的sync.RWMutex实现在多核环境中表现并不佳,因为所有的读者在进行原子增量操作时,会抢占相同的内存地址。该文探讨了一种n-way RWMutex,也可以称为“大读者(big reader)”锁,它可以为每个CPU内核分配独立的RWMutex。读者仅需在其核心中处理读锁,而写者则须依次处理所有锁。  
 

查找当前CPU

读者使用CPUID指令来决定使用何种锁,该指令仅需返回当前活动CPU的APICID,而不需要发出系统调用指令抑或改变运行时。这在Intel或AMD处理器上均是可以的;ARM处理器则需要使用CPU ID寄存器对于超过256个处理器的系统,必须使用x2APIC,另外除了CPUID还要用到带有EAX=0xb的EDX寄存器。程序启动时,会构建(通过CPU亲和力系统调用) APICID到CPU索引的映射,该映射在处理器的整个生命周期中静态存在。由于CPUID指令的开销可能相当昂贵,goroutine将只在其运行的内核中定期地更新状态结果。频繁更新可以减少内核锁阻塞,但同时也会导致花在加锁过程中的CPUID指令时间增加。  

陈旧的CPU信息。如果加上锁运行goroutine的CPU信息可能会是过时的(goroutine会转移到另一个核心)。在  reader记住哪个是上锁的前提下,这只会影响性能,而不会影响准确性,当然,这样的转移也是不太可能的,就像操作系统内核尝试在同一个核心保持线程来改进缓存命中率一样。

性能

这个模式的性能特征会被大量的参数所影响。特别是CPUID 检测频率,readers 的数量,readers 和writers 的比率,还有readers 持有锁的时间,这些因素都非常重要。当在这个时间有且仅有一个writer 活跃的时候,这个writer 持有锁的时期不会影响sync.RWMutex 和DRWMutex 之间的性能差异。

实验证明DRWMutex表现胜过多核系统,特别writer小于1%的时候,CPUID会在最多每10个锁之间被调用(这种变化取决于锁被持有的持续时间)。甚至在少核的情况下,DRWMutex也在普遍选择通过sync.Mutex使用sync.RWMutex的应用程序的情况下表现好过sync.RWMutex.

下图显示核数量使用增加每10个的平均性能:

drwmutex -i 5000 -p 0.0001 -w 1 -r 100 -c 100

DRWMutex and sync.RWMutex performance comparison

错误条表示第25和第75个百分位。注意每第10核的下降;这是因为10个核组成一个运行标准检查系统的机器上的NUMA节点, 所以一旦增加一个NUMA节点,跨线程通信量变得更加宝贵。对于DRWMutex来说,由于对比sync.RWMutex更多的reader能够并行工作,所以性能也随之提升。

查看go-nuts tread进一步讨论

cpu_amd64.s        

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include "textflag.h"
 
// func cpu() uint64
TEXT 路cpu(SB),NOSPLIT,$0-8
    MOVL $0x01, AX  // version information
    MOVL $0x00, BX  // any leaf will do
    MOVL $0x00, CX  // any subleaf will do
 
    // call CPUID
    BYTE  $0x0f
    BYTE  $0xa2
 
    SHRQ $24, BX  // logical cpu id is put in EBX[31-24]
    MOVQ BX, ret+0(FP)
    RET

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package main
 
import (
    "flag"
    "fmt"
    "math/rand"
    "os"
    "runtime"
    "runtime/pprof"
    "sync"
    "syscall"
    "time"
    "unsafe"
)
 
func cpu() uint64  // implemented in cpu_amd64.s
 
var cpus map[uint64] int
 
// determine mapping from APIC ID to CPU index by pinning the entire process to
// one core at the time, and seeing that its APIC ID is.
func init() {
    cpus = make(map[uint64] int )
 
    var aff uint64
    syscall.Syscall(syscall.SYS_SCHED_GETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))
 
    n := 0
    start :=  time .Now()
    var mask uint64 = 1
Outer:
    for  {
        for  (aff & mask) == 0 {
            mask <<= 1
            if  mask == 0 || mask > aff {
                break  Outer
            }
        }
 
        ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(mask), uintptr(unsafe.Pointer(&mask)))
        if  ret != 0 {
            panic(err.Error())
        }
 
        // what CPU do we have?
        <- time .After(1 *  time .Millisecond)
        c := cpu()
 
        if  oldn, ok := cpus[c]; ok {
            fmt.Println( "cpu" , n,  "==" , oldn,  "-- both have CPUID" , c)
        }
 
        cpus[c] = n
        mask <<= 1
        n++
    }
 
    fmt.Printf( "%d/%d cpus found in %v: %v\n" , len(cpus), runtime.NumCPU(),  time .Now().Sub(start), cpus)
 
    ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))
    if  ret != 0 {
        panic(err.Error())
    }
}
 
type RWMutex2 []sync.RWMutex
 
func (mx RWMutex2) Lock() {
    for  core := range mx {
        mx[core].Lock()
    }
}
 
func (mx RWMutex2) Unlock() {
    for  core := range mx {
        mx[core].Unlock()
    }
}
 
func main() {
    cpuprofile := flag.Bool( "cpuprofile" ,  false ,  "enable CPU profiling" )
    locks := flag.Uint64( "i" , 10000,  "Number of iterations to perform" )
    write := flag.Float64( "p" , 0.0001,  "Probability of write locks" )
    wwork := flag.Int( "w" , 1,  "Amount of work for each writer" )
    rwork := flag.Int( "r" , 100,  "Amount of work for each reader" )
    readers := flag.Int( "n" , runtime.GOMAXPROCS(0),  "Total number of readers" )
    checkcpu := flag.Uint64( "c" , 100,  "Update CPU estimate every n iterations" )
    flag.Parse()
 
    var o *os.File
    if  *cpuprofile {
        o, _ := os.Create( "rw.out" )
        pprof.StartCPUProfile(o)
    }
 
    readers_per_core := *readers / runtime.GOMAXPROCS(0)
 
    var wg sync.WaitGroup
 
    var mx1 sync.RWMutex
 
    start1 :=  time .Now()
    for  n := 0; n < runtime.GOMAXPROCS(0); n++ {
        for  r := 0; r < readers_per_core; r++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                r :=  rand .New( rand .NewSource( rand .Int63()))
                for  n := uint64(0); n < *locks; n++ {
                    if  r.Float64() < *write {
                        mx1.Lock()
                        x := 0
                        for  i := 0; i < *wwork; i++ {
                            x++
                        }
                        _ = x
                        mx1.Unlock()
                    }  else  {
                        mx1.RLock()
                        x := 0
                        for  i := 0; i < *rwork; i++ {
                            x++
                        }
                        _ = x
                        mx1.RUnlock()
                    }
                }
            }()
        }
    }
    wg.Wait()
    end1 :=  time .Now()
 
    t1 := end1.Sub(start1)
    fmt.Println( "mx1" , runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t1.Seconds(), t1)
 
    if  *cpuprofile {
        pprof.StopCPUProfile()
        o.Close()
 
        o, _ = os.Create( "rw2.out" )
        pprof.StartCPUProfile(o)
    }
 
    mx2 := make(RWMutex2, len(cpus))
 
    start2 :=  time .Now()
    for  n := 0; n < runtime.GOMAXPROCS(0); n++ {
        for  r := 0; r < readers_per_core; r++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                c := cpus[cpu()]
                r :=  rand .New( rand .NewSource( rand .Int63()))
                for  n := uint64(0); n < *locks; n++ {
                    if  *checkcpu != 0 && n%*checkcpu == 0 {
                        c = cpus[cpu()]
                    }
 
                    if  r.Float64() < *write {
                        mx2.Lock()
                        x := 0
                        for  i := 0; i < *wwork; i++ {
                            x++
                        }
                        _ = x
                        mx2.Unlock()
                    }  else  {
                        mx2[c].RLock()
                        x := 0
                        for  i := 0; i < *rwork; i++ {
                            x++
                        }
                        _ = x
                        mx2[c].RUnlock()
                    }
                }
            }()
        }
    }
    wg.Wait()
    end2 :=  time .Now()
 
    pprof.StopCPUProfile()
    o.Close()
 
    t2 := end2.Sub(start2)
    fmt.Println( "mx2" , runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t2.Seconds(), t2)
}

本文转自:开源中国社区[http://www.oschina.net] 
本文标题:Go语言的分布式读写互斥
本文地址:http://www.oschina.net/translate/distributed-read-write- mutex-in-go
参与翻译:BuN_Ny , OSC技术周刊 , eason02

英文原文:Distributed Read-Write Mutex in Go


时间: 2015-05-06 08:40 来源 :开源中国社区 作者 :oschina 原文链接 

有疑问加站长微信联系(非本文作者)

本文来自:CSDN博客

感谢作者:u012275397

查看原文:Go 语言的分布式读写互斥

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2262 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传