## 引言
Go内置了数据库相关的库 - database/sql,实现数据库操作相关的接口,其中还包含一个很重要的功能 - 连接池,用来实现连接的复用,限制连接的数量,从而提高性能,避免连接数量失控,导致资源消耗不可控。
本文借Go内置的database/sql库,来一起学习如何一步步设计包含连接池的数据库组件,包括模型抽象、连接复用,以及如何管理连接数。
## 设计
### 模型抽象
首先,我们要对解决领域进行抽象。
我们目标是设计一个数据库连接组件,所以第一个对象模型很明确 - 数据库对象, 我们将数据库对象抽象为一个**DB**的结构体,一个对象对应的是一个**数据库实例**,所以DB必须是**单例**。
其次,数据库需要**连接**,所以可以对连接进行抽象,命名为**Conn**,这里我们**不关心**Conn的**属性**,而是关心**行为**,所以Conn类型定义成一个**interface**,包含所需两个方法:预处理方法**Prepare**和关闭连接方法**Close**
(注:Prepare方法不再继续展开,实际上就是接收一个sql,返回一个实现Stmt接口的预处对象,接着设置一下参数,最后执行数据库操作)。
由于不同的数据库连接的**实现方式**会有**不同**,这时就要考虑**隔离变化**,对连接的方式进行**抽象**,定义一个**连接接口** - **Connector**,用来创建连接(**依赖倒置**原则),当初始化DB的时候再将具体实现**注入**到DB对象中(也就是**依赖注入**)。
最终我们可以得到以下几个接口和结构体:
```go
// 数据库对象
type DB struct {
connector Connector
}
type Conn interface {
Prepare(query string) (Stmt, error)
Close() error
}
// 数据库连接接口
type Connector interface {
Connect(context.Context) (Conn, error)
}
```
最后,我们给DB对象增加一个获取连接的方法Conn,在不考虑连接池的情况下,调用connector.Connect(ctx)直接获取连接:
```go
// 获取一个连接
func (db *DB) Conn(ctx context.Context) (*Conn, error) {
return db.connector.Connect(ctx)
}
```
### 连接复用
当不考虑到连接池时,上面的实现基本满足需求,但是为了提高性能,连接池还是必须的,接下来我们开始设计连接池的功能。
第一步,需要考虑存储空闲连接,这里采用的是切片来存储:freeConn []*Conn。
接着,需要考虑属性要定义在哪?!空闲的连接需要能被DB实例中的不同方法访问到,所以我们把freeConn定义为DB的一个属性,同时考虑到对freeConn的访问会存在并发安全的问题,需要增加一个锁**mu sync.Mutex**来保护:
```go
// 数据库对象
type DB struct {
connector Connector
mu sync.Mutex // protects following fields
freeConn []*Conn
}
```
数据库连接获取方法Conn就需要修改为:
````go
func (db *DB) Conn(ctx context.Context) (*Conn, error) {
db.mu.Lock() // 加锁保护freeConn
numFree := len(db.freeConn)
if numFree > 0 {
conn := db.freeConn[0]
// 移除第一个连接
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
db.mu.Unlock()
return conn, nil
}
db.mu.Unlock()
return db.connector.Connect(ctx) // 没有空闲连接,重新创建
}
````
连接要复用,就不能关闭,用完需要放回连接池中,所以DB需要一个将连接放回连接池的方法 - putConn:
```go
func (db *DB) putConn(dc Conn) {
db.mu.Lock()
db.freeConn = append(db.freeConn, dc)
db.mu.Unlock()
}
```
**但是**,putConn方法要在怎么被调用?!不能因为增加了连接池功,让客户端的改变使用方式,所以我们应该考虑将putConn调用放到一个合理的地方 - Conn的Close()方法中。
在没有连接池功能的时候,一个Conn用完了就一定会调用Close()释放资源,有连接池功能后,就不再是直接关闭连接,而是释放到连接池中,提供给后续请求使用。
对此,我们对Conn进行改造,增加一层代理,命名为PConn,将原来的Conn作为PConn的一个属性,同时实现了Conn接口的两个方法:Prepare和Close,这样我们就可以对Close方法进行拦截,修改它的行为:
```go
type PConn interface {
db *DB
ci Conn
}
func (pc *PConn) Close() error {
dc.db.putConn(pc)
}
func (pc *PConn) Prepare(query string) (Stmt, error) {
return pc.ci.Prepare(query)
}
```
接着我们调整一下Conn的创建方法(也可以重新实现Connector接口,返回的是PConn实例,然后将新的实现注入到DB实例中)
```go
func (db *DB) Conn(ctx context.Context) (Conn, error) {
...
c , err := db.connector.Connect(ctx) // 没有空闲连接,重新创建
if err!=nil {
return nil, err
}
return &PConn{
ci: c,
db: db,
}
}
```
这样当PConn使用完后调用Close方法,就不再是关闭连接,而是将连接释放到DB对象的freeConn中,由于客户端定义的类型是Conn接口,并且PConn也实现了Conn接口,所以无需调整(符合**开闭原则**)
到此,连接复用初步完成了,接着就得考虑另外一个核心功能 :**连接数量管理**。
### 连接数量管理
目前的Conn发现没有连接的时候是调用connector.Connect方法直接创建新连接,没法控制连接的数量,这样很明显是不合理,无限创建连接可能会导致资源耗尽,资源消耗曲线过陡峭,所以我们需要:
1. 限制连接数量。将连接的数量约束在指定的范围内
2. 连接请求队列。当连接数量达到最大值时,连接请求需要需要放到等待队列中,等待有连接释放。
首先,需要有地方保存当前连接数量和最大连接数,并且要能被对象中的不同方法访问到,所以我们可以给DB增加两个属性:
```go
type DB struct {
numOpen int // number of opened and pending open connections
maxOpen int // <= 0 means unlimited
}
```
接着,需要设计当有请求连接时,发现没有空闲连接,并且连接数量等于maxOpen时要怎么办?
Database/sql里是采用一个map来存储(注:这个有点奇怪,为什么不用队列?),在DB结构体增加一个属性:connRequests,类型为:map[uint64]chan connRequest,其中key/value:
- key :请求唯一标识。调用nextRequestKeyLocked方法生成,实际上就是一个自增的序列,只是为了保持唯一
- value:等待连接的请求。类型为chan,每个请求封装为一个chan,可以保证并发安全,同时也可以利用其阻塞特性(当chan没有值时阻塞等待),chan接收的数据类型为connRequest格式,当其他协程有释放连接时,会将连接放到一个connRequest对象中发送给该chan,connRequest只包含两个属性:conn和err,用来接收返回**连接**或是**异常**。
代码如下:
```go
type DB struct {
...
numOpen int // number of opened and pending open connections
maxOpen int // <= 0 means unlimited
nextRequest uint64 // Next key to use in connRequests.
connRequests map[uint64]chan connRequest
}
func (db *DB) nextRequestKeyLocked() uint64 {
next := db.nextRequest
db.nextRequest++
return next
}
// 连接请求
type connRequest struct {
conn *PConn
err error
}
```
调整获取连接的方法Conn(ctx context.Context) 逻辑:
1. 判断freeConn是否有空闲连接,有就返回
2. 判断连接数量numOpen是否大于maxOpen,如果还小于maxOpen,说明还可以创建新连接,创建连接后numOpen++,返回连接
3. 当numOpen已经是大于等于maxOpen,就**不能**再创建新连接,这是就把请求放到集合connRequests中,**等待**连接释放。
````go
func (db *DB) Conn(ctx context.Context) (Conn, error) {
db.mu.Lock() // 加锁保护freeConn
numFree := len(db.freeConn)
if numFree > 0 { // 有空闲连接
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
db.mu.Unlock()
return conn, nil
}
// 连接数已经超出
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
req := make(chan connRequest, 1) // 创建一个chan,接收连接
reqKey := db.nextRequestKeyLocked() // 生成唯一序号
db.connRequests[reqKey] = req // 放到全局属性,让其他方法能访问到
db.mu.Unlock()
select {
case <-ctx.Done(): //超时等
// Remove the connection request and ensure no value has been sent
// on it after removing.
db.mu.Lock()
delete(db.connRequests, reqKey) // 移除
db.mu.Unlock()
}
case ret, ok := <-req: // 收到连接释放
if !ok {
return nil, errDBClosed
}
return ret.conn, ret.err
}
}
// 连接数没超出,可以创建新连接
db.numOpen++ // optimistically
db.mu.Unlock()
c, err := db.connector.Connect(ctx) // 重新创建
if err != nil {
return nil, err
}
return &PConn{
ci: c,
db: db,
}, nil
}
````
接着,我们还需要调整连接释放方法 - putConn:
1. 增加一个bool返回值,告诉调用方连接是否释放成功(如果失败,客户端可以决定关闭连接)
2. 如果连接数numOpen大于 maxOpen时,当前连接直接丢弃,返回false
3. 当len(db.connRequests)大于0时,需要考虑将连接优先给db.connRequests中的请求
4. 最后才将连接放入空闲列表中。
```go
func (db *DB) putConn(dc Conn) bool{
db.mu.Lock()
defer db.mu.Unlock()
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
// 当有等待连接的请求时
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
req <- connRequest{
conn: dc,
err: err,
}
return true
}
// 放入空闲连接池中
db.freeConn = append(db.freeConn, dc)
return true
}
```
PConn的close方法也要稍微调整一下,如果释放连接失败,需要把连接关闭
```go
func (pc *PConn) Close() error {
ok := dc.db.putConn(pc)
if !ok {
dc.ci.Close()
}
}
```
## 总结
到目前为止,一个较完整的包含连接池的数据库组件完成了,上述代码是参照database/sql设计,去除了一些非核心的代码,通过对它的研究,可以学到:
1. 如何对问题域进行抽象。数据库抽象成DB、连接为Conn,连接器Connector等
2. 如何命名一些变量。 connRequest(请求连接)、 freeConn(请求连接)等
3. 如何使用context。实现超时等
4. 并发编程。chan、mutex、全局属性访问,资源约束等
**我的博客**:https://itart.cn/blogs/2021/explore/database-sql-pool.html
有疑问加站长微信联系(非本文作者))