golang sync .pool

Stevennnmmm · · 403 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

Go 是一个自动垃圾回收的编程语言,它的算法我们后续会讲到,主要就是采用三色并发标记算法标记对象并回收。我们可以不用考虑为golang来节省什么,但是我们如果想将程序做到优秀我们就不得不考虑减少它gc的次数,毕竟,Go 的自动垃圾回收机制还是有一个 STW(stop-the-world,程序暂停)的时间,而且,大量地创建在堆上的对象,也会影响垃圾回收标记的时间

所以,一般我们做性能优化的时候,会采用对象池的方式,把不用的对象回收起来,避免被垃圾回收掉,这样使用的时候就不必在堆上重新创建了
按照惯例我们先看官方文档

官方文档

// A Pool is a set of temporary objects that may be individually saved and
// retrieved.
//
// Any item stored in the Pool may be removed automatically at any time without
// notification. If the Pool holds the only reference when this happens, the
// item might be deallocated.
//
// A Pool is safe for use by multiple goroutines simultaneously.
//
// Pool's purpose is to cache allocated but unused items for later reuse,
// relieving pressure on the garbage collector. That is, it makes it easy to
// build efficient, thread-safe free lists. However, it is not suitable for all
// free lists.
//
// An appropriate use of a Pool is to manage a group of temporary items
// silently shared among and potentially reused by concurrent independent
// clients of a package. Pool provides a way to amortize allocation overhead
// across many clients.
//
// An example of good use of a Pool is in the fmt package, which maintains a
// dynamically-sized store of temporary output buffers. The store scales under
// load (when many goroutines are actively printing) and shrinks when
// quiescent.
//
// On the other hand, a free list maintained as part of a short-lived object is
// not a suitable use for a Pool, since the overhead does not amortize well in
// that scenario. It is more efficient to have such objects implement their own
// free list.
//
// A Pool must not be copied after first use.

引用:/src/sync/pool.go:44

//池是一组临时对象,可以分别保存和
//检索到。
//
//池中存储的任何项目都可以随时自动删除,而无需
//通知。如果发生这种情况时,池中只有唯一的引用,则
//可能已释放项目。
//
//一个Pool可以安全地同时被多个goroutine使用。
//
//池的目的是缓存已分配但未使用的项目,以供以后重用,
//减轻垃圾收集器的压力。也就是说,它很容易
//建立有效的,线程安全的空闲列表。但是,它并不适合所有人
//免费列表。
//
//池的适当用法是管理一组临时项
//在并发的独立服务器之间静默共享并有可能被重用
//包的客户。池提供了一种摊销分配开销的方法
//在许多客户中。
//
// fmt包中有一个很好使用Pool的示例,它维护了一个
//动态大小的临时输出缓冲区存储。店铺规模在
//加载(当许多goroutine正在活动打印时),并在收缩时收缩
//静止。
//
//另一方面,作为短期对象的一部分维护的空闲列表是
//不适合作为Pool的用途,因为间接费用无法在
//这种情况。使此类对象实现自己的效率更高
//空闲列表。
//
//池在第一次使用后不得复制。

具体使用

sync.Pool 数据类型用来保存一组可独立访问的临时对象,为什么说是临时,因为可能随时被移除掉,在stw的时候也有可能被移除掉。所以我们不要使用这个做长链接保存

pool是一个线程安全的对象,池中的东西随时可能被销毁

New

Pool struct 包含一个 New 字段,这个字段的类型是函数 func() interface{}。当调用 Pool 的 Get 方法从池中获取元素,没有更多的空闲元素可返回时,就会调用这个 New 方法来创建新的元素。如果你没有设置 New 字段,没有更多的空闲元素可返回时,Get 方法将返回 nil,表明当前没有可用的元素

Put

这个方法用于将一个元素返还给 Pool,Pool 会把这个元素保存到池中,并且可以复用。但如果 Put 一个 nil 值,Pool 就会忽略这个值。

Get

如果调用这个方法,就会从 Pool取走一个元素,这也就意味着,这个元素会从 Pool 中移除,返回给调用者。不过,除了返回值是正常实例化的元素,Get 方法的返回值还可能会是一个 nil(Pool.New 字段没有设置,又没有空闲元素可以返回),所以你在使用的时候,可能需要判断。

sync.pool源码解析

这个是池的结构

type Pool struct {
    noCopy noCopy          //这个好像是为了使用govet 可以检测冲突
    local     unsafe.Pointer // 这是一个本地的环的指针
    localSize uintptr        // 本地数组大小
    victim     unsafe.Pointer // local from previous cycle
    victimSize uintptr        // size of victims array
    // New optionally specifies a function to generate
    // a value when Get would otherwise return nil.
    // It may not be changed concurrently with calls to Get.
    New func() interface{}
}

在这段代码中,你需要关注一下 local 字段,因为所有当前主要的空闲可用的元素都存放在 local 字段中,请求元素时也是优先从 local 字段中查找可用的元素。local 字段包含一个 poolLocalInternal 字段,并提供 CPU 缓存对齐,从而避免 false sharing。

