go语言实现自己的RPC:go rpc codec

掘金 · · 795 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

前言

RPC是远程过程调用(Remote Procedure Call)的简称,通过RPC我们可以像调用本地方法一样调用位于其他位置的函数。大家更常见的可能是HTTP API调用,简单来对比的话,RPC比起HTTP调用封装更完善,调用者不必手动处理序列化和反序列化,使用成本更低一些(虽然学习成本可能会更高)。

出于学习目的,这次的目标是使用go语言来实现一个自己的RPC。在现实世界里,对于一个RPC工具,除了方法调用以外,人们更看重的是其他功能比如服务发现、负载均衡、熔断降级之类的功能,这里暂时不会涉及,而是仅关注实现一个可以工作的方法调用。

之前的文章里大致了解了go语言自带的rpc框架,其中就提到go rpc预留了codec接口,可以让用户在go rpc使用自己的序列化协议,这次就尝试实现一个自己的codec来实现自己的RPC。

准备工作

序列化协议

要实现一个RPC,基本的元素大概有这几个:序列化协议、网络模型和线程模型。而go rpc里的codec基本上实现的就是序列化协议。

本来想着用比较熟悉的thrift协议,但是使用thrift本身实现了RPC流程,所以它并不是一个单纯的序列化协议,它的序列化逻辑可能无法和go rpc很好的契合,再加上还需要书写IDL定义,增加复杂度。本来就是为了熟悉go,所以这里先从简单的开始,于是选择messagepack作为序列化协议。

messagepack是一个比较轻量级的序列化协议,它的逻辑和json类似,但是使用的是二进制形式,所以比json序列化更快,序列化后产生的数据也更小,基本上可以认为是一个二进制版本的json。

创建类定义

要实现自己的codec,需要分别实现go rpc中提供个两个接口:ServerCodec和ClientCodec,很明显他们分别表示服务端和客户端的逻辑,两个接口的定义具体如下:

type ServerCodec interface {
	ReadRequestHeader(*Request) error
	ReadRequestBody(interface{}) error
	WriteResponse(*Response, interface{}) error
	Close() error
}
type ClientCodec interface {
	WriteRequest(*Request, interface{}) error
	ReadResponseHeader(*Response) error
	ReadResponseBody(interface{}) error
	Close() error
}
复制代码

可以看到,go rpc将一次请求/响应抽象成了header+body的形式,读取数据时分为读取head和读取body,写入数据时只需写入body部分,go rpc会替我们加上head部分。 接下来我们定义两个结构,用来表示一次请求/响应的完整数据:

type MsgpackReq struct {
	rpc.Request  //head
	Arg interface{} //body
}

type MsgpackResp struct {
	rpc.Response  //head
	Reply interface{}  //body
}
复制代码

这里的msgpackReq和msgpackResp直接内嵌了go rpc里自带的Request和Response,自带的Request和Response定义了序号、方法名等信息。

接下来就是自定义Codec的声明:

type MessagePackServerCodec struct {
	rwc    io.ReadWriteCloser //用于读写数据,实际是一个网络连接
	req    MsgpackReq //用于缓存解析到的请求
	closed bool  //标识codec是否关闭
}

type MessagePackClientCodec struct {
	rwc    io.ReadWriteCloser
	resp   MsgpackResp  //用于缓存解析到的请求
	closed bool
}

func NewServerCodec(conn net.Conn) *MessagePackServerCodec {
	return &MessagePackServerCodec{conn, MsgpackReq{}, false}
}

func NewClientCodec(conn net.Conn) *MessagePackClientCodec {
	return &MessagePackClientCodec{conn, MsgpackResp{}, false}
}
复制代码

在之前的文章里提到了,codec需要包含一个数据源用于读写数据,这里直接将网路连接传递进去。

实现Codec方法

实现思路

接下来是具体的方法实现,出于简单起见,这里将反序列化部分的两步合并为一步,在读取head部分时就将所有的数据解析好并缓存起来,读取body时直接返回缓存的结果。具体的思路就是:

  1. 客户端在发送请求时,将数据包装成一个MsgpackReq,然后用messagepack序列化并发送出去
  2. 服务端在读取请求head部分时,将收到的数据用messagepack反序列化成一个MsgpackReq,并将得到的结果缓存起来
  3. 服务端在读取请求body部分时,从缓存的MsgpackReq中获取到Arg字段并返回
  4. 服务端在发送响应时,将数据包装成一个MsgpackResp,然后用messagepack序列化并发送出去
  5. 客户端在读取响应head部分时,将收到的数据用messagepack反序列化成一个MsgpackResp,并将得到的结果缓存起来
  6. 客户端在读取响应body部分时,从缓存的MsgpackResp中获取到Reply或者Error字段并返回

Client实现

这里直接上代码:

