战旗直播基于consul服务注册与发现的GRPC服务实战与感想

ample · · 1513 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

前言

鄙人关注consul也有一段时间了,从2017年开始了解它的一些特性,它能帮助解决哪些问题,然后怎么应用到微服务中去。随着时间的推移,微服务的发展也是非常的迅速,可以说日新月异,每天都在变化。consul工具所提供的功能也在不断地新增和完善。OK,有些扯远了,咱们还是回到主题上来吧。
在微服务领域有个重要的概念——服务注册与发现。google或baidu一下,会发现有大量的关于服务注册发现的文章、博客等,有基于consul的、也有基于etcd和zoomkeeper的,每个工具都有自己的特点和优势,也有一定的相似性,比如它们都可以实现服务注册与发现,也都可以实现kv存储等。但是它们也有一定的区别,比如consul重点是服务注册与发现,其次是kv存储;像etcd重点是处理kv存储的功能,在kv存储上来实现服务的注册与发现。侧重点不一样,感兴趣的朋友可以去google或baidu一下,这里就不在叙述了。由于侧重点不同,依据战旗直播业务的实际情况,选择了consul来实现战旗后端的服务注册与发现。

consul能解决的问题

  1. 服务注册(注销)与发现
  2. 节点/服务监控
  3. KV存储——业务服务配置统一管理
  4. consul-template
  5. ACL
  6. DNS
  7. 其他

consul集群部署

依据consul的文档,consul集群中需要部署两种类型的节点:server节点和client节点。server节点推荐部署奇数个节点,有利于leader的选举过程快速的结束(偶数个节点可能需要多个选举过程才能选举出leader)。这里有几个概念:集群,leader选举,对这些原理感兴趣同学可以去google下.
战旗部署了5个server节点,其他节点都是client,也就是说每台服务上都有一个node。然后,战旗的服务通过localhost:8500地址向consul集群注册自己。部署结构1如下图所示:
Consul dep.png

思考:这样的部署结构存在一定的缺陷,如果某个节点的consul挂了,会直接影响该节点上的所有应用服务,应为它们都是通过localhost来跟consul进行通讯的。那怎么预防这样的情况呢?
这时备选方案plan B出台了.

  1. 通过coreDNS获取consul集群状况,来自动配置一个内部DNS路由条目。当访问consul服务时,自动路由到consul集群中的某个节点。
  2. 也可以部署两个nginx(主备),由nginx路由到consul集群中的某个节点。
  3. 如果您采用的是k8s环境,那恭喜您,可以省略很多工作来,直接在k8s中部署一个service来指向consul集群。

部署结构2如下图所示:
consul dep2.jpg

小结:为了提高consul集群的高可靠性,保证服务的正常运行。在结构1的基础上,同时又部署了DNS方案。当本节点的consul不可达时,通过DNS来与consul集群通讯。这里可能有些同学会问,为什么不直接采用结构2呢?这两个结构各有千秋,结构1通迅效率高,应用服务的健康检测都在当前节点完成,检测压力小。结构2虽无单节点风险,但存在consul负载不均衡等缺点。

具体consul怎么下载,怎么安装,怎么启动server,怎么加入集群就不讲了,大家可以参考官方文档:https://www.consul.io/docs/index.html

consul与grpc实战

服务注册与注销

当应用服务启动时,将自己注册到consul中,以便其他服务能及时发现该服务。

1.导入sdk,

import (
    consul "github.com/hashicorp/consul/api"
)

2.获取consul client对象

// @param uriStr string: consul通迅地址,默认http://localhost:8500; 当localhost不可达时,需要通过服务名来访问其他consul节点,如:http://consul:8500
// @param token string: 访问控制用
func MakeClient(uriStr string, token string) (*consul.Client, error) {

    uri, err := url.Parse(uriStr)
    if err != nil {
        logs.Error("url parse error: ", err)
        return nil, err
    }

    config := consulapi.DefaultConfig()
    config.Address = uriStr

    if len(token) > 0 {
        config.Token = token
    } else {
        config.Token = defaultToken
    }

    client, err := consulapi.NewClient(config)
    if err != nil {
        logs.Error("consul: ", uri.Scheme)
        return nil, err
    }

    return client, nil
}

3.准备工作

// @param svrName string: 要注册当服务名
// @param useType string: 对应consul中的tag, 可用于过滤
// @param svrPort int: 服务对应的端口号
// @param healthPort int: http检测时需要对应端口号,tcp检测默认当前端口
// @param healthType string: http或tcp,
// @param localIp string: 当前节点的内网IP,即其他服务能访问到的IP
func Prepare(svrName string, useType string, svrPort int, healthPort int, healthType string, localIp string) *consulapi.AgentServiceRegistration {
    ip := localIp 
    // 注册配置信息
    reg := &consul.AgentServiceRegistration{
        ID:      strings.ToLower(fmt.Sprintf("%s_%d", svrName, libs.Ip2Long(ip))), // 生成一个唯一当服务ID
        Name:    strings.ToLower(fmt.Sprintf("%s", svrName)), // 注册服务名
        Tags:    []string{strings.ToLower(useType)},// 标签
        Port:    svrPort, // 端口号
        Address: ip, // 所在节点ip地址
    }
    // 健康检测配置信息
    reg.Check = &consulapi.AgentServiceCheck{
        TCP:                            fmt.Sprintf("%s:%d", ip, svrPort),
        Timeout:                        "1s",
        Interval:                       "15s",
        DeregisterCriticalServiceAfter: "30s",// 30秒服务不可达时,注销服务
        Status:                         "passing",// 服务启动时,默认正常
    }

    if healthType == "http" {
        reg.Check.HTTP = fmt.Sprintf("http://%s:%d%s", reg.Address, healthPort, "/health") // http检测默认/health路径
        reg.Check.TCP = ""
    }
    // 启动http健康检测响应
    if len(reg.Check.HTTP) > 0 {
        RunHealthCheck(reg.Check.HTTP)
    }
    p.curRegistration = reg
    return reg
}

