【Go夜读】grpc 开发及 grpcp 的源码分析

ChaunhewieTian · · 1205 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

资源链接


B站视频

grpcp 介绍


  • grpcp 是基于 grpc 开发的,通过对 grpc 的封装和复用,来使得业务方能够最快速地通过 grpcp 找到相通的链路
  • grpcp 主要实现了对 grpc 的连接的维护,使得业务方能够省去对链路的维护,直接根据路由表进行数据分发

源码分析


结构体和变量

  • ConnectionTracker
    ConnectionTracker 就是控制整个连接表,维护所有的连接
// ConnectionTracker keep connections and maintain their status
type ConnectionTracker struct {
    sync.RWMutex  // 用来防止多个协程同时操作 connections 和 alives
    // 创建连接的函数,可以自定义,也可以直接使用本代码中的 dialF 变量定义的默认函数
    dial              DialFunc
    // 进行连接是否 ready 的检查函数,可以自定义,不设定则使用默认函数defaultReadyCheck()
    readyCheck        ReadyCheckFunc
    // 维护一个 addr 到连接的映射表,断连也会维护,之后会重连
    connections       map[string]*trackedConn
    // 维护 connections 中处于 ready状态 的连接
    alives            map[string]*trackedConn
    // 超时时间
    timeout           time.Duration  // 连接超时时间
    checkReadyTimeout time.Duration  // 在心跳检测时,如果连接不是ready,则等待 conn 的状态改变超时时间
    heartbeatInterval time.Duration  // 心跳间隔
    // 上下文,这个上下文传给每一个连接,作为每个连接的 parent ctx,当 parent ctx cancel(cannel被调用)时,每个子ctx 均被 cancel
    ctx    context.Context
    // 上下文对应的 cancel 函数,未用到,提供给业务方使用,当关闭所有连接时,调用 cannel
    cannel context.CancelFunc
}
  • trackedCon
    trackedConn 用来控制单个连接
type trackedConn struct {
    sync.RWMutex  // 用来防止多个协程同时操作本连接
    // 本连接对应的 addr 地址,即 key,用来标识一个连接
    addr    string
    // 本连接对应的 grpc 连接
    conn    *grpc.ClientConn
    // 本连接对应的父ConnectionTracker,该连接存在于 tracker.connections 映射表中,可能存在于 tracker.alives 映射表中
    tracker *ConnectionTracker
    // 本连接对应的 state,将 grpc 中的五种状态(Idle Connecting Ready TransientFailure Shutdown)减到三种(Idle Ready Shutdown)
    state   connectivity.State
    // 本连接对应的超时时刻,每次心跳检测成功,如果是 ready 状态就更新超时时间,心跳检测一直失败,超过超时时间 tracker.timeout,则 shutdown() 本连接
    expires time.Time
    // 本连接对应的 retry 次数,每次心跳检测成功,如果是 ready 状态就重置为 0,否则 ++,代码中未使用该变量
    retry   int
    // 本连接对应的 CancelFunc,在本连接 shutdown 时,通知本连接对应的心跳协程退出
    cannel  context.CancelFunc
}
  • Pool
    以下通过池子来实现复用同一个连接,但是复用的时候不能将拿出来的 connection close 掉,否则该 conn 会被置为 shutdown,并被踢出 ConnectionTracker.alives 记录, 同时其他协程已经拿出来的 conn 就无法使用了
var (
    // 默认连接池,如果没有自定义连接池,则使用默认连接池
    defaultPool *ConnectionTracker
    // 用于限定创建连接池的操作最多被仅仅执行一次
    once        sync.Once
    // 连接函数,可以自定义参数(类似grpc.WithInsecure())
    dialF       = func(addr string) (*grpc.ClientConn, error) {
        return grpc.Dial(
            addr,
            grpc.WithInsecure(),
        )
    }
)
  • 其他默认参数
// 超时参数,可以自定义覆盖掉
const (
    defaultTimeout    = 100 * time.Second
    checkReadyTimeout = 5 * time.Second
    heartbeatInterval = 20 * time.Second
)
// 如果尝试获取某个连接,当前连接不联通,则返回的错误定义
var (
    errNoReady = fmt.Errorf("no ready")
)

