更新至2012.4.8的vitess代码
新的代码增加了同步用的条件变量,没有空闲资源时的排队不再使用channel来同步(使用其它编程语言的同学可以方便的移植这个代码了),
转而使用condition variable。不再使用mu.RLock,统一使用Lock,不再纠结。 整体代码清晰了许多。
为了进一步提高性能和代码复用,vitess还提供了通用的池管理,RoundRobin.go中实现了通用的资源池,方便管理池内资源总数,超时。
type Factory func() (Resource, error) //工厂方法,资源创建函数,如connection
// Every resource needs to suport the Resource interface.
type Resource interface {
Close()
IsClosed() bool
}
为什么要实现这两个接口呢?,因为资源池需要知道如何Close超时的资源,
以及哪些资源是空闲的,哪些已经关闭了。
type RoundRobin struct {
// mu controls resources & factory
// Use Lock to modify, RLock otherwise
mu sync.RWMutex
resources chan fifoWrapper //用chan来模拟一个FIFO的队列,先来先服务
factory Factory
// Use sync/atomic to access the following vars
size int64 //LQ: 池的总大小
waitCount int64 //LQ: 还有多少人在等池内出现空闲资源
waitTime int64 //LQ: 等待空闲资源总共花了多少时间
idleTimeout int64 //LQ: 最多允许资源空闲多长时间,超过则close空闲资源
}
type fifoWrapper struct {
resource Resource
timeUsed time.timeUsed //LQ: 用于控制超时,初始值为资源上次进入池内的时间(见Put函数)
}
抱歉都快到电视剧的第N集了才看到golang里面的重量级组件channel,简单的理解可以认为channel是个
支持生产者消费者模型的同步队列,而且是个不需要销毁的队列,golang会自动将不再使用的channel当垃圾回收掉。
不多说,上代码,详细内容见标注的代码注释
// has not been reached, it will create a new one using the factory. Otherwise,
// it will indefinitely wait till the next resource becomes available.
func (self *RoundRobin) Get() (resource Resource, err error) {
return self.get(true)
}
// TryGet will return the next available resource. If none is available, and capacity
// has not been reached, it will create a new one using the factory. Otherwise,
// it will return nil with no error.
func (self *RoundRobin) TryGet() (resource Resource, err error) {
return self.get(false)
}
func (self *RoundRobin) get(wait bool) (resource Resource, err error) {
self.mu.Lock()
defer self.mu.Unlock()
// Any waits in this loop will release the lock, and it will be
// reacquired before the waits return.
for {
select {
case fw := <-self.resources:
// Found a free resource in the channel
if self.idleTimeout > 0 && fw.timeUsed.Add(self.idleTimeout).Sub(time.Now()) < 0 {
// resource has been idle for too long. Discard & go for next.
go fw.resource.Close()
self.size--
continue
}
return fw.resource, nil
default:
// resource channel is empty
if self.size >= int64(cap(self.resources)) {
// The pool is full
if wait {
start := time.Now()
self.available.Wait() //没有空闲资源了,等着吧,不如上一版本的代码自然啊
self.recordWait(start)
continue
}
return nil, nil
}
// Pool is not full. Create a resource.
if resource, err = self.waitForCreate(); err == nil {
// Creation successful. Account for this by incrementing size.
self.size++
}
return resource, err
}
}
panic("unreachable")
}
func (self *RoundRobin) recordWait(start time.Time) {
self.waitCount++
self.waitTime += time.Now().Sub(start)
}
//LQ: 这里的increment和decrement应该是多余的,没看明白作者是什么目的,和惊群有啥关系
//为了避免self.factory()比较耗时,执行self.factory时unlock还是有必要的
func (self *RoundRobin) waitForCreate() (resource Resource, err error) {
// Prevent thundering herd: increment size before creating resource, and decrement after.
self.size++
self.mu.Unlock()
defer func() {
self.mu.Lock()
self.size--
}()
return self.factory()
}
在代码注释中可以看到,为了避免惊群效应,这里采用的方式是先increment,本人也不太明白,为什么这样能避免惊群效应。
看完了Get发现排队等待是那么的自然,一行代码的事情。再来看Put函数,我们会发现唤醒也是那么的简洁。
// Put will return a resource to the pool. You MUST return every resource to the pool,
// even if it's closed. If a resource is closed, Put will discard it. Thread synchronization
// between Close() and IsClosed() is the caller's responsibility.
func (self *RoundRobin) Put(resource Resource) {
self.mu.Lock()
defer self.mu.Unlock()
defer self.available.Signal() //LQ: 排队的兄弟该醒醒了
if self.size > int64(cap(self.resources)) {
go resource.Close()
self.size--
} else if resource.IsClosed() {
self.size--
} else {
self.resources <- fifoWrapper{resource, time.Now()}
}
}
有疑问加站长微信联系(非本文作者)