跟着gRPC源码简单学习RPC原理

onequid · · 208 次点击 · · 开始浏览    

RPC(Remote Procedure Call/远程过程调用)是一种服务间交互的方式,达到像本地方法一样调用远程方法的目的。

看看源码,简单了解gRPC是怎么实现RPC的,以gRPC官方代码示例helloworld为例。

服务注册

先看一下服务端怎么注册服务。

helloworld.pb.go文件中,会有RegisterGreeterServer方法以及_Greeter_serviceDesc变量,
_Greeter_serviceDesc描述了服务的属性。RegisterGreeterServer方法会向gRPC服务端s注册服务srv

//...
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
    s.RegisterService(&_Greeter_serviceDesc, srv)
}
//...
var _Greeter_serviceDesc = grpc.ServiceDesc{
    ServiceName: "helloworld.Greeter", // ServiceName包括`.proto`文件中的package名称
    HandlerType: (*GreeterServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "SayHello",
            Handler:    _Greeter_SayHello_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "helloworld.proto",
}

server.go中的RegisterService方法,会判断ServiceServer是否实现sd中描述的HandlerType;如果实现了则调用s.register方法注册。

func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
    ht := reflect.TypeOf(sd.HandlerType).Elem()
    st := reflect.TypeOf(ss)
    if !st.Implements(ht) { // 判断ServiceServer是否实现sd中描述的HandlerType
        grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
    }
    s.register(sd, ss)
}

register根据Method创建对应的map,并将名称作为键,方法描述(指针)作为值,添加到相应的map中。
最后将{服务名称:服务}添加到服务端。

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.printf("RegisterService(%q)", sd.ServiceName)
    if s.serve {// 服务端是否启动
        grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
    }
    if _, ok := s.m[sd.ServiceName]; ok {//服务是否已经注册
        grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
    }
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc), // map of Methods
        sd:     make(map[string]*StreamDesc), // map of Stream
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
    for i := range sd.Streams {
        d := &sd.Streams[i]
        srv.sd[d.StreamName] = d
    }
    s.m[sd.ServiceName] = srv // 添加服务到服务端
}

服务调用

服务端注册了服务之后,接下来就是调用服务。

客户端调用服务

helloworld.pb.go中的SayHello方法中使用c.cc.Invoke远程调用服务端:

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
    out := new(HelloReply)
    // 远程调用服务端的方法
    err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

Invoke的定义如下,"/helloworld.Greeter/SayHello"对应的是method参数。

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error

method被传入后,依次被invokenewClientStream方法调用,保存在cs.callHdr.Method;后续在newAttemptLocked方法中生成cs.attempt.t
cs.SendMsg最终通过csAttempt.sendMsg方法中调用a.t.Write方法写出请求req

最后cs.RecvMsg轮询获取返回结果。

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}

服务端执行调用

handleStream方法会处理请求中的Method字段,获得服务名与方法名。调用s.processUnaryRPC, 调用md.Handler方法(即_Greeter_SayHello_Handler方法)。
最终调用s.sendResponse方法返回结果。

func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(HelloRequest)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(GreeterServer).SayHello(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/helloworld.Greeter/SayHello",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
    }
    return interceptor(ctx, in, info, handler)
}

本文来自:Segmentfault

感谢作者:onequid

查看原文:跟着gRPC源码简单学习RPC原理

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:muxilin131420 备注:入群;或加QQ群:977810755

208 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传