组件
server
grpc的服务端首先需要三部分的组件:
- service为提供客户端响应;
- register是服务端每次启动时将自己的地址注册给注册中心;
- unregister是服务端每次终止时从注册中心清除掉自己的注册信息;
register
func Register(client *etcd3.Client, target, key, val string, stopSignal chan os.Signal) error {
go func() {
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
ttl := 10
resp, err := client.Grant(context.Background(), int64(ttl))
if err != nil {
log.Println("grant lease error ", err)
}
_, err = client.Get(context.Background(), key)
if err != nil {
if err == rpctypes.ErrKeyNotFound {
if _, err = client.Put(context.Background(), key, val, etcd3.WithLease(resp.ID)); err != nil {
log.Printf("put %+v in etcd error:%+v", val, err)
}
} else {
log.Printf("get from etcd error:%+v", err)
}
} else {
if _, err = client.Put(context.Background(), key, val, etcd3.WithLease(resp.ID)); err != nil {
log.Printf("put %+v in etcd error:%+v", val, err)
}
}
select {
case <-stopSignal:
return
default:
}
}
}
}()
return nil
}
主要逻辑说明:
首先etcd client去etcd获取该service实例的key,若获取不到,则往etcd中存入该实例的key,val,key为“服务名/本次启动的地址”,val为本次启动的地址;若获取得到,说明该key已经被存过了,只需要keepalive即可;注意,register是一个无限循环的协程,会不断检查当前实例的注册情况,并且是有时效性的,防止该实例意外退出没有清理掉自己的注册信息而被客户端将请求发送到该实例上。
unregister
func Unregister(client *etcd3.Client, key string) error {
var err error
if _, err := client.Delete(context.Background(), key); err != nil {
log.Printf("grpclb: unregister '%s' failed: %s", key, err.Error())
} else {
log.Printf("grpclb: unregister '%s' ok.", key)
}
return err
}
主要逻辑:注销注册信息是在服务正常退出时被调用,将自身的注册信息从etcd中清理掉,防止之后继续接受客户端的请求。
service
func RungGRPCServer(grpcPort int16) {
svcKey := fmt.Sprintf("%+v/%+v/127.0.0.1:%+v", lb.PREFIX, SERVICE_NAME, *port)
svcVal := fmt.Sprintf("127.0.0.1:%+v", *port)
// 启动一个grpc server
grpcServer := grpc.NewServer()
// 绑定服务实现 RegisterHelloWorldServiceServer
client, err := etcd3.New(etcd3.Config{
Endpoints: strings.Split(ETCD_ADDR, ","),
})
if err != nil {
log.Fatalf("creat etcd3 client failed:%+v", err)
}
gs := &GreeterService{client}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
go func() {
s := <-ch
log.Printf("receive stop signal:%+v", s)
lb.Unregister(gs.etcdCli, svcKey)
os.Exit(1)
}()
lb.Register(gs.etcdCli, ETCD_ADDR, svcKey, svcVal, ch)
//实现了GreeterService的所有方法才能注册给grpcServer
api.RegisterGreeterServer(grpcServer, gs)
// 监听端口
listen, e := net.Listen("tcp", fmt.Sprintf(":%+v", *port))
if e != nil {
log.Fatal(e)
}
// 绑定监听端口
log.Printf("serve gRPC server: 127.0.0.1:%+v", grpcPort)
if err := grpcServer.Serve(listen); err != nil {
log.Printf("failed to serve: %v", err)
return
}
}
主要逻辑:启动实例,注册到注册中心,并且起一个协程用来监听退出信息,若退出则调用注销函数注销本次启动实例的注册信息;
client
client
func main() {
r := lb.NewResolver(GREETER_SERVICE)
//r需要实现naming.Resolver的Resolve接口
b := grpc.RoundRobin(r)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, ETCD_ADDR, grpc.WithInsecure(), grpc.WithBalancer(b))
time.Sleep(5 * time.Second)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := api.NewGreeterClient(conn)
name := "world"
if len(os.Args) > 1 {
name = os.Args[1]
}
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
log.Println("start to request...")
r, err := c.SayHello(context.Background(), &api.HelloRequest{Name: name})
if err != nil {
log.Fatalf("call say hello fail: %v", err)
}
log.Println(r.Reply)
}
}
}
客户端想要实现负载均衡的功能,需要在dial选项中加上grpc.WithBalancer(b)
,可以看到有两个组件:r := lb.NewResolver(GREETER_SERVICE)
,b := grpc.RoundRobin(r)
。resolver需要想要访问服务的服务名以及从哪里去寻找服务名对应的地址;最后会将其作为参数用来初始化balancer。
resolver
type resolver struct {
serviceName string
}
func NewResolver(serviceName string) *resolver {
return &resolver{
serviceName: serviceName,
}
}
func (r *resolver) Resolve(target string) (naming.Watcher, error) {
if r.serviceName == "" {
log.Println("no service name provided")
return nil, fmt.Errorf("no service name provided")
}
client, err := etcd3.New(etcd3.Config{
Endpoints: strings.Split(target, ","),
})
if err != nil {
return nil, fmt.Errorf("grpclb: create etcd3 client failed: %s", err.Error())
}
return &watcher{client, false, r.serviceName}, nil
}
使用resolver实现了naming.Resolver接口,这个接口只有一个方法:Resolve。
type Resolver interface {
// Resolve creates a Watcher for target.
Resolve(target string) (Watcher, error)
}
根据client给定的target返回一个watcher。watcher也是一个naming.Watcher接口,这个接口有两个方法:Next和Close。
type Watcher interface {
// Next blocks until an update or error happens. It may return one or more
// updates. The first call should get the full set of the results. It should
// return an error if and only if Watcher cannot recover.
Next() ([]*Update, error)
// Close closes the Watcher.
Close()
}
watcher的Next方法将返回一个naming.Update数组,这个数组标识每次watcher监听到的变化:
type Update struct {
// Op indicates the operation of the update.
Op Operation
// Addr is the updated address. It is empty string if there is no address update.
Addr string
// Metadata is the updated metadata. It is nil if there is no metadata update.
// Metadata is not required for a custom naming implementation.
Metadata interface{}
}
根据变化选取正确的地址供客户端使用。
可以从grpc.balancer的源码中看到调用这个接口的方式:
func (rr *roundRobin) Start(target string, config BalancerConfig) error {
rr.mu.Lock()
defer rr.mu.Unlock()
if rr.done {
return ErrClientConnClosing
}
if rr.r == nil {
// If there is no name resolver installed, it is not needed to
// do name resolution. In this case, target is added into rr.addrs
// as the only address available and rr.addrCh stays nil.
rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})
return nil
}
w, err := rr.r.Resolve(target)
if err != nil {
return err
}
rr.w = w
rr.addrCh = make(chan []Address, 1)
go func() {
for {
if err := rr.watchAddrUpdates(); err != nil {
return
}
}
}()
return nil
}
从源码可以看出,首先调用了resolver的Resolve接口,返回了一个watcher,之后在一个无限循环的协程中不断调用watcher的watchAddrUpdates方法监听变化。
所以可以看出grpc接口设计的意图:resolver的target就是用来提供给watcher,watcher使用这个target去寻址到具体的服务地址。
再看下源码中watchAddrUpdates的具体实现:
func (rr *roundRobin) watchAddrUpdates() error {
updates, err := rr.w.Next()
if err != nil {
grpclog.Warningf("grpc: the naming watcher stops working due to %v.", err)
return err
}
rr.mu.Lock()
defer rr.mu.Unlock()
for _, update := range updates {
addr := Address{
Addr: update.Addr,
Metadata: update.Metadata,
}
switch update.Op {
case naming.Add:
var exist bool
for _, v := range rr.addrs {
if addr == v.addr {
exist = true
grpclog.Infoln("grpc: The name resolver wanted to add an existing address: ", addr)
break
}
}
if exist {
continue
}
rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
case naming.Delete:
for i, v := range rr.addrs {
if addr == v.addr {
copy(rr.addrs[i:], rr.addrs[i+1:])
rr.addrs = rr.addrs[:len(rr.addrs)-1]
break
}
}
default:
grpclog.Errorln("Unknown update.Op ", update.Op)
}
}
// Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified.
open := make([]Address, len(rr.addrs))
for i, v := range rr.addrs {
open[i] = v.addr
}
if rr.done {
return ErrClientConnClosing
}
select {
case <-rr.addrCh:
default:
}
rr.addrCh <- open
return nil
}
从源码可以看出每次调用watcher的Next方法,返回了一个update数组,然后遍历这个数组每一项,若是Add类型的事件,得到地址,将其与自身缓存的地址列表逐一比对,若已存在在缓存中则不做任何操作;否则加入到缓存的地址列表中;若是Delete类型的事件,则从缓存地址列表中删除掉该地址。最后将地址列表放入addrCh管道中,这个有点没明白用意。
因此我们需要实现watcher的Next方法,返回update事件数组即可。
watcher
func (w *watcher) Next() ([]*naming.Update, error) {
key := fmt.Sprintf("%s/%s", PREFIX, w.serviceName)
baseCtx := context.TODO()
log.Printf("%+v, call next method", w)
if !w.init {
ctx, cancel := context.WithTimeout(baseCtx, 3*time.Second)
defer cancel()
resp, err := w.cli.Get(ctx, key, etcd3.WithPrefix())
//log.Printf("get etcd resp:%+v", resp)
if err != nil {
log.Println("get from etcd error ", err)
} else {
w.init = true
addrs := getAddrFromResp(resp)
if len(addrs) != 0 {
updates := make([]*naming.Update, 0, len(addrs))
for _, addr := range addrs {
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: addr,
})
}
return updates, nil
}
}
}
rch := w.cli.Watch(context.Background(), key, etcd3.WithPrefix())
for wresp := range rch {
events := wresp.Events
log.Printf("get etcd events:%+v", events)
updates := make([]*naming.Update, 0, len(events))
for _, ev := range events {
switch ev.Type {
case mvccpb.PUT:
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: string(ev.Kv.Value),
})
case mvccpb.DELETE:
updates = append(updates, &naming.Update{
Op: naming.Delete,
Addr: string(ev.Kv.Value),
})
}
}
return updates, nil
}
return nil, nil
}
第一次watcher启动时,我们会直接从etcd中取服务名前缀的所有地址,并将每一项都放入update数组中,返回。
之后则直接监听服务名前缀key,从其管道中获取变化事件,若为PUT事件,说明有新服务实例加入了,则将其加入到update数组中,事件类型为Add,若监听到DELETE事件,也将其加入到update数组中,事件类型为Delete。
从etcd响应中抽取地址的实现如下:
func getAddrFromResp(resp *etcd3.GetResponse) []string {
results := resp.Kvs
addrs := make([]string, 0, len(results))
for _, r := range results {
if string(r.Value) != "" {
addrs = append(addrs, string(r.Value))
}
}
return addrs
}
演示
至此,我们已经实现了grpc负载均衡所需要的所有接口,启动之可以发现请求被均匀的分配到了各个实例上,并且新加入的server也能很快接受到请求。
总结
不足之处:
- 本次试验所用的grpc版本是v1.26.0,grpc.RoundRobin接口目前在新版本的grpc中已经被弃用了:
// RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch
// the name resolution updates and updates the addresses available correspondingly.
//
// Deprecated: please use package balancer/roundrobin. May be removed in a future 1.x release.
func RoundRobin(r naming.Resolver) Balancer {
return &roundRobin{r: r}
}
之后有空再用新版balancer/roundrobin实现一版
- register接口若get到服务名前缀,可以不使用put,直接调用etcdV3的keepalive接口为key续期即可。
- watcher监听etcd管道,每次得到数据后直接返回,关闭了管道,有丢失数据的风险,此处可以起一个后台协程,每次将数据送入管道,主协程从管道取数据,避免etcd resp管道的频繁关闭操作。