Go内置database/sql连接池 - 源码学习

uuid · · 1060 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

## 引言 Go内置了数据库相关的库 - database/sql,实现数据库操作相关的接口,其中还包含一个很重要的功能 - 连接池,用来实现连接的复用,限制连接的数量,从而提高性能,避免连接数量失控,导致资源消耗不可控。 本文借Go内置的database/sql库,来一起学习如何一步步设计包含连接池的数据库组件,包括模型抽象、连接复用,以及如何管理连接数。 ## 设计 ### 模型抽象 首先,我们要对解决领域进行抽象。 我们目标是设计一个数据库连接组件,所以第一个对象模型很明确 - 数据库对象, 我们将数据库对象抽象为一个**DB**的结构体,一个对象对应的是一个**数据库实例**,所以DB必须是**单例**。 其次,数据库需要**连接**,所以可以对连接进行抽象,命名为**Conn**,这里我们**不关心**Conn的**属性**,而是关心**行为**,所以Conn类型定义成一个**interface**,包含所需两个方法:预处理方法**Prepare**和关闭连接方法**Close** (注:Prepare方法不再继续展开,实际上就是接收一个sql,返回一个实现Stmt接口的预处对象,接着设置一下参数,最后执行数据库操作)。 由于不同的数据库连接的**实现方式**会有**不同**,这时就要考虑**隔离变化**,对连接的方式进行**抽象**,定义一个**连接接口** - **Connector**,用来创建连接(**依赖倒置**原则),当初始化DB的时候再将具体实现**注入**到DB对象中(也就是**依赖注入**)。 最终我们可以得到以下几个接口和结构体: ```go // 数据库对象 type DB struct { connector Connector } type Conn interface { Prepare(query string) (Stmt, error) Close() error } // 数据库连接接口 type Connector interface { Connect(context.Context) (Conn, error) } ``` 最后,我们给DB对象增加一个获取连接的方法Conn,在不考虑连接池的情况下,调用connector.Connect(ctx)直接获取连接: ```go // 获取一个连接 func (db *DB) Conn(ctx context.Context) (*Conn, error) { return db.connector.Connect(ctx) } ``` ### 连接复用 当不考虑到连接池时,上面的实现基本满足需求,但是为了提高性能,连接池还是必须的,接下来我们开始设计连接池的功能。 第一步,需要考虑存储空闲连接,这里采用的是切片来存储:freeConn []*Conn。 接着,需要考虑属性要定义在哪?!空闲的连接需要能被DB实例中的不同方法访问到,所以我们把freeConn定义为DB的一个属性,同时考虑到对freeConn的访问会存在并发安全的问题,需要增加一个锁**mu sync.Mutex**来保护: ```go // 数据库对象 type DB struct { connector Connector mu sync.Mutex // protects following fields freeConn []*Conn } ``` 数据库连接获取方法Conn就需要修改为: ````go func (db *DB) Conn(ctx context.Context) (*Conn, error) { db.mu.Lock() // 加锁保护freeConn numFree := len(db.freeConn) if numFree > 0 { conn := db.freeConn[0] // 移除第一个连接 copy(db.freeConn, db.freeConn[1:]) db.freeConn = db.freeConn[:numFree-1] db.mu.Unlock() return conn, nil } db.mu.Unlock() return db.connector.Connect(ctx) // 没有空闲连接,重新创建 } ```` 连接要复用,就不能关闭,用完需要放回连接池中,所以DB需要一个将连接放回连接池的方法 - putConn: ```go func (db *DB) putConn(dc Conn) { db.mu.Lock() db.freeConn = append(db.freeConn, dc) db.mu.Unlock() } ``` **但是**,putConn方法要在怎么被调用?!不能因为增加了连接池功,让客户端的改变使用方式,所以我们应该考虑将putConn调用放到一个合理的地方 - Conn的Close()方法中。 在没有连接池功能的时候,一个Conn用完了就一定会调用Close()释放资源,有连接池功能后,就不再是直接关闭连接,而是释放到连接池中,提供给后续请求使用。 对此,我们对Conn进行改造,增加一层代理,命名为PConn,将原来的Conn作为PConn的一个属性,同时实现了Conn接口的两个方法:Prepare和Close,这样我们就可以对Close方法进行拦截,修改它的行为: ```go type PConn interface { db *DB ci Conn } func (pc *PConn) Close() error { dc.db.putConn(pc) } func (pc *PConn) Prepare(query string) (Stmt, error) { return pc.ci.Prepare(query) } ``` 接着我们调整一下Conn的创建方法(也可以重新实现Connector接口,返回的是PConn实例,然后将新的实现注入到DB实例中) ```go func (db *DB) Conn(ctx context.Context) (Conn, error) { ... c , err := db.connector.Connect(ctx) // 没有空闲连接,重新创建 if err!=nil { return nil, err } return &PConn{ ci: c, db: db, } } ``` 这样当PConn使用完后调用Close方法,就不再是关闭连接,而是将连接释放到DB对象的freeConn中,由于客户端定义的类型是Conn接口,并且PConn也实现了Conn接口,所以无需调整(符合**开闭原则**) 到此,连接复用初步完成了,接着就得考虑另外一个核心功能 :**连接数量管理**。 ### 连接数量管理 目前的Conn发现没有连接的时候是调用connector.Connect方法直接创建新连接,没法控制连接的数量,这样很明显是不合理,无限创建连接可能会导致资源耗尽,资源消耗曲线过陡峭,所以我们需要: 1. 限制连接数量。将连接的数量约束在指定的范围内 2. 连接请求队列。当连接数量达到最大值时,连接请求需要需要放到等待队列中,等待有连接释放。 首先,需要有地方保存当前连接数量和最大连接数,并且要能被对象中的不同方法访问到,所以我们可以给DB增加两个属性: ```go type DB struct { numOpen int // number of opened and pending open connections maxOpen int // <= 0 means unlimited } ``` 接着,需要设计当有请求连接时,发现没有空闲连接,并且连接数量等于maxOpen时要怎么办? Database/sql里是采用一个map来存储(注:这个有点奇怪,为什么不用队列?),在DB结构体增加一个属性:connRequests,类型为:map[uint64]chan connRequest,其中key/value: - key :请求唯一标识。调用nextRequestKeyLocked方法生成,实际上就是一个自增的序列,只是为了保持唯一 - value:等待连接的请求。类型为chan,每个请求封装为一个chan,可以保证并发安全,同时也可以利用其阻塞特性(当chan没有值时阻塞等待),chan接收的数据类型为connRequest格式,当其他协程有释放连接时,会将连接放到一个connRequest对象中发送给该chan,connRequest只包含两个属性:conn和err,用来接收返回**连接**或是**异常**。 代码如下: ```go type DB struct { ... numOpen int // number of opened and pending open connections maxOpen int // <= 0 means unlimited nextRequest uint64 // Next key to use in connRequests. connRequests map[uint64]chan connRequest } func (db *DB) nextRequestKeyLocked() uint64 { next := db.nextRequest db.nextRequest++ return next } // 连接请求 type connRequest struct { conn *PConn err error } ``` 调整获取连接的方法Conn(ctx context.Context) 逻辑: 1. 判断freeConn是否有空闲连接,有就返回 2. 判断连接数量numOpen是否大于maxOpen,如果还小于maxOpen,说明还可以创建新连接,创建连接后numOpen++,返回连接 3. 当numOpen已经是大于等于maxOpen,就**不能**再创建新连接,这是就把请求放到集合connRequests中,**等待**连接释放。 ````go func (db *DB) Conn(ctx context.Context) (Conn, error) { db.mu.Lock() // 加锁保护freeConn numFree := len(db.freeConn) if numFree > 0 { // 有空闲连接 conn := db.freeConn[0] copy(db.freeConn, db.freeConn[1:]) db.freeConn = db.freeConn[:numFree-1] db.mu.Unlock() return conn, nil } // 连接数已经超出 if db.maxOpen > 0 && db.numOpen >= db.maxOpen { req := make(chan connRequest, 1) // 创建一个chan,接收连接 reqKey := db.nextRequestKeyLocked() // 生成唯一序号 db.connRequests[reqKey] = req // 放到全局属性,让其他方法能访问到 db.mu.Unlock() select { case <-ctx.Done(): //超时等 // Remove the connection request and ensure no value has been sent // on it after removing. db.mu.Lock() delete(db.connRequests, reqKey) // 移除 db.mu.Unlock() } case ret, ok := <-req: // 收到连接释放 if !ok { return nil, errDBClosed } return ret.conn, ret.err } } // 连接数没超出,可以创建新连接 db.numOpen++ // optimistically db.mu.Unlock() c, err := db.connector.Connect(ctx) // 重新创建 if err != nil { return nil, err } return &PConn{ ci: c, db: db, }, nil } ```` 接着,我们还需要调整连接释放方法 - putConn: 1. 增加一个bool返回值,告诉调用方连接是否释放成功(如果失败,客户端可以决定关闭连接) 2. 如果连接数numOpen大于 maxOpen时,当前连接直接丢弃,返回false 3. 当len(db.connRequests)大于0时,需要考虑将连接优先给db.connRequests中的请求 4. 最后才将连接放入空闲列表中。 ```go func (db *DB) putConn(dc Conn) bool{ db.mu.Lock() defer db.mu.Unlock() if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } // 当有等待连接的请求时 if c := len(db.connRequests); c > 0 { var req chan connRequest var reqKey uint64 for reqKey, req = range db.connRequests { break } delete(db.connRequests, reqKey) // Remove from pending requests. req <- connRequest{ conn: dc, err: err, } return true } // 放入空闲连接池中 db.freeConn = append(db.freeConn, dc) return true } ``` PConn的close方法也要稍微调整一下,如果释放连接失败,需要把连接关闭 ```go func (pc *PConn) Close() error { ok := dc.db.putConn(pc) if !ok { dc.ci.Close() } } ``` ## 总结 到目前为止,一个较完整的包含连接池的数据库组件完成了,上述代码是参照database/sql设计,去除了一些非核心的代码,通过对它的研究,可以学到: 1. 如何对问题域进行抽象。数据库抽象成DB、连接为Conn,连接器Connector等 2. 如何命名一些变量。 connRequest(请求连接)、 freeConn(请求连接)等 3. 如何使用context。实现超时等 4. 并发编程。chan、mutex、全局属性访问,资源约束等 **我的博客**:https://itart.cn/blogs/2021/explore/database-sql-pool.html

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1060 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传