func (c *MessagePackClientCodec) WriteRequest(r *rpc.Request, arg interface{}) error {
	//先判断codec是否已经关闭,如果是则直接返回
	if c.closed {
		return nil
	}
	//将r和arg组装成一个MsgpackReq并序列化
	request := &MsgpackReq{*r, arg}
	reqData, err := msgpack.Marshal(request)
	if err != nil {
		panic(err)
		return err
	}
	//先发送数据长度
	head := make([]byte, 4)
	binary.BigEndian.PutUint32(head, uint32(len(reqData)))
	_, err = c.rwc.Write(head)
	//再将序列化产生的数据发送出去
	_, err = c.rwc.Write(reqData)
	return err
}

func (c *MessagePackClientCodec) ReadResponseHeader(r *rpc.Response) error {
	//先判断codec是否已经关闭,如果是则直接返回
	if c.closed {
		return nil
	}
	//读取数据
	data, err := readData(c.rwc)
	if err != nil {
		//client一旦初始化就会开始轮询数据,所以要处理连接close的情况
		if strings.Contains(err.Error(), "use of closed network connection") {
			return nil
		}
		panic(err) //简单起见,出现异常直接panic
	}

	//将读取到的数据反序列化成一个MsgpackResp
	var response MsgpackResp
	err = msgpack.Unmarshal(data, &response)

	if err != nil {
		panic(err) //简单起见,出现异常直接panic
	}

	//根据读取到的数据设置request的各个属性
	r.ServiceMethod = response.ServiceMethod
	r.Seq = response.Seq
	//同时将读取到的数据缓存起来
	c.resp = response

	return nil
}

func (c *MessagePackClientCodec) ReadResponseBody(reply interface{}) error {
	//这里直接用缓存的数据返回即可

	if "" != c.resp.Error {//如果返回的是异常
		return errors.New(c.resp.Error)
	}
	if reply != nil {
		//正常返回,通过反射将结果设置到reply变量,因为reply一定是指针类型,所以不必检查CanSet
		reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(c.resp.Reply))
	}
	return nil
}


func (c *MessagePackClientCodec) Close() error {
	c.closed = true //关闭时将closed设置为true
	if c.rwc != nil {
		return c.rwc.Close()
	}
	return nil
}
复制代码

以上就是client部分的实现,值得注意的有几点:

  1. 读写数据前,需要检查codec是否已经关闭了
  2. 读写数据时需要处理拆包粘包(通过readData函数处理)

Server实现

同样直接上代码:

func (c *MessagePackServerCodec) WriteResponse(r *rpc.Response, reply interface{}) error {
	//先判断codec是否已经关闭,如果是则直接返回
	if c.closed {
		return nil
	}
	//将r和reply组装成一个MsgpackResp并序列化
	response := &MsgpackResp{*r, reply}

	respData, err := msgpack.Marshal(response)
	if err != nil {
		panic(err)
		return err
	}
	head := make([]byte, 4)
	binary.BigEndian.PutUint32(head, uint32(len(respData)))
	_, err = c.rwc.Write(head)
	//将序列化产生的数据发送出去
	_, err = c.rwc.Write(respData)
	return err
}

func (c *MessagePackServerCodec) ReadRequestHeader(r *rpc.Request) error {
	//先判断codec是否已经关闭,如果是则直接返回
	if c.closed {
		return nil
	}
	//读取数据
	data, err := readData(c.rwc)
	if err != nil {
		//这里不能直接panic,需要处理EOF和reset的情况
		if err == io.EOF {
			return err
		}
		if strings.Contains(err.Error(), "connection reset by peer") {
			return err
		}
		panic(err) //其他异常直接panic
	}
	//将读取到的数据反序列化成一个MsgpackReq
	var request MsgpackReq
	err = msgpack.Unmarshal(data, &request)

	if err != nil {
		panic(err) //简单起见,出现异常直接panic
	}

	//根据读取到的数据设置request的各个属性
	r.ServiceMethod = request.ServiceMethod
	r.Seq = request.Seq
	//同时将解析到的数据缓存起来
	c.req = request

	return nil
}

func (c *MessagePackServerCodec) ReadRequestBody(arg interface{}) error {
	if arg != nil {
		//参数不为nil,通过反射将结果设置到arg变量
		reflect.ValueOf(arg).Elem().Set(reflect.ValueOf(c.req.Arg))
	}
	return nil
}

func (c *MessagePackServerCodec) Close() error {
	c.closed = true
	if c.rwc != nil {
		return c.rwc.Close()
	}
	return nil
}
复制代码

实际上server端的实现几乎和client端的逻辑一样,只是request和response的角色不同而已。其中有几点需要注意:

  1. server端读取数据时需要处理EOF和连接reset的情况
  2. server在返回数据时没有显式处理接口产生的error,只是将reply传递了回去,这是因为error在rpc.Request里存着,不用codec处理

处理拆包粘包