函数

ConnectionTracker

  • 可以被自定义的函数
// DialFunc dial function
// 用来自定义连接函数,在 New 时需要被传入
// 如果业务方直接使用 ConnectionTracker,则需要自己写 dailF,参考 Pool 的 dialF 写法来自定义
// 如果业务方使用 pool() 函数,则利用 Pool 的默认 dailF 函数来创建
type DialFunc func(addr string) (*grpc.ClientConn, error)

// ReadyCheckFunc check conn is ready function
// 用来自定义检测是否 ready 的函数,默认使用 defaultReadyCheck(),可以根据业务场景自定义,用来得到连接的状态,直接操作 conn.GetState() 实现自己的封装
type ReadyCheckFunc func(ctx context.Context, conn *grpc.ClientConn) connectivity.State
  • defaultReadyCheck()
    检测连接的状态,并返回
    如果是 ready 或 shutdown,则直接返回
    如果是别的状态,则检测是否有状态改变,如果没有状态改变而 ConnectionTracker.checkReadyTimeout 超时,则返回 Idle,如果状态改变,则继续循环
// 这里简化了 grpc 的状态,从五种简化为三种,这个默认的检查状态的方法可以被重写覆盖,来自定义状态
func defaultReadyCheck(ctx context.Context, conn *grpc.ClientConn) connectivity.State {
    for {
        s := conn.GetState()
        if s == connectivity.Ready || s == connectivity.Shutdown {
            return s
        }
        if !conn.WaitForStateChange(ctx, s) {
            return connectivity.Idle
        }
    }
}
  • New 函数和 New 函数实现动态参数传入
// New initialization ConnectionTracker
func New(dial DialFunc, opts ...TrackerOption) *ConnectionTracker {
    // 创建默认的 ConnectionTracker 
    ctx, cannel := context.WithCancel(context.Background())
    ct := &ConnectionTracker{
        dial:              dial,
        readyCheck:        defaultReadyCheck,
        connections:       make(map[string]*trackedConn),
        alives:            make(map[string]*trackedConn),
        timeout:           defaultTimeout,
        checkReadyTimeout: checkReadyTimeout,
        heartbeatInterval: heartbeatInterval,

        ctx:    ctx,
        cannel: cannel,
    }

    // 通过传入的参数来覆盖 ct 的默认参数
    for _, opt := range opts {
        opt(ct)
    }

    return ct
}

// TrackerOption initialization options
// 用来操作选项,实现 New() 的动态参数
type TrackerOption func(*ConnectionTracker)

/**************************以下函数的作用为更新设定的参数,业务方可以自定义函数并传入 New() *****************************/ 

// SetTimeout custom timeout
func SetTimeout(timeout time.Duration) TrackerOption {
    return func(o *ConnectionTracker) {
        o.timeout = timeout
    }
}

// SetCheckReadyTimeout custom checkReadyTimeout
func SetCheckReadyTimeout(timeout time.Duration) TrackerOption {
    return func(o *ConnectionTracker) {
        o.checkReadyTimeout = timeout
    }
}

// SetHeartbeatInterval custom heartbeatInterval
func SetHeartbeatInterval(interval time.Duration) TrackerOption {
    return func(o *ConnectionTracker) {
        o.heartbeatInterval = interval
    }
}

// CustomReadyCheck custom ready check function
func CustomReadyCheck(f ReadyCheckFunc) TrackerOption {
    return func(o *ConnectionTracker) {
        o.readyCheck = f
    }
}
  • 对外接口
// GetConn create or get an existing connection
// 获取一个存在的连接,如果连接之前已经被创建,则直接返回;否则创建连接
func (ct *ConnectionTracker) GetConn(addr string) (*grpc.ClientConn, error) {
    return ct.getConn(addr, false)
}

// Dial force to create new connection, this operation will close old connection!
// 强制重新创建一个新连接,如果连接之前已经被创建,则关闭后再次重新创建
func (ct *ConnectionTracker) Dial(addr string) (*grpc.ClientConn, error) {
    return ct.getConn(addr, true)
}

