本文整理一下思路,编写示例(golang),以便加深etcd的理解
大致如下,监听程序为master,服务为service
1 service 启动时向etcd注册自己的信息,即注册到services/ 这个目录
2 service 可能异常推出,需要维护一个TTL(V3 使用 lease实现),类似于心跳,挂掉了,master可以监听到
3 master监听 services/ 目录下的所有服务,根据不同action(V3有put/delete),进行处理
service注册
提供 key(service name), value(serviceInfo)进行registered
start 启动后,执行keeplive(), 维护心跳,挂掉时revoke()
同时监听 stop chan, 相当于unregistered
package discovery import ( "github.com/coreos/etcd/clientv3" "context" "time" "log" "encoding/json" "errors" ) //the detail of service type ServiceInfo struct{ IP string } type Service struct { Name string Info ServiceInfo stop chan error leaseid clientv3.LeaseID client *clientv3.Client } func NewService(name string, info ServiceInfo,endpoints []string) (*Service, error) { cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 2*time.Second, }) if err != nil { log.Fatal(err) return nil, err } return &Service { Name: name, Info: info, stop: make (chan error), client: cli, },err } func (s *Service) Start() error { ch, err := s.keepAlive() if err != nil { log.Fatal(err) return err } for { select { case err := <- s.stop: s.revoke() return err case <- s.client.Ctx().Done(): return errors.New("server closed") case ka, ok := <-ch: if !ok { log.Println("keep alive channel closed") s.revoke() return nil } else { log.Printf("Recv reply from service: %s, ttl:%d", s.Name, ka.TTL) } } } } func (s *Service) Stop() { s.stop <- nil } func (s *Service) keepAlive() (<-chan *clientv3.LeaseKeepAliveResponse, error){ info := &s.Info key := "services/" + s.Name value, _ := json.Marshal(info) // minimum lease TTL is 5-second resp, err := s.client.Grant(context.TODO(), 5) if err != nil { log.Fatal(err) return nil, err } _, err = s.client.Put(context.TODO(), key, string(value), clientv3.WithLease(resp.ID)) if err != nil { log.Fatal(err) return nil, err } s.leaseid = resp.ID return s.client.KeepAlive(context.TODO(), resp.ID) } func (s *Service) revoke() error { _, err := s.client.Revoke(context.TODO(), s.leaseid) if err != nil { log.Fatal(err) } log.Printf("servide:%s stop\n", s.Name) return err }
监听程序Master
提供监听路径path,启动master,当put时加入 map, delete时 从map去掉
package discovery import ( "github.com/coreos/etcd/clientv3" "context" "log" "time" "encoding/json" "fmt" ) type Master struct { Path string Nodes map[string] *Node Client *clientv3.Client } //node is a client type Node struct { State bool Key string Info ServiceInfo } func NewMaster(endpoints []string, watchPath string) (*Master,error) { cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: time.Second, }) if err != nil { log.Fatal(err) return nil,err } master := &Master { Path: watchPath, Nodes: make(map[string]*Node), Client: cli, } go master.WatchNodes() return master,err } func (m *Master) AddNode(key string,info *ServiceInfo) { node := &Node{ State: true, Key: key, Info: *info, } m.Nodes[node.Key] = node } func GetServiceInfo(ev *clientv3.Event) *ServiceInfo { info := &ServiceInfo{} err := json.Unmarshal([]byte(ev.Kv.Value), info) if err != nil { log.Println(err) } return info } func (m *Master) WatchNodes() { rch := m.Client.Watch(context.Background(), m.Path, clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case clientv3.EventTypePut: fmt.Printf("[%s] %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) info := GetServiceInfo(ev) m.AddNode(string(ev.Kv.Key),info) case clientv3.EventTypeDelete: fmt.Printf("[%s] %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) delete(m.Nodes, string(ev.Kv.Key)) } } } }
service使用示例,20s后断开
package main import ( "fmt" dis "discovery" "log" "time" ) func main() { serviceName := "s-test" serviceInfo := dis.ServiceInfo{IP:"192.168.1.26"} s, err := dis.NewService(serviceName, serviceInfo,[]string { "http://192.168.1.17:2379", "http://192.168.1.17:2479", "http://192.168.1.17:2579", }) if err != nil { log.Fatal(err) } fmt.Printf("name:%s, ip:%s\n", s.Name, s.Info.IP) go func() { time.Sleep(time.Second*20) s.Stop() }() s.Start() }
master使用示例
package main import ( "log" "time" "fmt" dis "discovery" ) func main() { m, err := dis.NewMaster([]string{ "http://192.168.1.17:2379", "http://192.168.1.17:2479", "http://192.168.1.17:2579", }, "services/") if err != nil { log.Fatal(err) } for { for k, v := range m.Nodes { fmt.Printf("node:%s, ip=%s\n", k, v.Info.IP) } fmt.Printf("nodes num = %d\n",len(m.Nodes)) time.Sleep(time.Second * 5) } }
执行结果(需要提前搭建 etcd服务器,可以到github下载,文末提供简单启动脚本)
go run master
go run service
etcd服务器启动脚本(执行环境 CentOS6.5)
#!/bin/sh work_path=$(dirname $0) cd ./${work_path} #echo $(pwd) #echo `date` case $1 in 1) echo -e "[1]start first server\n" ./etcd --name cd0 --initial-advertise-peer-urls http://127.0.0.1:2380 \ --listen-peer-urls http://127.0.0.1:2380 \ --listen-client-urls http://192.168.1.17:2379,http://127.0.0.1:2379 \ --advertise-client-urls http://192.168.1.17:2379,http://127.0.0.1:2379 \ --initial-cluster-token etcd-cluster-1 \ --initial-cluster cd0=http://127.0.0.1:2380,cd1=http://127.0.0.1:2480,cd2=http://127.0.0.1:2580 \ --initial-cluster-state new ;; 2) echo -e "[2]start second server\n" ./etcd --name cd1 --initial-advertise-peer-urls http://127.0.0.1:2480 \ --listen-peer-urls http://127.0.0.1:2480 \ --listen-client-urls http://192.168.1.17:2479,http://127.0.0.1:2479 \ --advertise-client-urls http://192.168.1.17:2479,http://127.0.0.1:2479 \ --initial-cluster-token etcd-cluster-1 \ --initial-cluster cd0=http://127.0.0.1:2380,cd1=http://127.0.0.1:2480,cd2=http://127.0.0.1:2580 \ --initial-cluster-state new ;; 3) echo -e "[3]start third server\n" ./etcd --name cd2 --initial-advertise-peer-urls http://127.0.0.1:2580 \ --listen-peer-urls http://127.0.0.1:2580 \ --listen-client-urls http://192.168.1.17:2579,http://127.0.0.1:2579 \ --advertise-client-urls http://192.168.1.17:2579,http://127.0.0.1:2579 \ --initial-cluster-token etcd-cluster-1 \ --initial-cluster cd0=http://127.0.0.1:2380,cd1=http://127.0.0.1:2480,cd2=http://127.0.0.1:2580 \ --initial-cluster-state new ;; *) echo "error paramater" ;; esac
完整代码,请移步 git@github.com:moonlong/etcd-discovery.git
欢迎斧正
完!
有疑问加站长微信联系(非本文作者)