etcd实现服务发现和注册

bytemode · · 1753 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

## 原理 etcd实现服务发现和注册,使用的是kv存储、租约、watch. 向etcd 注册 该服务(其实就是 存一个值)然后向etcd 发送心跳,当etcd 没有检测到心跳就会 把这个键值对 删了(这整个动作是etcd里的租约模式),网关那边 就只需要 watch 这个 key ,就能够知道 所有服务的所有动态了. 注册的时候可以使用前缀这样在watch的时候可以watch所有的服务器. ## 服务注册 1. 租约模式,客户端申请一个租约设置过期时间,keepalive没个固定时间进行租约续期,通过租约存储key.过期不续租则etcd会删除租约上的所有key 2. 相同服务存储的key的前缀可以设置成一样 3. 注册服务就是向服务端使用租约模式写入一个key ``` package main import ( "context" "fmt" "time" "go.etcd.io/etcd/clientv3" ) //创建租约注册服务 type ServiceRegister struct { etcdClient *clientv3.Client //etcd client lease clientv3.Lease //租约 leaseResp *clientv3.LeaseGrantResponse //设置租约时间返回 canclefunc func() //租约撤销 //租约keepalieve相应chan keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse key string //注册的key } func NewServiceRegister(addr []string, timeNum int64) (*ServiceReg, error) { conf := clientv3.Config{ Endpoints: addr, DialTimeout: 5 * time.Second, } var ( client *clientv3.Client ) //连接etcd if clientTem, err := clientv3.New(conf); err == nil { etcdClient = clientTem } else { return nil, err } ser := &ServiceRegister{ etcdClient: client, } //申请租约设置时间keepalive if err := ser.setLease(timeNum); err != nil { return nil, err } //监听续租相应chan go ser.ListenLeaseRespChan() return ser, nil } //设置租约 func (this *ServiceRegister) setLease(timeNum int64) error { //申请租约 lease := clientv3.NewLease(this.etcdClient) //设置租约时间 leaseResp, err := lease.Grant(context.TODO(), timeNum) if err != nil { return err } //设置续租 定期发送需求请求 ctx, cancelFunc := context.WithCancel(context.TODO()) leaseRespChan, err := lease.KeepAlive(ctx, leaseResp.ID) if err != nil { return err } this.lease = lease this.leaseResp = leaseResp this.canclefunc = cancelFunc this.keepAliveChan = leaseRespChan return nil } //监听 续租情况 func (this *ServiceRegister) ListenLeaseRespChan() { for { select { case leaseKeepResp := <-this.keepAliveChan: if leaseKeepResp == nil { fmt.Printf("已经关闭续租功能\n") return } else { fmt.Printf("续租成功\n") } } } } //通过租约 注册服务 func (this *ServiceRegister) PutService(key, val string) error { //带租约的模式写入数据即注册服务 kv := clientv3.NewKV(this.etcdClient) _, err := kv.Put(context.TODO(), key, val, clientv3.WithLease(this.leaseResp.ID)) return err } //撤销租约 func (this *ServiceRegister) RevokeLease() error { this.canclefunc() time.Sleep(2 * time.Second) _, err := this.lease.Revoke(context.TODO(), this.leaseResp.ID) return err } func main() { ser,_ := NewServiceRegister([]string{"127.0.0.1:2379"}, 5) ser.PutService("/server/node1", "node1") select{} } ``` ## 服务发现 1. 创建一个client 连到etcd. 2. 匹配到所有相同前缀的 key. 存储信息到本地 3. watch这个key前缀,当有增加或者删除的时候就修改本地 4. 本地维护server的列表 ``` import ( "go.etcd.io/etcd/clientv3" "time" "context" "go.etcd.io/etcd/mvcc/mvccpb" "sync" "log" ) type ServiceDiscovery struct { client *clientv3.Client serverList map[string]string lock sync.Mutex } func NewServiceDiscovery (addr []string)( *ServiceDiscovery, error){ conf := clientv3.Config{ Endpoints: addr, DialTimeout: 5 * time.Second, } if client, err := clientv3.New(conf); err == nil { return &ClientDis{ client:client, serverList:make(map[string]string), }, nil } else { return nil ,err } } func (this * ServiceDiscovery) GetService(prefix string) ([]string ,error){ //使用key前桌获取所有的etcd上所有的server resp, err := this.client.Get(context.Background(), prefix, clientv3.WithPrefix()) if err != nil { return nil, err } //解析出所有的server放入本地 addrs := this.extractAddrs(resp) //warch server前缀 将变更写入本地 go this.watcher(prefix) return addrs ,nil } // 监听key前缀 func (this *ServiceDiscovery) watcher(prefix string) { //监听 返回监听事件chan rch := this.client.Watch(context.Background(), prefix, clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case mvccpb.PUT: //修改或者新增 this.SetServiceList(string(ev.Kv.Key),string(ev.Kv.Value)) case mvccpb.DELETE: //删除 this.DelServiceList(string(ev.Kv.Key)) } } } } func (this *ServiceDiscovery) extractAddrs(resp *clientv3.GetResponse) []string { addrs := make([]string,0) if resp == nil || resp.Kvs == nil { return addrs } for i := range resp.Kvs { if v := resp.Kvs[i].Value; v != nil { this.SetServiceList(string(resp.Kvs[i].Key),string(resp.Kvs[i].Value)) addrs = append(addrs, string(v)) } } return addrs } func (this *ServiceDiscovery) SetServiceList(key,val string) { this.lock.Lock() defer this.lock.Unlock() this.serverList[key] = string(val) log.Println("set data key :",key,"val:",val) } func (this *ServiceDiscovery) DelServiceList(key string) { this.lock.Lock() defer this.lock.Unlock() delete(this.serverList,key) log.Println("del data key:", key) } func (this *ServiceDiscovery) SerList2Array()[]string { this.lock.Lock() defer this.lock.Unlock() addrs := make([]string,0) for _, v := range this.serverList { addrs = append(addrs,v) } return addrs } func main () { cli,_ := NewServiceDiscovery([]string{"127.0.0.1:2379"}) cli.GetService("/server") select {} } ```

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1753 次点击  
加入收藏 微博
被以下专栏收入,发现更多相似内容
上一篇:grpc快速使用
下一篇:etcd快速入门
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传