RPC(Remote Procedure Call/远程过程调用)是一种服务间交互的方式,达到像本地方法一样调用远程方法的目的。
看看源码,简单了解gRPC是怎么实现RPC的,以gRPC官方代码示例helloworld
为例。
服务注册
先看一下服务端怎么注册服务。
在helloworld.pb.go
文件中,会有RegisterGreeterServer
方法以及_Greeter_serviceDesc
变量,_Greeter_serviceDesc
描述了服务的属性。RegisterGreeterServer
方法会向gRPC服务端s
注册服务srv
。
//...
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
s.RegisterService(&_Greeter_serviceDesc, srv)
}
//...
var _Greeter_serviceDesc = grpc.ServiceDesc{
ServiceName: "helloworld.Greeter", // ServiceName包括`.proto`文件中的package名称
HandlerType: (*GreeterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _Greeter_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "helloworld.proto",
}
server.go
中的RegisterService
方法,会判断ServiceServer是否实现sd中描述的HandlerType;如果实现了则调用s.register
方法注册。
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) { // 判断ServiceServer是否实现sd中描述的HandlerType
grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
}
s.register(sd, ss)
}
register
根据Method
创建对应的map,并将名称作为键,方法描述(指针)作为值,添加到相应的map中。
最后将{服务名称:服务}添加到服务端。
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
if s.serve {// 服务端是否启动
grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
if _, ok := s.m[sd.ServiceName]; ok {//服务是否已经注册
grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
srv := &service{
server: ss,
md: make(map[string]*MethodDesc), // map of Methods
sd: make(map[string]*StreamDesc), // map of Stream
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
srv.md[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
srv.sd[d.StreamName] = d
}
s.m[sd.ServiceName] = srv // 添加服务到服务端
}
服务调用
服务端注册了服务之后,接下来就是调用服务。
客户端调用服务
helloworld.pb.go
中的SayHello
方法中使用c.cc.Invoke
远程调用服务端:
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
// 远程调用服务端的方法
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
Invoke
的定义如下,"/helloworld.Greeter/SayHello"
对应的是method
参数。
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
method
被传入后,依次被invoke
和newClientStream
方法调用,保存在cs.callHdr.Method
;后续在newAttemptLocked
方法中生成cs.attempt.t
。cs.SendMsg
最终通过csAttempt.sendMsg
方法中调用a.t.Write
方法写出请求req
。
最后cs.RecvMsg
轮询获取返回结果。
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
服务端执行调用
handleStream
方法会处理请求中的Method
字段,获得服务名与方法名。调用s.processUnaryRPC
, 调用md.Handler
方法(即_Greeter_SayHello_Handler
方法)。
最终调用s.sendResponse
方法返回结果。
func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HelloRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GreeterServer).SayHello(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/helloworld.Greeter/SayHello",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
}
return interceptor(ctx, in, info, handler)
}
有疑问加站长微信联系(非本文作者)