1.简介<br />
  最近在做服务拆分,目标就是先抽出一个流量解析模块,主要是负责流量的接入、协议的转化,将所有媒体请求都以统一接口访问后续服务,因为时间紧迫,服务间访问、负载均衡采用的是比较传统的处理方案——新服务通过内网LB访问后续服务,该种方案的最大问题就是在原有上、下层服务之间又加了一层,即数据链路又增加了一环,增加性能开销;后来利用周末时间,研究了软负载策略(又称客户端负载策略 ),该方案的特点就是将LB放在服务消费方,服务消费方通过LB组件获知可以提供服务的目标机器,然后与之建立连接、直接进行访问。<br />
  服务发现与负载均衡策略参考 : https://www.jianshu.com/p/f49c3a29b5dc?from=timeline<br />
2.实现<br />
  由于平时工作也比较忙,本文比较偏实践、没有详细介绍一些理论知识,期间提到的内容,如有不了解的,还需要同学自己搜资料了解下,Go语言中,GRPC和ETCD就能很好的实现服务发现和负载均衡的处理,下面就开始讲下实现过程。<br /><br />
2.1准备<br />
  正所谓“工欲善其事,必先利其器”,开始之前,先把工具准备,为了处理的方便,建立先把Go升级到高版本,比如:1.14版本,主要是利用它的module功能,然后就是下载etcd并启动etcd服务,剩下的就是要安装下protobuf,后面要处理proto文件;这几块处理都比较简单,基本不会遇到啥大问题,基本网上搜搜资料都可以解决,下面说下比较麻烦的问题,就是GRPC和ETCD版本兼容的问题,之前这块被搞的吐血,基本有两个方案:<br />
  1)向上对齐,修改etcd源码,参考地址是 : https://github.com/etcd-io/etcd/pull/11564/files<br />
  2)向下对齐,GPRC和protoc都使用降低版本,参考地址是 : https://learnku.com/articles/43758<br />
  我这边采用的是方案二,建议准备好环境后再开工;关注ETCD这块,最近还在研究,后面会详细介绍它的灵魂——Raft协议。<br />
2.2方案<br />
  本方案的处理方式是,客户端经自身LB组件知道服务端地址列表,然后建立连接、进行访问;客户端怎么获取服务端地址列表呢?这就需要借助ETCD了,ETCD是客户端与服务端的桥梁——服务端向ETCD注册服务地址、维持心跳,客户端负责监听变化、更新地址列表,下面围绕这几个点细讨论下<br />
2.2.1服务注册<br />
  按照理想情况,服务启动后将自身服务地址注册到ETCD上即可,看似处理过程很简单,但如果考虑到实际情况,就不可能这样简单处理了,线上实际情况是——服务器存在各种各样的断电、断网风险,比如:网线被工程挖断、被工作人员踢掉,机柜断电,服务宕机等,所以服务在ETCD上注册地址是个技术活,要考虑各种可能发现的异常情况,总的来说,要考虑以下几种情况:<br />
  1)服务是正常的,但是,没注册上、导致无法为客户端提供服务;<br />
  2)服务是不正常的,但是,没取消注册、导致客户端还在继续请求;<br />
  综上所述,服务端在注册时,不能永久注册,注册地址是有过期时间的,对此,服务端要有定时检测机制,类似心跳检测,查看是否注册成功,如果服务注册失败,还要进行补注册,而补注册后,还要支持对数据的“续命”操作,即数据过期后自动续约。<br />
2.2.2服务发现<br />
  客户端做的事情主要是获取地址列表、与服务端建立连接,考虑到服务端地址存在变化的情况,故客户端还要支持地址监测、及时变更服务地址的处理,上述处理可以借助GRPC的命名解析功能;负载均衡的策略也是在客户端处理的,拨号时会指定策略,一般采取轮询策略。<br />
