Go语言学习——基于GRPC和ETCD实现服务注册、发现与负载均衡

tianqy · 2020-07-19 16:42:29 · 2281 次点击 · 大约8小时之前 开始浏览    置顶
这是一个创建于 2020-07-19 16:42:29 的主题,其中的信息可能已经有所发展或是发生改变。

1.简介
  最近在做服务拆分,目标就是先抽出一个流量解析模块,主要是负责流量的接入、协议的转化,将所有媒体请求都以统一接口访问后续服务,因为时间紧迫,服务间访问、负载均衡采用的是比较传统的处理方案——新服务通过内网LB访问后续服务,该种方案的最大问题就是在原有上、下层服务之间又加了一层,即数据链路又增加了一环,增加性能开销;后来利用周末时间,研究了软负载策略(又称客户端负载策略 ),该方案的特点就是将LB放在服务消费方,服务消费方通过LB组件获知可以提供服务的目标机器,然后与之建立连接、直接进行访问。
  服务发现与负载均衡策略参考 : https://www.jianshu.com/p/f49c3a29b5dc?from=timeline

2.实现
  由于平时工作也比较忙,本文比较偏实践、没有详细介绍一些理论知识,期间提到的内容,如有不了解的,还需要同学自己搜资料了解下,Go语言中,GRPC和ETCD就能很好的实现服务发现和负载均衡的处理,下面就开始讲下实现过程。

2.1准备
  正所谓“工欲善其事,必先利其器”,开始之前,先把工具准备,为了处理的方便,建立先把Go升级到高版本,比如:1.14版本,主要是利用它的module功能,然后就是下载etcd并启动etcd服务,剩下的就是要安装下protobuf,后面要处理proto文件;这几块处理都比较简单,基本不会遇到啥大问题,基本网上搜搜资料都可以解决,下面说下比较麻烦的问题,就是GRPC和ETCD版本兼容的问题,之前这块被搞的吐血,基本有两个方案:
  1)向上对齐,修改etcd源码,参考地址是 : https://github.com/etcd-io/etcd/pull/11564/files
  2)向下对齐,GPRC和protoc都使用降低版本,参考地址是 : https://learnku.com/articles/43758
  我这边采用的是方案二,建议准备好环境后再开工;关注ETCD这块,最近还在研究,后面会详细介绍它的灵魂——Raft协议。

2.2方案
  本方案的处理方式是,客户端经自身LB组件知道服务端地址列表,然后建立连接、进行访问;客户端怎么获取服务端地址列表呢?这就需要借助ETCD了,ETCD是客户端与服务端的桥梁——服务端向ETCD注册服务地址、维持心跳,客户端负责监听变化、更新地址列表,下面围绕这几个点细讨论下

2.2.1服务注册
  按照理想情况,服务启动后将自身服务地址注册到ETCD上即可,看似处理过程很简单,但如果考虑到实际情况,就不可能这样简单处理了,线上实际情况是——服务器存在各种各样的断电、断网风险,比如:网线被工程挖断、被工作人员踢掉,机柜断电,服务宕机等,所以服务在ETCD上注册地址是个技术活,要考虑各种可能发现的异常情况,总的来说,要考虑以下几种情况:
  1)服务是正常的,但是,没注册上、导致无法为客户端提供服务;
  2)服务是不正常的,但是,没取消注册、导致客户端还在继续请求;
  综上所述,服务端在注册时,不能永久注册,注册地址是有过期时间的,对此,服务端要有定时检测机制,类似心跳检测,查看是否注册成功,如果服务注册失败,还要进行补注册,而补注册后,还要支持对数据的“续命”操作,即数据过期后自动续约。

2.2.2服务发现
  客户端做的事情主要是获取地址列表、与服务端建立连接,考虑到服务端地址存在变化的情况,故客户端还要支持地址监测、及时变更服务地址的处理,上述处理可以借助GRPC的命名解析功能;负载均衡的策略也是在客户端处理的,拨号时会指定策略,一般采取轮询策略。

2.3代码
  围绕方案实现处理代码,本部分主要是看下注册与发现的代码,完整的代码处理可以去Git上下载,地址是 :
  https://github.com/JackBelief/go_module_test.git

  本部分代码结构是:
image.png   其中,server.go是服务端代码,client.go是客户端代码,config是用于服务启动参数解析(引入了viper),etcd_proc完成对ETCD的各种操作处理,protoes定义GRPC契约,service完成GRPC服务结构定义。

