go micro client

舞林 · · 908 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

micro.newService()中newOptions

func newOptions(opts ...Option) Options {
    opt := Options{
        Auth:      auth.DefaultAuth,
        Broker:    broker.DefaultBroker,
        Cmd:       cmd.DefaultCmd,
        Config:    config.DefaultConfig,
        Client:    client.DefaultClient,
        Server:    server.DefaultServer,
        Store:     store.DefaultStore,
        Registry:  registry.DefaultRegistry,
        Router:    router.DefaultRouter,
        Runtime:   runtime.DefaultRuntime,
        Transport: transport.DefaultTransport,
        Context:   context.Background(),
        Signal:    true,
    }

    for _, o := range opts {
        o(&opt)
    }

    return opt
}

初始化了一堆基础设置,来看看Client, client.DefaultClient,
这里不要直接去看client/client.go中的newRpcClient(),因为在micro/defaults.go中已经初始化了client,默认是grpc
client.DefaultClient = gcli.NewClient()

func newClient(opts ...client.Option) client.Client {
    options := client.NewOptions()
    // default content type for grpc
    options.ContentType = "application/grpc+proto"

    for _, o := range opts {
        o(&options)
    }

    rc := &grpcClient{
        opts: options,
    }
    rc.once.Store(false)

    rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())

    c := client.Client(rc)

    // wrap in reverse
    for i := len(options.Wrappers); i > 0; i-- {
        c = options.Wrappers[i-1](c)
    }

    return c
}

func NewClient(opts ...client.Option) client.Client {
    return newClient(opts...)
}

这里做了以下事情

  1. 初始化并设置options,设置ContentType为"application/grpc+proto"
  2. 实例化grpcClient{}
  3. 依次调用client Wrapper中间件,注意是倒着调用哦
type grpcClient struct {
    opts client.Options
    pool *pool
    once atomic.Value
}

// Client is the interface used to make requests to services.
// It supports Request/Response via Transport and Publishing via the Broker.
// It also supports bidirectional streaming of requests.
type Client interface {
    Init(...Option) error
    Options() Options
    NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
    NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request
    Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
    Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
    Publish(ctx context.Context, msg Message, opts ...PublishOption) error
    String() string
}

newClient()中的c := client.Client(rc)说明一下,

grpcClient只有3个属性,client.Client(rc)调用一下,让rc变转Client的实例,在grpc.go中也可以看到grpcClient实现了Client定义的全部方法

结合实例看example/client/main.go中call(),发起请求

func call(i int, c client.Client) {
    // Create new request to service go.micro.srv.example, method Example.Call
    req := c.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{
        Name: "John",
    })

    // create context with metadata
    ctx := metadata.NewContext(context.Background(), map[string]string{
        "X-User-Id": "john",
        "X-From-Id": "script",
    })

    rsp := &example.Response{}

    // Call service
    if err := c.Call(ctx, req, rsp); err != nil {
        fmt.Println("call err: ", err, rsp)
        return
    }

    fmt.Println("Call:", i, "rsp:", rsp.Msg)
}

func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
    return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...)
}

func newGRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request {
    var opts client.RequestOptions
    for _, o := range reqOpts {
        o(&opts)
    }

    // set the content-type specified
    if len(opts.ContentType) > 0 {
        contentType = opts.ContentType
    }

    return &grpcRequest{
        service:     service,
        method:      method,
        request:     request,
        contentType: contentType,
        opts:        opts,
    }
}

NewRequest()最终得到一个grpcRequest{}实例,然后调用grpcClient.Call()
ctx可以附加信息(放在请求的header中),在请求周期中都可获取,trace,auth等组件都可以使用