2.3代码<br />
  围绕方案实现处理代码,本部分主要是看下注册与发现的代码,完整的代码处理可以去Git上下载,地址是 : <br />   https://github.com/JackBelief/go_module_test.git <br />
  本部分代码结构是:<br />
![image.png](https://static.studygolang.com/200719/e81198286712e0597a0b16fbf13d5b14.png)
  其中,server.go是服务端代码,client.go是客户端代码,config是用于服务启动参数解析(引入了viper),etcd_proc完成对ETCD的各种操作处理,protoes定义GRPC契约,service完成GRPC服务结构定义。<br />
2.3.1服务端代码<br />
  服务端代码会启动两个协程,一个用于注册的心跳检测;另一个用于注册的续约处理,可以细看注释:<br />
```
func RegisterETCDServer(addr string) {
// 服务注册
registerServer(addr)
}
func registerServer(addr string) {
var err error
// 创建ETCD的客户端
if GClient == nil {
GClient, err = newETCDClient()
if err != nil {
fmt.Println("ectd 客户端创建失败 error=", err.Error())
return
}
}
fmt.Println("ectd 客户端创建成功")
// 定时循环检测,查看向ETCD注册服务是否正常
// 每台服务向ETCD注册自己的IP地址,定时检测注册内容是否还在
ticker := time.NewTicker(time.Second * time.Duration(5))
go func() {
for {
getResp, err := GClient.Get(context.Background(), ETCDServerPrefix+addr)
if err != nil {
fmt.Println("etcd出现异常,key获取异常,key=", ETCDServerPrefix+addr, " error=", err.Error())
} else if getResp.Count == 0 {
fmt.Println("etcd没有目标数据,需要补数据,key=", ETCDServerPrefix+addr)
putData(ETCDServerPrefix+addr, addr)
} else {
fmt.Println("etcd目标数据正常,key=", ETCDServerPrefix+addr)
}
<-ticker.C
}
}()
return
}
func newETCDClient() (*clientv3.Client, error) {
config := clientv3.Config{
Endpoints: []string{"121.42.161.154:2379"},
DialTimeout: 5 * time.Second,
}
return clientv3.New(config)
}
func putData(key, value string) {
leaseResp, err := GClient.Grant(context.Background(), 5)
if err != nil {
fmt.Println("etcd申请租约失败 key=", key, " error=", err.Error())
return
}
_, err = GClient.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))
if err != nil {
fmt.Println("etcd写入数据失败 key=", key, " error=", err.Error())
return
}
kaRespChan, err := GClient.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
fmt.Println("etcd租约续约失败 key=", key, "id=", leaseResp.ID, " error=", err.Error())
return
}
// 定期查看续约结果
go func() {
for {
select {
case respData := <-kaRespChan:
if kaRespChan == nil {
fmt.Println("管道关闭,出现异常,退出 key=", key)
return
} else {
if respData == nil {
fmt.Println("没有数据,可能是etcd关闭、也可能是网络异常,退出 key=", key)
return
} else {
fmt.Println("续约成功 key=", key)
}
}
}
time.Sleep(1 * time.Second)
}
}()
return
}
func UnRegisterETCDServer(addr string) {
// 服务取消注册
unRegisterServer(addr)
}
func unRegisterServer(addr string) {
var err error
// 创建ETCD的客户端
if GClient == nil {
GClient, err = newETCDClient()
if err != nil {
fmt.Println("ectd 客户端创建失败 error=", err.Error())
return
}
}
// 删除服务注册数据
_, err = GClient.Delete(context.Background(), ETCDServerPrefix+addr)
if err != nil {
fmt.Println("服务关闭,etcd删除数据失败 key=", ETCDServerPrefix+addr, " error=", err.Error())
return
} else {
fmt.Println("服务关闭,etcd成功删除数据 key=", ETCDServerPrefix+addr)
return
}
}
```
2.3.2客户端代码<br />
  客户端除了发起GRPC调用处理业务外,还要实现服务发现、监控变化的处理,具体可以细看注释:<br />
```
/*****************************************************************************************************
Builder 是接口类型,用于创建命名解析器,可监视命名空间是否发生变化,其方法有:
1) Scheme() string // 返回解析器支持的方案
2) Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) // 创建解析器
Resolver 是接口类型,用于监控目标变化,当目标发生变化时,会相应地更新地址、服务配置,其方法有:
1) Close() // 关闭解析器
2) ResolveNow(ResolveNowOptions) // 备用接口,GRPC可以再次调用用于目标的解析
客户端要实现以上接口,从而实现服务发现、变更
*****************************************************************************************************/
func NewResolver() resolver.Builder {
return &ETCDResolver{rawAddr: "121.42.161.154:2379"}
}
type ETCDResolver struct {
rawAddr string // etcd服务地址,多个地址要使用分隔符
resolverConn resolver.ClientConn // 解析器链接对象
}
// 实现Builder接口类型
func (er *ETCDResolver) Scheme() string {
return ETCDSchema
}
func (er *ETCDResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// 构建解析器,解析器只负责对目标的更新,而对目标的监控由用户部分完成,
var err error
if GClient == nil {
GClient, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(er.rawAddr, ";"),
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
}
// 解析器监控变化
er.resolverConn = cc
fmt.Println("resolver create success")
go er.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
return er, nil
}
func (er *ETCDResolver) watch(keyPrefix string) {
for {
er.watchETCD(keyPrefix)
time.Sleep(1 * time.Second)
}
}
func (er *ETCDResolver) watchETCD(keyPrefix string) {
defer func() {
if err := recover(); err != nil {
fmt.Println("watch error =", err)
}
}()
er.watchETCDKey(keyPrefix)
}
func (er *ETCDResolver) watchETCDKey(keyPrefix string) {
var addrList []resolver.Address
// 读取ETCD,获取IP列表
getResp, err := GClient.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
if err != nil {
fmt.Println("解析器读取ETCD,获取IP列表失败 err=", err.Error())
} else {
for index := range getResp.Kvs {
fmt.Println("初始IP地址是:", strings.TrimPrefix(string(getResp.Kvs[index].Key), keyPrefix))
addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[index].Key), keyPrefix)})
}
}
er.resolverConn.NewAddress(addrList)
// er.resolverConn.UpdateState(resolver.State{Addresses:addrList})
// 监控ETCD中目标数据的变化
watchChan := GClient.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
for chanEle := range watchChan {
for _, ev := range chanEle.Events {
// 根据IP变化情况,解析器更新IP地址列表
addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
switch ev.Type {
case mvccpb.PUT:
if !exist(addrList, addr) {
addrList = append(addrList, resolver.Address{Addr: addr})
er.resolverConn.NewAddress(addrList)
fmt.Println("插入新地址 address=", addr)
}
case mvccpb.DELETE:
if s, ok := remove(addrList, addr); ok {
addrList = s
er.resolverConn.NewAddress(addrList)
fmt.Println("删除老地址 address=", addr)
}
}
}
}
}
func exist(l []resolver.Address, addr string) bool {
for i := range l {
if l[i].Addr == addr {
return true
}
}
return false
}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
for i := range s {
if s[i].Addr == addr {
s[i] = s[len(s)-1]
return s[:len(s)-1], true
}
}
return nil, false
}
// 实现Resolver接口类型
func (er *ETCDResolver) ResolveNow(rn resolver.ResolveNowOptions) {
fmt.Println("ETCDResolver ResolveNow")
}
func (er *ETCDResolver) Close() {
fmt.Println("ETCDResolver Close")
}
```
3.调测<br />
  下载代码后,分别编译服务端和客户端代码,然后,启动多个服务端和一个客户端,通过关闭、重启服务端的方式查看客户端的服务发现、监控,另外,感兴趣的同学还可以看下断网、ETCD关闭等情况下客户端运行状态,本文不再细述。
有疑问加站长微信联系(非本文作者)