并发map --- sync map分析

123archu · · 4566 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

[TOC]

本文基于1.10源码分析
如之前的文章可以看到,golang中的map是不支持并发操作的,golang推荐用户直接用读写锁对map进行保护,也有第三方类库使用分段锁。在1.19版本中,golang基于原本的map,新增了一个支持并发操作的map,叫sync map。

下面我们先介绍一下它的用法,然后在介绍原理,最后详细看看代码。

用法

基本api有这几个

  1. Store 写入
  2. Load 读取,返回值有两个,第一个是value,第二个是bool变量表示key是否存在
  3. Delete 删除
  4. LoadOrStore 存在就读,不存在就写
  5. Range 遍历,注意遍历的快照

sync map底层使用map[interface{}]* entry来做存储,所以无论key还是value都是支持多种数据类型。
一个简单的例子:

package main

import (
    "fmt"
     "sync"
)


type MySyncMap struct {
    sync.Map
}

func (m MySyncMap) Print(k interface{}) {
    value, ok := m.Load(k)
    fmt.Println(value, ok)
} 

func main() {
        var syncMap MySyncMap 

        syncMap.Print("Key1")

        syncMap.Store("Key1", "Value1")
        syncMap.Print("Key1")

        syncMap.Store("Key2", "Value2")

        syncMap.Store("Key3", 2)
        syncMap.Print("Key3")

        syncMap.Store(4, 4)
        syncMap.Print(4)

        syncMap.Delete("Key1")
        syncMap.Print("Key1")
}

输出:

<nil> false
Value1 true
2 true
4 true
<nil> false

设计原理

常用方案比较

并发hashmap的方案有很多,下面简单提一下几种,然后再讨论golang实现时的考虑。
第一种是最简单的,直接在不支持并发的hashmap上,使用一个读写锁的保护,这也是golang sync map还没出来前,大家常用的方法。这种方法的缺点是写会堵塞读。

第二种是数据库常用的方法,分段锁,每一个读写锁保护一段区间,golang的第三方库也有人是这么实现的。java的ConcurrentHashMap也是这么实现的。平均情况下这样的性能还挺好的,但是极端情况下,如果某个区间有热点写,那么那个区间的读请求也会受到影响。

第三种方法是我们C++自己造轮子时经常用的,使用使用链表法解决冲突,然后链表使用CAS去解决并发下冲突,这样读写都是无锁,我觉得这种挺好的,性能非常高,不知为啥其他语言不这么实现。

然后在《An overview of sync.Map》中有提到,在cpu核数很多的情况下,因为cache contention,reflect.New、sync.RWMutex、atomic.AddUint32都会很慢,golang团队为了适应cpu核很多的情况,没有采用上面的几种常见的方案。

golang sync map的目标是实现适合读多写少的场景、并且要求稳定性很好,不能出现像分段锁那样读经常被阻塞的情况。golang sync map基于map做了一层封装,在大部分情况下,不过写入性能比较差。下面来详细说说实现。

实现思路

要读受到的影响尽量小,那么最容易想到的想法,就是读写分离。golang sync map也是受到这个想法的启发(我自认为)设计出来的。使用了两个map,一个叫read,一个叫dirty,两个map存储的都是指针,指向value数据本身,所以两个map是共享value数据的,更新value对两个map同时可见。

dirty可以进行增删查,当时都要进行加互斥锁。

read中存在的key,可以无锁的读,借助CAS进行无锁的更新、删除操作,但是不能新增key,相当于dirty的一个cache,由于value共享,所以能通过read对已存在的value进行更新。

read不能新增key,那么数据怎么来的呢?sync map中会记录miss cache的次数,当miss次数大于等于dirty元素个数时,就会把dirty变成read,原来的dirty清空。

为了方便dirty直接变成read,那么得保证read中存在的数据dirty必须有,所以在dirty是空的时候,如果要新增一个key,那么会把read中的元素复制到dirty中,然后写入新key。

然后删除操作也很有意思,使用的是延迟删除,优先看read中没有,read中有,就把read中的对应entry指针中的p置为nil,作为一个标记。在read中标记为nil的,只有在dirty提升为read时才会被实际删除。

源码

结构

// The zero Map is empty and ready for use. A Map must not be copied after first use.
type Map struct {
    mu Mutex

    // read contains the portion of the map's contents that are safe for
    // concurrent access (with or without mu held).
    //
    // The read field itself is always safe to load, but must only be stored with
    // mu held.
    //
    // Entries stored in read may be updated concurrently without mu, but updating
    // a previously-expunged entry requires that the entry be copied to the dirty
    // map and unexpunged with mu held.
    read atomic.Value // readOnly

    // dirty contains the portion of the map's contents that require mu to be
    // held. To ensure that the dirty map can be promoted to the read map quickly,
    // it also includes all of the non-expunged entries in the read map.
    //
    // Expunged entries are not stored in the dirty map. An expunged entry in the
    // clean map must be unexpunged and added to the dirty map before a new value
    // can be stored to it.
    //
    // If the dirty map is nil, the next write to the map will initialize it by
    // making a shallow copy of the clean map, omitting stale entries.
    dirty map[interface{}]*entry

    // misses counts the number of loads since the read map was last updated that
    // needed to lock mu to determine whether the key was present.
    //
    // Once enough misses have occurred to cover the cost of copying the dirty
    // map, the dirty map will be promoted to the read map (in the unamended
    // state) and the next store to the map will make a new dirty copy.
    misses int
}

//read的实际结构体
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
    m       map[interface{}]*entry
    amended bool // true if the dirty map contains some key not in m.
}