func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
    if req == nil {
        return errors.InternalServerError("go.micro.client", "req is nil")
    } else if rsp == nil {
        return errors.InternalServerError("go.micro.client", "rsp is nil")
    }
    // make a copy of call opts
    callOpts := g.opts.CallOptions
    for _, opt := range opts {
        opt(&callOpts)
    }

    // check if we already have a deadline
    d, ok := ctx.Deadline()
    if !ok {
        // no deadline so we create a new one
        var cancel context.CancelFunc
        ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout)
        defer cancel()
    } else {
        // got a deadline so no need to setup context
        // but we need to set the timeout we pass along
        opt := client.WithRequestTimeout(time.Until(d))
        opt(&callOpts)
    }

    // should we noop right here?
    select {
    case <-ctx.Done():
        return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
    default:
    }

    // make copy of call method
    gcall := g.call

    // wrap the call in reverse
    for i := len(callOpts.CallWrappers); i > 0; i-- {
        gcall = callOpts.CallWrappers[i-1](gcall)
    }

    // return errors.New("go.micro.client", "request timeout", 408)
    call := func(i int) error {
        // call backoff first. Someone may want an initial start delay
        t, err := callOpts.Backoff(ctx, req, i)
        if err != nil {
            return errors.InternalServerError("go.micro.client", err.Error())
        }

        // only sleep if greater than 0
        if t.Seconds() > 0 {
            time.Sleep(t)
        }

        // lookup the route to send the reques to
        route, err := g.lookupRoute(req, callOpts)
        if err != nil {
            return err
        }

        // pass a node to enable backwards compatability as changing the
        // call func would be a breaking change.
        // todo v3: change the call func to accept a route
        node := &registry.Node{Address: route.Address}

        // make the call
        err = gcall(ctx, node, req, rsp, callOpts)

        // record the result of the call to inform future routing decisions
        g.opts.Selector.Record(*route, err)

        // try and transform the error to a go-micro error
        if verr, ok := err.(*errors.Error); ok {
            return verr
        }

        return err
    }

    ch := make(chan error, callOpts.Retries+1)
    var gerr error

    for i := 0; i <= callOpts.Retries; i++ {
        go func(i int) {
            ch <- call(i)
        }(i)

        select {
        case <-ctx.Done():
            return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
        case err := <-ch:
            // if the call succeeded lets bail early
            if err == nil {
                return nil
            }

            retry, rerr := callOpts.Retry(ctx, req, i, err)
            if rerr != nil {
                return rerr
            }

            if !retry {
                return err
            }

            gerr = err
        }
    }

    return gerr
}

这里做了以下事情

  1. 设置g.opts.CallOptions
  2. 设置ctx超时,超时返回错误408
  3. 复制了grpcClient.call()给变量gcall
  4. 依次调用callOpts.CallWrappers 中间件,注意这里,前面是client中间件,这里是call中间件,具体在中间件文章讲
  5. 声明call()函数,

    1. 设置delay 机制
    2. 调用g.lookupRoute(),返回router.Route{}

      1. 初始化router.QueryOption
      2. opts.Router.Lookup(query...) ->query()(router/default.go)

        1. 从缓存查t.routes[opts.Service],查不到就调用t.fetchRoutes(opts.Service)获取路由表后再取,这里的fetchRoutes()并不是router/table.go中的函数,而是router/default.go中router.fetchRoutes(),在router/default.go中的newRouter()里r.table = newTable(r.fetchRoutes)设置了table.fetchRoutes。
      3. 设置opts.Selector,默认random
      4. opts.Selector.Select(routes, opts.SelectOptions...)选择合适的route发起请求,只有一个就直接返回,多个就随机返回一个,selector可以有其他实现方式实现,具体看实际业务了
    3. 定义registry.Node,调用gcall(),即grpcClient.call()

      1. 定义header,加入timeout,x-content-type,放入ctx
      2. 设置grpc.DialOption{}等参数,然后调用g.pool.getConn(),使用连接池发起连接
      3. 开协程,设置grpc.CallOption{},发起远程调用cc.Invoke()(google.golang.org/grpc/call.go),完成后发出通知给ch chan
      4. 监听调用完成ch chan,ctx超时chan
    4. 记录调用结果,用于优化选择,然而random的Selector里面啥也没干
  6. 定义重试次数(默认1),调用第5步的call()

    1. 接受ctx超时信号(返回408错误),
    2. call()中完成的信号,成功完成就返回nill,不然就调用Retry()

这就是一次call请求的流程,这里没有深入底层grpc是如何发起请求的,感兴趣的同学可以查阅下代码

如有错漏,请留言告知,Thanks♪(・ω・)ノ


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:舞林

查看原文:go micro client

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

908 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传