资源链接
grpcp 介绍
- grpcp 是基于 grpc 开发的,通过对 grpc 的封装和复用,来使得业务方能够最快速地通过 grpcp 找到相通的链路
- grpcp 主要实现了对 grpc 的连接的维护,使得业务方能够省去对链路的维护,直接根据路由表进行数据分发
源码分析
结构体和变量
-
ConnectionTracker
ConnectionTracker 就是控制整个连接表,维护所有的连接
// ConnectionTracker keep connections and maintain their status
type ConnectionTracker struct {
sync.RWMutex // 用来防止多个协程同时操作 connections 和 alives
// 创建连接的函数,可以自定义,也可以直接使用本代码中的 dialF 变量定义的默认函数
dial DialFunc
// 进行连接是否 ready 的检查函数,可以自定义,不设定则使用默认函数defaultReadyCheck()
readyCheck ReadyCheckFunc
// 维护一个 addr 到连接的映射表,断连也会维护,之后会重连
connections map[string]*trackedConn
// 维护 connections 中处于 ready状态 的连接
alives map[string]*trackedConn
// 超时时间
timeout time.Duration // 连接超时时间
checkReadyTimeout time.Duration // 在心跳检测时,如果连接不是ready,则等待 conn 的状态改变超时时间
heartbeatInterval time.Duration // 心跳间隔
// 上下文,这个上下文传给每一个连接,作为每个连接的 parent ctx,当 parent ctx cancel(cannel被调用)时,每个子ctx 均被 cancel
ctx context.Context
// 上下文对应的 cancel 函数,未用到,提供给业务方使用,当关闭所有连接时,调用 cannel
cannel context.CancelFunc
}
-
trackedCon
trackedConn 用来控制单个连接
type trackedConn struct {
sync.RWMutex // 用来防止多个协程同时操作本连接
// 本连接对应的 addr 地址,即 key,用来标识一个连接
addr string
// 本连接对应的 grpc 连接
conn *grpc.ClientConn
// 本连接对应的父ConnectionTracker,该连接存在于 tracker.connections 映射表中,可能存在于 tracker.alives 映射表中
tracker *ConnectionTracker
// 本连接对应的 state,将 grpc 中的五种状态(Idle Connecting Ready TransientFailure Shutdown)减到三种(Idle Ready Shutdown)
state connectivity.State
// 本连接对应的超时时刻,每次心跳检测成功,如果是 ready 状态就更新超时时间,心跳检测一直失败,超过超时时间 tracker.timeout,则 shutdown() 本连接
expires time.Time
// 本连接对应的 retry 次数,每次心跳检测成功,如果是 ready 状态就重置为 0,否则 ++,代码中未使用该变量
retry int
// 本连接对应的 CancelFunc,在本连接 shutdown 时,通知本连接对应的心跳协程退出
cannel context.CancelFunc
}
-
Pool
以下通过池子来实现复用同一个连接,但是复用的时候不能将拿出来的 connection close 掉,否则该 conn 会被置为 shutdown,并被踢出 ConnectionTracker.alives 记录, 同时其他协程已经拿出来的 conn 就无法使用了
var (
// 默认连接池,如果没有自定义连接池,则使用默认连接池
defaultPool *ConnectionTracker
// 用于限定创建连接池的操作最多被仅仅执行一次
once sync.Once
// 连接函数,可以自定义参数(类似grpc.WithInsecure())
dialF = func(addr string) (*grpc.ClientConn, error) {
return grpc.Dial(
addr,
grpc.WithInsecure(),
)
}
)
- 其他默认参数
// 超时参数,可以自定义覆盖掉
const (
defaultTimeout = 100 * time.Second
checkReadyTimeout = 5 * time.Second
heartbeatInterval = 20 * time.Second
)
// 如果尝试获取某个连接,当前连接不联通,则返回的错误定义
var (
errNoReady = fmt.Errorf("no ready")
)
函数
ConnectionTracker
- 可以被自定义的函数
// DialFunc dial function
// 用来自定义连接函数,在 New 时需要被传入
// 如果业务方直接使用 ConnectionTracker,则需要自己写 dailF,参考 Pool 的 dialF 写法来自定义
// 如果业务方使用 pool() 函数,则利用 Pool 的默认 dailF 函数来创建
type DialFunc func(addr string) (*grpc.ClientConn, error)
// ReadyCheckFunc check conn is ready function
// 用来自定义检测是否 ready 的函数,默认使用 defaultReadyCheck(),可以根据业务场景自定义,用来得到连接的状态,直接操作 conn.GetState() 实现自己的封装
type ReadyCheckFunc func(ctx context.Context, conn *grpc.ClientConn) connectivity.State
-
defaultReadyCheck()
检测连接的状态,并返回
如果是 ready 或 shutdown,则直接返回
如果是别的状态,则检测是否有状态改变,如果没有状态改变而 ConnectionTracker.checkReadyTimeout 超时,则返回 Idle,如果状态改变,则继续循环
// 这里简化了 grpc 的状态,从五种简化为三种,这个默认的检查状态的方法可以被重写覆盖,来自定义状态
func defaultReadyCheck(ctx context.Context, conn *grpc.ClientConn) connectivity.State {
for {
s := conn.GetState()
if s == connectivity.Ready || s == connectivity.Shutdown {
return s
}
if !conn.WaitForStateChange(ctx, s) {
return connectivity.Idle
}
}
}
- New 函数和 New 函数实现动态参数传入
// New initialization ConnectionTracker
func New(dial DialFunc, opts ...TrackerOption) *ConnectionTracker {
// 创建默认的 ConnectionTracker
ctx, cannel := context.WithCancel(context.Background())
ct := &ConnectionTracker{
dial: dial,
readyCheck: defaultReadyCheck,
connections: make(map[string]*trackedConn),
alives: make(map[string]*trackedConn),
timeout: defaultTimeout,
checkReadyTimeout: checkReadyTimeout,
heartbeatInterval: heartbeatInterval,
ctx: ctx,
cannel: cannel,
}
// 通过传入的参数来覆盖 ct 的默认参数
for _, opt := range opts {
opt(ct)
}
return ct
}
// TrackerOption initialization options
// 用来操作选项,实现 New() 的动态参数
type TrackerOption func(*ConnectionTracker)
/**************************以下函数的作用为更新设定的参数,业务方可以自定义函数并传入 New() *****************************/
// SetTimeout custom timeout
func SetTimeout(timeout time.Duration) TrackerOption {
return func(o *ConnectionTracker) {
o.timeout = timeout
}
}
// SetCheckReadyTimeout custom checkReadyTimeout
func SetCheckReadyTimeout(timeout time.Duration) TrackerOption {
return func(o *ConnectionTracker) {
o.checkReadyTimeout = timeout
}
}
// SetHeartbeatInterval custom heartbeatInterval
func SetHeartbeatInterval(interval time.Duration) TrackerOption {
return func(o *ConnectionTracker) {
o.heartbeatInterval = interval
}
}
// CustomReadyCheck custom ready check function
func CustomReadyCheck(f ReadyCheckFunc) TrackerOption {
return func(o *ConnectionTracker) {
o.readyCheck = f
}
}
- 对外接口
// GetConn create or get an existing connection
// 获取一个存在的连接,如果连接之前已经被创建,则直接返回;否则创建连接
func (ct *ConnectionTracker) GetConn(addr string) (*grpc.ClientConn, error) {
return ct.getConn(addr, false)
}
// Dial force to create new connection, this operation will close old connection!
// 强制重新创建一个新连接,如果连接之前已经被创建,则关闭后再次重新创建
func (ct *ConnectionTracker) Dial(addr string) (*grpc.ClientConn, error) {
return ct.getConn(addr, true)
}
// Alives current live connections
// 返回当前处于 ready 状态的连接的 addr 数组
func (ct *ConnectionTracker) Alives() []string {
// ConnectionTracker 的锁,用来防止多个 goroutine 同时操作 alives(比如在处理过程中加入或者删除 alives 中的元素)
ct.RLock()
defer ct.RUnlock()
alives := []string{}
for addr := range ct.alives {
alives = append(alives, addr)
}
return alives
}
- 其他内部函数
// 获取和 addr 对应的连接,不存在则创建;force 用来指定是否强制重新创建
func (ct *ConnectionTracker) getConn(addr string, force bool) (*grpc.ClientConn, error) {
// ConnectionTracker 的锁,用来防止多个 goroutine 同时操作 connections(唯一的场景就是多个 goroutine 获取 addr,发现没有,然后并行创建 trackedConn 并设定 connections[addr])
ct.Lock()
tc, ok := ct.connections[addr]
if !ok {
tc = &trackedConn{
addr: addr,
tracker: ct,
}
ct.connections[addr] = tc
}
// 在此处 Unlock,而不是在上面 defer ct.Unlock() 是因为要提前释放锁,让其他的协程能够读取 connections,因为下面的 tryconn 需要尝试连接,可能会耗费较长时间,因此需要提前把锁释放掉
ct.Unlock()
// 尝试连接,如果连接已经创建且 force 为 false(不强制重新创建),则会立即返回(Ready 返回 nil,Idle 返回 errNoReady,Shutdown 进行重新创建连接)
err := tc.tryconn(ct.ctx, force)
if err != nil {
return nil, err
}
return tc.conn, nil
}
// 当连接 ready 后,加入 alives
func (ct *ConnectionTracker) connReady(tc *trackedConn) {
ct.Lock()
defer ct.Unlock()
ct.alives[tc.addr] = tc
}
// 当连接 unready 后,从 alives 移除
func (ct *ConnectionTracker) connUnReady(addr string) {
ct.Lock()
defer ct.Unlock()
delete(ct.alives, addr)
}
trackedConn
-
连接 tryconn
如果已经创建连接且不强制重新创建,连接处于 Ready 状态则返回 nil,处于 Idle 状态返回 errNoReady,处于 Shutdown 状态进行重新创建连接
如果强制重新创建连接,则关闭原来的连接(如果存在),并重新连接
创建连接过程:1. 关闭可能存在的连接,重新创建连接;2. 开启 ready 状态判断函数,超时则返回 Idle 状态;3. 开启心跳函数;4. 更新连接的参数并更新 tracker 中该连接的记录
func (tc *trackedConn) tryconn(ctx context.Context, force bool) error {
// 对本连接加锁,防止其他 goroutine 同时连接
tc.Lock()
defer tc.Unlock()
if !force && tc.conn != nil { // 其他的协程可能建立了连接,所以如果建立好了,则直接返回并复用
if tc.state == connectivity.Ready {
return nil
}
if tc.state == connectivity.Idle {
return errNoReady
}
}
/******************************下面是创建连接的部分**********************************/
// 如果有连接,则先关闭原来的连接
if tc.conn != nil { // close shutdown conn
tc.conn.Close()
}
// 根据设定的 dialF 函数创建连接
conn, err := tc.tracker.dial(tc.addr)
if err != nil {
return err
}
tc.conn = conn
// ready 超时上下文,设定超时时间,如果超时时间内没有变为 Ready 或者 Shutdown 状态,则返回 Idle
readyCtx, cancel := context.WithTimeout(ctx, tc.tracker.checkReadyTimeout)
// 本函数结束并完成连接的 ready 后,清除资源
defer cancel()
// 调动 ConnectionTracker.readyCheck 函数(可能是自定义 可能是默认)。默认逻辑:如果状态
// 是 Ready 或者 Shutdown,则直接返回,否则等待状态的变化,超时则返回 Idle 状态
checkStatus := tc.tracker.readyCheck(readyCtx, tc.conn)
// heartbeat 心跳上下文
hbCtx, hbCancel := context.WithCancel(ctx)
tc.cannel = hbCancel
// 开启心跳协程函数,每个心跳间隔,探测一次,当连接被 Shutdown 时或者ConnectionTracker的cannel被调用时,tc.cannel 被调用,从而退出心跳协程
go tc.heartbeat(hbCtx)
if checkStatus != connectivity.Ready {
return errNoReady
}
// 调用连接准备好的函数,更改连接状态为 Ready,更新超时时刻,重置 retry 参数,并将该连接加入 ConnectionTracker.Alives 数组
tc.ready()
return nil
}
这里解决一个疑问:
当第一次请求 addr 连接,如果调用 GetConn() ,tc, ok := ct.connections[addr] 返回空,则创建连接,此时连接状态默认为 0,即 Idle 状态。同时 tc.conn == nil
然后进入 tryconn(),由于 tc.conn == nil 因此不会进入“直接判断连接状态为 Idle 而返回 errNoReady ”的逻辑,而是进入后面创建连接的逻辑
-
心跳检测
每次新创建一个连接,会创建一个协程,开启这个连接的心跳检测,根据检测结果进行状态调整
被 Shutdown 的连接不会从映射表中删除,在下次请求该 addr 连接的时候,会进入 tryconn(),重新创建一个连接,开启新连接的心跳函数
// 如果连接不是 Shutdown 则每隔 tracker.heartbeatInterval 时间进行一次心跳检测;如果父 ctx 被取消或者其他原因导致连接的 ctx 被取消,则关闭连接,结束心跳
func (tc *trackedConn) heartbeat(ctx context.Context) {
ticker := time.NewTicker(tc.tracker.heartbeatInterval)
for tc.getState() != connectivity.Shutdown {
select {
case <-ctx.Done():
tc.shutdown()
break
case <-ticker.C:
tc.healthCheck(ctx)
}
}
}
// 检测当前连接的状态,并返回,期间连接被锁定,tryconn 和 healthCheck 不能同时进行
func (tc *trackedConn) healthCheck(ctx context.Context) {
tc.Lock()
defer tc.Unlock()
ctx, cancel := context.WithTimeout(ctx, tc.tracker.checkReadyTimeout)
defer cancel()
// 如果是 Ready 或者 Shutdown 则直接返回,否则等待状态转变,如果 checkReadyTimeout 超时,则返回 Idle
switch tc.tracker.readyCheck(ctx, tc.conn) {
case connectivity.Ready:
tc.ready()
case connectivity.Shutdown:
tc.shutdown()
case connectivity.Idle:
// 判断是否连接 timeout 超时,如果超时则关闭,否则置为 Idle状态
if tc.expired() {
tc.shutdown()
} else {
tc.idle()
}
}
}
- 其他内部函数
/**************************获取状态*****************************/
func (tc *trackedConn) getState() connectivity.State {
tc.RLock()
defer tc.RUnlock()
return tc.state
}
/**************************根据状态进行参数设定*****************************/
func (tc *trackedConn) ready() {
tc.state = connectivity.Ready
tc.expires = time.Now().Add(tc.tracker.timeout)
tc.retry = 0
tc.tracker.connReady(tc)
}
func (tc *trackedConn) idle() {
tc.state = connectivity.Idle
tc.retry++
tc.tracker.connUnReady(tc.addr)
}
func (tc *trackedConn) shutdown() {
tc.state = connectivity.Shutdown
tc.conn.Close()
tc.cannel()
tc.tracker.connUnReady(tc.addr)
}
func (tc *trackedConn) expired() bool {
return tc.expires.Before(time.Now())
}
Pool
pool 其实就是一个 ConnectionTracker,因此在业务方 New ConnectionTracker 时,业务方可以直接定义为 pool 变量名,然后调用 pool 的 GetConn / Dial / Alives,如下示例:
/***********************以下使用方法其实是在用 ConnectionTracker **************************/
pool := New(dialF, opts)
conn := pool.GetConn(addr)
conn := pool.Dail(addr)
alives := pool.Alives()
/***********************以下使用方法才是在用 pool **************************/
conn := GetConn(addr)
conn := Dail(addr)
alives := Alives()
-
函数不多,全部列在一起
这里的 pool 是可以方便业务方直接使用 pool() 得到一个默认的 ConnectionTracker,直接开始业务工作
直接调用 GetConn() / Dail() / Alives() 即可,在第一次 pool() 函数被调用时,创建一个 ConnectionTracker,之后可以直接复用,不会重新创建 ConnectionTracker
func pool() *ConnectionTracker {
once.Do(func() {
defaultPool = New(dialF)
})
return defaultPool
}
// GetConn create or get an existing connection from default pool
func GetConn(addr string) (*grpc.ClientConn, error) {
return pool().GetConn(addr)
}
// Dial force to create new connection from default pool, this operation will close old connection!
func Dial(addr string) (*grpc.ClientConn, error) {
return pool().Dial(addr)
}
// Alives current live connections from default pool
func Alives() []string {
return pool().Alives()
}
以上就是整个 grpcp 的源码
开发此工具的背景
- 当 server 面临业务的多种请求时,需要将请求转移给不同的物理机或者 BGP,那么如果所有的连接都加在 server 上,会导致连接数过多,以及维护连接的开销很大,比如网络不稳定的时候,会使得传输效率极大下降
- 为了解决这个问题,利用 grpcp,来维护连接状态,而业务方可以快速得到一个可用的 Ready 状态的连接,从而递交数据包。
- 同时,存在大量的连接时,可以分级,从而减少 server 的维护连接的压力,分担压力给 agent
- server 和 每一个 agent 都维护着自己到别人的连接。
- 初始启动时,由于连接均未建立,则短时间会大量建立连接,之后则可以稳定下来,满足连接复用的功能需求。
场景示例:
- 初始所有的连接都没有。
- 业务方维护一个路由表,比如从 server -> agent-d,有两条匹配规则:1. server -> agent-a -> agent-d 2. server -> agent-b -> agent-d
- 业务方根据优先级,优先选取 1 规则,从 server 取 agent-a 的连接,server 没有,则创建连接;然后得到连接,再向 agent-a 请求 agent-d 的连接,agent-a 创建连接,得到 conn 之后,发送数据包。
然后其他业务方也要发送数据到 agent-d,此时,1 链路已经被创建好,因此,直接获取得到连接,并发送包。 - 这时, agent-a 到 agent-d 连接开始不稳定,那么因为存在心跳,连接状态会被置为 Idle
- 另一个业务请求想要发送至 agent-d,请求连接,由于 Idel 状态,直接能够收返回 errNoReady,因此开始选用规则 2,server 创建 agent-b连接,agent-b 创建 agent-d 连接,然后返回
- 其他业务请求时,也会复用 2 规则的链路
- 这时,agent-a 到 agent-d 因为心跳维护,重新连接,到达 Ready 状态
- 这时再来一个请求发送往 agent-d,就会直接匹配到规则 1,并发送。
注:
- 路由表是维护在业务方,由业务来定义路由的使用情况
- agent 做两件事,如果是直连,数据是发给自己的,则直接处理;如果是转发,则查找相应的连接并发送,或者创建连接并发送,或者失败返回 err,让业务方迅速进行下一条规则匹配并发送
其他资源补充
- grpc 的使用一般先定义 proto 文件(定义 rpc 接口 以及 rpc 的输入结构和传输数据的格式等),然后根据 protoc-gen-go 工具来生成服务端和客户端代码(一个第三方的更快捷的包 golang/protobuf)
- grpc-go 连接语义和 API
- ↑ 大佬的 blog 主页
有疑问加站长微信联系(非本文作者)