// expunged is an arbitrary pointer that marks entries which have been deleted
// from the dirty map.
var expunged = unsafe.Pointer(new(interface{}))

// An entry is a slot in the map corresponding to a particular key.
type entry struct {
    // p points to the interface{} value stored for the entry.
    //
    // If p == nil, the entry has been deleted and m.dirty == nil.
    //
    // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry
    // is missing from m.dirty.
    //
    // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty
    // != nil, in m.dirty[key].
    //
    // An entry can be deleted by atomic replacement with nil: when m.dirty is
    // next created, it will atomically replace nil with expunged and leave
    // m.dirty[key] unset.
    //
    // An entry's associated value can be updated by atomic replacement, provided
    // p != expunged. If p == expunged, an entry's associated value can be updated
    // only after first setting m.dirty[key] = e so that lookups using the dirty
    // map find the entry.
    p unsafe.Pointer // *interface{}
}
sync map结构

mu是用来保护dirty的互斥锁
missed是记录没命中read的次数

注意对于entry.p,有两个特殊值,一个是nil,另一个是expungednil代表的意思是,在read中被删除了,但是dirty中还在,所以能直接更新值(如果dirty==nill的特殊情况,下次写入新值时会复制);expunged代表数据在ditry中已经被删除了,更新值的时候要先把这个entry复制到dirty。

Load 读取


// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        // Avoid reporting a spurious miss if m.dirty got promoted while we were
        // blocked on m.mu. (If further loads of the same key will not miss, it's
        // not worth copying the dirty map for this key.)
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            // Regardless of whether the entry was present, record a miss: this key
            // will take the slow path until the dirty map is promoted to the read
            // map.
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load()
}

func (e *entry) load() (value interface{}, ok bool) {
    p := atomic.LoadPointer(&e.p)
    if p == nil || p == expunged {
        return nil, false
    }
    return *(*interface{})(p), true
}

func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}

读取时,先去read读取;如果没有,就加锁,然后去dirty读取,同时调用missLocked(),再解锁。在missLocked中,会递增misses变量,如果misses>len(dirty),那么把dirty提升为read,清空原来的dirty

在代码中,我们可以看到一个double check,检查read没有,上锁,再检查read中有没有,是因为有可能在第一次检查之后,上锁之前的间隙,dirty提升为read了,这时如果不double check,可能会导致一个存在的key却返回给调用方说不存在。 在下面的其他操作中,我们经常会看到这个double check。

Store 写入

// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }

    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if e.unexpungeLocked() {
            // The entry was previously expunged, which implies that there is a
            // non-nil dirty map and this entry is not in it.
            m.dirty[key] = e
        }
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        e.storeLocked(&value)
    } else {
        if !read.amended {
            // We're adding the first new key to the dirty map.
            // Make sure it is allocated and mark the read-only map as incomplete.
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}

// tryStore stores a value if the entry has not been expunged.
//
// If the entry is expunged, tryStore returns false and leaves the entry
// unchanged.
func (e *entry) tryStore(i *interface{}) bool {
    p := atomic.LoadPointer(&e.p)
    if p == expunged {
        return false
    }
    for {
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
    }
}

func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }

    read, _ := m.read.Load().(readOnly)
    m.dirty = make(map[interface{}]*entry, len(read.m))
    for k, e := range read.m {
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
    p := atomic.LoadPointer(&e.p)
    for p == nil {
        if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
    }
    return p == expunged
}

// unexpungeLocked ensures that the entry is not marked as expunged.
//
// If the entry was previously expunged, it must be added to the dirty map
// before m.mu is unlocked.
func (e *entry) unexpungeLocked() (wasExpunged bool) {
    return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

写入的时候,先看read中能否查到key,在read中存在的话,直接通过read中的entry来更新值;在read中不存在,那么就上锁,然后double check。这里需要留意,分几种情况:

  1. double check发现read中存在,如果是expunged,那么就先尝试把expunged替换成nil,最后如果entry.p==expunged就复制到dirty中,再写入值;否则不用替换直接写入值。
  2. dirty中存在,直接更新
  3. dirty中不存在,如果此时dirty为空,那么需要将read复制到dirty中,最后再把新值写入到dirty中。复制的时候调用的是dirtyLocked(),在复制到dirty的时候,read中为nil的元素,会更新为expunged,并且不复制到dirty中。

我们可以看到,在更新read中的数据时,使用的是tryStore,通过CAS来解决冲突,在CAS出现冲突后,如果发现数据被置为expung,tryStore那么就不会写入数据,而是会返回false,在Store流程中,就是接着往下走,在dirty中写入。

再看下情况1的时候,为啥要那么做。double check的时候,在read中存在,那么就是说在加锁之前,有并发线程先写入了key,然后由Load触发了dirty提升为read,这时dirty可能为空,也可能不为空,但无论dirty状态如何,都是可以直接更新entry.p。如果是expunged的话,那么要先替换成nil,再复制entry到dirty中。

疑问:这里不太懂,为啥在read中直接更新就用cas去更新,跑到下面的流程,就用原子更新,可是尽管上了锁,key在read中存在,那么就会并发写,为啥可以不用cas更新??

Delete 删除

// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }
    if ok {
        e.delete()
    }
}

func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}

删除很简单,read中存在,就把read中的entry.p置为nil,如果只在ditry中存在,那么就直接从dirty中删掉对应的entry。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:123archu

查看原文:并发map --- sync map分析

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

4566 次点击  
加入收藏 微博
1 回复  |  直到 2019-02-01 16:29:17
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传