用Go轻松实现一个高性能RPC

zehuamama · · 1911 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

概述

RPC是远程过程调用[1]Remote Procedure Call)的缩写形式。RPC调用的原理其实很简单,它类似于三层构架的C/S系统,第三方的客户程序通过接口调用RPC内部的标准或自定义函数,获得函数返回的数据进行处理后显示或打印。

TinyRPC 是丸子基于Go语言标准库 net/rpc 扩展的远程过程调用框架,它具有以下特性:

  • 基于TCP传输层协议
  • 支持多种压缩格式:gzip、snappy、zlib;
  • 基于二进制的 Protocol Buffer 序列化协议:具有协议编码小及高扩展性和跨平台性;
  • 支持生成工具:TinyRPC提供的 protoc-gen-tinyrpc 插件可以帮助开发者快速定义自己的服务;

TinyRPC 的源代码仅有一千行左右,通过学习 TinyRPC ,开发者可以得到以下收获:

  • 代码简洁规范
  • 涵盖大多数 Go 语言基础用法和高级特性
  • 单元测试编写技巧
  • TCP流中处理数据包的技巧
  • RPC框架的设计理念

地址:github.com/zehuamama/tinyrpc

基于TCP的TinyRPC协议

在TinyRPC中,请求消息由TinyRPC客户端的应用程序发出,在TCP的字节流中,请求消息分为三部分:

  • 由可变长量编码的 uint 类型用来标识请求头的长度;
  • 基于 Protocol Buffer 协议编码的请求头部信息
  • 基于 Protocol Buffer 协议编码的请求体,见图所示:

TinyRPC请求消息结构

在TinyRPC中,响应消息由TinyRPC服务端的应用程序响应,在TCP的字节流中,响应消息分为三部分:

  • 由可变长量编码的 uint 类型用来标识响应头的长度;
  • 基于 Protocol Buffer 协议编码的响应头部信息
  • 基于 Protocol Buffer 协议编码的响应体,见图所示:

TinyRPC响应消息结构

  • 其中ID为RPC调用的序号,以便在并发调用时,客户端根据响应的ID序号来判断RPC的调用结果;
  • Error message为调用时发生错误的消息,若该内容为空则表示未出现RPC调用错误;
  • 在请求I/O流中,请求体(Request Body)表示RPC的参数内容;而在响应I/O流中,响应体(Response Body)则表示RPC调用的结果,这些Body在TinyRPC中均采用 Protocol Buffer 协议编码。

头部(Header)消息编码

由于TinyRPC是 Protocol Buffer 协议编码,相对应地,它的Header内容同样也是由 Protocol Buffer 协议编码的,所以我们需要先定义TinyRPC的头文件,即 header.proto

syntax = "proto3";

package header;

enum Compress{
	Raw = 0;
	Gzip = 1;
	Snappy = 2;
	Zlib = 3;
}

message RequestHeader {
	Compress  compress_type = 1;
	string 	method = 2;
	uint64  id = 3;
	uint32  request_len = 4;
	uint32  checksum = 5;
}

message ResponseHeader {
	Compress compress_type = 1;
	uint64 id = 2;
	string error = 3;
	uint32 response_len = 4;
	uint32 checksum = 5;
}

从上述的 header.proto 由几个部分组成:

  • 枚举类型的 compress_type 它表示RPC的协议内容的压缩类型,TinyRPC支持四种压缩类型,Raw、Gzip、Snappy、Zlib,其中Raw表示不对报文进行压缩;
  • 字符串类型的 method ,它表示远程过程调用的名称;
  • 整型变量 id ,它表示RPC调用的序号,在远程过程调用时,请求的 id 与响应的 id 需要进行相对应;
  • request_len 表示请求体的长度,若 compress_type 不为Raw,request_len 表示压缩后请求体的长度;
  • response_len 表示响应体的长度,若 compress_type 不为Raw,request_len 表示压缩后响应体的长度;
  • error 表示远程过程调用调用出现的错误,若 error 为空,则表示远程过程调用没有出现错误
  • checksum 表示为校验码,在请求头部和响应头部均存在 checksum ,它分别对请求体和响应体的报文通过CRC32散列算法进行校验。