具体思路参考go语言处理TCP拆包/粘包 ,这里附上readData的实现:

func readData(conn io.ReadWriteCloser) (data []byte, returnError error) {
	const HeadSize = 4 //设定长度部分占4个字节
	headBuf := bytes.NewBuffer(make([]byte, 0, HeadSize))
	headData := make([]byte, HeadSize)
	for {
		readSize, err := conn.Read(headData)
		if err != nil {
			returnError = err
			return
		}
		headBuf.Write(headData[0:readSize])
		if headBuf.Len() == HeadSize {
			break
		} else {
			headData = make([]byte, HeadSize-readSize)
		}
	}
	bodyLen := int(binary.BigEndian.Uint32(headBuf.Bytes()))
	bodyBuf := bytes.NewBuffer(make([]byte, 0, bodyLen))
	bodyData := make([]byte, bodyLen)
	for {
		readSize, err := conn.Read(bodyData)
		if err != nil {
			returnError = err
			return
		}
		bodyBuf.Write(bodyData[0:readSize])
		if bodyBuf.Len() == bodyLen {
			break
		} else {
			bodyData = make([]byte, bodyLen-readSize)
		}
	}
	data = bodyBuf.Bytes()
	returnError = nil
	return
}
复制代码

测试代码

接下来我们通过简单的Echo调用测试一下我们的codec:

//声明接口类
type EchoService struct {}
//定义方法Echo
func (service *EchoService) Echo(arg string, result *string) error {
	*result = arg
	return nil
}
//服务端启动逻辑
func RegisterAndServeOnTcp() {
	err := rpc.Register(&EchoService{})//注册并不是注册方法,而是注册EchoService的一个实例
	if err != nil {
		log.Fatal("error registering", err)
		return
	}
	tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234")
	if err != nil {
		log.Fatal("error resolving tcp", err)
	}
	listener, err := net.ListenTCP("tcp", tcpAddr)

	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Fatal("error accepting", err)
		} else {
			//这里先通过NewServerCodec获得一个实例,然后调用rpc.ServeCodec来启动服务
			rpc.ServeCodec(msgpk.NewServerCodec(conn))
		}
	}
}
//客户端调用逻辑
func Echo(arg string) (result string, err error) {
	var client *rpc.Client
	conn, err := net.Dial("tcp", ":1234")
	client = rpc.NewClientWithCodec(msgpk.NewClientCodec(conn))

	defer client.Close()

	if err != nil {
		return "", err
	}
	err = client.Call("EchoService.Echo", arg, &result) //通过类型加方法名指定要调用的方法
	if err != nil {
		return "", err
	}
	return result, err
}
//main函数
func main() {
	go server.RegisterAndServeOnTcp() //先启动服务端
	time.Sleep(1e9)
	wg := new(sync.WaitGroup) //waitGroup用于阻塞主线程防止提前退出
	callTimes := 10
	wg.Add(callTimes)
	for i := 0; i < callTimes; i++ {
		go func() {
		        //使用hello world加一个随机数作为参数
			argString := "hello world "+strconv.Itoa(rand.Int())
			resultString, err := client.Echo(argString)
			if err != nil {
				log.Fatal("error calling:", err)
			}
			if resultString != argString {
				fmt.Println("error")
			} else {
				fmt.Printf("echo:%s\n", resultString)
			}
			wg.Done()
		}()
	}
	wg.Wait()
}
复制代码

上面的例子里首先通过go server.RegisterAndServeOnTcp()启动了服务端,然后同时启动了10个go routine来发起请求,客户端在收到响应之后会打印对应的结果。最后执行main函数,控制台会输出结果(后面的随机数可能会不同):

echo:hello world 8674665223082153551
echo:hello world 6129484611666145821
echo:hello world 5577006791947779410
echo:hello world 605394647632969758
echo:hello world 4037200794235010051
echo:hello world 3916589616287113937
echo:hello world 894385949183117216
echo:hello world 1443635317331776148
echo:hello world 2775422040480279449
echo:hello world 6334824724549167320
复制代码

结语

到这里,一个简单的自定义的go语言rpc就已经完成了,虽然自定义部分只有序列化协议部分而已,比如线程模型仍是go rpc自带的逻辑,除此之外也没有前言里提到的各种高级功能。后续再考虑尝试用go语言从零开始实现一个RPC吧。

其他

并发场景

有细心的同学可能已经发现了,这里实现的逻辑当中完全没有考虑并发的问题,缓存数据也是直接放到codec对象。而这样简单的实现也不会导致并发调用失败,其中具体的原因就是go rpc在处理每个codec对象时,读取请求都是顺序的,然后再并发的处理请求并返回结果。


有疑问加站长微信联系(非本文作者)

本文来自:掘金

感谢作者:掘金

查看原文:go语言实现自己的RPC:go rpc codec

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

795 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传