// Alives current live connections
// 返回当前处于 ready 状态的连接的 addr 数组
func (ct *ConnectionTracker) Alives() []string {
    // ConnectionTracker 的锁,用来防止多个 goroutine 同时操作 alives(比如在处理过程中加入或者删除 alives 中的元素)
    ct.RLock()
    defer ct.RUnlock()
    alives := []string{}
    for addr := range ct.alives {
        alives = append(alives, addr)
    }
    return alives
}
  • 其他内部函数
// 获取和 addr 对应的连接,不存在则创建;force 用来指定是否强制重新创建
func (ct *ConnectionTracker) getConn(addr string, force bool) (*grpc.ClientConn, error) {
    // ConnectionTracker 的锁,用来防止多个 goroutine 同时操作 connections(唯一的场景就是多个 goroutine 获取 addr,发现没有,然后并行创建 trackedConn 并设定 connections[addr])
    ct.Lock()
    tc, ok := ct.connections[addr]
    if !ok {
        tc = &trackedConn{
            addr:    addr,
            tracker: ct,
        }
        ct.connections[addr] = tc
    }
    // 在此处 Unlock,而不是在上面 defer ct.Unlock() 是因为要提前释放锁,让其他的协程能够读取 connections,因为下面的 tryconn 需要尝试连接,可能会耗费较长时间,因此需要提前把锁释放掉
    ct.Unlock()

    // 尝试连接,如果连接已经创建且 force 为 false(不强制重新创建),则会立即返回(Ready 返回 nil,Idle 返回 errNoReady,Shutdown 进行重新创建连接) 
    err := tc.tryconn(ct.ctx, force)
    if err != nil {
        return nil, err
    }
    return tc.conn, nil
}

// 当连接 ready 后,加入 alives
func (ct *ConnectionTracker) connReady(tc *trackedConn) {
    ct.Lock()
    defer ct.Unlock()
    ct.alives[tc.addr] = tc
}

// 当连接 unready 后,从 alives 移除
func (ct *ConnectionTracker) connUnReady(addr string) {
    ct.Lock()
    defer ct.Unlock()
    delete(ct.alives, addr)
}

trackedConn

  • 连接 tryconn
    如果已经创建连接且不强制重新创建,连接处于 Ready 状态则返回 nil,处于 Idle 状态返回 errNoReady,处于 Shutdown 状态进行重新创建连接
    如果强制重新创建连接,则关闭原来的连接(如果存在),并重新连接
    创建连接过程:1. 关闭可能存在的连接,重新创建连接;2. 开启 ready 状态判断函数,超时则返回 Idle 状态;3. 开启心跳函数;4. 更新连接的参数并更新 tracker 中该连接的记录
func (tc *trackedConn) tryconn(ctx context.Context, force bool) error {
    // 对本连接加锁,防止其他 goroutine 同时连接
    tc.Lock()
    defer tc.Unlock()
    if !force && tc.conn != nil { // 其他的协程可能建立了连接,所以如果建立好了,则直接返回并复用
        if tc.state == connectivity.Ready {
            return nil
        }
        if tc.state == connectivity.Idle {
            return errNoReady
        }
    }
    /******************************下面是创建连接的部分**********************************/
    // 如果有连接,则先关闭原来的连接
    if tc.conn != nil { // close shutdown conn
        tc.conn.Close()
    }
    // 根据设定的 dialF 函数创建连接
    conn, err := tc.tracker.dial(tc.addr)
    if err != nil {
        return err
    }
    tc.conn = conn
    
    // ready 超时上下文,设定超时时间,如果超时时间内没有变为 Ready 或者 Shutdown 状态,则返回 Idle
    readyCtx, cancel := context.WithTimeout(ctx, tc.tracker.checkReadyTimeout)
    // 本函数结束并完成连接的 ready 后,清除资源
    defer cancel()
    // 调动 ConnectionTracker.readyCheck 函数(可能是自定义 可能是默认)。默认逻辑:如果状态
    // 是 Ready 或者 Shutdown,则直接返回,否则等待状态的变化,超时则返回 Idle 状态
    checkStatus := tc.tracker.readyCheck(readyCtx, tc.conn)

    // heartbeat 心跳上下文
    hbCtx, hbCancel := context.WithCancel(ctx)
    tc.cannel = hbCancel
    // 开启心跳协程函数,每个心跳间隔,探测一次,当连接被 Shutdown 时或者ConnectionTracker的cannel被调用时,tc.cannel 被调用,从而退出心跳协程
    go tc.heartbeat(hbCtx)

    if checkStatus != connectivity.Ready {
        return errNoReady
    }
    // 调用连接准备好的函数,更改连接状态为 Ready,更新超时时刻,重置 retry 参数,并将该连接加入 ConnectionTracker.Alives 数组
    tc.ready()
    return nil
}

