Go内置database/sql连接池 - 源码学习

uuid · 2021-12-27 18:35:43 · 1402 次点击 · 预计阅读时间 8 分钟 · 大约8小时之前 开始浏览    
这是一个创建于 2021-12-27 18:35:43 的文章,其中的信息可能已经有所发展或是发生改变。

引言

Go内置了数据库相关的库 - database/sql,实现数据库操作相关的接口,其中还包含一个很重要的功能 - 连接池,用来实现连接的复用,限制连接的数量,从而提高性能,避免连接数量失控,导致资源消耗不可控。

本文借Go内置的database/sql库,来一起学习如何一步步设计包含连接池的数据库组件,包括模型抽象、连接复用,以及如何管理连接数。

设计

模型抽象

首先,我们要对解决领域进行抽象。

我们目标是设计一个数据库连接组件,所以第一个对象模型很明确 - 数据库对象, 我们将数据库对象抽象为一个DB的结构体,一个对象对应的是一个数据库实例,所以DB必须是单例

其次,数据库需要连接,所以可以对连接进行抽象,命名为Conn,这里我们不关心Conn的属性,而是关心行为,所以Conn类型定义成一个interface,包含所需两个方法:预处理方法Prepare和关闭连接方法Close (注:Prepare方法不再继续展开,实际上就是接收一个sql,返回一个实现Stmt接口的预处对象,接着设置一下参数,最后执行数据库操作)。

由于不同的数据库连接的实现方式会有不同,这时就要考虑隔离变化,对连接的方式进行抽象,定义一个连接接口 - Connector,用来创建连接(依赖倒置原则),当初始化DB的时候再将具体实现注入到DB对象中(也就是依赖注入)。

最终我们可以得到以下几个接口和结构体:

// 数据库对象
type DB struct {
    connector Connector
}

type Conn interface {
    Prepare(query string) (Stmt, error)
    Close() error
}

// 数据库连接接口
type Connector interface {
    Connect(context.Context) (Conn, error)
}

最后,我们给DB对象增加一个获取连接的方法Conn,在不考虑连接池的情况下,调用connector.Connect(ctx)直接获取连接:

// 获取一个连接
func (db *DB) Conn(ctx context.Context) (*Conn, error) {
    return db.connector.Connect(ctx)
}

连接复用

当不考虑到连接池时,上面的实现基本满足需求,但是为了提高性能,连接池还是必须的,接下来我们开始设计连接池的功能。

第一步,需要考虑存储空闲连接,这里采用的是切片来存储:freeConn []*Conn。

接着,需要考虑属性要定义在哪?!空闲的连接需要能被DB实例中的不同方法访问到,所以我们把freeConn定义为DB的一个属性,同时考虑到对freeConn的访问会存在并发安全的问题,需要增加一个锁mu sync.Mutex来保护:

// 数据库对象
type DB struct {
    connector Connector
    mu        sync.Mutex // protects following fields
    freeConn  []*Conn
}

数据库连接获取方法Conn就需要修改为:

func (db *DB) Conn(ctx context.Context) (*Conn, error) {
    db.mu.Lock() // 加锁保护freeConn
    numFree := len(db.freeConn)
    if numFree > 0 {
        conn := db.freeConn[0]
        // 移除第一个连接
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        db.mu.Unlock()
        return conn, nil
    }
    db.mu.Unlock()
    return db.connector.Connect(ctx) // 没有空闲连接,重新创建
}

连接要复用,就不能关闭,用完需要放回连接池中,所以DB需要一个将连接放回连接池的方法 - putConn:

func (db *DB) putConn(dc Conn) {
    db.mu.Lock()
    db.freeConn = append(db.freeConn, dc)
    db.mu.Unlock()
}

但是,putConn方法要在怎么被调用?!不能因为增加了连接池功,让客户端的改变使用方式,所以我们应该考虑将putConn调用放到一个合理的地方 - Conn的Close()方法中。

在没有连接池功能的时候,一个Conn用完了就一定会调用Close()释放资源,有连接池功能后,就不再是直接关闭连接,而是释放到连接池中,提供给后续请求使用。

对此,我们对Conn进行改造,增加一层代理,命名为PConn,将原来的Conn作为PConn的一个属性,同时实现了Conn接口的两个方法:Prepare和Close,这样我们就可以对Close方法进行拦截,修改它的行为:

type PConn interface {
  db        *DB
  ci        Conn
}
func (pc *PConn) Close() error {
    dc.db.putConn(pc)
}
func (pc *PConn) Prepare(query string) (Stmt, error) {
    return pc.ci.Prepare(query)
}

接着我们调整一下Conn的创建方法(也可以重新实现Connector接口,返回的是PConn实例,然后将新的实现注入到DB实例中)

func (db *DB) Conn(ctx context.Context) (Conn, error) {
  ...
  c , err := db.connector.Connect(ctx) // 没有空闲连接,重新创建
  if err!=nil {
    return nil, err
  }
  return &PConn{
      ci: c,
      db: db,
  }
}

