golang源代码阅读,sync系列-Pool

aseto · · 882 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

[原文地址](https://blog.csdn.net/u014763610/article/details/116889422) # 总结 1. 可以缓存变量实例的池子,每个pool存储一种类型的变量实例,支持并发 2. 内部存储结构,使用了链表、队列、数组 3. 初始化local时,会有一把全局锁 4. 除了用锁进行临界值的保护外,更多的是通过防止goroutine不能被强占来保证数据的安全 # 问题 1. 使用Pool如何提升性能 2. 变量如何存储,以及如何销毁 3. 使用完后,如何清零 # 问题解答 1. 使用Pool如何提升性能 - 小粒度:通过设置runtime.GOMAXPROCS(0)个locals,每个P都绑定一个local,每个P操作自己的local, 并且通过设置当前的goroutine不能被强占,在Get和Put,保证数据操作安全 - local也分粒度,优先从private里面获取,获取不到shared里面获取 - 自身的local获取不到,则从其它的P的local里面去偷,偷的是最后一个 - 数据过期,也分了oldPools和allPool,当当前pool里面没有,会从old里面获取 2. 变量如何存储,以及如何销毁 - 每个P有一个local,local由链表实现,链表里的元素由队列实现 - 销毁,通过GC粗发,清理函数在stw期间执行 3. 使用完后,如何清零 - 有系统管理清楚,使用者不感知,通过设置oldPools和allPool,以及victim,通过调用poolCleanup, 将allPools里的数据转移到oldPools,并将oldPools里面local的victim清空,从GC期间能够回收 # 使用 ```golang pool := &sync.Pool{ New: func() interface{} { return new(Demo) }, } data := pool.Get() demo := data.(*Demo) demo.A = "test" fmt.Println(demo) demo.A = "" pool.Put(demo) ``` # 源码分析 ## 结构体 ```golang type Pool struct { noCopy noCopy // 为local数组,大小为P的个数,每个P一个local,使用p的id作为index local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal // local数组的大小 localSize uintptr // size of the local array // 上一个周期的local victim unsafe.Pointer // local from previous cycle // 上一个周期localSize victimSize uintptr // size of victims array // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. // pool不存在值时,初始化新值调用的函数 New func() interface{} } ``` ```golang // local的实现,使用链表 type poolLocal struct { poolLocalInternal // Prevents false sharing on widespread platforms with // 128 mod (cache line size) = 0 . pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte } ``` 1. 默认先放在private里面 2. 如果已经放了,则放到shared里面 3. shared是Local P能够push也能pop,其它P只能pop 4. Get的时候,先从private里面获取,没有,从shared里面获取,没有的话,从其它的poolChain获取 ```golang type poolLocalInternal struct { // 如果存储变量,用作Get和Put比较均衡的时候,直接从private里面获取,会更加快速 private interface{} // Can be used only by the respective P. // private里已经存储了值,后续数据放入的地方 shared poolChain // Local P can pushHead/popHead; any P can popTail. } ``` ```golang // 原理链表实现 type poolChain struct { // head is the poolDequeue to push to. This is only accessed // by the producer, so doesn't need to be synchronized. head *poolChainElt // tail is the poolDequeue to popTail from. This is accessed // by consumers, so reads and writes must be atomic. tail *poolChainElt } ``` ## Put函数 **将使用完的变量x归还回去,否则使用Get,会一直New新的变量出来** 1. x不能为nil,否则直接返回 2. runtime_procPin到runtime_procUnpin期间,能保证当前goroutine不会被强占,保证数据操作安全 3. 因为local使用P的id作为index,调用期间使用了runtime_procPin到runtime_procUnpin, 保证了对于local只有一个goroutine,从而保证了数据操作的安全 ```golang func (p *Pool) Put(x interface{}) { if x == nil { return } // 获取localPool l, _ := p.pin() // 如果private为空,则放入x值到private if l.private == nil { l.private = x x = nil } // 如果private不为nil,则将x放入shared里面,放在head处 if x != nil { l.shared.pushHead(x) } // 和runtime_procPin配套使用,pin里面会调用runtime_procPin runtime_procUnpin() } ``` ## pin 1. 将当前goroutine绑定到P,禁止强占,并且返回属于当前P的localPool 2. 必须与runtime_procUnpin搭配使用 3. 该函数Put和Get均会调用 ```golang func (p *Pool) pin() (*poolLocal, int) { // 与runtime_procUnpin搭配使用 pid := runtime_procPin() // In pinSlow we store to local and then to localSize, here we load in opposite order. // Since we've disabled preemption, GC cannot happen in between. // Thus here we must observe local at least as large localSize. // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). // 获取当前p的localSize,默认初始化,s为0,会进入pinSlow分支 // 当前已经有调用Put操作,localSize已经不为0,才会进入 s := runtime_LoadAcquintptr(&p.localSize) // load-acquire l := p.local // load-consume if uintptr(pid) < s { return indexLocal(l, pid), pid } // 通过pinSlow获取,命名规则跟Mutex的lockSlow类似 return p.pinSlow() } ``` ### pinSlow函数 1. 初始化localPool会使用到全局锁allPoolsMu,因为所有的Pool都会装入allPools里面 2. 先上一个全局锁,然后进行double check,检查local是否已经初始化,如果已经初始化,直接返回 如果为初始化,则进行初始化 3. 使用P的id作为index,P的总个数作为local数组的大小 ```golang func (p *Pool) pinSlow() (*poolLocal, int) { // Retry under the mutex. // Can not lock the mutex while pinned. // 先Unpin当前P runtime_procUnpin() // 会使用全局锁allPoolsMu allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() // poolCleanup won't be called while we are pinned. // poolCleanup清理函数,不会在pinned期间执行 s := p.localSize l := p.local // double check,检查local是否已经初始化,如果已经初始化,则直接返回 if uintptr(pid) < s { return indexLocal(l, pid), pid } // 将p加入allPools里面 if p.local == nil { allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. // 如果GOMAXPROCS在GC阶段变化了,则重新赋值数组,并丢弃老的那个 size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) // 初始化一个local,并将local的地址赋值给p.local atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release //何止localSize的值,为当前P的个数 runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release 返回第P的id个local的值和pid return &local[pid], pid } ``` ## runtime_procPin函数和runtime_procUnpin函数 1. 两个函数的实现分别在runtime/proc.go里的procPin和procUnpin,下滑线前为包名 2. runtime_procPin和runtime_procUnpin期间,当前goroutine不会被强占 ### procPin 1. 将当前goroutine绑定的m的locks加一,由canPreemptM可以看出,M是否可以被强占,locks是否等于0,是必备条件之一 因此locks加一,能保证在procPin期间,当前goroutine不会被强占 2. 返回当前P的id ```golang func procPin() int { _g_ := getg() mp := _g_.m mp.locks++ return int(mp.p.ptr().id) } ``` ```golang func canPreemptM(mp *m) bool { return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning } ``` ### procUnpin 1. 获取当前goroutine的M,将其locks减一,与procPin里面的加一向匹配 ```golang func procUnpin() { _g_ := getg() _g_.m.locks-- } ``` ## shared,结构体poolDequeue ### 概览 1. shared在poolLocalInternal里面是以结构体的方式存在的,所以创建poolLocalInternal就会初始它 2. shared为一链表,链表里的元素为队列,每个元素里面的队列大小由开始的大小8开始,然后2的倍数递增 ### 结构体 ```golang type poolChainElt struct { poolDequeue // next and prev link to the adjacent poolChainElts in this // poolChain. // // next is written atomically by the producer and read // atomically by the consumer. It only transitions from nil to // non-nil. // // prev is written atomically by the consumer and read // atomically by the producer. It only transitions from // non-nil to nil. next, prev *poolChainElt } ``` ```golang type poolDequeue struct { // headTail packs together a 32-bit head index and a 32-bit // tail index. Both are indexes into vals modulo len(vals)-1. // // tail = index of oldest data in queue // head = index of next slot to fill // // Slots in the range [tail, head) are owned by consumers. // A consumer continues to own a slot outside this range until // it nils the slot, at which point ownership passes to the // producer. // // The head index is stored in the most-significant bits so // that we can atomically add to it and the overflow is // harmless. headTail uint64 // vals is a ring buffer of interface{} values stored in this // dequeue. The size of this must be a power of 2. // // vals[i].typ is nil if the slot is empty and non-nil // otherwise. A slot is still in use until *both* the tail // index has moved beyond it and typ has been set to nil. This // is set to nil atomically by the consumer and read // atomically by the producer. // 存储数据 vals []eface } ``` ### pushHead函数 ```golang func (c *poolChain) pushHead(val interface{}) { d := c.head // 未初始化,则进行初始化,c的head和tail都设置为新创建的d,d里的数组vals大小为8 // poolChainElt为一队列,数据放在数组里面。 if d == nil { // Initialize the chain. const initSize = 8 // Must be a power of 2 d = new(poolChainElt) d.vals = make([]eface, initSize) c.head = d storePoolChainElt(&c.tail, d) } // 将val放入d里,如果d满了,新创建一个poolChainElt if d.pushHead(val) { return } // The current dequeue is full. Allocate a new one of twice // the size. // 新创建一个poolChainElt,大小为之前的2倍,最大值为1<<30 newSize := len(d.vals) * 2 if newSize >= dequeueLimit { // Can't make it any bigger. newSize = dequeueLimit } // 然后将新创建的poolChainElt挂在到d的next下去 // d为前一个值,head为新创建的d2 d2 := &poolChainElt{prev: d} d2.vals = make([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) d2.pushHead(val) } ``` ### popHead函数 ```golang func (c *poolChain) popHead() (interface{}, bool) { d := c.head // 从head开始获取 for d != nil { // 如果当前的d的链表里有数据,如果存在,直接返回 if val, ok := d.popHead(); ok { return val, ok } // There may still be unconsumed elements in the // previous dequeue, so try backing up. // 如果不存在,则从prev前一个里获取 d = loadPoolChainElt(&d.prev) } return nil, false } ``` ### popTail函数 1. 当前P里面获取不到缓存变量时,会调用popTail,从其它的P的local里面偷 ```golang func (c *poolChain) popTail() (interface{}, bool) { d := loadPoolChainElt(&c.tail) if d == nil { return nil, false } for { // It's important that we load the next pointer // *before* popping the tail. In general, d may be // transiently empty, but if next is non-nil before // the pop and the pop fails, then d is permanently // empty, which is the only condition under which it's // safe to drop d from the chain. d2 := loadPoolChainElt(&d.next) if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { // This is the only dequeue. It's empty right // now, but could be pushed to in the future. return nil, false } // The tail of the chain has been drained, so move on // to the next dequeue. Try to drop it from the chain // so the next pop doesn't have to look at the empty // dequeue again. if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { // We won the race. Clear the prev pointer so // the garbage collector can collect the empty // dequeue and so popHead doesn't back up // further than necessary. storePoolChainElt(&d2.prev, nil) } d = d2 } } ``` ## Get函数 1. 先从private获取 2. 从shared里面获取 3. 从其它的shared获取 ```golang func (p *Pool) Get() interface{} { // 获取当前的P对应的local l, pid := p.pin() // 先从private获取,private不存在,则从 x := l.private // 从private获取后,将private设置为nil l.private = nil if x == nil { // Try to pop the head of the local shard. We prefer // the head over the tail for temporal locality of // reuse. // private不存在,则从当前P的shared里面获取,获取第一个 x, _ = l.shared.popHead() if x == nil { // shared不存在,则从其它的P的shared里面获取 x = p.getSlow(pid) } } runtime_procUnpin() // 最后也未获取到,则直接New一个 if x == nil && p.New != nil { x = p.New() } return x } ``` ### getSlow函数 1. private、当前P的shared里面都没有的话,尝试从其它的P的shared里面获取 ```golang func (p *Pool) getSlow(pid int) interface{} { // See the comment in pin regarding ordering of the loads. size := runtime_LoadAcquintptr(&p.localSize) // load-acquire locals := p.local // load-consume // 获取locals数组,里面存贮了所有P的local // Try to steal one element from other procs. // 尝试从其它P的share里面去偷一个元素出来 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // Try the victim cache. We do this after attempting to steal // from all primary caches because we want objects in the // victim cache to age out if at all possible. // 如果其它的share里面也没有,则尝试从victim里面获取,victim实则为执行完poolCleanup // 后,保存的上一个周期缓存的数据,会在下一个周期清除掉 // 获取原理跟从local里获取一样 size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) if x := l.private; x != nil { l.private = nil return x } for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // Mark the victim cache as empty for future gets don't bother // with it. atomic.StoreUintptr(&p.victimSize, 0) return nil } ``` ## 清理函数poolCleanup 1. 通过每次调用gc触发 ### 注册poolCleanup到触发点 ```golang func init() { runtime_registerPoolCleanup(poolCleanup) } ``` ```golang func sync_runtime_registerPoolCleanup(f func()) { poolcleanup = f } ``` ```golang func clearpools() { // clear sync.Pools if poolcleanup != nil { poolcleanup() } ... } ``` ```golang func gcStart(trigger gcTrigger) { ... // clearpools before we start the GC. If we wait they memory will not be // reclaimed until the next GC cycle. clearpools() work.cycles++ ... } ``` ### poolCleanup函数 1. 在stw期间调用,保证了Get和Put阶段都不会调用这个函数,无并发问题 2. 将当前的pools放到old里面去 3. 将上一个周期的oldPools清空,这样之前的数据就能被gc回收 ```golang func poolCleanup() { // This function is called with the world stopped, at the beginning of a garbage collection. // It must not allocate and probably should not call any runtime functions. // Because the world is stopped, no pool user can be in a // pinned section (in effect, this has all Ps pinned). // Drop victim caches from all pools. // 将老的victim设置为nil,这样gc便能清理 for _, p := range oldPools { p.victim = nil p.victimSize = 0 } // Move primary cache to victim cache. // 将当前的pool里的local赋值给victim,并将local for _, p := range allPools { p.victim = p.local p.victimSize = p.localSize p.local = nil p.localSize = 0 } // The pools with non-empty primary caches now have non-empty // victim caches and no pools have primary caches. // 将当前的pool赋值给oldPools,当前的pools清空 oldPools, allPools = allPools, nil } ```

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:701969077

882 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传