Golang 学习笔记十二 实例pool

懒皮 · · 732 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

摘自Go语言实战第7章

本章会介绍pool包。这个包用于展示如何使用有缓冲的通道实现资源池,来管理可以在任意数量的goroutine之间共享及独立使用的资源。这种模式在需要共享一组静态资源的情况(如共享数据库连接或者内存缓冲区)下非 常有用。如果goroutine需要从池里得到这些资源中的一个,它可以从池里申请,使用完后归还到资源池里。

本书是以 Go 1.5 版本为基础写作而成的。在 Go 1.6 及之后的版本中,标准库里自带了资源池的实现(sync.Pool)。推荐使用。——译者注

//main.go
// 这个示例程序展示如何使用 pool 包
// 来共享一组模拟的数据库连接
package main

import (
    "io"
    "log"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"

    "github.com/goinaction/code/chapter7/patterns/pool"
)

const (
    maxGoroutines   = 25 // 要使用的 goroutine 的数量
    pooledResources = 2  // 池中的资源的数量
)

// dbConnection 模拟要共享的资源
type dbConnection struct {
    ID int32
}

// Close 实现了 io.Closer 接口,以便 dbConnection
// 可以被池管理。Close 用来完成任意资源的
// 释放管理
func (dbConn *dbConnection) Close() error {
    log.Println("Close: Connection", dbConn.ID)
    return nil
}

// idCounter 用来给每个连接分配一个独一无二的 id
var idCounter int32

// createConnection 是一个工厂函数,
// 当需要一个新连接时,资源池会调用这个函数
func createConnection() (io.Closer, error) {
    id := atomic.AddInt32(&idCounter, 1)
    log.Println("Create: New Connection", id)

    return &dbConnection{id}, nil
}

// main 是所有 Go 程序的入口
func main() {
    var wg sync.WaitGroup
    wg.Add(maxGoroutines)

    // 创建用来管理连接的池
    p, err := pool.New(createConnection, pooledResources)
    if err != nil {
        log.Println(err)
    }

    // 使用池里的连接来完成查询
    for query := 0; query < maxGoroutines; query++ {
        // 每个 goroutine 需要自己复制一份要
        // 查询值的副本,不然所有的查询会共享
        // 同一个查询变量
        go func(q int) {
            performQueries(q, p)
            wg.Done()
        }(query)
    }

    // 等待 goroutine 结束
    wg.Wait()

    // 关闭池
    log.Println("Shutdown Program.")
    p.Close()
}

// performQueries 用来测试连接的资源池
func performQueries(query int, p *pool.Pool) {
    // 从池里请求一个连接
    conn, err := p.Acquire()
    if err != nil {
        log.Println(err)
        return
    }

    // 将该连接释放回池里
    defer p.Release(conn)

    // 用等待来模拟查询响应
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("Query: QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}

//pool.go
// Fatih Arslan 和 Gabriel Aszalos 协助完成了这个示例
// 包 pool 管理用户定义的一组资源
package pool

import (
    "errors"
    "io"
    "log"
    "sync"
)

// Pool 管理一组可以安全地在多个 goroutine 间
// 共享的资源。被管理的资源必须
// 实现 io.Closer 接口
type Pool struct {
    m         sync.Mutex
    resources chan io.Closer
    factory   func() (io.Closer, error)
    closed    bool
}

// ErrPoolClosed 表示请求(Acquire)了一个
// 已经关闭的池
var ErrPoolClosed = errors.New("Pool has been closed.")

// New 创建一个用来管理资源的池。
// 这个池需要一个可以分配新资源的函数,
// 并规定池的大小
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("Size value too small.")
    }

    return &Pool{
        factory:   fn,
        resources: make(chan io.Closer, size),
    }, nil
}

// Acquire 从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
    select {
    // 检查是否有空闲的资源
    case r, ok := <-p.resources:
        log.Println("Acquire:", "Shared Resource")
        if !ok {
            return nil, ErrPoolClosed
        }
        return r, nil

    // 因为没有空闲资源可用,所以提供一个新资源
    default:
        log.Println("Acquire:", "New Resource")
        return p.factory()
    }
}

// Release 将一个使用后的资源放回池里
func (p *Pool) Release(r io.Closer) {
    // 保证本操作和 Close 操作的安全
    p.m.Lock()
    defer p.m.Unlock()

    // 如果池已经被关闭,销毁这个资源
    if p.closed {
        r.Close()
        return
    }

    select {
    // 试图将这个资源放入队列
    case p.resources <- r:
        log.Println("Release:", "In Queue")

    // 如果队列已满,则关闭这个资源
    default:
        log.Println("Release:", "Closing")
        r.Close()
    }
}

// Close 会让资源池停止工作,并关闭所有现有的资源
func (p *Pool) Close() {
    // 保证本操作与 Release 操作的安全
    p.m.Lock()
    defer p.m.Unlock()

    // 如果 pool 已经被关闭,什么也不做
    if p.closed {
        return
    }

    // 将池关闭
    p.closed = true

    // 在清空通道里的资源之前,将通道关闭
    // 如果不这样做,会发生死锁
    close(p.resources)

    // 关闭资源
    for r := range p.resources {
        r.Close()
    }
}

1.Release 方法的实现。该方法一开始在第 61 行和第 62 行对互斥量进行加锁和解锁。这和 Close 方法中的互斥量是同一个互斥量。这样可以阻止这两个方法在不同 goroutine 里同时运行。使用互斥量有两个目的。第一,可以保护第 65 行中读取 closed标志的行为,保证同一时刻不会有其他 goroutine 调用 Close 方法写同一个标志。第二,我们不想往一个已经关闭的通道里发送数据,因为那样会引起崩溃。如果 closed 标志是 true,我们就知道 resources 通道已经被关闭。

在第 66 行,如果池已经被关闭,会直接调用资源值 r 的 Close 方法。因为这时已经清空并关闭了池,所以无法将资源重新放回到该资源池里。对 closed 标志的读写必须进行同步,否则可能误导其他 goroutine,让其认为该资源池依旧是打开的,并试图对通道进行无效的操作。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:懒皮

查看原文:Golang 学习笔记十二 实例pool

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

732 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传