我们通过 protoc 工具,可以将 header.proto 生成Go语言文件 header/header.pb.go

protoc --go_out=. header.proto

头部(Header)消息对象池

为了减少创建请求头部对象 RequestHeader 和响应头部对象 ResponseHeader 的次数我们通过为这两个结构体建立对象池,以便可以进行复用。

同时我们为 RequestHeader 和 ResponseHeader 都实现了ResetHeader方法,当每次使用完这些对象时,我们调用ResetHeader让结构体内容初始化,随后再把它们丢回对象池里。

代码 header/pool.go 如下:

package header

import "sync"

var (
	RequestPool  sync.Pool
	ResponsePool sync.Pool
)

func init() {
	RequestPool = sync.Pool{New: func() any {
		return &RequestHeader{}
	}}
	ResponsePool = sync.Pool{New: func() any {
		return &ResponseHeader{}
	}}
}

// ResetHeader reset request header
func (h *RequestHeader) ResetHeader() {
	h.Id = 0
	h.Checksum = 0
	h.Method = ""
	h.CompressType = 0
	h.RequestLen = 0
}

// ResetHeader reset response header
func (h *ResponseHeader) ResetHeader() {
	h.Error = ""
	h.Id = 0
	h.CompressType = 0
	h.Checksum = 0
	h.ResponseLen = 0
}

IO操作

TinyRPC的IO操作函数在codec/io.go中,其中 sendFrame 函数会向IO中写入uvarint类型的 size ,表示要发送数据的长度,随后将该字节slice类型的数据 data 写入IO流中。

  • 若写入数据的长度为 0 ,此时sendFrame 函数会向IO流写入uvarint类型的 0 值;
  • 若写入数据的长度大于 0 ,此时sendFrame 函数会向IO流写入uvarint类型的 len(data) 值,随后将该字节串的数据 data 写入IO流中。

代码如下所示:

func sendFrame(w io.Writer, data []byte) (err error) {
	var size [binary.MaxVarintLen64]byte

	if data == nil || len(data) == 0 {
		n := binary.PutUvarint(size[:], uint64(0))
		if err = write(w, size[:n]); err != nil {
			return
		}
		return
	}

	n := binary.PutUvarint(size[:], uint64(len(data)))
	if err = write(w, size[:n]); err != nil {
		return
	}
	if err = write(w, data); err != nil {
		return
	}
	return
}

func write(w io.Writer, data []byte) error {
	for index := 0; index < len(data); {
		n, err := w.Write(data[index:])
		if _, ok := err.(net.Error); !ok {
			return err
		}
		index += n
	}
	return nil
}

recvFrame 函数与sendFrame 函数类似,首先会向IO中读入uvarint类型的 size ,表示要接收数据的长度,随后将该从IO流中读取该 size 长度字节串。

注意,由于 codec 层会传入一个bufio类型的结构体,bufio类型实现了有缓冲的IO操作,以便减少IO在用户态与内核态拷贝的次数。
  • 若 recvFrame 函数从IO流读取uvarint类型的 size 值大于0,随后 recvFrame 将该从IO流中读取该 size 长度字节串。
func recvFrame(r io.Reader) (data []byte, err error) {
	size, err := binary.ReadUvarint(r.(*bufio.Reader))
	if err != nil {
		return nil, err
	}
	if size != 0 {
		data = make([]byte, size)
		if err = read(r, data); err != nil {
			return nil, err
		}
	}
	return data, nil
}

func read(r io.Reader, data []byte) error {
	for index := 0; index < len(data); {
		n, err := r.Read(data[index:])
		if err != nil {
			if _, ok := err.(net.Error); !ok {
				return err
			}
		}
		index += n
	}
	return nil
}