// Local per-P Pool appendix.
type poolLocalInternal struct {
    private interface{} // Can be used only by the respective P.
    shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
}
type poolLocal struct {
    poolLocalInternal
    // Prevents false sharing on widespread platforms with
    // 128 mod (cache line size) = 0 .
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

而 poolLocalInternal 也包含两个字段:private 和 shared。private,代表一个缓存的元素,而且只能由相应的一个 P 存取。因为一个 P 同时只能执行一个 goroutine,所以不会有并发的问题。shared,可以由任意的 P 访问,但是只有本地的 P 才能 pushHead/popHead,其它 P 可以 popTail,相当于只有一个本地的 P 作为生产者(Producer),多个 P 作为消费者(Consumer),它是使用一个 local-free 的 queue 列表实现的。

  • 其实首先我们得明确几个点存储数据指针是local

  • 当发生GC的时候,pool是怎么运行的:实际上会调用一个函数叫做poolcleanup

1.poolCleanup方法

func poolCleanup() {
    // This function is called with the world stopped, at the beginning of a garbage collection.
    // It must not allocate and probably should not call any runtime functions.
    // Because the world is stopped, no pool user can be in a
    // pinned section (in effect, this has all Ps pinned).
    // Drop victim caches from all pools.
    for _, p := range oldPools {
        p.victim = nil
        p.victimSize = 0
    }
    // Move primary cache to victim cache.
    for _, p := range allPools {
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    }
    // The pools with non-empty primary caches now have non-empty
    // victim caches and no pools have primary caches.
    oldPools, allPools = allPools, nil
}

整体大概意思是:将victim的数据给清空,将local池中的数据丢给victim,

2.get方法

// Get selects an arbitrary item from the Pool, removes it from the
// Pool, and returns it to the caller.
// Get may choose to ignore the pool and treat it as empty.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *Pool) Get() interface{} {
    if race.Enabled {
        race.Disable()
    }
    l, pid := p.pin() //将goroutine 绑定到P上
    x := l.private  
    l.private = nil
    if x == nil {
        // Try to pop the head of the local shard. We prefer
        // the head over the tail for temporal locality of
        // reuse.
        x, _ = l.shared.popHead()
        if x == nil {
            x = p.getSlow(pid)
        }
    }
    runtime_procUnpin()
    if race.Enabled {
        race.Enable()
        if x != nil {
            race.Acquire(poolRaceAddr(x))
        }
    }
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}

其实就四种可能
1.从本地的private取出来x
2.当第一步取不出的时候,从本地的分片头部取一个出来
3.当本地分片没有了,走慢方法getslow
4.当大家都没有了,生成一个新的

3.put方法

// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
    if x == nil {
        return
    }
    if race.Enabled {
        if fastrand()%4 == 0 {
            // Randomly drop x on floor.
            return
        }
        race.ReleaseMerge(poolRaceAddr(x))
        race.Disable()
    }
    l, _ := p.pin()
    if l.private == nil {
        l.private = x
        x = nil
    }
    if x != nil {
        l.shared.pushHead(x)
    }
    runtime_procUnpin()
    if race.Enabled {
        race.Enable()
    }
}

这个要比get方法简单一些
1.如果放入的东西是nil ,return
2.如果private !=nil ,直接放到private中
3.从头部放入本地分片中

3.getslow方法

func (p *Pool) getSlow(pid int) interface{} {
    // See the comment in pin regarding ordering of the loads.
    size := atomic.LoadUintptr(&p.localSize) // load-acquire
    locals := p.local                        // load-consume
    // Try to steal one element from other procs.
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i+1)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }
    // Try the victim cache. We do this after attempting to steal
    // from all primary caches because we want objects in the
    // victim cache to age out if at all possible.
    size = atomic.LoadUintptr(&p.victimSize)
    if uintptr(pid) >= size {
        return nil
    }
    locals = p.victim
    l := indexLocal(locals, pid)
    if x := l.private; x != nil {
        l.private = nil
        return x
    }
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }
    // Mark the victim cache as empty for future gets don't bother
    // with it.
    atomic.StoreUintptr(&p.victimSize, 0)
    return nil
}

这里其实是一个内置函数,我就不做过多讲解,其实就是:当本地分片没有了资源了后,尝试去窃取一个资源,窃取不到的时候我们,就会从victim的内容中去获取一个对象,也就是垃圾分拣站获取一个旧的对象,但是他们的注释其实还是很有意思的:

    // Try the victim cache. We do this after attempting to steal
    // from all primary caches because we want objects in the
    // victim cache to age out if at all possible.

我们想尽可能的不去用这个·victim对象,至于原因我目前还不太了解。

踩坑点

1.内存泄露


var buffers = sync.Pool{
  New: func() interface{} { 
    return new(bytes.Buffer)
  },
}

func GetBuffer() *bytes.Buffer {
  return buffers.Get().(*bytes.Buffer)
}

func PutBuffer(buf *bytes.Buffer) {
  buf.Reset()
  buffers.Put(buf)
}

这段代码其实看上去人畜无害,但是实际上在我们应用中,buf底层是一个包含切片的结构体,当我们将这个切片扩展到一定长度后归还,仅仅是将len重置了,实际上的cap会保留,那么我们知道切片的底层就是array,那么这个内存我们一直回收不了就造成了泄露。

2.内存浪费

除了内存泄漏以外,还有一种浪费的情况,就是池子中的 buffer 都比较大,但在实际使用的时候,很多时候只需要一个小的 buffer,这也是一种浪费现象。其实,我们可以将 buffer 池分成几层。首先,小于 512 byte 的元素的 buffer 占一个池子;其次,小于 1K byte 大小的元素占一个池子;再次,小于 4K byte 大小的元素占一个池子。这样分成几个池子以后,就可以根据需要,到所需大小的池子中获取 buffer 了。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:Stevennnmmm

查看原文:golang sync .pool

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

403 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传