package clientselector
import (
"errors"
"math/rand"
"net"
"net/rpc"
"net/url"
"strconv"
"strings"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/client"
"github.com/smallnest/rpcx"
)
// EtcdClientSelector is used to select a rpc server from etcd.
//etcd 负载均衡器 结构体
type EtcdClientSelector struct {
EtcdServers []string //etcd 客户端连接地址列表
KeysAPI client.KeysAPI //etcd 连接客户端
ticker *time.Ticker //周期执行器
sessionTimeout time.Duration //连接超时时间
BasePath string //should endwith serviceName 服务路径或者服务名称
Servers []string //具体微服务地址列表
Group string //组名
clientAndServer map[string]*rpc.Client //map key 就提微服务地址 value 客户端连接
metadata map[string]string //
Latitude float64 //纬度
Longitude float64 //经度
WeightedServers []*Weighted //权重
SelectMode rpcx.SelectMode //选择器
dailTimeout time.Duration //连接超时时间
rnd *rand.Rand //序列号
currentServer int //当前第几个服务
len int //服务个数
HashServiceAndArgs HashServiceAndArgs //权重选择器
Client *rpcx.Client //rpcx客户端
}
// NewEtcdClientSelector creates a EtcdClientSelector
// 创建EtcdClientSelector结构体
func NewEtcdClientSelector(etcdServers []string, basePath string, sessionTimeout time.Duration, sm rpcx.SelectMode, dailTimeout time.Duration) *EtcdClientSelector {
selector := &EtcdClientSelector{
EtcdServers: etcdServers,
BasePath: basePath,
sessionTimeout: sessionTimeout,
SelectMode: sm,
dailTimeout: dailTimeout,
clientAndServer: make(map[string]*rpc.Client),
metadata: make(map[string]string),
rnd: rand.New(rand.NewSource(time.Now().UnixNano()))}
selector.start()
return selector
}
//SetClient set a Client in order that clientSelector can uses it
//客户端设置
func (s *EtcdClientSelector) SetClient(c *rpcx.Client) {
s.Client = c
}
//SetSelectMode sets SelectMode
//设置重选算法
func (s *EtcdClientSelector) SetSelectMode(sm rpcx.SelectMode) {
s.SelectMode = sm
}
//AllClients returns rpc.Clients to all servers
//直接返回微服务对应的直连客户端集合
func (s *EtcdClientSelector) AllClients(clientCodecFunc rpcx.ClientCodecFunc) []*rpc.Client {
var clients []*rpc.Client
for _, sv := range s.Servers {
ss := strings.Split(sv, "@")
//创建直连的客户端 http kcp (基于udp实现的协议) 或者https 以及golang支持的协议
c, err := rpcx.NewDirectRPCClient(s.Client, clientCodecFunc, ss[0], ss[1], s.dailTimeout)
if err == nil {
clients = append(clients, c)
}
}
return clients
}
//获取一个ETCD客户端
func (s *EtcdClientSelector) start() {
cli, err := client.New(client.Config{
Endpoints: s.EtcdServers,
Transport: client.DefaultTransport,
HeaderTimeoutPerRequest: s.sessionTimeout,
})
if err != nil {
return
}
s.KeysAPI = client.NewKeysAPI(cli)
s.pullServers()
// s.ticker = time.NewTicker(s.sessionTimeout)
// go func() {
// for range s.ticker.C {
// s.pullServers()
// }
// }()
go s.watch()
}
func (s *EtcdClientSelector) watch() {
watcher := s.KeysAPI.Watcher(s.BasePath, &client.WatcherOptions{
Recursive: true,
})
for {
res, err := watcher.Next(context.Background())
if err != nil {
break
}
//services are changed, we pull service again instead of processing single node
if res.Action == "expire" {
s.pullServers()
if !res.Node.Dir {
// clientAndServer delete the invalid client connection
removedServer := strings.TrimPrefix(res.Node.Key, s.BasePath+"/")
delete(s.clientAndServer, removedServer)
}
} else if res.Action == "set" || res.Action == "update" {
s.pullServers()
} else if res.Action == "delete" {
s.pullServers()
}
}
}
//
func (s *EtcdClientSelector) pullServers() {
//获取key对应的
resp, err := s.KeysAPI.Get(context.TODO(), s.BasePath, &client.GetOptions{
Recursive: true,
Sort: true,
})
//
if err == nil && resp.Node != nil {
if len(resp.Node.Nodes) > 0 {
var servers []string
for _, n := range resp.Node.Nodes {
servers = append(servers, strings.TrimPrefix(n.Key, s.BasePath+"/"))
}
s.Servers = servers
s.createWeighted(resp.Node.Nodes)
//set weight based on ICMP result
if s.SelectMode == rpcx.WeightedICMP {
for _, w := range s.WeightedServers {
server := w.Server.(string)
ss := strings.Split(server, "@")
host, _, _ := net.SplitHostPort(ss[1])
rtt, _ := Ping(host)
rtt = CalculateWeight(rtt)
w.Weight = rtt
w.EffectiveWeight = rtt
}
}
s.len = len(s.Servers)
if s.len > 0 {
s.currentServer = s.currentServer % s.len
}
} else {
// when the last instance is down, it should be deleted
s.clientAndServer = map[string]*rpc.Client{}
}
}
}
func (s *EtcdClientSelector) createWeighted(nodes client.Nodes) {
s.WeightedServers = make([]*Weighted, len(s.Servers))
var inactiveServers []int
for i, n := range nodes {
key := strings.TrimPrefix(n.Key, s.BasePath+"/")
s.WeightedServers[i] = &Weighted{Server: key, Weight: 1, EffectiveWeight: 1}
s.metadata[key] = n.Value
if v, err := url.ParseQuery(n.Value); err == nil {
w := v.Get("weight")
state := v.Get("state")
group := v.Get("group")
if (state != "" && state != "active") || (s.Group != group) {
inactiveServers = append(inactiveServers, i)
}
if w != "" {
if weight, err := strconv.Atoi(w); err == nil {
s.WeightedServers[i].Weight = weight
s.WeightedServers[i].EffectiveWeight = weight
}
}
}
}
s.removeInactiveServers(inactiveServers)
}
func (s *EtcdClientSelector) removeInactiveServers(inactiveServers []int) {
i := len(inactiveServers) - 1
for ; i >= 0; i-- {
k := inactiveServers[i]
removedServer := s.Servers[k]
s.Servers = append(s.Servers[0:k], s.Servers[k+1:]...)
s.WeightedServers = append(s.WeightedServers[0:k], s.WeightedServers[k+1:]...)
c := s.clientAndServer[removedServer]
if c != nil {
delete(s.clientAndServer, removedServer)
c.Close() //close connection to inactive server
}
}
}
func (s *EtcdClientSelector) getCachedClient(server string, clientCodecFunc rpcx.ClientCodecFunc) (*rpc.Client, error) {
c := s.clientAndServer[server]
if c != nil {
return c, nil
}
ss := strings.Split(server, "@") //
c, err := rpcx.NewDirectRPCClient(s.Client, clientCodecFunc, ss[0], ss[1], s.dailTimeout)
s.clientAndServer[server] = c
return c, err
}
// Select returns a rpc client
func (s *EtcdClientSelector) Select(clientCodecFunc rpcx.ClientCodecFunc, options ...interface{}) (*rpc.Client, error) {
if s.len == 0 {
return nil, errors.New("No available service")
}
switch s.SelectMode {
case rpcx.RandomSelect:
s.currentServer = s.rnd.Intn(s.len)
server := s.Servers[s.currentServer]
return s.getCachedClient(server, clientCodecFunc)
case rpcx.RoundRobin:
s.currentServer = (s.currentServer + 1) % s.len //not use lock for performance so it is not precise even
server := s.Servers[s.currentServer]
return s.getCachedClient(server, clientCodecFunc)
case rpcx.ConsistentHash:
if s.HashServiceAndArgs == nil {
s.HashServiceAndArgs = JumpConsistentHash
}
s.currentServer = s.HashServiceAndArgs(s.len, options)
server := s.Servers[s.currentServer]
return s.getCachedClient(server, clientCodecFunc)
case rpcx.WeightedRoundRobin, rpcx.WeightedICMP:
server := nextWeighted(s.WeightedServers).Server.(string)
return s.getCachedClient(server, clientCodecFunc)
case rpcx.Closest:
closestServers := getClosestServer(s.Latitude, s.Longitude, s.metadata)
selected := s.rnd.Intn(len(closestServers))
return s.getCachedClient(closestServers[selected], clientCodecFunc)
default:
return nil, errors.New("not supported SelectMode: " + s.SelectMode.String())
}
}
有疑问加站长微信联系(非本文作者)