golang源代码阅读,sync系列-Pool

aseto · 2021-05-16 13:30:50 · 1601 次点击 · 预计阅读时间 17 分钟 · 大约8小时之前 开始浏览    
这是一个创建于 2021-05-16 13:30:50 的文章,其中的信息可能已经有所发展或是发生改变。

原文地址

总结

  1. 可以缓存变量实例的池子,每个pool存储一种类型的变量实例,支持并发
  2. 内部存储结构,使用了链表、队列、数组
  3. 初始化local时,会有一把全局锁
  4. 除了用锁进行临界值的保护外,更多的是通过防止goroutine不能被强占来保证数据的安全

问题

  1. 使用Pool如何提升性能
  2. 变量如何存储,以及如何销毁
  3. 使用完后,如何清零

问题解答

  1. 使用Pool如何提升性能
    • 小粒度:通过设置runtime.GOMAXPROCS(0)个locals,每个P都绑定一个local,每个P操作自己的local, 并且通过设置当前的goroutine不能被强占,在Get和Put,保证数据操作安全
    • local也分粒度,优先从private里面获取,获取不到shared里面获取
    • 自身的local获取不到,则从其它的P的local里面去偷,偷的是最后一个
    • 数据过期,也分了oldPools和allPool,当当前pool里面没有,会从old里面获取
  2. 变量如何存储,以及如何销毁
    • 每个P有一个local,local由链表实现,链表里的元素由队列实现
    • 销毁,通过GC粗发,清理函数在stw期间执行
  3. 使用完后,如何清零
    • 有系统管理清楚,使用者不感知,通过设置oldPools和allPool,以及victim,通过调用poolCleanup, 将allPools里的数据转移到oldPools,并将oldPools里面local的victim清空,从GC期间能够回收

使用

    pool := &sync.Pool{
        New: func() interface{} {
            return new(Demo)
        },
    }
    data := pool.Get()
    demo := data.(*Demo)
    demo.A = "test"
    fmt.Println(demo)
    demo.A = ""
    pool.Put(demo)

源码分析

结构体