2.3.1服务端代码
  服务端代码会启动两个协程,一个用于注册的心跳检测;另一个用于注册的续约处理,可以细看注释:

func RegisterETCDServer(addr string) {
    // 服务注册
    registerServer(addr)
}

func registerServer(addr string) {
    var err error

    // 创建ETCD的客户端
    if GClient == nil {
        GClient, err = newETCDClient()
        if err != nil {
            fmt.Println("ectd 客户端创建失败 error=", err.Error())
            return
        }
    }
    fmt.Println("ectd 客户端创建成功")

    // 定时循环检测,查看向ETCD注册服务是否正常
    // 每台服务向ETCD注册自己的IP地址,定时检测注册内容是否还在
    ticker := time.NewTicker(time.Second * time.Duration(5))
    go func() {
        for {
            getResp, err := GClient.Get(context.Background(), ETCDServerPrefix+addr)
            if err != nil {
                fmt.Println("etcd出现异常,key获取异常,key=", ETCDServerPrefix+addr, " error=", err.Error())
            } else if getResp.Count == 0 {
                fmt.Println("etcd没有目标数据,需要补数据,key=", ETCDServerPrefix+addr)
                putData(ETCDServerPrefix+addr, addr)
            } else {
                fmt.Println("etcd目标数据正常,key=", ETCDServerPrefix+addr)
            }

            <-ticker.C
        }
    }()

    return
}

func newETCDClient() (*clientv3.Client, error) {
    config := clientv3.Config{
        Endpoints:   []string{"121.42.161.154:2379"},
        DialTimeout: 5 * time.Second,
    }

    return clientv3.New(config)
}

func putData(key, value string) {
    leaseResp, err := GClient.Grant(context.Background(), 5)
    if err != nil {
        fmt.Println("etcd申请租约失败 key=", key, " error=", err.Error())
        return
    }

    _, err = GClient.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))
    if err != nil {
        fmt.Println("etcd写入数据失败 key=", key, " error=", err.Error())
        return
    }

    kaRespChan, err := GClient.KeepAlive(context.Background(), leaseResp.ID)
    if err != nil {
        fmt.Println("etcd租约续约失败 key=", key, "id=", leaseResp.ID, " error=", err.Error())
        return
    }

    // 定期查看续约结果
    go func() {
        for {
            select {
            case respData := <-kaRespChan:
                if kaRespChan == nil {
                    fmt.Println("管道关闭,出现异常,退出 key=", key)
                    return
                } else {
                    if respData == nil {
                        fmt.Println("没有数据,可能是etcd关闭、也可能是网络异常,退出 key=", key)
                        return
                    } else {
                        fmt.Println("续约成功 key=", key)
                    }
                }
            }
            time.Sleep(1 * time.Second)
        }
    }()

    return
}

func UnRegisterETCDServer(addr string) {
    // 服务取消注册
    unRegisterServer(addr)
}

func unRegisterServer(addr string) {
    var err error

    // 创建ETCD的客户端
    if GClient == nil {
        GClient, err = newETCDClient()
        if err != nil {
            fmt.Println("ectd 客户端创建失败 error=", err.Error())
            return
        }
    }

    // 删除服务注册数据
    _, err = GClient.Delete(context.Background(), ETCDServerPrefix+addr)
    if err != nil {
        fmt.Println("服务关闭,etcd删除数据失败 key=", ETCDServerPrefix+addr, " error=", err.Error())
        return
    } else {
        fmt.Println("服务关闭,etcd成功删除数据 key=", ETCDServerPrefix+addr)
        return
    }
}

2.3.2客户端代码
  客户端除了发起GRPC调用处理业务外,还要实现服务发现、监控变化的处理,具体可以细看注释:

/*****************************************************************************************************
    Builder 是接口类型,用于创建命名解析器,可监视命名空间是否发生变化,其方法有:
    1) Scheme() string        // 返回解析器支持的方案
    2) Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)    // 创建解析器

    Resolver 是接口类型,用于监控目标变化,当目标发生变化时,会相应地更新地址、服务配置,其方法有:
    1) Close()        // 关闭解析器
    2) ResolveNow(ResolveNowOptions)        // 备用接口,GRPC可以再次调用用于目标的解析

    客户端要实现以上接口,从而实现服务发现、变更
*****************************************************************************************************/
func NewResolver() resolver.Builder {
    return &ETCDResolver{rawAddr: "121.42.161.154:2379"}
}

