Go 1.13版本中有几个比较大的修改,其中之一是sync.Pool
修改了部分实现,减小某些极端情况下的性能开销。文中内容来源于笔者读完sync.Pool源代码的思考和总结,内容以Go 1.13中的实现为准,少量内容涉及到Go 1.13之前,如有误区请读者多多指教。
概念
在本文内容开始之前需要理解几个在Go runtime中的概念,以便于更好的理解sync.Pool
中一些实现。
goroutine抢占
Go中调度器是GMP模型,简单理解G就是goroutine;M可以类比内核线程,是执行G的地方;P是调度G以及为G的执行准备所需资源。一般情况下,P的数量CPU的可用核心数,也可由runtime.GOMAXPROCS
指定。本文的重点并非goroutine调度器,在此不做详细解释,感兴趣可以翻阅延伸阅读的文章。
Go有这样的调度规则:某个G不能一直占用M,在某个时刻的时候,runtime(参见sysmon
)会判断当前M是否可以被抢占,即M上正在执行的G让出。P在合理的时刻将G调度到合理的M上执行,在runtime里面,每个P维护一个本地存放待执行G的队列localq,同时还存在一个全局的待执行G的队列globalq;调度就是P从localq或globalq中取出G到对应的M上执行,所谓抢占,runtime将G抢占移出运行状态,拷贝G的执行栈放入待执行队列中,可能是某个P的localq,也可能是globalq,等待下一次调度,因此当被抢占的G重回待执行队列时有可能此时的P与前一次运行的P并非同一个。
所谓禁止抢占,即当前执行G不允许被抢占调度,直到禁止抢占标记解除。Go runtime实现了G的禁止抢占与解除禁止抢占。
func runtime_procPin() int
禁止抢占,标记当前G在M上不会被抢占,并返回当前所在P的ID。
func runtime_procUnpin()
解除G的禁止抢占状态,之后G可被抢占。
数据结构
poolDequeue
type poolDequeue struct {
headTail uint64
vals []eface
}
type eface struct {
typ, val unsafe.Pointer
}
poolDequeue
被实现为单生产者多消费者的固定大小的无锁Ring式队列。生产者可以从head插入head删除,而消费者仅可从tail删除。headTail
指向了队列的头和尾,通过位运算将head和tail位置存入headTail
变量中。
func (d *poolDequeue) pushHead(val interface{}) bool {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// Ring式队列,头尾相等则队列已满
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// 原子操作拿到slot.typ
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
return false
}
if val == nil {
val = dequeueNil(nil)
}
// slot占位,将val存入vals中
*(*interface{})(unsafe.Pointer(slot)) = val
// 更改队列指向头
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
- 在头入时,先判断队列是否已满,判断条件:head == tail
- 从队列中取到head位置的
slot
,根据slot.typ
判断当前slot是否已被存放数据,注意这里使用了atomic.LoadPointer
取代锁操作。 - 将
val
赋值给slot
,这里实现的比较巧妙,slot
是eface
类型,将slot
转为interface{}
类型,这样val
能以interface{}
赋值给slot
让slot.typ
和slot.val
指向其内存块,这样slot.typ
和slot.val
均不为空,这也就是第2点条件判断的来由。插入成功后head加1,指向队头的前一个空位,由于插入删除都涉及到对headTail
的修改,此处使用原子操作取代锁。
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 判断队列是否为空
if tail == head {
return nil, false
}
// head位置是队头的前一个位置,所以此处要先退一位
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
// 取出val
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// 重置slot,typ和val均为nil
*slot = eface{}
return val, true
}
popHead
的代码比较简单,流程上无非就是判断队列是否空,拿出slot
转换为interface{}
然后重置slot
。popHead
与pushHead
有一点需要注意,在popHead
中,是先修改了headTail
,然后再取slot
,而在pushHead
中是先插入数据,然后再修改headTail
,至于为什么这里先留一个疑问,后面将会详细解释。
func (d *poolDequeue) popTail() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 判断队列是否空
if tail == head {
return nil, false
}
ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
// 取出val
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// 重置slot.typ
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
return val, true
}
popTail
的代码类似popHead
,只是删除对象从队首变成队尾,注意结尾部分代码,使用了atomic.StorePointer
取代锁操作。
poolChain
poolDequeue
被实现为Ring式队列,而poolChain
则是基于poolDequeue
实现为双向链表。
type poolChain struct {
head *poolChainElt
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
next, prev *poolChainElt
}
同理,poolChain
也实现了pushHead
,popHead
和popTail
。
func (c *poolChain) pushHead(val interface{}) {
d := c.head
if d == nil {
// poolDequeue初始长度为8
const initSize = 8
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
if d.pushHead(val) {
return
}
// 前一个poolDequeue长度的2倍
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
newSize = dequeueLimit
}
// 首尾相连,构成链表
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
到这里大概就明白了,poolDequeue
是在poolChain
的pushHead
中创建的,且每次创建的长度都是前一个poolDequeue
长度的2倍,初始长度为8。
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
for d != nil {
if val, ok := d.popHead(); ok {
return val, ok
}
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
popHead
以队首向队尾为方向遍历链表,对每个poolDequeue
执行popHead
尝试取出存放的对象。
func (c *poolChain) popTail() (interface{}, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
return nil, false
}
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
popTail
以队尾向队队首为方向遍历链表,对每个poolDequeue
执行popTail
尝试从尾部取出存放的对象。
宏观上来看,poolChain
的结构如下图:
Pool的源代码实现
poolChain
为sync.Pool的底层数据结构,接下来一览Pool的实现。
type Pool struct {
// ...
// 每个P的本地队列,实际类型为[P]poolLocal
local unsafe.Pointer
// [P]poolLocal的大小,<= P
localSize uintptr
victim unsafe.Pointer
victimSize uintptr
// 自定义的对象创建回调,当pool中无可用对象时会调用此函数
New func() interface{}
}
type poolLocalInternal struct {
// 每个P的私有共享,使用时无需加锁
private interface{}
// 对象列表,本地P可以pushHead/popHead,其他P仅能popTail
shared poolChain
}
type poolLocal struct {
poolLocalInternal
// 伪共享,仅占位用,防止在cache line上分配多个poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
接下来看看Pool的具体实现。
Pool.Get
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
s := atomic.LoadUintptr(&p.localSize)
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
pin
首先标记了当前G禁止抢占,在runtime_procUnpin
之前,当前G和P不会被抢占。此处之所以标记禁止抢占是因为下文中有使用到P ID,如果被抢占了,有可能接下里使用的P ID与所绑定的P并非同一个。
在获得P ID之后,当P ID小于p.local
数组长度时在p.local
数组里找到P对应的poolLocal
对象,否则进入pinSlow
函数创建新的poolLocal
。
func (p *Pool) pinSlow() (*poolLocal, int) {
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p)
}
// 当前P的数量
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
atomic.StoreUintptr(&p.localSize, uintptr(size))
return &local[pid], pid
}
pinSlow
的实现比较简单,即当前P ID在Pool中没有对应的poolLocal
对象时,则创建一个新的poolLocal
对象,旧的poolLocal
将会进入GC。仔细观察pinSlow
函数发现先执行了runtime_procUnpin
,随后有执行了runtime_procPin
,后者的目的在于获取最新的P ID,这里的意图有点难以理解,在仔细阅读和理解之后,发现目的在于尽量减少[]poolLocal
的创建次数,因为pinSlow
之前和pinSlow
里面可能会因为解除禁止抢占而导致绑定的P不一致,万一新绑定的P里存在可用的poolLocal
呢。
func (p *Pool) Get() interface{} {
// ...
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil {
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
// 解除禁止抢占
runtime_procUnpin()
// ...
if x == nil && p.New != nil {
x = p.New()
}
return x
}
- 根据当前P取得对应的
poolLocal
和P ID。 - 若当前
poolLocal.private
不为空,则表示可复用此对象;若为空,则在poolLocal.shared
队列中获取对象。 - 若
poolLocal.shared
无可用对象,则进入getSlow
获取对象。 - 若未能从其他P成功窃取对象,则调用自定义的对象创建函数,如果该函数不为
nil
。
func (p *Pool) getSlow(pid int) interface{} {
size := atomic.LoadUintptr(&p.localSize)
locals := p.local
// 从其他P中窃取对象
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 尝试从victim cache中取对象
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 清空victim cache
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
进入getSlow
的前提是当前P本地无可用对象,于是转头去其他P的里窃取对象,getSlow
就是做这件事情。
- 遍历
Pool.local
中的其他P,确认其他P的shared
里是否有可用对象,如果有,则从链尾取出。 - 如果其他P中也没有可用对象,则尝试从victim cache中取可用对象,至于victim cache本文下半部分将会做详细解释。
纵观整个Get过程会发现,从当前P的poolLocal
中取对象时使用的时popHead
,而从其他P的poolLocal
中窃取对象时使用的时popTail
,再回到上文中对poolChain
的定义,可以知道,当前P对本地poolLocal是生产者,对其他P的poolLocal而言是消费者。
再次回到poolDequeue
和poolChain
上。我们知道某一时刻P只会调度一个G,那么对于生产者而言,调用pushHead
和popHead
并不需要加锁,因为当前P操作的是本地poolLocal
;当消费者是其他P,在进行popTail
操作时,则会对pushHead
以及popHead
形成竞争关系,对这种问题,poolDequeue
的实现直指要害。
首先注意eface
这个结构,若插入成功eface
下的两个字段会指向要缓存对象的内存地址,在pushHead
中使用了原子操作判断typ
字段是否为nil
,存在这样一种可能性:pushHead
所取到的slot
正在popTail
里准备重置,这种情况下pushHead
会直接返回失败。
回到竞争问题上,pushHead
的流程可以简化为先取slot
,再判断是否可插入最后修改headTail
,而popTail
的流程可以简化为先修改headTail
再取slot
然后重置slot
,pushHead
修改head位置,popTail
修改tail位置,所以对于headTail
字段使用原子操作避免即可读写冲突。
疑问是为何popTail
中需要先修改headTail
呢,因为存在其他P都会到当前P上窃取对象,当多个P都调用本地P的popTail
时,竞争现象就会更加明显,所以此时应尽早修改headTail
,一旦某个P窃取到了其他P就无法再窃取此对象。
Pool.Put
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// ...
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
// ...
}
Put
的实现比较简单,优先将对象存入private
,若private
已存在则放入shared链表中,pin
中会标记禁止抢占,因此需要在pin
结束以及Put逻辑结束后取消禁止抢占。
Victim Cache
Victim Cache本是计算机架构里面的一个概念,是CPU硬件处理缓存的一种技术,sync.Pool
引入的意图在于降低GC压力同时提高命中率,本文并不需要详解Victim Cache的原理,分析sync.Pool
即可明白其意图。对于Pool而言有一点需要明白,这个Pool并非是无限制扩展的,否则会引起内存溢出。几乎所有的池技术中,都会在某个时刻清空或清除部分缓存对象,那么在Go中何时清理未使用的对象呢?
在Pool.Get函数中,取不到对象时会尝试从p.victim
中取,用完后放回当前P的本地队列,而p.vcitim
是什么被创建的呢?是在poolCleanup
函数中,该函数会在GC时被调用到,在init
函数里注册。
func poolCleanup() {
// 1
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// 2
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
oldPools, allPools = allPools, nil
}
poolCleanup
会在STW阶段被调用,函数实现虽然看起来简单,但其意图较为复杂,那么该如何解释呢?
尝试模拟一下实际情况:
- 初始状态下,
oldPools
和allPools
均为nil
- 第1次调用Get,由于
p.local
为nil
,将会在pinSlow
中创建p.local
,然后将p
放入allPools
,此时allPools
长度为1,oldPools
为nil
- 对象使用完毕,第1次调用Put放回对象
- 第1次GC STW阶段,
allPools
中所有p.local
将值赋值给victim
并置为nil
,最后allPools
为nil
,oldPools
长度为1 - 第2次调用Get,由于
p.local
为nil
,此时会从p.victim
里面尝试取对象 - 对象使用完毕,第2次调用Put放回对象,但由于
p.local
为nil
,重新创建p.local
,并将对象放回,此时allPools
长度为1,oldPools
长度为1 - 第2次GC STW阶段,
oldPools
中所有p.victim
置nil
,前一次的cache在本次GC时被回收,allPools
所有p.local
将值赋值给victim
并置为nil
,最后allPools
为nil,oldPools
长度为1
再来看看Go 1.13以前poolCleanup
的实现:
func poolCleanup() {
for i, p := range allPools {
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
allPools = []*Pool{}
}
Go 1.13以前poolCleanup
的实现简单粗暴,每次GC STW阶段遍历allPools
,清空p.local
、poolLocal.shared
。
通过两者的对比发现,新版的实现相比Go 1.13之前,GC的粒度拉大了,由于实际回收的时间线拉长,单位时间内GC的开销减小。
由此基本明白p.victim
的作用,它的定位是次级缓存,GC时将对象放入其中,下一次GC来临之前如果有Get调用则会从p.victim
中取,直到再一次GC来临做回收,同时由于从p.victim
中取出对象使用完毕之后并未放回p.victim
中,在一定程度也减小了下一次GC的开销。原来1次GC的开销被拉长到2次且会有一定程度的开销减小,这就是p.victim
引入的意图。
关于Victim Cache更多的信息可以在延伸阅读中找到。
sync.Pool的设计理念
无锁
无锁编程是很多编程语言里逃离不了的话题。sync.Pool
的无锁是在poolDequeue
和poolChain
层面实现的。
操作对象隔离
纵观整个sync.Pool
的实现,明确了生产者(本地P)访问head,消费者(其他P)访问tail,从P的角度切入操作方向,实现了目标操作对象层面的“解耦”,大部分时候两者的操作互不影响。图文示意如下:
原子操作代替锁
poolDequeue
对一些关键变量采用了CAS操作,比如poolDequeue.headTail
,既可完整保证并发又能降低相比锁而言的开销。
行为隔离——链表
这点与“操作对象隔离”是相辅相成的,一旦设计目标为尽量减少对同一对象的操作锁,就需要对行为进行隔离,链表能很好的满足这个设计目标:特定的P访问特定的位置。从整个过程来看,链表是减少锁的高效数据结构。
Victim Cache降低GC开销
GC的开销已经足够足够小了,但仍不可避免。对于sync.Pool
而言,避免极端情况GC的开销也是重点之一,所以Go 1.13的sync.Pool
引入了Victim Cache机制,有效拉长真正回收的时间线,从而减小单次GC的开销。
延伸阅读
- [High Performance Cache Architecture Using Victim
Cache](https://www.ijedr.org/papers/...
有疑问加站长微信联系(非本文作者)