本文字数:5688 字
精读时间:13 分钟
也可在 6 分钟内完成速读
Go 1.13 版本中有几个比较大的修改,其中之一是sync.Pool修改了部分实现,减小某些极端情况下的性能开销。文中内容来源于笔者读完 sync.Pool 源代码的思考和总结,内容以 Go 1.13 中的实现为准,少量内容涉及到 Go 1.13 之前,如有误区请读者多多指教。
概念
在本文内容开始之前需要理解几个在 Go runtime 中的概念,以便于更好的理解sync.Pool中一些实现。
goroutine 抢占
Go 中调度器是 GMP 模型,简单理解 G 就是 goroutine;M 可以类比内核线程,是执行 G 的地方;P 是调度 G 以及为 G 的执行准备所需资源。一般情况下,P 的数量 CPU 的可用核心数,也可由runtime.GOMAXPROCS指定。本文的重点并非 goroutine 调度器,在此不做详细解释,感兴趣可以翻阅延伸阅读的文章。
Go 有这样的调度规则:某个 G 不能一直占用 M,在某个时刻的时候,runtime(参见sysmon)会判断当前 M 是否可以被抢占,即 M 上正在执行的 G 让出。P 在合理的时刻将 G 调度到合理的 M 上执行,在 runtime 里面,每个 P 维护一个本地存放待执行 G 的队列 localq,同时还存在一个全局的待执行 G 的队列 globalq;调度就是 P 从 localq 或 globalq 中取出 G 到对应的 M 上执行,所谓抢占,runtime 将 G 抢占移出运行状态,拷贝 G 的执行栈放入待执行队列中,可能是某个 P 的 localq,也可能是 globalq,等待下一次调度,因此当被抢占的 G 重回待执行队列时有可能此时的 P 与前一次运行的 P 并非同一个。
所谓禁止抢占,即当前执行 G 不允许被抢占调度,直到禁止抢占标记解除。Go runtime 实现了 G 的禁止抢占与解除禁止抢占。
func runtime_procPin() int
禁止抢占,标记当前 G 在 M 上不会被抢占,并返回当前所在 P 的 ID。
func runtime_procUnpin()
解除 G 的禁止抢占状态,之后 G 可被抢占。
数据结构
poolDequeue
type poolDequeue struct {
headTail uint64
vals []eface
}
type eface struct {
typ, val unsafe.Pointer
}
poolDequeue被实现为单生产者多消费者的固定大小的无锁 Ring 式队列。生产者可以从 head 插入 head 删除,而消费者仅可从 tail 删除。headTail指向了队列的头和尾,通过位运算将 head 和 tail 位置存入headTail变量中。
func (d *poolDequeue) pushHead(val interface{}) bool {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// Ring式队列,头尾相等则队列已满
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// 原子操作拿到slot.typ
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
return false
}
if val == nil {
val = dequeueNil(nil)
}
// slot占位,将val存入vals中
*(*interface{})(unsafe.Pointer(slot)) = val
// 更改队列指向头
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
1.在头入时,先判断队列是否已满,判断条件:head == tail
2.从队列中取到 head 位置的slot,根据slot.typ判断当前 slot 是否已被存放数据,注意这里使用了atomic.LoadPointer取代锁操作。
3.将val赋值给slot,这里实现的比较巧妙,slot是eface类型,将slot转为interface{}类型,这样val能以interface{}赋值给slot让slot.typ和slot.val指向其内存块,这样slot.typ和slot.val均不为空,这也就是第 2 点条件判断的来由。插入成功后 head 加 1,指向队头的前一个空位,由于插入删除都涉及到对headTail的修改,此处使用原子操作取代锁。
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 判断队列是否为空
if tail == head {
return nil, false
}
// head位置是队头的前一个位置,所以此处要先退一位
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
// 取出val
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// 重置slot,typ和val均为nil
*slot = eface{}
return val, true
}
popHead的代码比较简单,流程上无非就是判断队列是否空,拿出slot转换为interface{}然后重置slot。popHead与pushHead有一点需要注意,在popHead中,是先修改了headTail,然后再取slot,而在pushHead中是先插入数据,然后再修改headTail,至于为什么这里先留一个疑问,后面将会详细解释。
func (d *poolDequeue) popTail() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 判断队列是否空
if tail == head {
return nil, false
}
ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
// 取出val
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// 重置slot.typ
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
return val, true
}
popTail的代码类似popHead,只是删除对象从队首变成队尾,注意结尾部分代码,使用了atomic.StorePointer取代锁操作。
poolChain
poolDequeue被实现为 Ring 式队列,而poolChain则是基于poolDequeue实现为双向链表。
type poolChain struct {
head *poolChainElt
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
next, prev *poolChainElt
}
同理,poolChain也实现了pushHead,popHead和popTail。
func (c *poolChain) pushHead(val interface{}) {
d := c.head
if d == nil {
// poolDequeue初始长度为8
const initSize = 8
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
if d.pushHead(val) {
return
}
// 前一个poolDequeue长度的2倍
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
newSize = dequeueLimit
}
// 首尾相连,构成链表
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
到这里大概就明白了,poolDequeue是在poolChain的pushHead中创建的,且每次创建的长度都是前一个poolDequeue长度的 2 倍,初始长度为 8。
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
for d != nil {
if val, ok := d.popHead(); ok {
return val, ok
}
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
popHead以队首向队尾为方向遍历链表,对每个poolDequeue执行popHead尝试取出存放的对象。
func (c *poolChain) popTail() (interface{}, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
return nil, false
}
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
popTail以队尾向队队首为方向遍历链表,对每个poolDequeue执行popTail尝试从尾部取出存放的对象。
宏观上来看,poolChain的结构如下图:
Pool 的源代码实现
poolChain为 sync.Pool 的底层数据结构,接下来一览 Pool 的实现。
type Pool struct {
// ...
// 每个P的本地队列,实际类型为[P]poolLocal
local unsafe.Pointer
// [P]poolLocal的大小,<= P
localSize uintptr
victim unsafe.Pointer
victimSize uintptr
// 自定义的对象创建回调,当pool中无可用对象时会调用此函数
New func() interface{}
}
type poolLocalInternal struct {
// 每个P的私有共享,使用时无需加锁
private interface{}
// 对象列表,本地P可以pushHead/popHead,其他P仅能popTail
shared poolChain
}
type poolLocal struct {
poolLocalInternal
// 伪共享,仅占位用,防止在cache line上分配多个poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
接下来看看 Pool 的具体实现。
Pool.Get
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
s := atomic.LoadUintptr(&p.localSize)
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
pin首先标记了当前 G 禁止抢占,在runtime_procUnpin之前,当前 G 和 P 不会被抢占。此处之所以标记禁止抢占是因为下文中有使用到 P ID,如果被抢占了,有可能接下里使用的 P ID 与所绑定的 P 并非同一个。
在获得 P ID 之后,当 P ID 小于p.local数组长度时在p.local数组里找到 P 对应的poolLocal对象,否则进入pinSlow函数创建新的poolLocal。
func (p *Pool) pinSlow() (*poolLocal, int) {
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p)
}
// 当前P的数量
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
atomic.StoreUintptr(&p.localSize, uintptr(size))
return &local[pid], pid
}
pinSlow的实现比较简单,即当前 P ID 在 Pool 中没有对应的poolLocal对象时,则创建一个新的poolLocal对象,旧的poolLocal将会进入 GC。仔细观察pinSlow函数发现先执行了runtime_procUnpin,随后有执行了runtime_procPin,后者的目的在于获取最新的 P ID,这里的意图有点难以理解,在仔细阅读和理解之后,发现目的在于尽量减少[]poolLocal的创建次数,因为pinSlow之前和pinSlow里面可能会因为解除禁止抢占而导致绑定的 P 不一致,万一新绑定的 P 里存在可用的poolLocal呢。
func (p *Pool) Get() interface{} {
// ...
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil {
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
// 解除禁止抢占
runtime_procUnpin()
// ...
if x == nil && p.New != nil {
x = p.New()
}
return x
}
1.根据当前 P 取得对应的poolLocal和 P ID。
2.若当前poolLocal.private不为空,则表示可复用此对象;若为空,则在poolLocal.shared队列中获取对象。
3.若poolLocal.shared无可用对象,则进入getSlow获取对象。
4.若未能从其他 P 成功窃取对象,则调用自定义的对象创建函数,如果该函数不为nil。
func (p *Pool) getSlow(pid int) interface{} {
size := atomic.LoadUintptr(&p.localSize)
locals := p.local
// 从其他P中窃取对象
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 尝试从victim cache中取对象
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 清空victim cache
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
进入getSlow的前提是当前 P 本地无可用对象,于是转头去其他 P 的里窃取对象,getSlow就是做这件事情。
5.遍历Pool.local中的其他 P,确认其他 P 的shared里是否有可用对象,如果有,则从链尾取出。
6.如果其他 P 中也没有可用对象,则尝试从 victim cache 中取可用对象,至于 victim cache 本文下半部分将会做详细解释。
纵观整个 Get 过程会发现,从当前 P 的poolLocal中取对象时使用的时popHead,而从其他 P 的poolLocal中窃取对象时使用的时popTail,再回到上文中对poolChain的定义,可以知道,当前 P 对本地 poolLocal 是生产者,对其他 P 的 poolLocal 而言是消费者。
再次回到poolDequeue和poolChain上。我们知道某一时刻 P 只会调度一个 G,那么对于生产者而言,调用pushHead和popHead并不需要加锁,因为当前 P 操作的是本地poolLocal;当消费者是其他 P,在进行popTail操作时,则会对pushHead以及popHead形成竞争关系,对这种问题,poolDequeue的实现直指要害。
首先注意eface这个结构,若插入成功eface下的两个字段会指向要缓存对象的内存地址,在pushHead中使用了原子操作判断typ字段是否为nil,存在这样一种可能性:pushHead所取到的slot正在popTail里准备重置,这种情况下pushHead会直接返回失败。
回到竞争问题上,pushHead的流程可以简化为先取slot,再判断是否可插入最后修改headTail,而popTail的流程可以简化为先修改headTail再取slot然后重置slot,pushHead修改 head 位置,popTail修改 tail 位置,所以对于headTail字段使用原子操作避免即可读写冲突。
疑问是为何popTail中需要先修改headTail呢,因为存在其他 P 都会到当前 P 上窃取对象,当多个 P 都调用本地 P 的popTail时,竞争现象就会更加明显,所以此时应尽早修改headTail,一旦某个 P 窃取到了其他 P 就无法再窃取此对象。
Pool.Put
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// ...
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
// ...
}
Put的实现比较简单,优先将对象存入private,若private已存在则放入 shared 链表中,pin中会标记禁止抢占,因此需要在pin结束以及 Put 逻辑结束后取消禁止抢占。
Victim Cache
Victim Cache 本是计算机架构里面的一个概念,是 CPU 硬件处理缓存的一种技术,sync.Pool引入的意图在于降低 GC 压力同时提高命中率,本文并不需要详解 Victim Cache 的原理,分析sync.Pool即可明白其意图。对于 Pool 而言有一点需要明白,这个 Pool 并非是无限制扩展的,否则会引起内存溢出。几乎所有的池技术中,都会在某个时刻清空或清除部分缓存对象,那么在 Go 中何时清理未使用的对象呢?
在 Pool.Get 函数中,取不到对象时会尝试从p.victim中取,用完后放回当前 P 的本地队列,而p.vcitim是什么被创建的呢?是在poolCleanup函数中,该函数会在 GC 时被调用到,在init函数里注册。
func poolCleanup() {
// 1
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// 2
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
oldPools, allPools = allPools, nil
}
poolCleanup会在 STW 阶段被调用,函数实现虽然看起来简单,但其意图较为复杂,那么该如何解释呢?
尝试模拟一下实际情况:
1.初始状态下,oldPools和allPools均为nil
2.第 1 次调用 Get,由于p.local为nil,将会在pinSlow中创建p.local,然后将p放入allPools,此时allPools长度为 1,oldPools为nil
3.对象使用完毕,第 1 次调用 Put 放回对象
4.第 1 次 GC STW 阶段,allPools中所有p.local将值赋值给victim并置为nil,最后allPools为nil,oldPools长度为 1
5.第 2 次调用 Get,由于p.local为nil,此时会从p.victim里面尝试取对象
6.对象使用完毕,第 2 次调用 Put 放回对象,但由于p.local为nil,重新创建p.local,并将对象放回,此时allPools长度为 1,oldPools长度为 1
7.第 2 次 GC STW 阶段,oldPools中所有p.victim置nil,前一次的 cache 在本次 GC 时被回收,allPools所有p.local将值赋值给victim并置为nil,最后allPools为 nil,oldPools长度为 1
再来看看 Go 1.13 以前poolCleanup的实现:
func poolCleanup() {
for i, p := range allPools {
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
allPools = []*Pool{}
}
Go 1.13 以前poolCleanup的实现简单粗暴,每次 GC STW 阶段遍历allPools,清空p.local、poolLocal.shared。通过两者的对比发现,新版的实现相比 Go 1.13 之前,GC 的粒度拉大了,由于实际回收的时间线拉长,单位时间内 GC 的开销减小。
由此基本明白p.victim的作用,它的定位是次级缓存,GC 时将对象放入其中,下一次 GC 来临之前如果有 Get 调用则会从p.victim中取,直到再一次 GC 来临做回收,同时由于从p.victim中取出对象使用完毕之后并未放回p.victim中,在一定程度也减小了下一次 GC 的开销。原来 1 次 GC 的开销被拉长到 2 次且会有一定程度的开销减小,这就是p.victim引入的意图。
关于 Victim Cache 更多的信息可以在延伸阅读中找到。
sync.Pool 的设计理念
无锁
无锁编程是很多编程语言里逃离不了的话题。sync.Pool的无锁是在poolDequeue和poolChain层面实现的。
操作对象隔离
纵观整个 sync.Pool的实现,明确了生产者(本地 P)访问 head,消费者(其他 P)访问 tail,从 P 的角度切入操作方向,实现了目标操作对象层面的 “解耦”,大部分时候两者的操作互不影响。图文示意如下:
原子操作代替锁
poolDequeue对一些关键变量采用了 CAS 操作,比如poolDequeue.headTail,既可完整保证并发又能降低相比锁而言的开销。
行为隔离——链表
这点与 “操作对象隔离” 是相辅相成的,一旦设计目标为尽量减少对同一对象的操作锁,就需要对行为进行隔离,链表能很好的满足这个设计目标:特定的 P 访问特定的位置。从整个过程来看,链表是减少锁的高效数据结构。
Victim Cache 降低 GC 开销
GC 的开销已经足够足够小了,但仍不可避免。对于sync.Pool而言,避免极端情况 GC 的开销也是重点之一,所以 Go 1.13 的sync.Pool引入了 Victim Cache 机制,有效拉长真正回收的时间线,从而减小单次 GC 的开销。
延伸阅读
High Performance Cache Architecture Using Victim Cache
Go: Understand the Design of Sync.Pool
也谈 Goroutine 调度器
Goroutine 调度实例简要分析
有疑问加站长微信联系(非本文作者)