这样当PConn使用完后调用Close方法,就不再是关闭连接,而是将连接释放到DB对象的freeConn中,由于客户端定义的类型是Conn接口,并且PConn也实现了Conn接口,所以无需调整(符合开闭原则

到此,连接复用初步完成了,接着就得考虑另外一个核心功能 :连接数量管理

连接数量管理

目前的Conn发现没有连接的时候是调用connector.Connect方法直接创建新连接,没法控制连接的数量,这样很明显是不合理,无限创建连接可能会导致资源耗尽,资源消耗曲线过陡峭,所以我们需要:

  1. 限制连接数量。将连接的数量约束在指定的范围内
  2. 连接请求队列。当连接数量达到最大值时,连接请求需要需要放到等待队列中,等待有连接释放。

首先,需要有地方保存当前连接数量和最大连接数,并且要能被对象中的不同方法访问到,所以我们可以给DB增加两个属性:

type DB struct {
  numOpen      int    // number of opened and pending open connections
  maxOpen      int    // <= 0 means unlimited
}

接着,需要设计当有请求连接时,发现没有空闲连接,并且连接数量等于maxOpen时要怎么办?

Database/sql里是采用一个map来存储(注:这个有点奇怪,为什么不用队列?),在DB结构体增加一个属性:connRequests,类型为:map[uint64]chan connRequest,其中key/value:

  • key :请求唯一标识。调用nextRequestKeyLocked方法生成,实际上就是一个自增的序列,只是为了保持唯一
  • value:等待连接的请求。类型为chan,每个请求封装为一个chan,可以保证并发安全,同时也可以利用其阻塞特性(当chan没有值时阻塞等待),chan接收的数据类型为connRequest格式,当其他协程有释放连接时,会将连接放到一个connRequest对象中发送给该chan,connRequest只包含两个属性:conn和err,用来接收返回连接或是异常

代码如下:

type DB struct {
    ...
  numOpen      int    // number of opened and pending open connections
  maxOpen      int                    // <= 0 means unlimited
  nextRequest  uint64 // Next key to use in connRequests.
  connRequests map[uint64]chan connRequest
}
func (db *DB) nextRequestKeyLocked() uint64 {
    next := db.nextRequest
    db.nextRequest++
    return next
}
// 连接请求
type connRequest struct {
    conn *PConn
    err  error
}

调整获取连接的方法Conn(ctx context.Context) 逻辑:

  1. 判断freeConn是否有空闲连接,有就返回
  2. 判断连接数量numOpen是否大于maxOpen,如果还小于maxOpen,说明还可以创建新连接,创建连接后numOpen++,返回连接
  3. 当numOpen已经是大于等于maxOpen,就不能再创建新连接,这是就把请求放到集合connRequests中,等待连接释放。
func (db *DB) Conn(ctx context.Context) (Conn, error) {
    db.mu.Lock() // 加锁保护freeConn
    numFree := len(db.freeConn)
    if numFree > 0 { // 有空闲连接
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        db.mu.Unlock()
        return conn, nil
    }
    // 连接数已经超出
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        req := make(chan connRequest, 1) // 创建一个chan,接收连接
        reqKey := db.nextRequestKeyLocked() // 生成唯一序号
        db.connRequests[reqKey] = req // 放到全局属性,让其他方法能访问到
        db.mu.Unlock()
        select {
        case <-ctx.Done(): //超时等
            // Remove the connection request and ensure no value has been sent
            // on it after removing.
            db.mu.Lock()
            delete(db.connRequests, reqKey) // 移除
            db.mu.Unlock()
        }
        case ret, ok := <-req: // 收到连接释放
            if !ok {
                return nil, errDBClosed
            }
            return ret.conn, ret.err
        }
    }
  // 连接数没超出,可以创建新连接
  db.numOpen++ // optimistically
  db.mu.Unlock()
    c, err := db.connector.Connect(ctx) // 重新创建
    if err != nil {
        return nil, err
    }
    return &PConn{
        ci: c,
        db: db,
    }, nil
}

接着,我们还需要调整连接释放方法 - putConn:

  1. 增加一个bool返回值,告诉调用方连接是否释放成功(如果失败,客户端可以决定关闭连接)
  2. 如果连接数numOpen大于 maxOpen时,当前连接直接丢弃,返回false
  3. 当len(db.connRequests)大于0时,需要考虑将连接优先给db.connRequests中的请求
  4. 最后才将连接放入空闲列表中。
func (db *DB) putConn(dc Conn) bool{
  db.mu.Lock()
  defer db.mu.Unlock()
  if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    // 当有等待连接的请求时
    if c := len(db.connRequests); c > 0 {
      var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {
            break
        }
        delete(db.connRequests, reqKey) // Remove from pending requests.
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    }
    // 放入空闲连接池中
    db.freeConn = append(db.freeConn, dc)
  return true
}

PConn的close方法也要稍微调整一下,如果释放连接失败,需要把连接关闭

func (pc *PConn) Close() error {
    ok := dc.db.putConn(pc)
    if !ok {
      dc.ci.Close()
    }
}

总结

到目前为止,一个较完整的包含连接池的数据库组件完成了,上述代码是参照database/sql设计,去除了一些非核心的代码,通过对它的研究,可以学到:

  1. 如何对问题域进行抽象。数据库抽象成DB、连接为Conn,连接器Connector等
  2. 如何命名一些变量。 connRequest(请求连接)、 freeConn(请求连接)等
  3. 如何使用context。实现超时等
  4. 并发编程。chan、mutex、全局属性访问,资源约束等

我的博客https://itart.cn/blogs/2021/explore/database-sql-pool.html


有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1402 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传