# 一个基于消息队列的的Go语言RPC框架
项目地址: https://github.com/yc90s/xrpc.git
## 前言
RPC作为分布式系统中的基础组件, 使用非常广泛。大多数的RPC框架都是基于点对点的网络连接, 比如golang原生的rpc框架、grpc等.
点对点连接的通讯方式, 随着集群节点的增加, 会导致集群的拓扑结构越来越复杂, 服务之间的耦合度越来越高, 服务的扩展性和可维护性都会受到影响.
而消息队列的通讯方式, 可以很好的解决这个问题。每个服务只需要关注自己订阅的消息, 不需要关心消息的发送者是谁, 也不需要关心消息的接收者是谁.
XRPC设计的原则是为了实现一套基于消息队列的、易于拓展和易于使用的轻量级RPC框架.
## XRPC的特性
除了上面提到的使用消息队列作为RPC的通道之外, XRPC还有以下几个比较实用的特点
- 支持任意参数数量的远程调用, 不需要把接口的参数都打包成一个结构体再调用, 可以像调用本地函数一样
- 支持`Call`和`Cast`两种远程调用方式, `Call`会一直阻塞直到接收到返回值或者超时, 而`Cast`适用于不需要等待返回值的情况
- 代码生成, 实现了一套IDL, 最大程度贴近go语法, 用来定义rpc服务的接口信息, 自动生成接口代码
此外, XRPC的核心代码非常精简, 而且非常容易拓展.
## XRPC的实现
一个RPC框架可以大体分为三个部分:通信、编码/解码、服务调用. 我从这三个方面分别介绍XRPC是怎么做的.
XRPC的每个服务都会订阅一个主题, 等待接收远程调用的消息, 每个服务可以注册多个接口. 客户端将要调用的接口和参数序列化后发布到对应的主题, 服务端收到消息后利用反射调用对应接口, 并将结果通过消息队列返回客户端.
### 通信
XRPC抽象出了一套消息队列接口
```
type MQueen interface {
GenerateSubj() string
Publish(string, []byte) error
Subscribe(string, MQCallback) error
UnSubscribe() error
}
```
- `GenerateSubj` 生成一个唯一的订阅主题名
- `Publish` 发布一条消息到指定的主题
- `Subscribe` 订阅指定的主题
- `UnSubscribe` 取消订阅
要拓展使用其他的消息队列, 只需要实现`MQueen`接口即可, 目前实现的有nats.
### 编码/解码
编码/解码部分XRPC也抽象了一个接口
```
type Codec interface
{
Unmarshal(b []byte, dst any) error
Marshal(v any) ([]byte, error)
}
```
同样只要实现这个接口就可以拓展自己的序列化方式, 目前实现的有gob、protobuf, 默认采用gob
### 服务调用
XRPC的支持注册任意数量参数的接口, 以及`Call`和`Cast`两种远程调用方式.
并且实现了一套IDL, 最大程度贴近go语法, 用来定义rpc接口信息, 并自动生成相关代码, 下面是一个简单的例子.
首先定义我们的RPC服务接口`hello.service`
```
package main
service HelloService {
Hello(string) (string, error)
}
```
可以看到语法和Go非常类似, 它里面定义了一个名叫`HelloService`服务, 包含一个`Hello`方法, 有一个参数两个返回值.
一个文件里面可以定义多个服务, 每个服务可以定义多个接口, 接口支持任意数量的参数.
然后生成接口代码, 执行下面的命令会在当前目录生成一个`hello.service.go`文件
```
xrpc hello.service
```
接下来就可以实现我们的RPC服务
```
type HelloRPCService struct {
*xrpc.RPCServer
}
func (s *HelloRPCService) Hello(request string) (string, error) {
reply := "hello:" + request
return reply, nil
}
// 创建服务
func newHelloService(nc *nats.Conn) *HelloRPCService {
s := &HelloRPCService{
RPCServer: xrpc.NewRPCServer(
xrpc.SetMQ(natsmq.NewMQueen(nc)),
xrpc.SetSubj("hello_server"),
),
}
RegisterHelloServiceServer(s.RPCServer, s)
return s
}
```
- `HelloRPCService` 实现了我们定义的RPC接口,
- `Hello` 接口将收到的消息添加"hello:"前缀并将结果返回
- `newHelloService` 方法用来创建RPC服务, 采用`nats`消息队列, 指定订阅的主题为`hello_server`
接下来创建RPC客户端
```
func newHelloRPCServiceClient(nc *nats.Conn) *HelloServiceClient {
return NewHelloServiceClient(xrpc.NewRPCClient(
xrpc.SetMQ(natsmq.NewMQueen(nc)),
xrpc.SetSubj("hello_client"),
))
}
```
现在我们就可以调用远程接口了
```
func main() {
nc, err := nats.Connect("nats://127.0.0.1:4222", nats.MaxReconnects(1000))
if err != nil {
panic(err)
}
defer nc.Close()
// 启动RPC服务
s := newHelloService(nc)
err = s.Start()
if err != nil {
panic(err)
}
defer s.Stop()
// 创建RPC客户端
c := newHelloRPCServiceClient(nc)
defer c.Close()
// 调用Hello发放, 第一个参数是rpc服务的名称
reply, err := c.Hello("hello_server", "yc90s")
if err != nil {
panic(err)
}
fmt.Println(reply) // 输出: hello:yc90s
}
```
`Hello`接口的第一个参数是RPC服务订阅的主题名, 例子里是`hello_server`, 后面的参数是传递给远程接口实际调用的参数, 程序最后输出"hello:yc90s"
## 总结
RPC作为分布式系统中基础又重要的一个组件, 所以我将其单独开源出来, 后续会再xrpc基础上, 再开源一个分布式服务器框架, 欢迎感兴趣的同学一起交流
有疑问加站长微信联系(非本文作者))