概述
Client 主要是用来执行请求服务和订阅发布事件。是对于broker,Transort的一种封装方便使用。
Init
初始化客户端函数
- 初始化连接池数量和连接池TTL
- 调用注入的opts函数列表
- 最后初始化连接池
func (r *rpcClient) Init(opts ...Option) error {
size := r.opts.PoolSize
ttl := r.opts.PoolTTL
for _, o := range opts {
o(&r.opts)
}
// update pool configuration if the options changed
if size != r.opts.PoolSize || ttl != r.opts.PoolTTL {
r.pool.Lock()
r.pool.size = r.opts.PoolSize
r.pool.ttl = int64(r.opts.PoolTTL.Seconds())
r.pool.Unlock()
}
return nil
}
==Call==
Call是Client接口中最主要的方法,在之前Go Micro Selector 源码分析
- Client调用Call方法
- Call方法调用selector组件的Select方法,获取next函数
- call匿名函数中调用next函数(默认为CacheSelector 随机获取服务列表中的节点, Go Micro Selector 源码分析) 返回node
- 以grpcClient为例,调用grpcClient.call
- call函数中获取conn,然后Invoke调用服务端函数
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
// 复制出options
callOpts := g.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
// 调用next函数 获取selector
next, err := g.next(req, callOpts)
if err != nil {
return err
}
// 检查context Deadline
d, ok := ctx.Deadline()
if !ok {
// 没有deadline 创建一个新的
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// 获取到deadline设置context
opt := client.WithRequestTimeout(time.Until(d))
opt(&callOpts)
}
// should we noop right here?
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
default:
}
// 复制call函数 在下面的goroutine中使用
gcall := g.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
gcall = callOpts.CallWrappers[i-1](gcall)
}
// return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// 调用call 正式调用服务端接口
err = gcall(ctx, node, req, rsp, callOpts)
g.opts.Selector.Mark(req.Service(), node, err)
return err
}
ch := make(chan error, callOpts.Retries+1)
var gerr error
// 重试
for i := 0; i <= callOpts.Retries; i++ {
go func(i int) {
// 调动call 返回channel
ch <- call(i)
}(i)
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
case err := <-ch:
// if the call succeeded lets bail early
if err == nil {
return nil
}
retry, rerr := callOpts.Retry(ctx, req, i, err)
if rerr != nil {
return rerr
}
if !retry {
return err
}
gerr = err
}
}
return gerr
}
Stream
Stream跟call的逻辑几乎是一样的,不过stream调用的是rpc_client.stream函数。这边就不过多的分析了
func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
next, err := r.next(request, callOpts)
if err != nil {
return nil, err
}
// should we noop right here?
select {
case <-ctx.Done():
return nil, errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
default:
}
call := func(i int) (Stream, error) {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
node, err := next()
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
stream, err := r.stream(ctx, node, request, callOpts)
r.opts.Selector.Mark(request.Service(), node, err)
return stream, err
}
type response struct {
stream Stream
err error
}
ch := make(chan response, callOpts.Retries+1)
var grr error
for i := 0; i <= callOpts.Retries; i++ {
go func(i int) {
s, err := call(i)
ch <- response{s, err}
}(i)
select {
case <-ctx.Done():
return nil, errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
case rsp := <-ch:
// if the call succeeded lets bail early
if rsp.err == nil {
return rsp.stream, nil
}
retry, rerr := callOpts.Retry(ctx, request, i, rsp.err)
if rerr != nil {
return nil, rerr
}
if !retry {
return nil, rsp.err
}
grr = rsp.err
}
}
return nil, grr
}
Publish
Client中的Publish主要是调用broker中的publish:r.opts.Broker.Publish
然而在client的publish函数中,获取了topic准备了body 最后调用broker的publish
func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
options := PublishOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
md, ok := metadata.FromContext(ctx)
if !ok {
md = make(map[string]string)
}
id := uuid.New().String()
md["Content-Type"] = msg.ContentType()
md["Micro-Topic"] = msg.Topic()
md["Micro-Id"] = id
// set the topic
topic := msg.Topic()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
options.Exchange = prx
}
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
// encode message body
cf, err := r.newCodec(msg.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{
Target: topic,
Type: codec.Event,
Header: map[string]string{
"Micro-Id": id,
"Micro-Topic": msg.Topic(),
},
}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
r.once.Do(func() {
r.opts.Broker.Connect()
})
return r.opts.Broker.Publish(topic, &broker.Message{
Header: md,
Body: b.Bytes(),
})
}
有疑问加站长微信联系(非本文作者)