一个基于消息队列的的Go语言RPC框架

yc90s · · 744 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

# 一个基于消息队列的的Go语言RPC框架 项目地址: https://github.com/yc90s/xrpc.git ## 前言 RPC作为分布式系统中的基础组件, 使用非常广泛。大多数的RPC框架都是基于点对点的网络连接, 比如golang原生的rpc框架、grpc等. 点对点连接的通讯方式, 随着集群节点的增加, 会导致集群的拓扑结构越来越复杂, 服务之间的耦合度越来越高, 服务的扩展性和可维护性都会受到影响. 而消息队列的通讯方式, 可以很好的解决这个问题。每个服务只需要关注自己订阅的消息, 不需要关心消息的发送者是谁, 也不需要关心消息的接收者是谁. XRPC设计的原则是为了实现一套基于消息队列的、易于拓展和易于使用的轻量级RPC框架. ## XRPC的特性 除了上面提到的使用消息队列作为RPC的通道之外, XRPC还有以下几个比较实用的特点 - 支持任意参数数量的远程调用, 不需要把接口的参数都打包成一个结构体再调用, 可以像调用本地函数一样 - 支持`Call`和`Cast`两种远程调用方式, `Call`会一直阻塞直到接收到返回值或者超时, 而`Cast`适用于不需要等待返回值的情况 - 代码生成, 实现了一套IDL, 最大程度贴近go语法, 用来定义rpc服务的接口信息, 自动生成接口代码 此外, XRPC的核心代码非常精简, 而且非常容易拓展. ## XRPC的实现 一个RPC框架可以大体分为三个部分:通信、编码/解码、服务调用. 我从这三个方面分别介绍XRPC是怎么做的. XRPC的每个服务都会订阅一个主题, 等待接收远程调用的消息, 每个服务可以注册多个接口. 客户端将要调用的接口和参数序列化后发布到对应的主题, 服务端收到消息后利用反射调用对应接口, 并将结果通过消息队列返回客户端. ### 通信 XRPC抽象出了一套消息队列接口 ``` type MQueen interface { GenerateSubj() string Publish(string, []byte) error Subscribe(string, MQCallback) error UnSubscribe() error } ``` - `GenerateSubj` 生成一个唯一的订阅主题名 - `Publish` 发布一条消息到指定的主题 - `Subscribe` 订阅指定的主题 - `UnSubscribe` 取消订阅 要拓展使用其他的消息队列, 只需要实现`MQueen`接口即可, 目前实现的有nats. ### 编码/解码 编码/解码部分XRPC也抽象了一个接口 ``` type Codec interface { Unmarshal(b []byte, dst any) error Marshal(v any) ([]byte, error) } ``` 同样只要实现这个接口就可以拓展自己的序列化方式, 目前实现的有gob、protobuf, 默认采用gob ### 服务调用 XRPC的支持注册任意数量参数的接口, 以及`Call`和`Cast`两种远程调用方式. 并且实现了一套IDL, 最大程度贴近go语法, 用来定义rpc接口信息, 并自动生成相关代码, 下面是一个简单的例子. 首先定义我们的RPC服务接口`hello.service` ``` package main service HelloService { Hello(string) (string, error) } ``` 可以看到语法和Go非常类似, 它里面定义了一个名叫`HelloService`服务, 包含一个`Hello`方法, 有一个参数两个返回值. 一个文件里面可以定义多个服务, 每个服务可以定义多个接口, 接口支持任意数量的参数. 然后生成接口代码, 执行下面的命令会在当前目录生成一个`hello.service.go`文件 ``` xrpc hello.service ``` 接下来就可以实现我们的RPC服务 ``` type HelloRPCService struct { *xrpc.RPCServer } func (s *HelloRPCService) Hello(request string) (string, error) { reply := "hello:" + request return reply, nil } // 创建服务 func newHelloService(nc *nats.Conn) *HelloRPCService { s := &HelloRPCService{ RPCServer: xrpc.NewRPCServer( xrpc.SetMQ(natsmq.NewMQueen(nc)), xrpc.SetSubj("hello_server"), ), } RegisterHelloServiceServer(s.RPCServer, s) return s } ``` - `HelloRPCService` 实现了我们定义的RPC接口, - `Hello` 接口将收到的消息添加"hello:"前缀并将结果返回 - `newHelloService` 方法用来创建RPC服务, 采用`nats`消息队列, 指定订阅的主题为`hello_server` 接下来创建RPC客户端 ``` func newHelloRPCServiceClient(nc *nats.Conn) *HelloServiceClient { return NewHelloServiceClient(xrpc.NewRPCClient( xrpc.SetMQ(natsmq.NewMQueen(nc)), xrpc.SetSubj("hello_client"), )) } ``` 现在我们就可以调用远程接口了 ``` func main() { nc, err := nats.Connect("nats://127.0.0.1:4222", nats.MaxReconnects(1000)) if err != nil { panic(err) } defer nc.Close() // 启动RPC服务 s := newHelloService(nc) err = s.Start() if err != nil { panic(err) } defer s.Stop() // 创建RPC客户端 c := newHelloRPCServiceClient(nc) defer c.Close() // 调用Hello发放, 第一个参数是rpc服务的名称 reply, err := c.Hello("hello_server", "yc90s") if err != nil { panic(err) } fmt.Println(reply) // 输出: hello:yc90s } ``` `Hello`接口的第一个参数是RPC服务订阅的主题名, 例子里是`hello_server`, 后面的参数是传递给远程接口实际调用的参数, 程序最后输出"hello:yc90s" ## 总结 RPC作为分布式系统中基础又重要的一个组件, 所以我将其单独开源出来, 后续会再xrpc基础上, 再开源一个分布式服务器框架, 欢迎感兴趣的同学一起交流

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

744 次点击  
加入收藏 微博
1 回复  |  直到 2024-02-21 11:40:29
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传