type Pool struct {
    noCopy noCopy

    // 为local数组,大小为P的个数,每个P一个local,使用p的id作为index
    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    // local数组的大小
    localSize uintptr        // size of the local array

    // 上一个周期的local
    victim     unsafe.Pointer // local from previous cycle
    // 上一个周期localSize
    victimSize uintptr        // size of victims array

    // New optionally specifies a function to generate
    // a value when Get would otherwise return nil.
    // It may not be changed concurrently with calls to Get.
    // pool不存在值时,初始化新值调用的函数
    New func() interface{}
}
// local的实现,使用链表
type poolLocal struct {
    poolLocalInternal

    // Prevents false sharing on widespread platforms with
    // 128 mod (cache line size) = 0 .
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
  1. 默认先放在private里面
  2. 如果已经放了,则放到shared里面
  3. shared是Local P能够push也能pop,其它P只能pop
  4. Get的时候,先从private里面获取,没有,从shared里面获取,没有的话,从其它的poolChain获取
    type poolLocalInternal struct {
     // 如果存储变量,用作Get和Put比较均衡的时候,直接从private里面获取,会更加快速
     private interface{} // Can be used only by the respective P.
     // private里已经存储了值,后续数据放入的地方
     shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
    }
    
// 原理链表实现
type poolChain struct {
    // head is the poolDequeue to push to. This is only accessed
    // by the producer, so doesn't need to be synchronized.
    head *poolChainElt

    // tail is the poolDequeue to popTail from. This is accessed
    // by consumers, so reads and writes must be atomic.
    tail *poolChainElt
}

Put函数

将使用完的变量x归还回去,否则使用Get,会一直New新的变量出来

  1. x不能为nil,否则直接返回
  2. runtime_procPin到runtime_procUnpin期间,能保证当前goroutine不会被强占,保证数据操作安全
  3. 因为local使用P的id作为index,调用期间使用了runtime_procPin到runtime_procUnpin, 保证了对于local只有一个goroutine,从而保证了数据操作的安全
func (p *Pool) Put(x interface{}) {
    if x == nil {
        return
    }
    // 获取localPool
    l, _ := p.pin()
    // 如果private为空,则放入x值到private
    if l.private == nil {
        l.private = x
        x = nil
    }
    // 如果private不为nil,则将x放入shared里面,放在head处
    if x != nil {
        l.shared.pushHead(x)
    }
    // 和runtime_procPin配套使用,pin里面会调用runtime_procPin
    runtime_procUnpin()

}

pin

  1. 将当前goroutine绑定到P,禁止强占,并且返回属于当前P的localPool
  2. 必须与runtime_procUnpin搭配使用
  3. 该函数Put和Get均会调用
    func (p *Pool) pin() (*poolLocal, int) {
     // 与runtime_procUnpin搭配使用
     pid := runtime_procPin()
     // In pinSlow we store to local and then to localSize, here we load in opposite order.
     // Since we've disabled preemption, GC cannot happen in between.
     // Thus here we must observe local at least as large localSize.
     // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
     //  获取当前p的localSize,默认初始化,s为0,会进入pinSlow分支
     // 当前已经有调用Put操作,localSize已经不为0,才会进入
     s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
     l := p.local                              // load-consume
     if uintptr(pid) < s {
         return indexLocal(l, pid), pid
     }
     // 通过pinSlow获取,命名规则跟Mutex的lockSlow类似
     return p.pinSlow()
    }
    

pinSlow函数

  1. 初始化localPool会使用到全局锁allPoolsMu,因为所有的Pool都会装入allPools里面
  2. 先上一个全局锁,然后进行double check,检查local是否已经初始化,如果已经初始化,直接返回 如果为初始化,则进行初始化
  3. 使用P的id作为index,P的总个数作为local数组的大小
    func (p *Pool) pinSlow() (*poolLocal, int) {
     // Retry under the mutex.
     // Can not lock the mutex while pinned.
     // 先Unpin当前P
     runtime_procUnpin()
     // 会使用全局锁allPoolsMu
     allPoolsMu.Lock()
     defer allPoolsMu.Unlock()
     pid := runtime_procPin()
     // poolCleanup won't be called while we are pinned.
     // poolCleanup清理函数,不会在pinned期间执行
     s := p.localSize
     l := p.local
     // double check,检查local是否已经初始化,如果已经初始化,则直接返回
     if uintptr(pid) < s {
         return indexLocal(l, pid), pid
     }
     // 将p加入allPools里面
     if p.local == nil {
         allPools = append(allPools, p)
     }
     // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
     // 如果GOMAXPROCS在GC阶段变化了,则重新赋值数组,并丢弃老的那个
     size := runtime.GOMAXPROCS(0)
     local := make([]poolLocal, size)
     // 初始化一个local,并将local的地址赋值给p.local
     atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
     //何止localSize的值,为当前P的个数
     runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
     返回第P的id个local的值和pid
     return &local[pid], pid
    }
    

runtime_procPin函数和runtime_procUnpin函数

  1. 两个函数的实现分别在runtime/proc.go里的procPin和procUnpin,下滑线前为包名
  2. runtime_procPin和runtime_procUnpin期间,当前goroutine不会被强占

procPin

  1. 将当前goroutine绑定的m的locks加一,由canPreemptM可以看出,M是否可以被强占,locks是否等于0,是必备条件之一 因此locks加一,能保证在procPin期间,当前goroutine不会被强占
  2. 返回当前P的id
func procPin() int {
    _g_ := getg()
    mp := _g_.m

    mp.locks++
    return int(mp.p.ptr().id)
}
func canPreemptM(mp *m) bool {
    return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}

procUnpin

  1. 获取当前goroutine的M,将其locks减一,与procPin里面的加一向匹配
    func procUnpin() {
     _g_ := getg()
     _g_.m.locks--
    }
    

shared,结构体poolDequeue

概览

  1. shared在poolLocalInternal里面是以结构体的方式存在的,所以创建poolLocalInternal就会初始它
  2. shared为一链表,链表里的元素为队列,每个元素里面的队列大小由开始的大小8开始,然后2的倍数递增

结构体

type poolChainElt struct {
    poolDequeue

    // next and prev link to the adjacent poolChainElts in this
    // poolChain.
    //
    // next is written atomically by the producer and read
    // atomically by the consumer. It only transitions from nil to
    // non-nil.
    //
    // prev is written atomically by the consumer and read
    // atomically by the producer. It only transitions from
    // non-nil to nil.
    next, prev *poolChainElt
}
type poolDequeue struct {
    // headTail packs together a 32-bit head index and a 32-bit
    // tail index. Both are indexes into vals modulo len(vals)-1.
    //
    // tail = index of oldest data in queue
    // head = index of next slot to fill
    //
    // Slots in the range [tail, head) are owned by consumers.
    // A consumer continues to own a slot outside this range until
    // it nils the slot, at which point ownership passes to the
    // producer.
    //
    // The head index is stored in the most-significant bits so
    // that we can atomically add to it and the overflow is
    // harmless.
    headTail uint64

    // vals is a ring buffer of interface{} values stored in this
    // dequeue. The size of this must be a power of 2.
    //
    // vals[i].typ is nil if the slot is empty and non-nil
    // otherwise. A slot is still in use until *both* the tail
    // index has moved beyond it and typ has been set to nil. This
    // is set to nil atomically by the consumer and read
    // atomically by the producer.
    // 存储数据
    vals []eface
}

pushHead函数

func (c *poolChain) pushHead(val interface{}) {
    d := c.head
    // 未初始化,则进行初始化,c的head和tail都设置为新创建的d,d里的数组vals大小为8
    // poolChainElt为一队列,数据放在数组里面。
    if d == nil {
        // Initialize the chain.
        const initSize = 8 // Must be a power of 2
        d = new(poolChainElt)
        d.vals = make([]eface, initSize)
        c.head = d
        storePoolChainElt(&c.tail, d)
    }
    // 将val放入d里,如果d满了,新创建一个poolChainElt
    if d.pushHead(val) {
        return
    }

    // The current dequeue is full. Allocate a new one of twice
    // the size.
    // 新创建一个poolChainElt,大小为之前的2倍,最大值为1<<30
    newSize := len(d.vals) * 2
    if newSize >= dequeueLimit {
        // Can't make it any bigger.
        newSize = dequeueLimit
    }

    // 然后将新创建的poolChainElt挂在到d的next下去
    // d为前一个值,head为新创建的d2
    d2 := &poolChainElt{prev: d}
    d2.vals = make([]eface, newSize)
    c.head = d2
    storePoolChainElt(&d.next, d2)
    d2.pushHead(val)
}

popHead函数

func (c *poolChain) popHead() (interface{}, bool) {
    d := c.head
    // 从head开始获取
    for d != nil {
        // 如果当前的d的链表里有数据,如果存在,直接返回
        if val, ok := d.popHead(); ok {
            return val, ok
        }
        // There may still be unconsumed elements in the
        // previous dequeue, so try backing up.
        // 如果不存在,则从prev前一个里获取
        d = loadPoolChainElt(&d.prev)
    }
    return nil, false
}

popTail函数

  1. 当前P里面获取不到缓存变量时,会调用popTail,从其它的P的local里面偷

    func (c *poolChain) popTail() (interface{}, bool) {
     d := loadPoolChainElt(&c.tail)
     if d == nil {
         return nil, false
     }
    
     for {
         // It's important that we load the next pointer
         // *before* popping the tail. In general, d may be
         // transiently empty, but if next is non-nil before
         // the pop and the pop fails, then d is permanently
         // empty, which is the only condition under which it's
         // safe to drop d from the chain.
         d2 := loadPoolChainElt(&d.next)
    
         if val, ok := d.popTail(); ok {
             return val, ok
         }
    
         if d2 == nil {
             // This is the only dequeue. It's empty right
             // now, but could be pushed to in the future.
             return nil, false
         }
    
         // The tail of the chain has been drained, so move on
         // to the next dequeue. Try to drop it from the chain
         // so the next pop doesn't have to look at the empty
         // dequeue again.
         if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
             // We won the race. Clear the prev pointer so
             // the garbage collector can collect the empty
             // dequeue and so popHead doesn't back up
             // further than necessary.
             storePoolChainElt(&d2.prev, nil)
         }
         d = d2
     }
    }
    

