日常开发过程中,map
结构应该登场率是较为频繁的。但是Go
的内建map
类型并不是协程安全的。如下面这个栗子,如果业务开发过程中不注意很容易中招。
func main() {
m := make(map[int]int)
go func() {
for {
m[0] = 1
}
}()
go func() {
for {
if v, ok := m[0]; ok {
fmt.Println(v)
}
}
}()
ch := make(chan os.Signal)
signal.Notify(ch, os.Interrupt)
<-ch
}
声明一个map[int]int
用两个协程去同时操作一个key,得到panic
fatal error: concurrent map read and map write
为了解决这种问题,我们有几个选择
- 使用
Go1.9
在sync
包下引入了并发安全的map
。sync.Map
功能上跟map[interface{}]interface{}
很像,但是sync.Map
是协程安全的,并通过读写分离的机制,降低锁的粒度,提高并发性能。 - 使用第三方的类库
concurrent-map
(https://github.com/orcaman/concurrent-map)
今天我们先来聊聊sync.Map
,本文基于go1.15.7
基本使用
让我们用sync.Map
来改写上面的栗子
func main() {
m := sync.Map{}
go func() {
for {
m.Store(0, 1)
}
}()
go func() {
for {
if v, ok := m.Load(0); ok {
fmt.Println(v)
}
}
}()
ch := make(chan os.Signal)
signal.Notify(ch, os.Interrupt)
<-ch
}
另外,sync.Map
还支持Range
、Delete
方法
m.Range(func(key, value interface{}) bool {
fmt.Println(key, value)
return true
})
m.Delete(0)
sync.Map源码分析
数据结构
先看下sync.Map
的数据结构
type Map struct {
mu Mutex
// 持有部分map的数据并且并发读不需要持有锁,但是改变指针需要持有锁
read atomic.Value // readOnly
// 包含部分map数据需要持有锁保护 为了保证dirty map能够快速晋升为read map,
// 它还包含所有在read map未清除的数据
// expunged数据不会存储在dirty map
// 如果dirtymap为nil则下一次写会从read map拷贝一份数据
dirty map[interface{}]*entry
// 记录从read map中读不到数据,加锁去判断key是否存在的次数
// 当misses等于dirty map长度时,dirty map会直接晋升为read map
// 下次写操作再从read map拷贝一份数据
misses int
}
sync.Map
中的 read
实际指向的是readOnly
结构体对象
// readOnly 是一个不可变结构体 自动存储在Map.read字段中
type readOnly struct {
m map[interface{}]*entry
amended bool // 如果key在dirty不在m中 则为true如果为false则表示dirty为空
}
// map中的key 指向的value
type entry struct {
// p指向interface{}类型的值
// 如果p==nil,表明entry已经被删除并且m.dirty==nil
// 如果p==expunged,表明entry已经被删除且m.dirty!=nil 并且entry不在m.dirty中
// 否则entry是有效的并且记录m.read.m[key] 并且如果m.dirty!=nil 则也存在m.dirty[key]
// 当m.dirty重建的时候被删除的entry会被检测到并自动由nil替换为expunged 并且不会设置m.dirty[key]
// 若p!=expunged,则entry关联的值可以通过原子替换来更新
// 若p==expunged,则需要先设置m.dirty[key]=e之后才能更新entry关联的值,这样以便使用dirty map查找值
p unsafe.Pointer // *interface{}
}
操作方法
Load操作
Load
操作返回存储在map中指定key的value,有两个返回值,ok表示key对应的value是否存在。
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// double-check 避免我们获得锁期间 ditry map已经晋升为了read map
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {//不存在 且map.dirty包含了部分不在read map中的数据
e, ok = m.dirty[key]
//记录miss 当前这个key会一直执行slow path直到dirty map晋升为read map
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
因为sync.Map
设计了一个read map
和一个dirty map
。他俩的区别在于
-
read map
指向了readOnly
结构体对象,readOnly
结构体本身是只读的 但是read map
指向的引用是可变的 -
dirty map
是一个结构为map[interface{}]*entry
的内建map
类型
让他俩之间产生关联的是sync.Map
中的misses
字段。为什么呢?让我们看看Load
的具体过程:
- 首先从
read map
中尝试读取数据,若read map
中不存在且read.amended
为true(注释:当存在数据存在dirty map但不在read map中时read.amended
为true)则尝试获得锁 - 获得锁后,并没有直接从
dirty map
中拿数据,而是进行了double-check
,再次从read map
中尝试获取数据,为何要这么做呢?我们接着看。 -
double-check
之后发现还是没有拿到数据,那么此时就从dirty map
中获取,然后执行missLocked()
这个方法很关键,我们来看看它的实现
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
missLocked()
方法首先针对m.misses++
递增操作(这里已获得锁,协程安全),这里也印证了misses
字段的含义:记录从read map中读不到数据,加锁去判断key是否存在的次数
。接着是重头戏
如果m.misses
小于m.dirty
的长度直接返回,但是如果大于或者等于,则会将m.dirty
的引用包装成readOnly
结构并更新给read map
并将m.dirty
置为nil,m.misses
置为0。(注意:这里read.amended
为false时,m.dirty为nil 事实也是这样)
到这里你应该能理清楚,read map
、dirty map
以及misses
之间的关系:
所以刚刚的double-check就很好理解了,因为第一次从read map
查找数据到加锁成功之间,有可能dirty map
已经完成了read map
的晋升过程。
-
Load()
的最后一步,查找数据,则调用entry.load()
获取数据。
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
Store操作
Store()
操作在key
存在并且value
不处于expunged
状态下会覆盖原有的值
// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
// 情况1
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {//情况2
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
m.dirty[key] = e
}
e.storeLocked(&value) // 情况3 e.unexpungeLocked()为false的情况
} else if e, ok := m.dirty[key]; ok {//情况4
e.storeLocked(&value)
} else {//情况5
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
源码面前无秘密,Store()
操作大概会存在5种情况:
- 如果key已经在
read map
存在了 且dirty map
中未被删除
通过调用e.tryStore(&value)
直接无锁情况下,更新对应值。前提是对应值 非expunged
func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged { //表示dirty map已删除对应值 返回false 让其更新完dirty map 再更新read map
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {// 直接更新值
return true
}
}
}
- 如果key存在于
read map
,不在dirty map
中
先将key-value放到dirty map
中,然后直接更新value,因为read map
和dirty map
持有的是相同的引用 故而一改全改
- 如果key同时存在于
read map
和dirty map
这个情况跟情况一类似,这个逻辑的原因是我们加锁过程中可能数据已经被加回dirty map,则直接更新read map
的值即可 原因见情况2
- 如果key不在
read map
但存在于dirty map
这种情况直接更新即可,因为已经拿到锁了 所以是协程安全的
- 如果key不在
read map
也不存在于dirty map
首先判断read.amended
若为false 则表明dirty map
刚刚晋升为read map
,此时dirty map
为nil。然后调用m.dirtyLocked()
func (m *Map) dirtyLocked() {
if m.dirty != nil { //不为nil 直接返回
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() { //删除则跳过复制
m.dirty[k] = e //从read map拷贝一份数据引用给到dirty map
}
}
}
// 判断read map中的数据是否为nil 若是 则将其更新为expunged
// 同时数据也不会copy到dirty map,所以expunged表明数据已经从dirty map移除了
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
将read map
中的数据引用拷贝一份后,针对新来的数据 m.dirty[key] = newEntry(value)
新建并插入
LoadOrStore操作
LoadOrStore
顾名思义,如果值存在则直接返回,若不存在则存储,返回值loaded,true表示数据被加载,false表示数据被存储
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
// Avoid locking if it's a clean hit.
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()
return actual, loaded
}
大体逻辑跟Store
差不了太多,我们主要关注下tryLoadOrStore
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
// p==nil
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {//删除直接返回
return nil, false, false
}
if p != nil { // 有值了 返回值
return *(*interface{})(p), true, true
}
}
}
如果entry不是expunged 则自动加载或存储值, 如果entry为expunged 则直接返回 并且loaded和ok为false
Range操作
Range()
要求一个函数f func(key, value interface{}) bool
作为入参,将map中的key-value依次调用这个函数。如果函数返回false,则Range
停止迭代。需要注意的是:Range
使用的快照,并发插入的情况下不一定准确。
func (m *Map) Range(f func(key, value interface{}) bool) {
read, _ := m.read.Load().(readOnly)
if read.amended {//如果dirty map中存在read map中没有的值 则先晋升下
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
//依次迭代
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
逻辑比较简单:如果当前dirty map中存在read map中没有的值 则先将dirty map
晋升为read map
,然后再依次迭代调用传入的函数 ,返回false时中断。
Delete操作
删除指定key的value。
func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}
// 删除key对应的value 并返回之前的值 loaded表示key是否存在
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
// 递增misses 找准时机晋升dirty map
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false//key不存在
}
删除分3中情况:
- key存在于
read map
则直接调用e.delete()
将其置为nil 为了减少锁的开销提供并发性能,使用了个小技巧延迟删除
,
即这种情况下并没有直接物理删除,而是通过CAS将entry.p
指针置为nil。
- key不在
read map
但存在于dirty map
,则直接调用内建delete
方法从map中删除元素 - key都不存在 直接返回
至此,全部操作都解析完毕,你可能对entry延迟删除的状态有点儿懵 没关系 上图
expunged算是一个标记,表示dirty map
中对应的值已经被干掉了。
需要注意的是 最好不要用占用内存比较大的类型作为key,因为sync.Map
软删除并不会立刻将key-value删除掉,只是value置为了nil,所以大key有内存泄露的危险。
但是话说回来 应该没人这么干吧?
我看还有人专门讨论了一下(https://github.com/golang/go/issues/40999) 可以围观下
总结
整个源码看下来,不难发现,对于高频读写少的情况下,sync.Map
基本时无锁情况下完成。但是对于写操作比较频繁的情况下,如果read map
中拿不到数据,就会降级为加锁获取,然后misses
增长变化速度势必比较快,连锁反应dirty map
晋升为read map
的操作也会比较频繁,其性能也势必会下降。所以写频繁的场景下sync.Map
还是不太够看。
有疑问加站长微信联系(非本文作者)