今天在写码之时,发现了同事用到了sync.pool。因不知其因,遂Google之。虽然大概知道其原因和用法。还不能融汇贯通。故写此记,方便日后查阅。直至明了。
正文:
在高并发或者大量的数据请求的场景中,我们会遇到很多问题,垃圾回收就是其中之一(garbage collection),为了减少优化GC,我们一般想到的方法就是能够让对象得以重用。这就需要一个对象池来存储待回收对象,等待下次重用,从而减少对象产生数量。我们可以把sync.Pool类型值看作是存放可被重复使用的值的容器。此类容器是自动伸缩的、高效的,同时也是并发安全的。为了描述方便,我们也会把sync.Pool类型的值称为临时对象池,而把存于其中的值称为对象值。这个类设计的目的是用来保存和复用临时对象,以减少内存分配,降低CG压力。
我们看下Go的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
// Copyright 2013 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package sync import ( "internal/race" "runtime" "sync/atomic" "unsafe" ) // A Pool is a set of temporary objects that may be individually saved and // retrieved. // // Any item stored in the Pool may be removed automatically at any time without // notification. If the Pool holds the only reference when this happens, the // item might be deallocated. // // A Pool is safe for use by multiple goroutines simultaneously. // // Pool's purpose is to cache allocated but unused items for later reuse, // relieving pressure on the garbage collector. That is, it makes it easy to // build efficient, thread-safe free lists. However, it is not suitable for all // free lists. // // An appropriate use of a Pool is to manage a group of temporary items // silently shared among and potentially reused by concurrent independent // clients of a package. Pool provides a way to amortize allocation overhead // across many clients. // // An example of good use of a Pool is in the fmt package, which maintains a // dynamically-sized store of temporary output buffers. The store scales under // load (when many goroutines are actively printing) and shrinks when // quiescent. // // On the other hand, a free list maintained as part of a short-lived object is // not a suitable use for a Pool, since the overhead does not amortize well in // that scenario. It is more efficient to have such objects implement their own // free list. // type Pool struct { local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal localSize uintptr // size of the local array // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. New func() interface{} } // Local per-P Pool appendix. type poolLocal struct { private interface{} // Can be used only by the respective P. shared []interface{} // Can be used by any P. Mutex // Protects shared. pad [128]byte // Prevents false sharing. } // Put adds x to the pool. func (p *Pool) Put(x interface{}) { if race.Enabled { // Under race detector the Pool degenerates into no-op. // It's conforming, simple and does not introduce excessive // happens-before edges between unrelated goroutines. return } if x == nil { return } l := p.pin() if l.private == nil { l.private = x x = nil } runtime_procUnpin() if x == nil { return } l.Lock() l.shared = append(l.shared, x) l.Unlock() } // Get selects an arbitrary item from the Pool, removes it from the // Pool, and returns it to the caller. // Get may choose to ignore the pool and treat it as empty. // Callers should not assume any relation between values passed to Put and // the values returned by Get. // // If Get would otherwise return nil and p.New is non-nil, Get returns // the result of calling p.New. func (p *Pool) Get() interface{} { if race.Enabled { if p.New != nil { return p.New() } return nil } l := p.pin() x := l.private l.private = nil runtime_procUnpin() if x != nil { return x } l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] } l.Unlock() if x != nil { return x } return p.getSlow() } func (p *Pool) getSlow() (x interface{}) { // See the comment in pin regarding ordering of the loads. size := atomic.LoadUintptr(&p.localSize) // load-acquire local := p.local // load-consume // Try to steal one element from other procs. pid := runtime_procPin() runtime_procUnpin() for i := 0; i < int(size); i++ { l := indexLocal(local, (pid+i+1)%int(size)) l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] l.Unlock() break } l.Unlock() } if x == nil && p.New != nil { x = p.New() } return x } // pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P. // Caller must call runtime_procUnpin() when done with the pool. func (p *Pool) pin() *poolLocal { pid := runtime_procPin() // In pinSlow we store to localSize and then to local, here we load in opposite order. // Since we've disabled preemption, GC can not happen in between. // Thus here we must observe local at least as large localSize. // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume if uintptr(pid) < s { return indexLocal(l, pid) } return p.pinSlow() } func (p *Pool) pinSlow() *poolLocal { // Retry under the mutex. // Can not lock the mutex while pinned. runtime_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() // poolCleanup won't be called while we are pinned. s := p.localSize l := p.local if uintptr(pid) < s { return indexLocal(l, pid) } if p.local == nil { allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer((*unsafe.Pointer)(&p.local), unsafe.Pointer(&local[0])) // store-release atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release return &local[pid] } func poolCleanup() { // This function is called with the world stopped, at the beginning of a garbage collection. // It must not allocate and probably should not call any runtime functions. // Defensively zero out everything, 2 reasons: // 1. To prevent false retention of whole Pools. // 2. If GC happens while a goroutine works with l.shared in Put/Get, // it will retain whole Pool. So next cycle memory consumption would be doubled. for i, p := range allPools { allPools[i] = nil for i := 0; i < int(p.localSize); i++ { l := indexLocal(p.local, i) l.private = nil for j := range l.shared { l.shared[j] = nil } l.shared = nil } p.local = nil p.localSize = 0 } allPools = []*Pool{} } var ( allPoolsMu Mutex allPools []*Pool ) func init() { runtime_registerPoolCleanup(poolCleanup) } func indexLocal(l unsafe.Pointer, i int) *poolLocal { return &(*[1000000]poolLocal)(l)[i] } // Implemented in runtime. func runtime_registerPoolCleanup(cleanup func()) func runtime_procPin() int func runtime_procUnpin() |
sync.Pool 最常用的两个函数Get/Put
1 2 3 |
var pool = &sync.Pool{New:func()interface{}{return NewObject()}} pool.Put() Pool.Get() |
对象池在Get的时候没有里面没有对象会返回nil,所以我们需要New function来确保当获取对象对象池为空时,重新生成一个对象返回,前者的功能是从池中获取一个interface{}类型的值,而后者的作用则是把一个interface{}类型的值放置于池中。
1 2 3 4 5 6 7 8 9 10 |
// 建立对象 var pool = &sync.Pool{New:func()interface{}{return "Hello,xiequan"}} // 准备放入的字符串 val := "Hello,World!" // 放入 pool.Put(val) // 取出 log.Println(pool.Get()) // 再取就没有了,会自动调用NEW log.Println(pool.Get()) |
再来看一个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
package main import ( "fmt" "runtime" "runtime/debug" "sync" "sync/atomic" ) func main() { // 禁用GC,并保证在main函数执行结束前恢复GC defer debug.SetGCPercent(debug.SetGCPercent(-1)) var count int32 newFunc := func() interface{} { return atomic.AddInt32(&count, 1) } pool := sync.Pool{New: newFunc} // New 字段值的作用 v1 := pool.Get() fmt.Printf("v1: %v\n", v1) // 临时对象池的存取 pool.Put(newFunc()) pool.Put(newFunc()) pool.Put(newFunc()) v2 := pool.Get() fmt.Printf("v2: %v\n", v2) // 垃圾回收对临时对象池的影响 debug.SetGCPercent(100) runtime.GC() v3 := pool.Get() fmt.Printf("v3: %v\n", v3) pool.New = nil v4 := pool.Get() fmt.Printf("v4: %v\n", v4) } |
通过Get方法获取到的值是任意的。如果一个临时对象池的Put方法未被调用过,且它的New字段也未曾被赋予一个非nil的函数值,那么它的Get方法返回的结果值就一定会是nil。Get方法返回的不一定就是存在于池中的值。不过,如果这个结果值是池中的,那么在该方法返回它之前就一定会把它从池中删除掉。
这样一个临时对象池在功能上看似与一个通用的缓存池相差无几。但是实际上,临时对象池本身的特性决定了它是一个“个性”非常鲜明的同步工具。我们在这里说明它的两个非常突出的特性。
来看一个syscn.pool和bytes.Buffer使用的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
type Dao struct { bp sync.Pool } func New(c *conf.Config) (d *Dao) { d = &Dao{ bp: sync.Pool{ New: func() interface{} { return &bytes.Buffer{} }, }, } return } func (d *Dao) Infoc(args ...string) (value string, err error) { if len(args) == 0 { return } // fetch a buf from bufpool buf, ok := d.bp.Get().(*bytes.Buffer) if !ok { return "", ErrType } // append first arg if _, err := buf.WriteString(args[0]); err != nil { return "", err } for _, arg := range args[1:] { // append ,arg if _, err := buf.WriteString(defaultSpliter); err != nil { return "", err } if _, err := buf.WriteString(strings.Replace(arg, defaultSpliter, defaultReplacer, -1)); err != nil { return "", err } } value = buf.String() buf.Reset() d.bp.Put(buf) return } |
在实现过程中还要特别注意的是Pool本身也是一个对象,要把Pool对象在程序开始的时候初始化为全局唯一。
对象池使用是较简单的,但原生的sync.Pool有个较大的问题:我们不能自由控制Pool中元素的数量,放进Pool中的对象每次GC发生时都会被清理掉。这使得sync.Pool做简单的对象池还可以,但做连接池就有点心有余而力不足了,比如:在高并发的情景下一旦Pool中的连接被GC清理掉,那每次连接DB都需要重新三次握手建立连接,这个代价就较大了。
总结:
对象池的一些适用场景(比如作为临时且状态无关的数据的暂存处),以及一些不适用的场景(比如用来存放数据库连接的实例)。如果我们在做实现技术的选型的时候把临时对象池作为了候选之一,那么就应该好好想想它的“个性”是不是符合你的需要。如果真的适合,那么它的特性一定会为你的程序增光添彩,无论在功能上还是在性能上。而如果它被用在了不恰当的地方,那么就只能适得其反了
参考文献:
https://golang.org/src/sync/pool.go
有疑问加站长微信联系(非本文作者)