这里解决一个疑问:
当第一次请求 addr 连接,如果调用 GetConn() ,tc, ok := ct.connections[addr] 返回空,则创建连接,此时连接状态默认为 0,即 Idle 状态。同时 tc.conn == nil
然后进入 tryconn(),由于 tc.conn == nil 因此不会进入“直接判断连接状态为 Idle 而返回 errNoReady ”的逻辑,而是进入后面创建连接的逻辑

  • 心跳检测
    每次新创建一个连接,会创建一个协程,开启这个连接的心跳检测,根据检测结果进行状态调整
    被 Shutdown 的连接不会从映射表中删除,在下次请求该 addr 连接的时候,会进入 tryconn(),重新创建一个连接,开启新连接的心跳函数
// 如果连接不是 Shutdown 则每隔 tracker.heartbeatInterval 时间进行一次心跳检测;如果父 ctx 被取消或者其他原因导致连接的 ctx 被取消,则关闭连接,结束心跳
func (tc *trackedConn) heartbeat(ctx context.Context) {
    ticker := time.NewTicker(tc.tracker.heartbeatInterval)
    for tc.getState() != connectivity.Shutdown {
        select {
        case <-ctx.Done():
            tc.shutdown()
            break
        case <-ticker.C:
            tc.healthCheck(ctx)
        }
    }
}

// 检测当前连接的状态,并返回,期间连接被锁定,tryconn 和 healthCheck 不能同时进行
func (tc *trackedConn) healthCheck(ctx context.Context) {
    tc.Lock()
    defer tc.Unlock()
    ctx, cancel := context.WithTimeout(ctx, tc.tracker.checkReadyTimeout)
    defer cancel()

    // 如果是 Ready 或者 Shutdown 则直接返回,否则等待状态转变,如果 checkReadyTimeout 超时,则返回 Idle
    switch tc.tracker.readyCheck(ctx, tc.conn) {
    case connectivity.Ready:
        tc.ready()
    case connectivity.Shutdown:
        tc.shutdown()
    case connectivity.Idle:
        // 判断是否连接 timeout 超时,如果超时则关闭,否则置为 Idle状态
        if tc.expired() {
            tc.shutdown()
        } else {
            tc.idle()
        }
    }
}
  • 其他内部函数
/**************************获取状态*****************************/ 

func (tc *trackedConn) getState() connectivity.State {
    tc.RLock()
    defer tc.RUnlock()
    return tc.state
}

/**************************根据状态进行参数设定*****************************/ 

func (tc *trackedConn) ready() {
    tc.state = connectivity.Ready
    tc.expires = time.Now().Add(tc.tracker.timeout)
    tc.retry = 0
    tc.tracker.connReady(tc)
}

func (tc *trackedConn) idle() {
    tc.state = connectivity.Idle
    tc.retry++
    tc.tracker.connUnReady(tc.addr)
}

func (tc *trackedConn) shutdown() {
    tc.state = connectivity.Shutdown
    tc.conn.Close()
    tc.cannel()
    tc.tracker.connUnReady(tc.addr)
}

func (tc *trackedConn) expired() bool {
    return tc.expires.Before(time.Now())
}

Pool

pool 其实就是一个 ConnectionTracker,因此在业务方 New ConnectionTracker 时,业务方可以直接定义为 pool 变量名,然后调用 pool 的 GetConn / Dial / Alives,如下示例:

