目录 [−]
在Go 1.6之前, 内置的map类型是部分goroutine安全的,并发的读没有问题,并发的写可能有问题。自go 1.6之后, 并发地读写map会报错,这在一些知名的开源库中都存在这个问题,所以go 1.9之前的解决方案是额外绑定一个锁,封装成一个新的struct或者单独使用锁都可以。
本文带你深入到sync.Map
的具体实现中,看看为了增加一个功能,代码是如何变的复杂的,以及作者在实现sync.Map
的一些思想。
有并发问题的map
官方的faq 已经提到内建的map
不是线程(goroutine)安全的。
首先,让我们看一段并发读写的代码,下列程序中一个goroutine一直读,一个goroutine一只写同一个键值,即即使读写的键不相同,而且map也没有"扩容"等操作,代码还是会报错。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main
func main() {
m := make (map [int ]int )
go func () {
for {
_ = m[1 ]
}
}()
go func () {
for {
m[2 ] = 2
}
}()
select {}
}
错误信息是: fatal error: concurrent map read and map write
。
如果你查看Go的源代码: hashmap_fast.go#L118 ,会看到读的时候会检查hashWriting
标志, 如果有这个标志,就会报并发错误。
写的时候会设置这个标志: hashmap.go#L542
hashmap.go#L628 设置完之后会取消这个标记。
当然,代码中还有好几处并发读写的检查, 比如写的时候也会检查是不是有并发的写,删除键的时候类似写,遍历的时候并发读写问题等。
有时候,map的并发问题不是那么容易被发现, 你可以利用-race
参数来检查。
Go 1.9之前的解决方案
但是,很多时候,我们会并发地使用map对象,尤其是在一定规模的项目中,map总会保存goroutine共享的数据。在Go官方blog的Go maps in action 一文中,提供了一种简便的解决方案。
1
2
3
4
var counter = struct {
sync.RWMutex
m map [string ]int
}{m: make (map [string ]int )}
它使用嵌入struct为map增加一个读写锁。
读数据的时候很方便的加锁:
1
2
3
4
counter.RLock()
n := counter.m["some_key" ]
counter.RUnlock()
fmt.Println("some_key:" , n)
写数据的时候:
1
2
3
counter.Lock()
counter.m["some_key" ]++
counter.Unlock()
sync.Map
可以说,上面的解决方案相当简洁,并且利用读写锁而不是Mutex可以进一步减少读写的时候因为锁带来的性能。
但是,它在一些场景下也有问题,如果熟悉Java的同学,可以对比一下java的ConcurrentHashMap
的实现,在map的数据非常大的情况下,一把锁会导致大并发的客户端共争一把锁,Java的解决方案是shard
, 内部使用多个锁,每个区间共享一把锁,这样减少了数据共享一把锁带来的性能影响,orcaman 提供了这个思路的一个实现: concurrent-map ,他也询问了Go相关的开发人员是否在Go中也实现这种方案 ,由于实现的复杂性,答案是Yes, we considered it.
,但是除非有特别的性能提升和应用场景,否则没有进一步的开发消息。
那么,在Go 1.9中sync.Map
是怎么实现的呢?它是如何解决并发提升性能的呢?
sync.Map
的实现有几个优化点,这里先列出来,我们后面慢慢分析。
空间换时间。 通过冗余的两个数据结构(read、dirty),实现加锁对性能的影响。
使用只读数据(read),避免读写冲突。
动态调整,miss次数多了之后,将dirty数据提升为read。
double-checking。
延迟删除。 删除一个键值只是打标记,只有在提升dirty的时候才清理删除的数据。
优先从read读取、更新、删除,因为对read的读取不需要锁。
下面我们介绍sync.Map
的重点代码,以便理解它的实现思想。
首先,我们看一下sync.Map
的数据结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Map struct {
mu Mutex
read atomic.Value
dirty map [interface {}]*entry
misses int
}
它的数据结构很简单,值包含四个字段:read
、mu
、dirty
、misses
。
它使用了冗余的数据结构read
、dirty
。dirty
中会包含read
中为删除的entries,新增加的entries会加入到dirty
中。
read
的数据结构是:
1
2
3
4
type readOnly struct {
m map [interface {}]*entry
amended bool
}
amended
指明Map.dirty
中有readOnly.m
未包含的数据,所以如果从Map.read
找不到数据的话,还要进一步到Map.dirty
中查找。
对Map.read的修改是通过原子操作进行的。
虽然read
和dirty
有冗余数据,但这些数据是通过指针指向同一个数据,所以尽管Map的value会很大,但是冗余的空间占用还是有限的。
readOnly.m
和Map.dirty
存储的值类型是*entry
,它包含一个指针p, 指向用户存储的value值。
1
2
3
type entry struct {
p unsafe.Pointer
}
p有三种值:
nil: entry已被删除了,并且m.dirty为nil
expunged: entry已被删除了,并且m.dirty不为nil,而且这个entry不存在于m.dirty中
其它: entry是一个正常的值
以上是sync.Map
的数据结构,下面我们重点看看Load
、Store
、Delete
、Range
这四个方法,其它辅助方法可以参考这四个方法来理解。
Load
加载方法,也就是提供一个键key
,查找对应的值value
,如果不存在,通过ok
反映:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Map) Load(key interface {}) (value interface {}, ok bool ) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil , false
}
return e.load()
}
这里有两个值的关注的地方。一个是首先从m.read
中加载,不存在的情况下,并且m.dirty
中有新数据,加锁,然后从m.dirty
中加载。
二是这里使用了双检查的处理,因为在下面的两个语句中,这两行语句并不是一个原子操作。
1
2
if !ok && read.amended {
m.mu.Lock()
虽然第一句执行的时候条件满足,但是在加锁之前,m.dirty
可能被提升为m.read
,所以加锁后还得再检查m.read
,后续的方法中都使用了这个方法。
双检查的技术Java程序员非常熟悉了,单例模式的实现之一就是利用双检查的技术。
可以看到,如果我们查询的键值正好存在于m.read
中,无须加锁,直接返回,理论上性能优异。即使不存在于m.read
中,经过miss
几次之后,m.dirty
会被提升为m.read
,又会从m.read
中查找。所以对于更新/增加较少,加载存在的key很多的case,性能基本和无锁的map类似。
下面看看m.dirty
是如何被提升的。 missLocked
方法中可能会将m.dirty
提升。
1
2
3
4
5
6
7
8
9
func (m *Map) missLocked() {
m.misses++
if m.misses < len (m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
上面的最后三行代码就是提升m.dirty
的,很简单的将m.dirty
作为readOnly
的m
字段,原子更新m.read
。提升后m.dirty
、m.misses
重置, 并且m.read.amended
为false。
Store
这个方法是更新或者新增一个entry。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (m *Map) Store(key, value interface {}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true })
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make (map [interface {}]*entry, len (read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool ) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil , expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
你可以看到,以上操作都是先从操作m.read
开始的,不满足条件再加锁,然后操作m.dirty
。
Store
可能会在某种情况下(初始化或者m.dirty刚被提升后)从m.read
中复制数据,如果这个时候m.read
中数据量非常大,可能会影响性能。
Delete
删除一个键值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (m *Map) Delete(key interface {}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete (m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete ()
}
}
同样,删除操作还是从m.read
中开始, 如果这个entry不存在于m.read
中,并且m.dirty
中有新数据,则加锁尝试从m.dirty
中删除。
注意,还是要双检查的。 从m.dirty
中直接删除即可,就当它没存在过,但是如果是从m.read
中删除,并不会直接删除,而是打标记:
1
2
3
4
5
6
7
8
9
10
11
12
13
func (e *entry) delete () (hadValue bool ) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil ) {
return true
}
}
}
Range
因为for ... range map
是内建的语言特性,所以没有办法使用for range
遍历sync.Map
, 但是可以使用它的Range
方法,通过回调的方式遍历。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (m *Map) Range(f func (key, value interface {}) bool ) {
read, _ := m.read.Load().(readOnly)
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
Range方法调用前可能会做一个m.dirty
的提升,不过提升m.dirty
不是一个耗时的操作。
sync.Map的性能
Go 1.9源代码中提供了性能的测试: map_bench_test.go 、map_reference_test.go
我也基于这些代码修改了一下,得到下面的测试数据,相比较以前的解决方案,性能多少回有些提升,如果你特别关注性能,可以考虑sync.Map
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
BenchmarkHitAll/*sync .RWMutexMap-4 20000000 83.8 ns/op
BenchmarkHitAll/*sync .Map-4 30000000 59.9 ns/op
BenchmarkHitAll_WithoutPrompting/*sync .RWMutexMap-4 20000000 96.9 ns/op
BenchmarkHitAll_WithoutPrompting/*sync .Map-4 20000000 64.1 ns/op
BenchmarkHitNone/*sync .RWMutexMap-4 20000000 79.1 ns/op
BenchmarkHitNone/*sync .Map-4 30000000 43.3 ns/op
BenchmarkHit_WithoutPrompting/*sync .RWMutexMap-4 20000000 81.5 ns/op
BenchmarkHit_WithoutPrompting/*sync .Map-4 30000000 44.0 ns/op
BenchmarkUpdate/*sync .RWMutexMap-4 5000000 328 ns/op
BenchmarkUpdate/*sync .Map-4 10000000 146 ns/op
BenchmarkUpdate_WithoutPrompting/*sync .RWMutexMap-4 5000000 336 ns/op
BenchmarkUpdate_WithoutPrompting/*sync .Map-4 5000000 324 ns/op
BenchmarkDelete/*sync .RWMutexMap-4 10000000 155 ns/op
BenchmarkDelete/*sync .Map-4 30000000 55.0 ns/op
BenchmarkDelete_WithoutPrompting/*sync .RWMutexMap-4 10000000 173 ns/op
BenchmarkDelete_WithoutPrompting/*sync .Map-4 10000000 147 ns/op
其它
sync.Map
没有Len
方法,并且目前没有迹象要加上 (issue#20680 ),所以如果想得到当前Map中有效的entries的数量,需要使用Range
方法遍历一次, 比较X疼。
LoadOrStore
方法如果提供的key存在,则返回已存在的值(Load),否则保存提供的键值(Store)。
有疑问加站长微信联系(非本文作者)