概述
RPC是远程过程调用[1](Remote Procedure Call)的缩写形式。RPC调用的原理其实很简单,它类似于三层构架的C/S系统,第三方的客户程序通过接口调用RPC内部的标准或自定义函数,获得函数返回的数据进行处理后显示或打印。
TinyRPC 是丸子基于Go语言标准库 net/rpc 扩展的远程过程调用框架,它具有以下特性:
- 基于TCP传输层协议
- 支持多种压缩格式:gzip、snappy、zlib;
- 基于二进制的 Protocol Buffer 序列化协议:具有协议编码小及高扩展性和跨平台性;
- 支持生成工具:TinyRPC提供的 protoc-gen-tinyrpc 插件可以帮助开发者快速定义自己的服务;
TinyRPC 的源代码仅有一千行左右,通过学习 TinyRPC ,开发者可以得到以下收获:
- 代码简洁规范
- 涵盖大多数 Go 语言基础用法和高级特性
- 单元测试编写技巧
- TCP流中处理数据包的技巧
- RPC框架的设计理念
地址:github.com/zehuamama/tinyrpc
基于TCP的TinyRPC协议
在TinyRPC中,请求消息由TinyRPC客户端的应用程序发出,在TCP的字节流中,请求消息分为三部分:
- 由可变长量编码的 uint 类型用来标识请求头的长度;
- 基于 Protocol Buffer 协议编码的请求头部信息
- 基于 Protocol Buffer 协议编码的请求体,见图所示:
TinyRPC请求消息结构
在TinyRPC中,响应消息由TinyRPC服务端的应用程序响应,在TCP的字节流中,响应消息分为三部分:
- 由可变长量编码的 uint 类型用来标识响应头的长度;
- 基于 Protocol Buffer 协议编码的响应头部信息
- 基于 Protocol Buffer 协议编码的响应体,见图所示:
TinyRPC响应消息结构
- 其中ID为RPC调用的序号,以便在并发调用时,客户端根据响应的ID序号来判断RPC的调用结果;
- Error message为调用时发生错误的消息,若该内容为空则表示未出现RPC调用错误;
- 在请求I/O流中,请求体(Request Body)表示RPC的参数内容;而在响应I/O流中,响应体(Response Body)则表示RPC调用的结果,这些Body在TinyRPC中均采用 Protocol Buffer 协议编码。
头部(Header)消息编码
由于TinyRPC是 Protocol Buffer 协议编码,相对应地,它的Header内容同样也是由 Protocol Buffer 协议编码的,所以我们需要先定义TinyRPC的头文件,即 header.proto:
syntax = "proto3";
package header;
enum Compress{
Raw = 0;
Gzip = 1;
Snappy = 2;
Zlib = 3;
}
message RequestHeader {
Compress compress_type = 1;
string method = 2;
uint64 id = 3;
uint32 request_len = 4;
uint32 checksum = 5;
}
message ResponseHeader {
Compress compress_type = 1;
uint64 id = 2;
string error = 3;
uint32 response_len = 4;
uint32 checksum = 5;
}
从上述的 header.proto 由几个部分组成:
- 枚举类型的 compress_type ,它表示RPC的协议内容的压缩类型,TinyRPC支持四种压缩类型,Raw、Gzip、Snappy、Zlib,其中Raw表示不对报文进行压缩;
- 字符串类型的 method ,它表示远程过程调用的名称;
- 整型变量 id ,它表示RPC调用的序号,在远程过程调用时,请求的 id 与响应的 id 需要进行相对应;
- request_len 表示请求体的长度,若 compress_type 不为Raw,request_len 表示压缩后请求体的长度;
- response_len 表示响应体的长度,若 compress_type 不为Raw,request_len 表示压缩后响应体的长度;
- error 表示远程过程调用调用出现的错误,若 error 为空,则表示远程过程调用没有出现错误
- checksum 表示为校验码,在请求头部和响应头部均存在 checksum ,它分别对请求体和响应体的报文通过CRC32散列算法进行校验。
我们通过 protoc 工具,可以将 header.proto 生成Go语言文件 header/header.pb.go:
protoc --go_out=. header.proto
头部(Header)消息对象池
为了减少创建请求头部对象 RequestHeader 和响应头部对象 ResponseHeader 的次数,我们通过为这两个结构体建立对象池,以便可以进行复用。
同时我们为 RequestHeader 和 ResponseHeader 都实现了ResetHeader方法,当每次使用完这些对象时,我们调用ResetHeader让结构体内容初始化,随后再把它们丢回对象池里。
代码 header/pool.go 如下:
package header
import "sync"
var (
RequestPool sync.Pool
ResponsePool sync.Pool
)
func init() {
RequestPool = sync.Pool{New: func() any {
return &RequestHeader{}
}}
ResponsePool = sync.Pool{New: func() any {
return &ResponseHeader{}
}}
}
// ResetHeader reset request header
func (h *RequestHeader) ResetHeader() {
h.Id = 0
h.Checksum = 0
h.Method = ""
h.CompressType = 0
h.RequestLen = 0
}
// ResetHeader reset response header
func (h *ResponseHeader) ResetHeader() {
h.Error = ""
h.Id = 0
h.CompressType = 0
h.Checksum = 0
h.ResponseLen = 0
}
IO操作
TinyRPC的IO操作函数在codec/io.go中,其中 sendFrame 函数会向IO中写入uvarint类型的 size ,表示要发送数据的长度,随后将该字节slice类型的数据 data 写入IO流中。
- 若写入数据的长度为 0 ,此时sendFrame 函数会向IO流写入uvarint类型的 0 值;
- 若写入数据的长度大于 0 ,此时sendFrame 函数会向IO流写入uvarint类型的 len(data) 值,随后将该字节串的数据 data 写入IO流中。
代码如下所示:
func sendFrame(w io.Writer, data []byte) (err error) {
var size [binary.MaxVarintLen64]byte
if data == nil || len(data) == 0 {
n := binary.PutUvarint(size[:], uint64(0))
if err = write(w, size[:n]); err != nil {
return
}
return
}
n := binary.PutUvarint(size[:], uint64(len(data)))
if err = write(w, size[:n]); err != nil {
return
}
if err = write(w, data); err != nil {
return
}
return
}
func write(w io.Writer, data []byte) error {
for index := 0; index < len(data); {
n, err := w.Write(data[index:])
if _, ok := err.(net.Error); !ok {
return err
}
index += n
}
return nil
}
recvFrame 函数与sendFrame 函数类似,首先会向IO中读入uvarint类型的 size ,表示要接收数据的长度,随后将该从IO流中读取该 size 长度字节串。
注意,由于 codec 层会传入一个bufio类型的结构体,bufio类型实现了有缓冲的IO操作,以便减少IO在用户态与内核态拷贝的次数。
- 若 recvFrame 函数从IO流读取uvarint类型的 size 值大于0,随后 recvFrame 将该从IO流中读取该 size 长度字节串。
func recvFrame(r io.Reader) (data []byte, err error) {
size, err := binary.ReadUvarint(r.(*bufio.Reader))
if err != nil {
return nil, err
}
if size != 0 {
data = make([]byte, size)
if err = read(r, data); err != nil {
return nil, err
}
}
return data, nil
}
func read(r io.Reader, data []byte) error {
for index := 0; index < len(data); {
n, err := r.Read(data[index:])
if err != nil {
if _, ok := err.(net.Error); !ok {
return err
}
}
index += n
}
return nil
}
实现ClientCodec接口
由于TinyRPC是基于标准库net/rpc扩展的,所以TinyRPC在codec层需要实现net/rpc的ClientCodec接口,我们先看看ClientCodec的代码:
// 文件 src/net/rpc/server.go
type ClientCodec interface {
WriteRequest(*Request, any) error
ReadResponseHeader(*Response) error
ReadResponseBody(any) error
Close() error
}
// Request 标准库里的请求体结构
type Request struct {
ServiceMethod string
Seq uint64
next *Request
}
// Response 标准库里的响应结构
type Response struct {
ServiceMethod string
Seq uint64
Error string
next *Response
}
其中ClientCodec接口包括写请求、读响应头部和读响应体,我们建立一个clientCode的结构体用来实现ClientCodec接口:
代码 codec/client.go 如下:
type clientCodec struct {
r io.Reader
w io.Writer
c io.Closer
compressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib)
response header.ResponseHeader // rpc response header
mutex sync.Mutex // protect pending map
pending map[uint64]string
}
其中 compressor 表示压缩类型(这个后续再讲),response 是响应的头部,mutex 是用于保护 pending 的互斥锁;
func NewClientCodec(conn io.ReadWriteCloser,
compressType compressor.CompressType) rpc.ClientCodec {
return &clientCodec{
r: bufio.NewReader(conn),
w: bufio.NewWriter(conn),
c: conn,
compressor: compressType,
pending: make(map[uint64]string),
这里的读写IO分别使用 bufio.NewReader 和 bufio.NewWriter 构造,通过缓冲IO来提高RPC的读写性能;
首先 clientCode 结构体实现了 ClientCodec 接口的WriteRequest 方法:
func (c *clientCodec) WriteRequest(r *rpc.Request, param any) error {
c.mutex.Lock()
// 将请求序号和请求方法关联起来
c.pending[r.Seq] = r.ServiceMethod
c.mutex.Unlock()
err := writeRequest(c.w, r, c.compressor, param)
if err != nil {
return err
}
return nil
}
func writeRequest(w io.Writer, r *rpc.Request,
compressType compressor.CompressType, param any) error {
var request proto.Message
if param != nil {
var ok bool
// 断言该接口是否实现了proto.Message接口的方法
if request, ok = param.(proto.Message); !ok {
return errs.NotImplementProtoMessageError
}
}
// 判断压缩类型是否存在
if _, ok := compressor.Compressors[compressType]; !ok {
return errs.NotFoundCompressorError
}
var pbRequest []byte
// 将RPC的参数Marshal为protocol buffer协议
if request != nil {
var err error
pbRequest, err = proto.Marshal(request)
if err != nil {
return err
}
}
// 对协议进行压缩
var compressedPbRequest []byte
compressedPbRequest, err := compressor.Compressors[compressType].Zip(pbRequest)
if err != nil {
return err
}
// 从请求头部对象池中取出请求头部结构体
h := header.RequestPool.Get().(*header.RequestHeader)
defer func() {
h.ResetHeader()
header.RequestPool.Put(h)
}()
h.Id = r.Seq
h.Method = r.ServiceMethod
h.RequestLen = uint32(len(compressedPbRequest))
h.CompressType = header.Compress(compressType)
h.Checksum = crc32.ChecksumIEEE(compressedPbRequest)
// 将请求头部Marshal为protocol buffer协议
pbHeader, err := proto.Marshal(h)
if err != err {
return err
}
// 发送请求头部
if err := sendFrame(w, pbHeader); err != nil {
return err
}
// 发送RPC参数
if write(w, compressedPbRequest); err != nil {
return err
}
// 记得flush,将缓冲IO的数据刷掉
w.(*bufio.Writer).Flush()
return nil
}
实现 ClientCodec 接口的 ReadResponseHeader 方法:
func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {
c.response.ResetHeader() // 重置clientCodec的响应头部
err := readResponseHeader(c.r, &c.response)
if err != nil {
return err
}
c.mutex.Lock()
r.Seq = c.response.Id // 填充 r.Seq
r.Error = c.response.Error // 填充 r.Error
r.ServiceMethod = c.pending[r.Seq] // 根据序号填充 r.ServiceMethod
delete(c.pending, r.Seq) // 删除pending里的序号
c.mutex.Unlock()
return nil
}
func readResponseHeader(r io.Reader, h *header.ResponseHeader) error {
pbHeader, err := recvFrame(r) //从IO中读取响应头部
if err != nil {
return err
}
err = proto.Unmarshal(pbHeader, h)//将字节串Unmarshal成ResponseHeader结构类型
if err != nil {
return err
}
return nil
}
实现 ClientCodec 接口的 ReadResponseHeader 方法:
func (c *clientCodec) ReadResponseBody(x any) error {
if x == nil {
if c.response.ResponseLen != 0 { //根据响应头部的长度,读取该长度的字节串
if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {
return err
}
}
return nil
}
err := readResponseBody(c.r, &c.response, x)
if err != nil {
return nil
}
return nil
}
func readResponseBody(r io.Reader, h *header.ResponseHeader, x any) error {
var response proto.Message
if x != nil {
var ok bool
// 断言该接口是否实现了proto.Message接口的方法
response, ok = x.(proto.Message)
if !ok {
return errs.NotImplementProtoMessageError
}
}
// 读取RPC调用的响应体
pbResponse := make([]byte, h.ResponseLen)
err := read(r, pbResponse)
if err != nil {
return err
}
// 进行校验
if h.Checksum != 0 {
if crc32.ChecksumIEEE(pbResponse) != h.Checksum {
return errs.UnexpectedChecksumError
}
}
// 判断压缩器是否存在
if _, ok := compressor.Compressors[compressor.CompressType(h.CompressType)]; !ok {
return errs.NotFoundCompressorError
}
// 解压响应体的字节串
var resp []byte
resp, err = compressor.Compressors[compressor.CompressType(h.CompressType)].Unzip(pbResponse)
if err != nil {
return err
}
// 反序列化成 protocol buffer 结构体
if response != nil {
err = proto.Unmarshal(resp, response)
if err != nil {
return err
}
}
return nil
}
实现ServerCodec接口
TinyRPC在codec层还需要实现net/rpc的ServerCodec接口
ServerCodec 的接口和 ClientCodec 接口十分类似:
type ServerCodec interface {
ReadRequestHeader(*Request) error
ReadRequestBody(any) error
WriteResponse(*Response, any) error
Close() error
}
其中 ServerCodec 接口包括写响应、读请求头部和读请求体,我们建立一个 serverCodec 的结构体用来实现 ServerCodec 接口,代码codec/server.go:
type serverCodec struct {
r io.Reader
w io.Writer
c io.Closer
request header.RequestHeader
mutex sync.Mutex // 保护 seq, pending
seq uint64 // 一个自增的序号
pending map[uint64]uint64
}
// NewServerCodec Create a new server codec
func NewServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
return &serverCodec{
r: bufio.NewReader(conn),
w: bufio.NewWriter(conn),
c: conn,
pending: make(map[uint64]uint64),
}
}
是不是和刚才的 clientCode 结构体神似?
首先, serverCodec 结构体实现了 ServerCodec 接口的 ReadRequestHeader方法:
func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {
s.request.ResetHeader() // 重置serverCodec结构体的请求头部
err := readRequestHeader(s.r, &s.request)
if err != nil {
return err
}
s.mutex.Lock()
s.seq++ // 序号自增
s.pending[s.seq] = s.request.Id // 自增序号与请求头部的ID进行绑定
r.ServiceMethod = s.request.Method // 填充 r.ServiceMethod
r.Seq = s.seq // 填充 r.Seq
s.mutex.Unlock()
return nil
}
func readRequestHeader(r io.Reader, h *header.RequestHeader) (err error) {
pbHeader, err := recvFrame(r) // 读取请求头部文件
if err != nil {
return err
}
err = proto.Unmarshal(pbHeader, h) //将字节串Unmarshal成RequestHeader结构类型
if err != nil {
return err
}
return nil
}
实现 ServerCodec 接口的 ReadRequestBody 方法:
func (s *serverCodec) ReadRequestBody(x any) error {
if x == nil {
if s.request.RequestLen != 0 { // 根据请求头部的RequestLen读取该长度的字节串
if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil {
return err
}
}
return nil
}
// 断言该接口是否实现了proto.Message接口的方法
request, ok := x.(proto.Message)
if !ok {
return errors.NotImplementProtoMessageError
}
err := readRequestBody(s.r, &s.request, request)
if err != nil {
return nil
}
return nil
}
func readRequestBody(r io.Reader, h *header.RequestHeader, request proto.Message) error {
requestBody := make([]byte, h.RequestLen)
err := read(r, requestBody) // 读取该长度的请求体
if err != nil {
return err
}
// 进行校验
if h.Checksum != 0 {
if crc32.ChecksumIEEE(requestBody) != h.Checksum {
return errors.UnexpectedChecksumError
}
}
// 判断压缩器是否存在
if _, ok := compressor.Compressors[compressor.CompressType(h.CompressType)]; !ok {
return errs.NotFoundCompressorError
}
// 压缩字节串
var pbRequest []byte
pbRequest, err = compressor.Compressors[compressor.CompressType(h.CompressType)].Unzip(requestBody)
if err != nil {
return err
}
// 反序列化成 protocol buffer 结构体
if request != nil {
err = proto.Unmarshal(pbRequest, request)
if err != nil {
return err
}
}
return nil
}
实现 ServerCodec 接口的 WriteResponse 方法:
func (s *serverCodec) WriteResponse(r *rpc.Response, x any) error {
var response proto.Message
if x != nil {
var ok bool
if response, ok = x.(proto.Message); !ok {
if _, ok = x.(struct{}); !ok {
s.mutex.Lock()
delete(s.pending, r.Seq)
s.mutex.Unlock()
return errors.NotImplementProtoMessageError
}
}
}
s.mutex.Lock()
id, ok := s.pending[r.Seq]
if !ok {
s.mutex.Unlock()
return errors.InvalidSequenceError
}
delete(s.pending, r.Seq)
s.mutex.Unlock()
err := writeResponse(s.w, id, r.Error, compressor.CompressType(s.request.CompressType), response)
if err != nil {
return err
}
return nil
}
func writeResponse(w io.Writer, id uint64, serr string,
compressType compressor.CompressType, response proto.Message) (err error) {
// 若rpc存在错误,将响应体设置为nil
if serr != "" {
response = nil
}
// 判断压缩器是否存在
if _, ok := compressor.Compressors[compressType]; !ok {
return errs.NotFoundCompressorError
}
// 将响应体Marshal成字节串
var pbResponse []byte
if response != nil {
pbResponse, err = proto.Marshal(response)
if err != nil {
return err
}
}
var compressedPbResponse []byte
// 压缩响应体
compressedPbResponse, _ = compressor.Compressors[compressType].Zip(pbResponse)
// 从对象池中取出一个响应头部
h := header.ResponsePool.Get().(*header.ResponseHeader)
defer func() {
h.ResetHeader()
header.ResponsePool.Put(h)
}()
h.Id = id
h.Error = serr
h.ResponseLen = uint32(len(compressedPbResponse))
h.Checksum = crc32.ChecksumIEEE(compressedPbResponse)
h.CompressType = header.Compress(compressType)
// 将响应头部Marshal成字节串
pbHeader, err := proto.Marshal(h)
if err != err {
return
}
// 发送响应头部
pbHeader, err := proto.Marshal(h)
if err != err {
return
}
// 发送响应体
if err = sendFrame(w, pbHeader); err != nil {
return
}
if err = write(w, compressedPbResponse); err != nil {
return
}
// 记得flush一下
w.(*bufio.Writer).Flush()
return nil
}
TinyRPC的压缩器
TinyRPC的压缩器很短,Raw、Gzip、Snappy、Zlib压缩均实现了Compressor 接口,代码compressor/compressor.go:
package compressor
type CompressType int32
const (
Raw CompressType = iota
Gzip
Snappy
Zlib
)
// Compressors 四种压缩器的实现
var Compressors = map[CompressType]Compressor{
Raw: RawCompressor{},
Gzip: GzipCompressor{},
Snappy: SnappyCompressor{},
Zlib: ZlibCompressor{},
}
// Compressor 压缩器接口
type Compressor interface {
Zip([]byte) ([]byte, error)
Unzip([]byte) ([]byte, error)
}
TinyRPC的Server
TinyRPC的服务端非常简单,把标准库 net/rpc 的 Server 结构包装了一层,其中 ServeCodec 使用的是TinyRPC的编解码器,代码server.go:
type Server struct {
*rpc.Server
}
func NewServer() *Server {
return &Server{&rpc.Server{}}
}
...
func (s *Server) Serve(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Print("tinyrpc.Serve: accept:", err.Error())
return
}
go s.Server.ServeCodec(codec.NewServerCodec(conn)) // 使用TinyRPC的解码器
}
}
TinyRPC的Client
TinyRPC的客户端也很简单,把标准库 net/rpc 的 Client 结构包装了一层,其中 ClientCodec 使用的是TinyRPC的编解码器,代码client.go:
注意:TinyRPC Client使用一种Go语言常用的设计模式:功能选项模式
type Client struct {
*rpc.Client
}
// Option 选项接口
type Option interface {
apply(*options)
}
type options struct {
compressType compressor.CompressType
}
type compressOption compressor.CompressType
func (c compressOption) apply(opt *options) {
opt.compressType = compressor.CompressType(c)
}
// WithCompress 设置压缩类型
func WithCompress(c compressor.CompressType) Option {
return compressOption(c)
}
func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {
options := options{
compressType: compressor.Raw,
}
for _, o := range opts { // 应用选项
o.apply(&options)
}
return &Client{rpc.NewClientWithCodec(
codec.NewClientCodec(conn, options.compressType))} // 使用TinyRPC的解码器
}
func (c *Client) Call(serviceMethod string, args any, reply any) error {
return c.Client.Call(serviceMethod, args, reply)
}
func (c *Client) AsyncCall(serviceMethod string, args any, reply any) chan *rpc.Call {
return c.Go(serviceMethod, args, reply, nil).Done
}
欢迎点赞收藏,作者接下来将推出下一个小项目 TinyBalancer ~
有疑问加站长微信联系(非本文作者)