/***********************以下使用方法其实是在用 ConnectionTracker **************************/
pool := New(dialF, opts)
conn := pool.GetConn(addr)
conn := pool.Dail(addr)
alives := pool.Alives()
/***********************以下使用方法才是在用 pool **************************/
conn := GetConn(addr)
conn := Dail(addr)
alives := Alives()
  • 函数不多,全部列在一起
    这里的 pool 是可以方便业务方直接使用 pool() 得到一个默认的 ConnectionTracker,直接开始业务工作
    直接调用 GetConn() / Dail() / Alives() 即可,在第一次 pool() 函数被调用时,创建一个 ConnectionTracker,之后可以直接复用,不会重新创建 ConnectionTracker
func pool() *ConnectionTracker {
    once.Do(func() {
        defaultPool = New(dialF)
    })
    return defaultPool
}

// GetConn create or get an existing connection from default pool
func GetConn(addr string) (*grpc.ClientConn, error) {
    return pool().GetConn(addr)
}

// Dial force to create new connection from default pool, this operation will close old connection!
func Dial(addr string) (*grpc.ClientConn, error) {
    return pool().Dial(addr)
}

// Alives current live connections from default pool
func Alives() []string {
    return pool().Alives()
}

以上就是整个 grpcp 的源码

开发此工具的背景


网络物理视图
  1. 当 server 面临业务的多种请求时,需要将请求转移给不同的物理机或者 BGP,那么如果所有的连接都加在 server 上,会导致连接数过多,以及维护连接的开销很大,比如网络不稳定的时候,会使得传输效率极大下降
  2. 为了解决这个问题,利用 grpcp,来维护连接状态,而业务方可以快速得到一个可用的 Ready 状态的连接,从而递交数据包。
  3. 同时,存在大量的连接时,可以分级,从而减少 server 的维护连接的压力,分担压力给 agent
  4. server 和 每一个 agent 都维护着自己到别人的连接。
  5. 初始启动时,由于连接均未建立,则短时间会大量建立连接,之后则可以稳定下来,满足连接复用的功能需求。

场景示例:

  • 初始所有的连接都没有。
  • 业务方维护一个路由表,比如从 server -> agent-d,有两条匹配规则:1. server -> agent-a -> agent-d 2. server -> agent-b -> agent-d
  • 业务方根据优先级,优先选取 1 规则,从 server 取 agent-a 的连接,server 没有,则创建连接;然后得到连接,再向 agent-a 请求 agent-d 的连接,agent-a 创建连接,得到 conn 之后,发送数据包。
    然后其他业务方也要发送数据到 agent-d,此时,1 链路已经被创建好,因此,直接获取得到连接,并发送包。
  • 这时, agent-a 到 agent-d 连接开始不稳定,那么因为存在心跳,连接状态会被置为 Idle
  • 另一个业务请求想要发送至 agent-d,请求连接,由于 Idel 状态,直接能够收返回 errNoReady,因此开始选用规则 2,server 创建 agent-b连接,agent-b 创建 agent-d 连接,然后返回
  • 其他业务请求时,也会复用 2 规则的链路
  • 这时,agent-a 到 agent-d 因为心跳维护,重新连接,到达 Ready 状态
  • 这时再来一个请求发送往 agent-d,就会直接匹配到规则 1,并发送。

注:

  1. 路由表是维护在业务方,由业务来定义路由的使用情况
  2. agent 做两件事,如果是直连,数据是发给自己的,则直接处理;如果是转发,则查找相应的连接并发送,或者创建连接并发送,或者失败返回 err,让业务方迅速进行下一条规则匹配并发送

其他资源补充


  1. grpc 的使用一般先定义 proto 文件(定义 rpc 接口 以及 rpc 的输入结构和传输数据的格式等),然后根据 protoc-gen-go 工具来生成服务端和客户端代码(一个第三方的更快捷的包 golang/protobuf
  2. grpc-go 连接语义和 API
  3. ↑ 大佬的 blog 主页

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:ChaunhewieTian

查看原文:【Go夜读】grpc 开发及 grpcp 的源码分析

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1205 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传