Get函数

  1. 先从private获取
  2. 从shared里面获取
  3. 从其它的shared获取

    func (p *Pool) Get() interface{} {
     // 获取当前的P对应的local
     l, pid := p.pin()
     // 先从private获取,private不存在,则从
     x := l.private
     // 从private获取后,将private设置为nil
     l.private = nil
     if x == nil {
         // Try to pop the head of the local shard. We prefer
         // the head over the tail for temporal locality of
         // reuse.
         // private不存在,则从当前P的shared里面获取,获取第一个
         x, _ = l.shared.popHead()
         if x == nil {
             // shared不存在,则从其它的P的shared里面获取
             x = p.getSlow(pid)
         }
     }
     runtime_procUnpin()
    
     // 最后也未获取到,则直接New一个
     if x == nil && p.New != nil {
         x = p.New()
     }
     return x
    }
    

getSlow函数

  1. private、当前P的shared里面都没有的话,尝试从其它的P的shared里面获取

    func (p *Pool) getSlow(pid int) interface{} {
     // See the comment in pin regarding ordering of the loads.
     size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
     locals := p.local                            // load-consume
     // 获取locals数组,里面存贮了所有P的local
     // Try to steal one element from other procs.
     // 尝试从其它P的share里面去偷一个元素出来
     for i := 0; i < int(size); i++ {
         l := indexLocal(locals, (pid+i+1)%int(size))
         if x, _ := l.shared.popTail(); x != nil {
             return x
         }
     }
    
     // Try the victim cache. We do this after attempting to steal
     // from all primary caches because we want objects in the
     // victim cache to age out if at all possible.
     // 如果其它的share里面也没有,则尝试从victim里面获取,victim实则为执行完poolCleanup
     // 后,保存的上一个周期缓存的数据,会在下一个周期清除掉
     // 获取原理跟从local里获取一样
     size = atomic.LoadUintptr(&p.victimSize)
     if uintptr(pid) >= size {
         return nil
     }
     locals = p.victim
     l := indexLocal(locals, pid)
     if x := l.private; x != nil {
         l.private = nil
         return x
     }
     for i := 0; i < int(size); i++ {
         l := indexLocal(locals, (pid+i)%int(size))
         if x, _ := l.shared.popTail(); x != nil {
             return x
         }
     }
    
     // Mark the victim cache as empty for future gets don't bother
     // with it.
     atomic.StoreUintptr(&p.victimSize, 0)
    
     return nil
    }
    