type ETCDResolver struct {
    rawAddr      string              // etcd服务地址,多个地址要使用分隔符
    resolverConn resolver.ClientConn // 解析器链接对象
}

// 实现Builder接口类型
func (er *ETCDResolver) Scheme() string {
    return ETCDSchema
}

func (er *ETCDResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    // 构建解析器,解析器只负责对目标的更新,而对目标的监控由用户部分完成,
    var err error
    if GClient == nil {
        GClient, err = clientv3.New(clientv3.Config{
            Endpoints:   strings.Split(er.rawAddr, ";"),
            DialTimeout: 5 * time.Second,
        })

        if err != nil {
            return nil, err
        }
    }

    // 解析器监控变化
    er.resolverConn = cc
    fmt.Println("resolver create success")
    go er.watch("/" + target.Scheme + "/" + target.Endpoint + "/")

    return er, nil
}

func (er *ETCDResolver) watch(keyPrefix string) {
    for {
        er.watchETCD(keyPrefix)
        time.Sleep(1 * time.Second)
    }
}

func (er *ETCDResolver) watchETCD(keyPrefix string) {
    defer func() {
        if err := recover(); err != nil {
            fmt.Println("watch error =", err)
        }
    }()

    er.watchETCDKey(keyPrefix)
}

func (er *ETCDResolver) watchETCDKey(keyPrefix string) {
    var addrList []resolver.Address

    // 读取ETCD,获取IP列表
    getResp, err := GClient.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
    if err != nil {
        fmt.Println("解析器读取ETCD,获取IP列表失败 err=", err.Error())
    } else {
        for index := range getResp.Kvs {
            fmt.Println("初始IP地址是:", strings.TrimPrefix(string(getResp.Kvs[index].Key), keyPrefix))
            addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[index].Key), keyPrefix)})
        }
    }

    er.resolverConn.NewAddress(addrList)
    // er.resolverConn.UpdateState(resolver.State{Addresses:addrList})

    // 监控ETCD中目标数据的变化
    watchChan := GClient.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
    for chanEle := range watchChan {
        for _, ev := range chanEle.Events {
            // 根据IP变化情况,解析器更新IP地址列表
            addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
            switch ev.Type {
            case mvccpb.PUT:
                if !exist(addrList, addr) {
                    addrList = append(addrList, resolver.Address{Addr: addr})
                    er.resolverConn.NewAddress(addrList)
                    fmt.Println("插入新地址 address=", addr)
                }
            case mvccpb.DELETE:
                if s, ok := remove(addrList, addr); ok {
                    addrList = s
                    er.resolverConn.NewAddress(addrList)
                    fmt.Println("删除老地址 address=", addr)
                }
            }
        }
    }
}

func exist(l []resolver.Address, addr string) bool {
    for i := range l {
        if l[i].Addr == addr {
            return true
        }
    }

    return false
}

func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
    for i := range s {
        if s[i].Addr == addr {
            s[i] = s[len(s)-1]
            return s[:len(s)-1], true
        }
    }

    return nil, false
}

// 实现Resolver接口类型
func (er *ETCDResolver) ResolveNow(rn resolver.ResolveNowOptions) {
    fmt.Println("ETCDResolver ResolveNow")
}

func (er *ETCDResolver) Close() {
    fmt.Println("ETCDResolver Close")
}

3.调测
  下载代码后,分别编译服务端和客户端代码,然后,启动多个服务端和一个客户端,通过关闭、重启服务端的方式查看客户端的服务发现、监控,另外,感兴趣的同学还可以看下断网、ETCD关闭等情况下客户端运行状态,本文不再细述。


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2281 次点击  ∙  2 赞  
加入收藏 微博
2 回复  |  直到 2020-10-10 09:34:37
caryx
caryx · #1 · 5年之前

专业,学习了!

tianqy
tianqy · #2 · 5年之前

补充: 在使用ETCD进行服务注册、发现时,分服务端和客户端实现,服务端主要是进行注册处理,分为两个for循环,第一个for循环用于注册检测、检测IP是否注册了;第二for循环用于注册续约、用于注册IP的租约续约;客户端主要是做命名解析,借助GRPC的resolver,通过将etcd中存储的服务端IP列表传给resolver实现客户端对服务端访问,负载平衡策略也是在GRPC中指定、一般设置为轮询。

添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传