总结
- 可以缓存变量实例的池子,每个pool存储一种类型的变量实例,支持并发
- 内部存储结构,使用了链表、队列、数组
- 初始化local时,会有一把全局锁
- 除了用锁进行临界值的保护外,更多的是通过防止goroutine不能被强占来保证数据的安全
问题
- 使用Pool如何提升性能
- 变量如何存储,以及如何销毁
- 使用完后,如何清零
问题解答
- 使用Pool如何提升性能
- 小粒度:通过设置runtime.GOMAXPROCS(0)个locals,每个P都绑定一个local,每个P操作自己的local, 并且通过设置当前的goroutine不能被强占,在Get和Put,保证数据操作安全
- local也分粒度,优先从private里面获取,获取不到shared里面获取
- 自身的local获取不到,则从其它的P的local里面去偷,偷的是最后一个
- 数据过期,也分了oldPools和allPool,当当前pool里面没有,会从old里面获取
- 变量如何存储,以及如何销毁
- 每个P有一个local,local由链表实现,链表里的元素由队列实现
- 销毁,通过GC粗发,清理函数在stw期间执行
- 使用完后,如何清零
- 有系统管理清楚,使用者不感知,通过设置oldPools和allPool,以及victim,通过调用poolCleanup, 将allPools里的数据转移到oldPools,并将oldPools里面local的victim清空,从GC期间能够回收
使用
pool := &sync.Pool{
New: func() interface{} {
return new(Demo)
},
}
data := pool.Get()
demo := data.(*Demo)
demo.A = "test"
fmt.Println(demo)
demo.A = ""
pool.Put(demo)
源码分析
结构体
type Pool struct {
noCopy noCopy
// 为local数组,大小为P的个数,每个P一个local,使用p的id作为index
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
// local数组的大小
localSize uintptr // size of the local array
// 上一个周期的local
victim unsafe.Pointer // local from previous cycle
// 上一个周期localSize
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
// pool不存在值时,初始化新值调用的函数
New func() interface{}
}
// local的实现,使用链表
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
- 默认先放在private里面
- 如果已经放了,则放到shared里面
- shared是Local P能够push也能pop,其它P只能pop
- Get的时候,先从private里面获取,没有,从shared里面获取,没有的话,从其它的poolChain获取
type poolLocalInternal struct { // 如果存储变量,用作Get和Put比较均衡的时候,直接从private里面获取,会更加快速 private interface{} // Can be used only by the respective P. // private里已经存储了值,后续数据放入的地方 shared poolChain // Local P can pushHead/popHead; any P can popTail. }
// 原理链表实现
type poolChain struct {
// head is the poolDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *poolChainElt
// tail is the poolDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *poolChainElt
}
Put函数
将使用完的变量x归还回去,否则使用Get,会一直New新的变量出来
- x不能为nil,否则直接返回
- runtime_procPin到runtime_procUnpin期间,能保证当前goroutine不会被强占,保证数据操作安全
- 因为local使用P的id作为index,调用期间使用了runtime_procPin到runtime_procUnpin, 保证了对于local只有一个goroutine,从而保证了数据操作的安全
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// 获取localPool
l, _ := p.pin()
// 如果private为空,则放入x值到private
if l.private == nil {
l.private = x
x = nil
}
// 如果private不为nil,则将x放入shared里面,放在head处
if x != nil {
l.shared.pushHead(x)
}
// 和runtime_procPin配套使用,pin里面会调用runtime_procPin
runtime_procUnpin()
}
pin
- 将当前goroutine绑定到P,禁止强占,并且返回属于当前P的localPool
- 必须与runtime_procUnpin搭配使用
- 该函数Put和Get均会调用
func (p *Pool) pin() (*poolLocal, int) { // 与runtime_procUnpin搭配使用 pid := runtime_procPin() // In pinSlow we store to local and then to localSize, here we load in opposite order. // Since we've disabled preemption, GC cannot happen in between. // Thus here we must observe local at least as large localSize. // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). // 获取当前p的localSize,默认初始化,s为0,会进入pinSlow分支 // 当前已经有调用Put操作,localSize已经不为0,才会进入 s := runtime_LoadAcquintptr(&p.localSize) // load-acquire l := p.local // load-consume if uintptr(pid) < s { return indexLocal(l, pid), pid } // 通过pinSlow获取,命名规则跟Mutex的lockSlow类似 return p.pinSlow() }
pinSlow函数
- 初始化localPool会使用到全局锁allPoolsMu,因为所有的Pool都会装入allPools里面
- 先上一个全局锁,然后进行double check,检查local是否已经初始化,如果已经初始化,直接返回 如果为初始化,则进行初始化
- 使用P的id作为index,P的总个数作为local数组的大小
func (p *Pool) pinSlow() (*poolLocal, int) { // Retry under the mutex. // Can not lock the mutex while pinned. // 先Unpin当前P runtime_procUnpin() // 会使用全局锁allPoolsMu allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() // poolCleanup won't be called while we are pinned. // poolCleanup清理函数,不会在pinned期间执行 s := p.localSize l := p.local // double check,检查local是否已经初始化,如果已经初始化,则直接返回 if uintptr(pid) < s { return indexLocal(l, pid), pid } // 将p加入allPools里面 if p.local == nil { allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. // 如果GOMAXPROCS在GC阶段变化了,则重新赋值数组,并丢弃老的那个 size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) // 初始化一个local,并将local的地址赋值给p.local atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release //何止localSize的值,为当前P的个数 runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release 返回第P的id个local的值和pid return &local[pid], pid }
runtime_procPin函数和runtime_procUnpin函数
- 两个函数的实现分别在runtime/proc.go里的procPin和procUnpin,下滑线前为包名
- runtime_procPin和runtime_procUnpin期间,当前goroutine不会被强占
procPin
- 将当前goroutine绑定的m的locks加一,由canPreemptM可以看出,M是否可以被强占,locks是否等于0,是必备条件之一 因此locks加一,能保证在procPin期间,当前goroutine不会被强占
- 返回当前P的id
func procPin() int {
_g_ := getg()
mp := _g_.m
mp.locks++
return int(mp.p.ptr().id)
}
func canPreemptM(mp *m) bool {
return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}
procUnpin
- 获取当前goroutine的M,将其locks减一,与procPin里面的加一向匹配
func procUnpin() { _g_ := getg() _g_.m.locks-- }
shared,结构体poolDequeue
概览
- shared在poolLocalInternal里面是以结构体的方式存在的,所以创建poolLocalInternal就会初始它
- shared为一链表,链表里的元素为队列,每个元素里面的队列大小由开始的大小8开始,然后2的倍数递增
结构体
type poolChainElt struct {
poolDequeue
// next and prev link to the adjacent poolChainElts in this
// poolChain.
//
// next is written atomically by the producer and read
// atomically by the consumer. It only transitions from nil to
// non-nil.
//
// prev is written atomically by the consumer and read
// atomically by the producer. It only transitions from
// non-nil to nil.
next, prev *poolChainElt
}
type poolDequeue struct {
// headTail packs together a 32-bit head index and a 32-bit
// tail index. Both are indexes into vals modulo len(vals)-1.
//
// tail = index of oldest data in queue
// head = index of next slot to fill
//
// Slots in the range [tail, head) are owned by consumers.
// A consumer continues to own a slot outside this range until
// it nils the slot, at which point ownership passes to the
// producer.
//
// The head index is stored in the most-significant bits so
// that we can atomically add to it and the overflow is
// harmless.
headTail uint64
// vals is a ring buffer of interface{} values stored in this
// dequeue. The size of this must be a power of 2.
//
// vals[i].typ is nil if the slot is empty and non-nil
// otherwise. A slot is still in use until *both* the tail
// index has moved beyond it and typ has been set to nil. This
// is set to nil atomically by the consumer and read
// atomically by the producer.
// 存储数据
vals []eface
}
pushHead函数
func (c *poolChain) pushHead(val interface{}) {
d := c.head
// 未初始化,则进行初始化,c的head和tail都设置为新创建的d,d里的数组vals大小为8
// poolChainElt为一队列,数据放在数组里面。
if d == nil {
// Initialize the chain.
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
// 将val放入d里,如果d满了,新创建一个poolChainElt
if d.pushHead(val) {
return
}
// The current dequeue is full. Allocate a new one of twice
// the size.
// 新创建一个poolChainElt,大小为之前的2倍,最大值为1<<30
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
// 然后将新创建的poolChainElt挂在到d的next下去
// d为前一个值,head为新创建的d2
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
popHead函数
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
// 从head开始获取
for d != nil {
// 如果当前的d的链表里有数据,如果存在,直接返回
if val, ok := d.popHead(); ok {
return val, ok
}
// There may still be unconsumed elements in the
// previous dequeue, so try backing up.
// 如果不存在,则从prev前一个里获取
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
popTail函数
当前P里面获取不到缓存变量时,会调用popTail,从其它的P的local里面偷
func (c *poolChain) popTail() (interface{}, bool) { d := loadPoolChainElt(&c.tail) if d == nil { return nil, false } for { // It's important that we load the next pointer // *before* popping the tail. In general, d may be // transiently empty, but if next is non-nil before // the pop and the pop fails, then d is permanently // empty, which is the only condition under which it's // safe to drop d from the chain. d2 := loadPoolChainElt(&d.next) if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { // This is the only dequeue. It's empty right // now, but could be pushed to in the future. return nil, false } // The tail of the chain has been drained, so move on // to the next dequeue. Try to drop it from the chain // so the next pop doesn't have to look at the empty // dequeue again. if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { // We won the race. Clear the prev pointer so // the garbage collector can collect the empty // dequeue and so popHead doesn't back up // further than necessary. storePoolChainElt(&d2.prev, nil) } d = d2 } }
Get函数
- 先从private获取
- 从shared里面获取
从其它的shared获取
func (p *Pool) Get() interface{} { // 获取当前的P对应的local l, pid := p.pin() // 先从private获取,private不存在,则从 x := l.private // 从private获取后,将private设置为nil l.private = nil if x == nil { // Try to pop the head of the local shard. We prefer // the head over the tail for temporal locality of // reuse. // private不存在,则从当前P的shared里面获取,获取第一个 x, _ = l.shared.popHead() if x == nil { // shared不存在,则从其它的P的shared里面获取 x = p.getSlow(pid) } } runtime_procUnpin() // 最后也未获取到,则直接New一个 if x == nil && p.New != nil { x = p.New() } return x }
getSlow函数
private、当前P的shared里面都没有的话,尝试从其它的P的shared里面获取
func (p *Pool) getSlow(pid int) interface{} { // See the comment in pin regarding ordering of the loads. size := runtime_LoadAcquintptr(&p.localSize) // load-acquire locals := p.local // load-consume // 获取locals数组,里面存贮了所有P的local // Try to steal one element from other procs. // 尝试从其它P的share里面去偷一个元素出来 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // Try the victim cache. We do this after attempting to steal // from all primary caches because we want objects in the // victim cache to age out if at all possible. // 如果其它的share里面也没有,则尝试从victim里面获取,victim实则为执行完poolCleanup // 后,保存的上一个周期缓存的数据,会在下一个周期清除掉 // 获取原理跟从local里获取一样 size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) if x := l.private; x != nil { l.private = nil return x } for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // Mark the victim cache as empty for future gets don't bother // with it. atomic.StoreUintptr(&p.victimSize, 0) return nil }
清理函数poolCleanup
- 通过每次调用gc触发
注册poolCleanup到触发点
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
func sync_runtime_registerPoolCleanup(f func()) {
poolcleanup = f
}
func clearpools() {
// clear sync.Pools
if poolcleanup != nil {
poolcleanup()
}
...
}
func gcStart(trigger gcTrigger) {
...
// clearpools before we start the GC. If we wait they memory will not be
// reclaimed until the next GC cycle.
clearpools()
work.cycles++
...
}
poolCleanup函数
- 在stw期间调用,保证了Get和Put阶段都不会调用这个函数,无并发问题
- 将当前的pools放到old里面去
将上一个周期的oldPools清空,这样之前的数据就能被gc回收
func poolCleanup() { // This function is called with the world stopped, at the beginning of a garbage collection. // It must not allocate and probably should not call any runtime functions. // Because the world is stopped, no pool user can be in a // pinned section (in effect, this has all Ps pinned). // Drop victim caches from all pools. // 将老的victim设置为nil,这样gc便能清理 for _, p := range oldPools { p.victim = nil p.victimSize = 0 } // Move primary cache to victim cache. // 将当前的pool里的local赋值给victim,并将local for _, p := range allPools { p.victim = p.local p.victimSize = p.localSize p.local = nil p.localSize = 0 } // The pools with non-empty primary caches now have non-empty // victim caches and no pools have primary caches. // 将当前的pool赋值给oldPools,当前的pools清空 oldPools, allPools = allPools, nil }
有疑问加站长微信联系(非本文作者))