清理函数poolCleanup

  1. 通过每次调用gc触发

注册poolCleanup到触发点

func init() {
    runtime_registerPoolCleanup(poolCleanup)
}
func sync_runtime_registerPoolCleanup(f func()) {
    poolcleanup = f
}
func clearpools() {
    // clear sync.Pools
    if poolcleanup != nil {
        poolcleanup()
    }
    ...
}
func gcStart(trigger gcTrigger) {
    ...

    // clearpools before we start the GC. If we wait they memory will not be
    // reclaimed until the next GC cycle.
    clearpools()

    work.cycles++

    ...
}

poolCleanup函数

  1. 在stw期间调用,保证了Get和Put阶段都不会调用这个函数,无并发问题
  2. 将当前的pools放到old里面去
  3. 将上一个周期的oldPools清空,这样之前的数据就能被gc回收

    func poolCleanup() {
     // This function is called with the world stopped, at the beginning of a garbage collection.
     // It must not allocate and probably should not call any runtime functions.
    
     // Because the world is stopped, no pool user can be in a
     // pinned section (in effect, this has all Ps pinned).
    
     // Drop victim caches from all pools.
     // 将老的victim设置为nil,这样gc便能清理
     for _, p := range oldPools {
         p.victim = nil
         p.victimSize = 0
     }
    
     // Move primary cache to victim cache.
     // 将当前的pool里的local赋值给victim,并将local
     for _, p := range allPools {
         p.victim = p.local
         p.victimSize = p.localSize
         p.local = nil
         p.localSize = 0
     }
    
     // The pools with non-empty primary caches now have non-empty
     // victim caches and no pools have primary caches.
     // 将当前的pool赋值给oldPools,当前的pools清空
     oldPools, allPools = allPools, nil
    }
    

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1601 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传