目的
Many Go programs and packages try to reuse memory either for locality reasons or to reduce GC pressure。
缓解GC压力
GC(garbage collector):
- 自动垃圾回收,减轻了程序员的压力
- 减轻压力的同时,也增加了运行时开销。
sync.pool应运而生,设计的目的是用来保存和复用临时对象,减小GC分配,降低GC压力。
Pool设计用意是在全局变量里维护的释放链表,尤其是被多个 goroutine 同时访问的全局变量。使用Pool代替自己写的释放链表,可以让程序运行的时候,在恰当的场景下从池里-重用-某项值。
sync.Pool的一种使用场景是,为临时缓冲区创建一个池,多个客户端使用这个缓冲区来共享全局资源。
另一方面,不恰当的使用例子,如果释放链表是某个对象的一部分,并由这个对象维护,而这个对象只由一个客户端使用,在这个客户端工作完成后释放链表,那么用Pool实现这个释放链表是不合适的。
由来讨论
Brad Fizpatrick曾建议在创建一个工友的Cache
类型。这个建议引发了一长串的讨论。Go 语言应该在标准库里提供一个这个样子的类型,还是应当将这个类型作为私下的实现?这个实现应该真的释放内存么?如果释放,什么时候释放?这个类型应当叫做Cache
,或者更应该叫做Pool
https://github.com/golang/go/issues/4720
https://my.oschina.net/u/115763/blog/282376
简单介绍
A Pool is a set of temporary objects that may be individually saved and retrieved.
池是一组可以单独保存和检索的临时对象
Any item stored in the Pool may be removed automatically at any time without notification. If the Pool holds the only reference when this happens, the item might be deallocated.
存储在池中的任何项目都可以随时自动删除,而无需通知。如果发生这种情况时池保存唯一的引用,则可能会释放该项
A Pool is safe for use by multiple goroutines simultaneously
并发安全
上面三句是pool源码上的摘抄解释
pool特性
-没有大小限制,大小只受限与GC的临界值
-对象的最大缓存周期是GC周期,当GC调用时,没有被引用的对象的会被清理掉
-Get方法返回的都是池子中的任意一个对象,没有顺序,注意是没有顺序的;如果当期池子为空,会调用New方法创建一个对象,没有New方法则会返回nil
使用场景
高并发场景下,当多个goroutine都需要创建同⼀个临时对象的时候,因为对象是占内存的,进⽽导致的就是内存回收的GC压⼒增加。
造成 “并发⼤大-占⽤内存⼤大-GC缓慢-处理理并发能⼒力力降低-并发更更 ⼤大”这样的恶性循环。
业界使用
Echo:
使用了sync.pool来从用内存,实现了0动态内存分配
https://echo.labstack.com/guide/routing
Gin:
上面是gin使用pool作为context的缓存
https://github.com/gin-gonic/gin/blob/73ccfea3ba5a115e74177dbfbc1ea0fff88c13f4/gin.go
fmt:
原生的fmt包里,也包含了sync.pool的调用。
源码分析
如上图所示:在go的M、P、G模型中,每个P绑定了一个poolLocalInternal,这结合了go的优势,使得当前P绑定等待队列中的任何G对poolLocalInternal的访问都不需要加锁。每个poolLocalInternal中包含private和shared。private为单个对象,为每个P单独维护,不具有共享特质,每次获取和添加都会首先设置private;shared为一系列的临时对象,为共享队列,各个P之间通过shard共享对象集,在go1.13之前,shard为数组,在1.13之后修改为使用环形数组,通过CAS实现了lock-free。
从全局的角度来看,全局维护了一个统一的结构,如上图所示的红色的pool,pool维护每个产生的local,每个local指向每个P绑定的poolLocalInternal。
before Go1.13:
// A Pool must not be copied after first use.
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
// Local per-P Pool appendix.
// 1.13之前
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared []interface{} // Can be used by any P.
Mutex // Protects shared.
}
上面定义了一个Pool结构体,其中声明了noCopy;poolLocalInternal是每个P的一个附件,其中包含一个private的私有对象,只能当前P访问,在获取和设置的时候会优先从改私有对象中获取和一个shared的数组,可以被任意的P访问。
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l := p.pin()
if l.private == nil {
l.private = x
x = nil
}
runtime_procUnpin()
if x != nil {
l.Lock()
l.shared = append(l.shared, x)
l.Unlock()
}
if race.Enabled {
race.Enable()
}
}
Put函数为sync.pool的主要函数,用于添加对象。调用了p.pin()获取当前P的绑定附件,runtime_procUnpin解除绑定关系,并且设计设置禁止关系(不禁止强占可能造成并发问题),通过P先判断是否可以放进private对象中,否则放进shard数组中。
// Get selects an arbitrary item from the Pool, removes it from the
// Pool, and returns it to the caller.
// Get may choose to ignore the pool and treat it as empty.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l := p.pin()
x := l.private
l.private = nil
runtime_procUnpin()
if x == nil {
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
}
l.Unlock()
if x == nil {
x = p.getSlow()
}
}
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}
func (p *Pool) getSlow() (x interface{}) {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
// Try to steal one element from other procs.
pid := runtime_procPin()
runtime_procUnpin()
for i := 0; i < int(size); i++ {
l := indexLocal(local, (pid+i+1)%int(size))
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock()
}
return x
}
Get函数和Put函数一致,通过pin()获取当前P绑定的附件。先从private中获取,再冲shard中获取,获取失败再调用getslow函数,在getslow函数中,通过遍历获取其余P的shared资源,会偷取最后一个,最后再偷取失败才会使用出事化函数New()
Get执行流程:private->shard->getslow()->New()
// pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() *poolLocal {
pid := runtime_procPin()
// In pinSlow we store to localSize and then to local, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid)
}
return p.pinSlow()
}
func (p *Pool) pinSlow() *poolLocal {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid)
}
if p.local == nil {
allPools = append(allPools, p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid]
}
pin函数索引当前G对应的绑定的P,通过runtime_procPin设置禁止强占,返回当前P拥有的poolLocal,获取不到时调用pinslow进行第二次获取。第二次调用会先使用runtime_procUnpin()进行强占解除,对全局锁加锁,这是如果local为空(第一次创建),则加入全局队列中。
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// Defensively zero out everything, 2 reasons:
// 1. To prevent false retention of whole Pools.
// 2. If GC happens while a goroutine works with l.shared in Put/Get,
// it will retain whole Pool. So next cycle memory consumption would be doubled.
for i, p := range allPools {
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
allPools = []*Pool{}
}
var (
allPoolsMu Mutex
allPools []*Pool
)
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
poolCleanup为运行时的注册函数,在GC开始时调用,逻辑很暴力,三层for循环赋空!
这个版本有啥缺点
- 对全局shared加锁读写,性能较低
- 三层for循环赋空很暴力,容易造成GC的尖峰
- 每次GC对全量清空,造成的缓存命中率下降
After Go1.13
在GO1.13之后,优化了以上的问题:
- 对全局的shard加锁,使用了CAS实现了lock-free
- 对GC造成的尖峰问题,引入了受害者缓存。延长了缓存的声明周期,增加了缓存的命中效率
可以很清楚的发现,和之前的数据结构相比,1.13之后的版本增加了黄色的poolDequene,那这这和黄色部分又是何方神圣呢?
// 1.13之后
// Local per-P Pool appendix.
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
type poolChain struct {
// head is the poolDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *poolChainElt
// tail is the poolDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
// next and prev link to the adjacent poolChainElts in this
// poolChain.
//
// next is written atomically by the producer and read
// atomically by the consumer. It only transitions from nil to
// non-nil.
//
// prev is written atomically by the consumer and read
// atomically by the producer. It only transitions from
// non-nil to nil.
next, prev *poolChainElt
}
// poolDequeue is a lock-free fixed-size single-producer,
// multi-consumer queue. The single producer can both push and pop
// from the head, and consumers can pop from the tail.
//
// It has the added feature that it nils out unused slots to avoid
// unnecessary retention of objects. This is important for sync.Pool,
// but not typically a property considered in the literature.
type poolDequeue struct {
// headTail packs together a 32-bit head index and a 32-bit
// tail index. Both are indexes into vals modulo len(vals)-1.
//
// tail = index of oldest data in queue
// head = index of next slot to fill
//
// Slots in the range [tail, head) are owned by consumers.
// A consumer continues to own a slot outside this range until
// it nils the slot, at which point ownership passes to the
// producer.
//
// The head index is stored in the most-significant bits so
// that we can atomically add to it and the overflow is
// harmless.
headTail uint64
// vals is a ring buffer of interface{} values stored in this
// dequeue. The size of this must be a power of 2.
//
// vals[i].typ is nil if the slot is empty and non-nil
// otherwise. A slot is still in use until *both* the tail
// index has moved beyond it and typ has been set to nil. This
// is set to nil atomically by the consumer and read
// atomically by the producer.
vals []eface
}
对锁的优化:
Go在1.13之后增加了poolDequene:
- lock-free
- 生产者可以进行pushHead和popTail
- 消费者只能进行popTail
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}
func (c *poolChain) pushHead(val interface{}) {
d := c.head
if d == nil {
// Initialize the chain.
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
if d.pushHead(val) {
return
}
// The current dequeue is full. Allocate a new one of twice
// the size.
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
// pushHead adds val at the head of the queue. It returns false if the
// queue is full. It must only be called by a single producer.
func (d *poolDequeue) pushHead(val interface{}) bool {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// Queue is full.
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// Check if the head slot has been released by popTail.
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// Another goroutine is still cleaning up the tail, so
// the queue is actually still full.
return false
}
// The head slot is free, so we own it.
if val == nil {
val = dequeueNil(nil)
}
*(*interface{})(unsafe.Pointer(slot)) = val
// Increment head. This passes ownership of slot to popTail
// and acts as a store barrier for writing the slot.
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
新版本使用l.shared.pushHead(x),进行头添加,删除了锁的使用。
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
for d != nil {
if val, ok := d.popHead(); ok {
return val, ok
}
// There may still be unconsumed elements in the
// previous dequeue, so try backing up.
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
// popHead removes and returns the element at the head of the queue.
// It returns false if the queue is empty. It must only be called by a
// single producer.
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}
// Confirm tail and decrement head. We do this before
// reading the value to take back ownership of this
// slot.
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// We successfully took back slot.
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// Zero the slot. Unlike popTail, this isn't racing with
// pushHead, so we don't need to be careful here.
*slot = eface{}
return val, true
}
在获取临时对象的时候,会首先从private中获取,private为空会接着从shard变量中拉取,shared变量中也没有空闲,接着调用getSlow从其他P中偷取,偷取失败的时候,这时候会使用受害者缓存,这一步是新添加,接着才会调用New()。
Get执行流程:private->shard->getslow()->victim→New()
针对GC尖峰的优化:
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// Because the world is stopped, no pool user can be in a
// pinned section (in effect, this has all Ps pinned).
// Drop victim caches from all pools.
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// Move primary cache to victim cache.
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// The pools with non-empty primary caches now have non-empty
// victim caches and no pools have primary caches.
oldPools, allPools = allPools, nil
}
受害者缓存(Victim Cache):是一个与直接匹配或低相联缓存并用的、容量很小的全相联缓存。当一个数据块被逐出缓存时,并不直接丢弃,而是暂先进入受害者缓存。如果受害者缓存已满,就替换掉其中一项。当进行缓存标签匹配时,在与索引指向标签匹配的同时,并行查看受害者缓存,如果在受害者缓存发现匹配,就将其此数据块与缓存中的不匹配数据块做交换,同时返回给处理器。
新版本的poolCleanup增加了victim,对于原来应该被GC的缓存,添加到了victim,销毁滞后到了下一轮,以此来解决缓存命中率低的问题。
基准测试
package main
import (
"sync"
"testing"
)
type info struct {
Val int
}
func BenchmarkNoPool(b *testing.B) {
b.ResetTimer()
var k *info
for i := 0; i < b.N; i++ {
k = &info{Val: 1}
k.Val += 1
}
}
var pInfo = sync.Pool{New: func() interface{} {
return new(info)
}}
func BenchmarkWithPool(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
k := pInfo.Get().(*info)
// 重置
k.Val = 0
k.Val += 1
pInfo.Put(k)
}
}
测试结果
go test -bench=. -benchmem
goos: darwin
goarch: amd64
pkg: pool_test
BenchmarkNoPool-4 78748666 13.7 ns/op 8 B/op 1 allocs/op
BenchmarkWithPool-4 75934996 16.2 ns/op 0 B/op 0 allocs/op
PASS
ok pool_test 3.962s
函数 | MAXPEOCESS | 总执行次数 | 单次平均耗时(ns) | 单词平均内存(B) | 单次分配次数 |
---|---|---|---|---|---|
BenchmarkNoPool | 4 | 78748666 | 13.7 | 8 | 1 |
BenchmarkWithPool | 4 | 75934996 | 16.2 | 0 | 0 |
有疑问加站长微信联系(非本文作者)