func RunHealthCheck(addr string) error {
    uri, err := url.Parse(addr)
    if err != nil {
        return err
    }

    http.HandleFunc(uri.Path, func(w http.ResponseWriter, r *http.Request) {
        w.Write([]byte("success"))
    })
    go http.ListenAndServe(uri.Host, nil)
    return nil
}

4.服务注册

var (
    // 获取consul.Client, 如果localhost不可达需要自动切换地址
    client := MakeClient("http:localhost:8500", "")
    // 配置要注册的服务信息
    registration = Prepare("ImSvr", "grpc", "10000", 30000, "http", "192.168.1.100")
)

// 
func Register() error {
    err := client.Agent().ServiceRegister(registration)
    if err != nil {
        return err
    }

    return nil
}

5.服务注销

//
func Deregister() error {
    svrId := registration.ID
    return client.Agent().ServiceDeregister(svrId)
}

小结:当这些服务注册/注销方法封装好后,在应用服务启动的时候,调用Register()方法进行注册;当应用服务退出的时候,调用Deregister()方法进行注销。当服务异常退出当时候,并不会调用Deregister()方法,那怎么办呢?放心,前面已经有说到健康检测的DeregisterCriticalServiceAfter字段,当服务不可达时,会自动注销服务。OK,到这里服务的自动化注册已经完成了。

服务发现

对于服务与服务之间的通迅有很多方式,有人直接用tcp/http;有人会考虑restfullapi,让接口处理起来更容易;有人用dubbo,抱紧ali大腿;咱们用grpc, 抱紧google大腿。通迅框架之间各有千秋,总之适合自己的才是最好的。感兴趣的盆友自己去google/百度。

google grpc框架里并没有实现如何基于consul进行服务发现(或许后期会加上),不过有DNS的服务发现。看了它的实现方式,大致懂了实现原理。OK,咱们就写个基于consul的服务发现。鄙人github中可直接用consulresolver(欢迎大家关注,如果有问题请提交issue会第一时间修改),目前只有roundrobin策略,后续会逐渐加入其他策略(如随机,权重随机,负载策略等).同时该项目
github.com/generalzgd/grpc-svr-frame
还对grpc作了一些简单的封装,有利于快速搭建grpc服务。
这里就不贴代码了。

OK,基于consul的服务发现有了,那怎么使用呢?
1.导入模块,会自动执行包中的Init方法

import (
    grpclb_consul `github.com/generalzgd/grpc-svr-frame/grpc-consul`
    ctrl `github.com/generalzgd/grpc-svr-frame/grpc-ctrl`
)

2.组合grpc的方法封装

type MyServer {
    ctrl.GrpcController
}

var (
    mySvr = MyServer{
        GrpcController:ctrl.MakeGrpcController(),
    }
)

func (p *MyServer) callSample(){
    // 设置5秒超时
    ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
    // 注意address字段,由“consul:///”字符串开头,表示要用consulresolver来解析,后面写上服务名,会自动解析到对应的服务地址。可以细看下对应的文件,然后理解是怎么运行的。
    cfg = yaml.EndpointConfig{
        Name: "ImSvr",
        Address: "consul:///imsvr", 
    }
    // 里面还封装了connection pool,以提高通迅效率
    clientConn,err := p.GetGrpcConnWithLB(cfg, ctx)
    if err != nil {
        return
    }
    // 以下是grpc框架的通用伪代码
    client := NewImSvrClient(clientConn.ClientConn)
    resp, err := client.Login(ctx, &LoginReq{})
    if err != nil {
        return
    }
    logs.Info("Got:", resp)
}

总结

至此基于consul的服务注册与发现已经整合到grpc通迅框架中,并且封装了grpc服务的一些常用方法,可用于快速的开发微服务。这个整合过程历经了很多辛酸,不仅分析了官方文档源代码,也吸收了其他在consul/grpc方面的贡献者经验,前前后后琢磨尝试了许许多多的失败,也走了很多的弯路。当然了,整个结构还是个雏形,还需要完善。例如,均衡负载策略,目前只实现了roundrobin, 后续还有随机、权重、负载策略等。

遇到的坑

咱们在部署consul的时候选择了一个老版本,以为老版本(1.4.4)会相对稳定些。在新加节点的时候,突然已有的services每隔几分钟会消失,然后又重现,然后又消失不停的重复。妈呀,出大问题了,跟运维老哥一起折腾了个吧小时,推测可能join server节点的时候,数据同步可能存在问题。期间也没做其他操作,就一个节点一个节点leave回退,发现leave最后加入的server节点后consul稳定了。稳定后再仔细分析原因,发现这个版本有bug,https://github.com/hashicorp/consul/issues/5518, 以后生产环境尽量用次新版本。这是血的教训!!!!!


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1513 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传