[原文地址](https://blog.csdn.net/u014763610/article/details/116889422)
# 总结
1. 可以缓存变量实例的池子,每个pool存储一种类型的变量实例,支持并发
2. 内部存储结构,使用了链表、队列、数组
3. 初始化local时,会有一把全局锁
4. 除了用锁进行临界值的保护外,更多的是通过防止goroutine不能被强占来保证数据的安全
# 问题
1. 使用Pool如何提升性能
2. 变量如何存储,以及如何销毁
3. 使用完后,如何清零
# 问题解答
1. 使用Pool如何提升性能
- 小粒度:通过设置runtime.GOMAXPROCS(0)个locals,每个P都绑定一个local,每个P操作自己的local,
并且通过设置当前的goroutine不能被强占,在Get和Put,保证数据操作安全
- local也分粒度,优先从private里面获取,获取不到shared里面获取
- 自身的local获取不到,则从其它的P的local里面去偷,偷的是最后一个
- 数据过期,也分了oldPools和allPool,当当前pool里面没有,会从old里面获取
2. 变量如何存储,以及如何销毁
- 每个P有一个local,local由链表实现,链表里的元素由队列实现
- 销毁,通过GC粗发,清理函数在stw期间执行
3. 使用完后,如何清零
- 有系统管理清楚,使用者不感知,通过设置oldPools和allPool,以及victim,通过调用poolCleanup,
将allPools里的数据转移到oldPools,并将oldPools里面local的victim清空,从GC期间能够回收
# 使用
```golang
pool := &sync.Pool{
New: func() interface{} {
return new(Demo)
},
}
data := pool.Get()
demo := data.(*Demo)
demo.A = "test"
fmt.Println(demo)
demo.A = ""
pool.Put(demo)
```
# 源码分析
## 结构体
```golang
type Pool struct {
noCopy noCopy
// 为local数组,大小为P的个数,每个P一个local,使用p的id作为index
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
// local数组的大小
localSize uintptr // size of the local array
// 上一个周期的local
victim unsafe.Pointer // local from previous cycle
// 上一个周期localSize
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
// pool不存在值时,初始化新值调用的函数
New func() interface{}
}
```
```golang
// local的实现,使用链表
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
```
1. 默认先放在private里面
2. 如果已经放了,则放到shared里面
3. shared是Local P能够push也能pop,其它P只能pop
4. Get的时候,先从private里面获取,没有,从shared里面获取,没有的话,从其它的poolChain获取
```golang
type poolLocalInternal struct {
// 如果存储变量,用作Get和Put比较均衡的时候,直接从private里面获取,会更加快速
private interface{} // Can be used only by the respective P.
// private里已经存储了值,后续数据放入的地方
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
```
```golang
// 原理链表实现
type poolChain struct {
// head is the poolDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *poolChainElt
// tail is the poolDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *poolChainElt
}
```
## Put函数
**将使用完的变量x归还回去,否则使用Get,会一直New新的变量出来**
1. x不能为nil,否则直接返回
2. runtime_procPin到runtime_procUnpin期间,能保证当前goroutine不会被强占,保证数据操作安全
3. 因为local使用P的id作为index,调用期间使用了runtime_procPin到runtime_procUnpin,
保证了对于local只有一个goroutine,从而保证了数据操作的安全
```golang
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// 获取localPool
l, _ := p.pin()
// 如果private为空,则放入x值到private
if l.private == nil {
l.private = x
x = nil
}
// 如果private不为nil,则将x放入shared里面,放在head处
if x != nil {
l.shared.pushHead(x)
}
// 和runtime_procPin配套使用,pin里面会调用runtime_procPin
runtime_procUnpin()
}
```
## pin
1. 将当前goroutine绑定到P,禁止强占,并且返回属于当前P的localPool
2. 必须与runtime_procUnpin搭配使用
3. 该函数Put和Get均会调用
```golang
func (p *Pool) pin() (*poolLocal, int) {
// 与runtime_procUnpin搭配使用
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
// 获取当前p的localSize,默认初始化,s为0,会进入pinSlow分支
// 当前已经有调用Put操作,localSize已经不为0,才会进入
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 通过pinSlow获取,命名规则跟Mutex的lockSlow类似
return p.pinSlow()
}
```
### pinSlow函数
1. 初始化localPool会使用到全局锁allPoolsMu,因为所有的Pool都会装入allPools里面
2. 先上一个全局锁,然后进行double check,检查local是否已经初始化,如果已经初始化,直接返回
如果为初始化,则进行初始化
3. 使用P的id作为index,P的总个数作为local数组的大小
```golang
func (p *Pool) pinSlow() (*poolLocal, int) {
// Retry under the mutex.
// Can not lock the mutex while pinned.
// 先Unpin当前P
runtime_procUnpin()
// 会使用全局锁allPoolsMu
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
// poolCleanup清理函数,不会在pinned期间执行
s := p.localSize
l := p.local
// double check,检查local是否已经初始化,如果已经初始化,则直接返回
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 将p加入allPools里面
if p.local == nil {
allPools = append(allPools, p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
// 如果GOMAXPROCS在GC阶段变化了,则重新赋值数组,并丢弃老的那个
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
// 初始化一个local,并将local的地址赋值给p.local
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
//何止localSize的值,为当前P的个数
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
返回第P的id个local的值和pid
return &local[pid], pid
}
```
## runtime_procPin函数和runtime_procUnpin函数
1. 两个函数的实现分别在runtime/proc.go里的procPin和procUnpin,下滑线前为包名
2. runtime_procPin和runtime_procUnpin期间,当前goroutine不会被强占
### procPin
1. 将当前goroutine绑定的m的locks加一,由canPreemptM可以看出,M是否可以被强占,locks是否等于0,是必备条件之一
因此locks加一,能保证在procPin期间,当前goroutine不会被强占
2. 返回当前P的id
```golang
func procPin() int {
_g_ := getg()
mp := _g_.m
mp.locks++
return int(mp.p.ptr().id)
}
```
```golang
func canPreemptM(mp *m) bool {
return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}
```
### procUnpin
1. 获取当前goroutine的M,将其locks减一,与procPin里面的加一向匹配
```golang
func procUnpin() {
_g_ := getg()
_g_.m.locks--
}
```
## shared,结构体poolDequeue
### 概览
1. shared在poolLocalInternal里面是以结构体的方式存在的,所以创建poolLocalInternal就会初始它
2. shared为一链表,链表里的元素为队列,每个元素里面的队列大小由开始的大小8开始,然后2的倍数递增
### 结构体
```golang
type poolChainElt struct {
poolDequeue
// next and prev link to the adjacent poolChainElts in this
// poolChain.
//
// next is written atomically by the producer and read
// atomically by the consumer. It only transitions from nil to
// non-nil.
//
// prev is written atomically by the consumer and read
// atomically by the producer. It only transitions from
// non-nil to nil.
next, prev *poolChainElt
}
```
```golang
type poolDequeue struct {
// headTail packs together a 32-bit head index and a 32-bit
// tail index. Both are indexes into vals modulo len(vals)-1.
//
// tail = index of oldest data in queue
// head = index of next slot to fill
//
// Slots in the range [tail, head) are owned by consumers.
// A consumer continues to own a slot outside this range until
// it nils the slot, at which point ownership passes to the
// producer.
//
// The head index is stored in the most-significant bits so
// that we can atomically add to it and the overflow is
// harmless.
headTail uint64
// vals is a ring buffer of interface{} values stored in this
// dequeue. The size of this must be a power of 2.
//
// vals[i].typ is nil if the slot is empty and non-nil
// otherwise. A slot is still in use until *both* the tail
// index has moved beyond it and typ has been set to nil. This
// is set to nil atomically by the consumer and read
// atomically by the producer.
// 存储数据
vals []eface
}
```
### pushHead函数
```golang
func (c *poolChain) pushHead(val interface{}) {
d := c.head
// 未初始化,则进行初始化,c的head和tail都设置为新创建的d,d里的数组vals大小为8
// poolChainElt为一队列,数据放在数组里面。
if d == nil {
// Initialize the chain.
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
// 将val放入d里,如果d满了,新创建一个poolChainElt
if d.pushHead(val) {
return
}
// The current dequeue is full. Allocate a new one of twice
// the size.
// 新创建一个poolChainElt,大小为之前的2倍,最大值为1<<30
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
// 然后将新创建的poolChainElt挂在到d的next下去
// d为前一个值,head为新创建的d2
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
```
### popHead函数
```golang
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
// 从head开始获取
for d != nil {
// 如果当前的d的链表里有数据,如果存在,直接返回
if val, ok := d.popHead(); ok {
return val, ok
}
// There may still be unconsumed elements in the
// previous dequeue, so try backing up.
// 如果不存在,则从prev前一个里获取
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
```
### popTail函数
1. 当前P里面获取不到缓存变量时,会调用popTail,从其它的P的local里面偷
```golang
func (c *poolChain) popTail() (interface{}, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
// It's important that we load the next pointer
// *before* popping the tail. In general, d may be
// transiently empty, but if next is non-nil before
// the pop and the pop fails, then d is permanently
// empty, which is the only condition under which it's
// safe to drop d from the chain.
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
// This is the only dequeue. It's empty right
// now, but could be pushed to in the future.
return nil, false
}
// The tail of the chain has been drained, so move on
// to the next dequeue. Try to drop it from the chain
// so the next pop doesn't have to look at the empty
// dequeue again.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// We won the race. Clear the prev pointer so
// the garbage collector can collect the empty
// dequeue and so popHead doesn't back up
// further than necessary.
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
```
## Get函数
1. 先从private获取
2. 从shared里面获取
3. 从其它的shared获取
```golang
func (p *Pool) Get() interface{} {
// 获取当前的P对应的local
l, pid := p.pin()
// 先从private获取,private不存在,则从
x := l.private
// 从private获取后,将private设置为nil
l.private = nil
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
// private不存在,则从当前P的shared里面获取,获取第一个
x, _ = l.shared.popHead()
if x == nil {
// shared不存在,则从其它的P的shared里面获取
x = p.getSlow(pid)
}
}
runtime_procUnpin()
// 最后也未获取到,则直接New一个
if x == nil && p.New != nil {
x = p.New()
}
return x
}
```
### getSlow函数
1. private、当前P的shared里面都没有的话,尝试从其它的P的shared里面获取
```golang
func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 获取locals数组,里面存贮了所有P的local
// Try to steal one element from other procs.
// 尝试从其它P的share里面去偷一个元素出来
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Try the victim cache. We do this after attempting to steal
// from all primary caches because we want objects in the
// victim cache to age out if at all possible.
// 如果其它的share里面也没有,则尝试从victim里面获取,victim实则为执行完poolCleanup
// 后,保存的上一个周期缓存的数据,会在下一个周期清除掉
// 获取原理跟从local里获取一样
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Mark the victim cache as empty for future gets don't bother
// with it.
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
```
## 清理函数poolCleanup
1. 通过每次调用gc触发
### 注册poolCleanup到触发点
```golang
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
```
```golang
func sync_runtime_registerPoolCleanup(f func()) {
poolcleanup = f
}
```
```golang
func clearpools() {
// clear sync.Pools
if poolcleanup != nil {
poolcleanup()
}
...
}
```
```golang
func gcStart(trigger gcTrigger) {
...
// clearpools before we start the GC. If we wait they memory will not be
// reclaimed until the next GC cycle.
clearpools()
work.cycles++
...
}
```
### poolCleanup函数
1. 在stw期间调用,保证了Get和Put阶段都不会调用这个函数,无并发问题
2. 将当前的pools放到old里面去
3. 将上一个周期的oldPools清空,这样之前的数据就能被gc回收
```golang
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// Because the world is stopped, no pool user can be in a
// pinned section (in effect, this has all Ps pinned).
// Drop victim caches from all pools.
// 将老的victim设置为nil,这样gc便能清理
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// Move primary cache to victim cache.
// 将当前的pool里的local赋值给victim,并将local
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// The pools with non-empty primary caches now have non-empty
// victim caches and no pools have primary caches.
// 将当前的pool赋值给oldPools,当前的pools清空
oldPools, allPools = allPools, nil
}
```
有疑问加站长微信联系(非本文作者))