上一篇文章我们讲了net/rpc中client部分的实现,我本机源码安装路径在/usr/local/go,这net/rpc(golang 1.4版本)涉及到的相关代码主要有:
server.go
方法注册:
因为从client我们知道是复用的socket来实现并发调用rpc方法,我们先从方法注册来看源码部分:
// Server对象大都是保存方法存根,保证对象互斥的
type Server struct {
mu sync.RWMutex // protects the serviceMap
serviceMap map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
func NewServer() *Server {
return &Server{serviceMap: make(map[string]*service)}
}
// rpc.Register默认使用了一个Server,只对serviceMap进行了初始化
var DefaultServer = NewServer()
// rpc的service包括方法名、方法反射,类型等
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}
// 无论是RegisterName、Register最终都调用了register的内部方法
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
// 保证注册服务安全,先加锁
server.mu.Lock()
defer server.mu.Unlock()
// 如果服务为空,默认注册一个
if server.serviceMap == nil {
server.serviceMap = make(map[string]*service)
}
// 获取注册服务的反射信息
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
// 可以使用自定义名称
sname := reflect.Indirect(s.rcvr).Type().Name()
if useName {
sname = name
}
if sname == "" {
s := "rpc.Register: no service name for type " + s.typ.String()
log.Print(s)
return errors.New(s)
}
// 方法必须是暴露的,既服务名首字符大写
if !isExported(sname) && !useName {
s := "rpc.Register: type " + sname + " is not exported"
log.Print(s)
return errors.New(s)
}
// 不允许重复注册
if _, present := server.serviceMap[sname]; present {
return errors.New("rpc: service already defined: " + sname)
}
s.name = sname
// 开始注册rpc struct内部的方法存根
s.method = suitableMethods(s.typ, true)
// 如果struct内部一个方法也没,那么直接报错,错误信息还非常详细
if len(s.method) == 0 {
str := ""
// To help the user, see if a pointer receiver would work.
method := suitableMethods(reflect.PtrTo(s.typ), false)
if len(method) != 0 {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
} else {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
}
log.Print(str)
return errors.New(str)
}
// 保存在server的serviceMap中
server.serviceMap[s.name] = s
return nil
}
// 上文提到了服务还需要方法存根的注册
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
// 根据方法名创建保存内部方法map
methods := make(map[string]*methodType)
// 获取rpc struct内部的方法
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// 之前对这行代码觉得比较奇葩,方法是否是暴露,是看是否有PkgPath的,如果是私有方法,PkgPath显示包名
if method.PkgPath != "" {
continue
}
// 判断是否是三个参数:第一个是结构本身,第二个是参数,第三个是返回值
// Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() != 3 {
if reportErr {
log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
}
continue
}
// args是指针类型
// First arg need not be a pointer.
argType := mtype.In(1)
if !isExportedOrBuiltinType(argType) {
if reportErr {
log.Println(mname, "argument type not exported:", argType)
}
continue
}
// reply是指针类型
// Second arg must be a pointer.
replyType := mtype.In(2)
if replyType.Kind() != reflect.Ptr {
if reportErr {
log.Println("method", mname, "reply type not a pointer:", replyType)
}
continue
}
// Reply type must be exported.
// reply必须是可暴露的
if !isExportedOrBuiltinType(replyType) {
if reportErr {
log.Println("method", mname, "reply type not exported:", replyType)
}
continue
}
// Method needs one out.
// 必须有一个返回值,而且要是error
if mtype.NumOut() != 1 {
if reportErr {
log.Println("method", mname, "has wrong number of outs:", mtype.NumOut())
}
continue
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
if reportErr {
log.Println("method", mname, "returns", returnType.String(), "not error")
}
continue
}
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
}
return methods
}
请求调用:
方法已经被注册成功,接下来我们看看是如何客户端发送请求调用的:
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Fatal("rpc.Serve: accept:", err.Error()) // TODO(r): exit?
}
// accept连接以后,打开一个goroutine处理请求
go server.ServeConn(conn)
}
}
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
// 根据指定的codec进行协议解析
server.ServeCodec(srv)
}
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
for {
// 解析请求
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
if !keepReading {
break
}
// send a response if we actually managed to read a header.
// 如果当前请求错误了,我们应该返回信息,然后继续处理
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
// 因为需要继续处理后续请求,所以开一个gorutine处理rpc方法
go service.call(server, sending, mtype, req, argv, replyv, codec)
}
// 如果连接关闭了需要释放资源
codec.Close()
}
func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mtype *methodType, req *Request, keepReading bool, err error) {
// 解析头部,如果失败,直接返回了
req = server.getRequest()
err = codec.ReadRequestHeader(req)
if err != nil {
req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
}
err = errors.New("rpc: server cannot decode request: " + err.Error())
return
}
if debugLog {
log.Printf("rpc: [trace:%v]\n", req.Tracer)
}
// We read the header successfully. If we see an error now,
// we can still recover and move on to the next request.
keepReading = true
// 获取请求中xxx.xxx中.的位置
dot := strings.LastIndex(req.ServiceMethod, ".")
if dot < 0 {
err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
return
}
// 拿到struct名字和方法名字
serviceName := req.ServiceMethod[:dot]
methodName := req.ServiceMethod[dot+1:]
// Look up the request.
// 加读锁,获取对象
server.mu.RLock()
service = server.serviceMap[serviceName]
server.mu.RUnlock()
if service == nil {
err = errors.New("rpc: can't find service " + req.ServiceMethod)
return
}
// 获取反射类型,看见rpc中的发射其实是预先放入map中的
mtype = service.method[methodName]
if mtype == nil {
err = errors.New("rpc: can't find method " + req.ServiceMethod)
}
return
}
func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
if err != nil {
if !keepReading {
return
}
// discard body
codec.ReadRequestBody(nil)
return
}
// 解析请求中的args
argIsValue := false // if true, need to indirect before calling.
if mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(mtype.ArgType.Elem())
} else {
argv = reflect.New(mtype.ArgType)
argIsValue = true
}
// argv guaranteed to be a pointer now.
if err = codec.ReadRequestBody(argv.Interface()); err != nil {
return
}
if argIsValue {
argv = argv.Elem()
}
// 初始化reply类型
replyv = reflect.New(mtype.ReplyType.Elem())
return
}
func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
function := mtype.method.Func
// Invoke the method, providing a new value for the reply.
// 这里是真正调用rpc方法的地方
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
// 处理返回请求了
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}
func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
resp := server.getResponse()
// Encode the response header
resp.ServiceMethod = req.ServiceMethod
if errmsg != "" {
resp.Error = errmsg
reply = invalidRequest
}
// 上一文提到,客户端是根据序号来定位请求的,所以需要原样返回
resp.Seq = req.Seq
sending.Lock()
err := codec.WriteResponse(resp, reply)
if debugLog && err != nil {
log.Println("rpc: writing response:", err)
}
sending.Unlock()
server.freeResponse(resp)
}
资源重用:
上面把大致的rpc请求都说明了,server有一个技巧是重用对象,这里使用的是链表方式处理的:
// 可以看出使用一个free list链表,来避免Request以及Response对象频繁创建,导致GC压力
func (server *Server) getRequest() *Request {
server.reqLock.Lock()
req := server.freeReq
if req == nil {
req = new(Request)
} else {
server.freeReq = req.next
*req = Request{}
}
server.reqLock.Unlock()
return req
}
func (server *Server) freeRequest(req *Request) {
server.reqLock.Lock()
req.next = server.freeReq
server.freeReq = req
server.reqLock.Unlock()
}
func (server *Server) getResponse() *Response {
server.respLock.Lock()
resp := server.freeResp
if resp == nil {
resp = new(Response)
} else {
server.freeResp = resp.next
*resp = Response{}
}
server.respLock.Unlock()
return resp
}
func (server *Server) freeResponse(resp *Response) {
server.respLock.Lock()
resp.next = server.freeResp
server.freeResp = resp
server.respLock.Unlock()
}
最后,sending这把锁的目的是避免同一个套接字快速请求中避免返回包写入乱序,因此避免一个包完整写入完毕才允许下一个返回写入套接字。通过rpc包源码解析,可以看到标准库中的核心思想还是channel+mutex实现复用对象,以及各种方式的复用,避免GC压力,在我们以后写高性能服务端可以借鉴的地方。
有疑问加站长微信联系(非本文作者)