实现ClientCodec接口

由于TinyRPC是基于标准库net/rpc扩展的,所以TinyRPC在codec层需要实现net/rpcClientCodec接口,我们先看看ClientCodec的代码:

// 文件 src/net/rpc/server.go

type ClientCodec interface {
	WriteRequest(*Request, any) error
	ReadResponseHeader(*Response) error
	ReadResponseBody(any) error

	Close() error
}
// Request 标准库里的请求体结构
type Request struct {
	ServiceMethod string 
	Seq           uint64 
	next          *Request 
}
// Response 标准库里的响应结构
type Response struct {
	ServiceMethod string 
	Seq           uint64 
	Error         string 
	next          *Response 
}

其中ClientCodec接口包括写请求读响应头部读响应体,我们建立一个clientCode的结构体用来实现ClientCodec接口:

代码 codec/client.go 如下:

type clientCodec struct {
	r io.Reader
	w io.Writer
	c io.Closer

	compressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib)
	response   header.ResponseHeader   // rpc response header
	mutex      sync.Mutex              // protect pending map
	pending    map[uint64]string
}

其中 compressor 表示压缩类型(这个后续再讲),response 是响应的头部,mutex 是用于保护 pending 的互斥锁;

func NewClientCodec(conn io.ReadWriteCloser,
	compressType compressor.CompressType) rpc.ClientCodec {

	return &clientCodec{
		r:          bufio.NewReader(conn),
		w:          bufio.NewWriter(conn),
		c:          conn,
		compressor: compressType,
		pending:    make(map[uint64]string),
	

这里的读写IO分别使用 bufio.NewReader 和 bufio.NewWriter 构造,通过缓冲IO来提高RPC的读写性能;

首先 clientCode 结构体实现了 ClientCodec 接口的WriteRequest 方法:

func (c *clientCodec) WriteRequest(r *rpc.Request, param any) error {
	c.mutex.Lock()
        // 将请求序号和请求方法关联起来
	c.pending[r.Seq] = r.ServiceMethod
	c.mutex.Unlock()
	err := writeRequest(c.w, r, c.compressor, param)
	if err != nil {
		return err
	}
	return nil
}

func writeRequest(w io.Writer, r *rpc.Request,
	compressType compressor.CompressType, param any) error {
	var request proto.Message
	if param != nil {
		var ok bool
                // 断言该接口是否实现了proto.Message接口的方法 
		if request, ok = param.(proto.Message); !ok {   
			return errs.NotImplementProtoMessageError
		}
	}
        // 判断压缩类型是否存在
	if _, ok := compressor.Compressors[compressType]; !ok {
		return errs.NotFoundCompressorError
	}
 
	var pbRequest []byte
        // 将RPC的参数Marshal为protocol buffer协议
	if request != nil {
		var err error
		pbRequest, err = proto.Marshal(request)
		if err != nil {
			return err
		}
	}
        // 对协议进行压缩        
	var compressedPbRequest []byte
	compressedPbRequest, err := compressor.Compressors[compressType].Zip(pbRequest)
	if err != nil {
		return err
	}
        // 从请求头部对象池中取出请求头部结构体
	h := header.RequestPool.Get().(*header.RequestHeader)
	defer func() {
		h.ResetHeader()
		header.RequestPool.Put(h)
	}()
	h.Id = r.Seq
	h.Method = r.ServiceMethod
	h.RequestLen = uint32(len(compressedPbRequest))
	h.CompressType = header.Compress(compressType)
	h.Checksum = crc32.ChecksumIEEE(compressedPbRequest)
        // 将请求头部Marshal为protocol buffer协议
	pbHeader, err := proto.Marshal(h)
	if err != err {
		return err
	}
        // 发送请求头部
	if err := sendFrame(w, pbHeader); err != nil {
		return err
	}
        // 发送RPC参数
	if write(w, compressedPbRequest); err != nil {
		return err
	}
        // 记得flush,将缓冲IO的数据刷掉
	w.(*bufio.Writer).Flush()
	return nil
}

实现 ClientCodec 接口的 ReadResponseHeader 方法:

func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {
	c.response.ResetHeader()  // 重置clientCodec的响应头部
	err := readResponseHeader(c.r, &c.response)
	if err != nil {
		return err
	}
	c.mutex.Lock()
	r.Seq = c.response.Id // 填充 r.Seq  
	r.Error = c.response.Error // 填充 r.Error
	r.ServiceMethod = c.pending[r.Seq] // 根据序号填充 r.ServiceMethod
	delete(c.pending, r.Seq)  // 删除pending里的序号  
	c.mutex.Unlock()
	return nil
}

func readResponseHeader(r io.Reader, h *header.ResponseHeader) error {
	pbHeader, err := recvFrame(r) //从IO中读取响应头部
	if err != nil {
		return err
	}
	err = proto.Unmarshal(pbHeader, h)//将字节串Unmarshal成ResponseHeader结构类型
	if err != nil {
		return err
	}
	return nil
}

实现 ClientCodec 接口的 ReadResponseHeader 方法:

func (c *clientCodec) ReadResponseBody(x any) error {
	if x == nil {
		if c.response.ResponseLen != 0 {  //根据响应头部的长度,读取该长度的字节串
			if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {
				return err
			}
		}
		return nil
	}

	err := readResponseBody(c.r, &c.response, x) 
	if err != nil {
		return nil
	}
	return nil
}

func readResponseBody(r io.Reader, h *header.ResponseHeader, x any) error {
	var response proto.Message
	if x != nil {
		var ok bool
                // 断言该接口是否实现了proto.Message接口的方法 
		response, ok = x.(proto.Message)
		if !ok {
			return errs.NotImplementProtoMessageError
		}
	}
        // 读取RPC调用的响应体
	pbResponse := make([]byte, h.ResponseLen)
	err := read(r, pbResponse)
	if err != nil {
		return err
	}

	// 进行校验
	if h.Checksum != 0 {
		if crc32.ChecksumIEEE(pbResponse) != h.Checksum {
			return errs.UnexpectedChecksumError
		}
	}
        // 判断压缩器是否存在
	if _, ok := compressor.Compressors[compressor.CompressType(h.CompressType)]; !ok {
		return errs.NotFoundCompressorError
	}
	// 解压响应体的字节串
	var resp []byte
	resp, err = compressor.Compressors[compressor.CompressType(h.CompressType)].Unzip(pbResponse)
	if err != nil {
		return err
	}
        // 反序列化成 protocol buffer 结构体
	if response != nil {
		err = proto.Unmarshal(resp, response)
		if err != nil {
			return err
		}
	}
	return nil
}

实现ServerCodec接口

TinyRPC在codec层还需要实现net/rpcServerCodec接口

ServerCodec 的接口和 ClientCodec 接口十分类似:

type ServerCodec interface {
	ReadRequestHeader(*Request) error
	ReadRequestBody(any) error
	WriteResponse(*Response, any) error
	Close() error
}

其中 ServerCodec 接口包括写响应读请求头部读请求体,我们建立一个 serverCodec 的结构体用来实现 ServerCodec 接口,代码codec/server.go

type serverCodec struct {
	r io.Reader
	w io.Writer
	c io.Closer

	request header.RequestHeader

	mutex   sync.Mutex // 保护 seq, pending
	seq     uint64  // 一个自增的序号
	pending map[uint64]uint64
}

// NewServerCodec Create a new server codec
func NewServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
	return &serverCodec{
		r:       bufio.NewReader(conn),
		w:       bufio.NewWriter(conn),
		c:       conn,
		pending: make(map[uint64]uint64),
	}
}

是不是和刚才的 clientCode 结构体神似

首先, serverCodec 结构体实现了 ServerCodec 接口的 ReadRequestHeader方法:

func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {
	s.request.ResetHeader() // 重置serverCodec结构体的请求头部
	err := readRequestHeader(s.r, &s.request)
	if err != nil {
		return err
	}
	s.mutex.Lock()
	s.seq++ // 序号自增
	s.pending[s.seq] = s.request.Id  // 自增序号与请求头部的ID进行绑定
	r.ServiceMethod = s.request.Method  // 填充 r.ServiceMethod 
	r.Seq = s.seq  // 填充 r.Seq  
	s.mutex.Unlock()
	return nil
}

func readRequestHeader(r io.Reader, h *header.RequestHeader) (err error) {
	pbHeader, err := recvFrame(r) // 读取请求头部文件
	if err != nil {
		return err
	}
	err = proto.Unmarshal(pbHeader, h) //将字节串Unmarshal成RequestHeader结构类型
	if err != nil {
		return err
	}
	return nil
}

实现 ServerCodec 接口的 ReadRequestBody 方法:

func (s *serverCodec) ReadRequestBody(x any) error {
	if x == nil {
		if s.request.RequestLen != 0 {  // 根据请求头部的RequestLen读取该长度的字节串
			if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil {
				return err
			}
		}
		return nil
	}
        // 断言该接口是否实现了proto.Message接口的方法 
	request, ok := x.(proto.Message)
	if !ok {
		return errors.NotImplementProtoMessageError
	}

	err := readRequestBody(s.r, &s.request, request)
	if err != nil {
		return nil
	}
	return nil
}

func readRequestBody(r io.Reader, h *header.RequestHeader, request proto.Message) error {
	requestBody := make([]byte, h.RequestLen)

	err := read(r, requestBody) // 读取该长度的请求体  
	if err != nil {
		return err
	}
        // 进行校验
	if h.Checksum != 0 {
		if crc32.ChecksumIEEE(requestBody) != h.Checksum {
			return errors.UnexpectedChecksumError
		}
	}
        // 判断压缩器是否存在
	if _, ok := compressor.Compressors[compressor.CompressType(h.CompressType)]; !ok {
		return errs.NotFoundCompressorError
	}
        // 压缩字节串
	var pbRequest []byte
	pbRequest, err = compressor.Compressors[compressor.CompressType(h.CompressType)].Unzip(requestBody)
	if err != nil {
		return err
	}
        // 反序列化成 protocol buffer 结构体
	if request != nil {
		err = proto.Unmarshal(pbRequest, request)
		if err != nil {
			return err
		}
	}
	return nil
}

实现 ServerCodec 接口的 WriteResponse 方法:

func (s *serverCodec) WriteResponse(r *rpc.Response, x any) error {
	var response proto.Message
	if x != nil {
		var ok bool
		if response, ok = x.(proto.Message); !ok {
			if _, ok = x.(struct{}); !ok {
				s.mutex.Lock()
				delete(s.pending, r.Seq)
				s.mutex.Unlock()
				return errors.NotImplementProtoMessageError
			}
		}
	}

	s.mutex.Lock()
	id, ok := s.pending[r.Seq]
	if !ok {
		s.mutex.Unlock()
		return errors.InvalidSequenceError
	}
	delete(s.pending, r.Seq)
	s.mutex.Unlock()

	err := writeResponse(s.w, id, r.Error, compressor.CompressType(s.request.CompressType), response)
	if err != nil {
		return err
	}

	return nil
}

func writeResponse(w io.Writer, id uint64, serr string,
	compressType compressor.CompressType, response proto.Message) (err error) {
        // 若rpc存在错误,将响应体设置为nil
	if serr != "" {
		response = nil
	}
        // 判断压缩器是否存在
	if _, ok := compressor.Compressors[compressType]; !ok {
		return errs.NotFoundCompressorError
	}
        // 将响应体Marshal成字节串
	var pbResponse []byte
	if response != nil {
		pbResponse, err = proto.Marshal(response)
		if err != nil {
			return err
		}
	}

	var compressedPbResponse []byte
        // 压缩响应体
	compressedPbResponse, _ = compressor.Compressors[compressType].Zip(pbResponse)
	// 从对象池中取出一个响应头部
        h := header.ResponsePool.Get().(*header.ResponseHeader)
	defer func() {
		h.ResetHeader()
		header.ResponsePool.Put(h)
	}()
	h.Id = id
	h.Error = serr
	h.ResponseLen = uint32(len(compressedPbResponse))
	h.Checksum = crc32.ChecksumIEEE(compressedPbResponse)
	h.CompressType = header.Compress(compressType)
        // 将响应头部Marshal成字节串
	pbHeader, err := proto.Marshal(h)
	if err != err {
		return
	}
        // 发送响应头部
        pbHeader, err := proto.Marshal(h)
	if err != err {
		return
	}
        // 发送响应体
	if err = sendFrame(w, pbHeader); err != nil {
		return
	}

	if err = write(w, compressedPbResponse); err != nil {
		return
	}
        // 记得flush一下
	w.(*bufio.Writer).Flush()
	return nil
}

TinyRPC的压缩器

TinyRPC的压缩器很短,Raw、Gzip、Snappy、Zlib压缩均实现了Compressor 接口,代码compressor/compressor.go

package compressor

type CompressType int32

const (
	Raw CompressType = iota
	Gzip
	Snappy
	Zlib
)
// Compressors 四种压缩器的实现
var Compressors = map[CompressType]Compressor{
	Raw:    RawCompressor{},
	Gzip:   GzipCompressor{},
	Snappy: SnappyCompressor{},
	Zlib:   ZlibCompressor{},
}
// Compressor 压缩器接口
type Compressor interface {
	Zip([]byte) ([]byte, error)
	Unzip([]byte) ([]byte, error)
}

TinyRPC的Server

TinyRPC的服务端非常简单,把标准库 net/rpc 的 Server 结构包装了一层,其中 ServeCodec 使用的是TinyRPC的编解码器,代码server.go

type Server struct {
	*rpc.Server
}

func NewServer() *Server {
	return &Server{&rpc.Server{}}
}

...

func (s *Server) Serve(lis net.Listener) {
	for {
		conn, err := lis.Accept()
		if err != nil {
			log.Print("tinyrpc.Serve: accept:", err.Error())
			return
		}
		go s.Server.ServeCodec(codec.NewServerCodec(conn))  // 使用TinyRPC的解码器
	}
}

TinyRPC的Client

TinyRPC的客户端也很简单,把标准库 net/rpc 的 Client 结构包装了一层,其中 ClientCodec 使用的是TinyRPC的编解码器,代码client.go

注意:TinyRPC Client使用一种Go语言常用的设计模式:功能选项模式
type Client struct {
	*rpc.Client
}

// Option 选项接口
type Option interface {
	apply(*options)
}

type options struct {
	compressType compressor.CompressType
}

type compressOption compressor.CompressType

func (c compressOption) apply(opt *options) {
	opt.compressType = compressor.CompressType(c)
}

// WithCompress 设置压缩类型
func WithCompress(c compressor.CompressType) Option {
	return compressOption(c)
}

func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {
	options := options{
		compressType: compressor.Raw,
	}
	for _, o := range opts {  // 应用选项    
		o.apply(&options)
	}
	return &Client{rpc.NewClientWithCodec(
		codec.NewClientCodec(conn, options.compressType))} // 使用TinyRPC的解码器
}

func (c *Client) Call(serviceMethod string, args any, reply any) error {
	return c.Client.Call(serviceMethod, args, reply)
}

func (c *Client) AsyncCall(serviceMethod string, args any, reply any) chan *rpc.Call {
	return c.Go(serviceMethod, args, reply, nil).Done
}

 

欢迎点赞收藏,作者接下来将推出下一个小项目 TinyBalancer ~


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1911 次点击  ∙  1 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传