摘自Go语言实战第7章
本章会介绍pool包。这个包用于展示如何使用有缓冲的通道实现资源池,来管理可以在任意数量的goroutine之间共享及独立使用的资源。这种模式在需要共享一组静态资源的情况(如共享数据库连接或者内存缓冲区)下非 常有用。如果goroutine需要从池里得到这些资源中的一个,它可以从池里申请,使用完后归还到资源池里。
本书是以 Go 1.5 版本为基础写作而成的。在 Go 1.6 及之后的版本中,标准库里自带了资源池的实现(sync.Pool)。推荐使用。——译者注
//main.go
// 这个示例程序展示如何使用 pool 包
// 来共享一组模拟的数据库连接
package main
import (
"io"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/goinaction/code/chapter7/patterns/pool"
)
const (
maxGoroutines = 25 // 要使用的 goroutine 的数量
pooledResources = 2 // 池中的资源的数量
)
// dbConnection 模拟要共享的资源
type dbConnection struct {
ID int32
}
// Close 实现了 io.Closer 接口,以便 dbConnection
// 可以被池管理。Close 用来完成任意资源的
// 释放管理
func (dbConn *dbConnection) Close() error {
log.Println("Close: Connection", dbConn.ID)
return nil
}
// idCounter 用来给每个连接分配一个独一无二的 id
var idCounter int32
// createConnection 是一个工厂函数,
// 当需要一个新连接时,资源池会调用这个函数
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection", id)
return &dbConnection{id}, nil
}
// main 是所有 Go 程序的入口
func main() {
var wg sync.WaitGroup
wg.Add(maxGoroutines)
// 创建用来管理连接的池
p, err := pool.New(createConnection, pooledResources)
if err != nil {
log.Println(err)
}
// 使用池里的连接来完成查询
for query := 0; query < maxGoroutines; query++ {
// 每个 goroutine 需要自己复制一份要
// 查询值的副本,不然所有的查询会共享
// 同一个查询变量
go func(q int) {
performQueries(q, p)
wg.Done()
}(query)
}
// 等待 goroutine 结束
wg.Wait()
// 关闭池
log.Println("Shutdown Program.")
p.Close()
}
// performQueries 用来测试连接的资源池
func performQueries(query int, p *pool.Pool) {
// 从池里请求一个连接
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
// 将该连接释放回池里
defer p.Release(conn)
// 用等待来模拟查询响应
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("Query: QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}
//pool.go
// Fatih Arslan 和 Gabriel Aszalos 协助完成了这个示例
// 包 pool 管理用户定义的一组资源
package pool
import (
"errors"
"io"
"log"
"sync"
)
// Pool 管理一组可以安全地在多个 goroutine 间
// 共享的资源。被管理的资源必须
// 实现 io.Closer 接口
type Pool struct {
m sync.Mutex
resources chan io.Closer
factory func() (io.Closer, error)
closed bool
}
// ErrPoolClosed 表示请求(Acquire)了一个
// 已经关闭的池
var ErrPoolClosed = errors.New("Pool has been closed.")
// New 创建一个用来管理资源的池。
// 这个池需要一个可以分配新资源的函数,
// 并规定池的大小
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("Size value too small.")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
// Acquire 从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
// 检查是否有空闲的资源
case r, ok := <-p.resources:
log.Println("Acquire:", "Shared Resource")
if !ok {
return nil, ErrPoolClosed
}
return r, nil
// 因为没有空闲资源可用,所以提供一个新资源
default:
log.Println("Acquire:", "New Resource")
return p.factory()
}
}
// Release 将一个使用后的资源放回池里
func (p *Pool) Release(r io.Closer) {
// 保证本操作和 Close 操作的安全
p.m.Lock()
defer p.m.Unlock()
// 如果池已经被关闭,销毁这个资源
if p.closed {
r.Close()
return
}
select {
// 试图将这个资源放入队列
case p.resources <- r:
log.Println("Release:", "In Queue")
// 如果队列已满,则关闭这个资源
default:
log.Println("Release:", "Closing")
r.Close()
}
}
// Close 会让资源池停止工作,并关闭所有现有的资源
func (p *Pool) Close() {
// 保证本操作与 Release 操作的安全
p.m.Lock()
defer p.m.Unlock()
// 如果 pool 已经被关闭,什么也不做
if p.closed {
return
}
// 将池关闭
p.closed = true
// 在清空通道里的资源之前,将通道关闭
// 如果不这样做,会发生死锁
close(p.resources)
// 关闭资源
for r := range p.resources {
r.Close()
}
}
1.Release 方法的实现。该方法一开始在第 61 行和第 62 行对互斥量进行加锁和解锁。这和 Close 方法中的互斥量是同一个互斥量。这样可以阻止这两个方法在不同 goroutine 里同时运行。使用互斥量有两个目的。第一,可以保护第 65 行中读取 closed标志的行为,保证同一时刻不会有其他 goroutine 调用 Close 方法写同一个标志。第二,我们不想往一个已经关闭的通道里发送数据,因为那样会引起崩溃。如果 closed 标志是 true,我们就知道 resources 通道已经被关闭。
在第 66 行,如果池已经被关闭,会直接调用资源值 r 的 Close 方法。因为这时已经清空并关闭了池,所以无法将资源重新放回到该资源池里。对 closed 标志的读写必须进行同步,否则可能误导其他 goroutine,让其认为该资源池依旧是打开的,并试图对通道进行无效的操作。
有疑问加站长微信联系(非本文作者)