上一篇我们介绍了rpc
最基本的应用,今天我们来看看rpc
的另外一个数据交互方式streaming rpc
,也就是流式接口。
streaming rpc
相比于simple rpc
来说可以很好的解决一个接口发送大量数据的场景。
比如一个订单导出的接口有20万条记录,如果使用simple rpc
来实现的话。那么我们需要一次性接收到20万记录才能进行下一步的操作。但是如果我们使用streaming rpc
那么我们就可以接收一条记录处理一条记录,直到所以的数据传输完毕。这样可以较少服务器的瞬时压力,也更有及时性
下面们来看看streaming rpc
具体是怎么交互的。
IDL
syntax = "proto3";
package proto;
message Order {
int32 id = 1;
string orderSn = 2;
string date = 3;
}
message OrderList{
Order order = 1;
}
message OrderSearchParams {
}
message Image{
string fileName = 1;
string file = 2;
}
message ImageList{
Image image = 1;
}
message uploadResponse{
}
message SumData{
int32 number = 1;
}
service StreamService {
rpc OrderList(OrderSearchParams) returns (stream OrderList){}; //服务端流式
rpc UploadFile(stream ImageList) returns (uploadResponse){}; //客户端流式
rpc SumData(stream SumData) returns (stream SumData){}; //双向流式
}
这里定义了三个方法
- OrderList 服务器流式,客户端普通rpc调用
- UploadFile 客户端流式,服务端普通rpc
- SumData 双向流式
基础结构
server
package main
import (
"google.golang.org/grpc"
"iris-grpc-example/proto"
"log"
"net"
)
type StreamServices struct {}
func main() {
server := grpc.NewServer()
proto.RegisterStreamServiceServer(server, &StreamServices{})
lis, err := net.Listen("tcp", "127.0.0.1:9528")
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}
server.Serve(lis)
}
func (services *StreamServices)OrderList(params *proto.OrderSearchParams,stream proto.StreamService_OrderListServer) error {
return nil
}
func (services *StreamServices)UploadFile(stream proto.StreamService_UploadFileServer) error {
return nil
}
func (services *StreamServices)SumData(stream proto.StreamService_SumDataServer) error {
return nil
}
clietn
package main
import (
"github.com/kataras/iris/v12"
"google.golang.org/grpc"
"iris-grpc-example/proto"
"log"
)
var streamClient proto.StreamServiceClient
func main() {
app := iris.New()
app.Logger().SetLevel("debug") //debug
app.Handle("GET", "/testOrderList", orderList)
app.Handle("GET", "/testUploadImage", uploadImage)
app.Handle("GET", "/testSumData", sumData)
app.Run(iris.Addr("127.0.0.1:8080"))
}
func init() {
connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure())
if err != nil {
log.Fatalln(err)
}
streamClient = proto.NewStreamServiceClient(connect)
}
func orderList(ctx iris.Context) {
}
func uploadImage(ctx iris.Context) {
}
func sumData(ctx iris.Context) {
}
按照proto
中的约定,先实现接口并注册一个服务。接下来我们依次来实现三个不同的流式方法。
服务端流式 orderList
server
func (services *StreamServices) OrderList(params *proto.OrderSearchParams, stream proto.StreamService_OrderListServer) error {
for i := 0; i <= 10; i++ {
order := proto.Order{
Id: int32(i),
OrderSn: time.Now().Format("20060102150405") + "order_sn",
Date: time.Now().Format("2006-01-02 15:04:05"),
}
err := stream.Send(&proto.StreamOrderList{
Order: &order,
})
if err != nil {
return err
}
}
return nil
}
gRPC为我们提供一个流的发送方法。send
,这样我们可以很简单的以流的方式传递数据。
现在我们来查看streaming.pb.go
中的send
func (x *streamServiceOrderListServer) Send(m *StreamOrderList) error {
return x.ServerStream.SendMsg(m)
}
可以看到最终是使用ServerStream.SendMsg
,查看源码,可以发现,最终是使用了一个结构体。
type serverStream struct {
ctx context.Context
......
maxReceiveMessageSize int
maxSendMessageSize int
......
}
这里我们关心两个值,最大可接收大小,最大发送大小。而再SendMsg
中也有对于的大小判断,所以发送的消息大小不是无限制的。
// TODO(dfawley): should we be checking len(data) instead?
if len(payload) > ss.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
}
我们可以服务端创建server
的时候通过server := grpc.NewServer(grpc.MaxSendMsgSize())
来指定大小,有可以在客服端创建client
的时候通过connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure(),grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize()))
的时候来指定,具体大家可以下来自己详细了解下配置项。
client
func orderList(ctx iris.Context) {
stream, err := streamClient.OrderList(context.Background(), &proto.OrderSearchParams{})
if err != nil {
ctx.JSON(map[string]string{
"err": err.Error(),
})
return
}
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
ctx.JSON(map[string]string{
"err": err.Error(),
})
return
}
ctx.JSON(res)
log.Println(res)
}
}
这里在for
循环中去读取数据,直到取到一个io.EOF
的结束错误占位符。
客户端流式 uploadImage
server
func (services *StreamServices) UploadFile(stream proto.StreamService_UploadFileServer) error {
for {
res,err := stream.Recv()
//接收消息结束,发送结果,并关闭
if err == io.EOF {
return stream.SendAndClose(&proto.UploadResponse{})
}
if err !=nil {
return err
}
fmt.Println(res)
}
return nil
}
可以看到这里我们同样使用for
结合stream.Recv()
来接收数据流,但是这里我们多一个SendAndClose
,表示服务器已经接收消息结束,并发生一个正确的响应给客户端。
client
func uploadImage(ctx iris.Context) {
stream,err := streamClient.UploadFile(context.Background())
if err != nil {
ctx.JSON(map[string]string{
"err": err.Error(),
})
return
}
for i:=1;i<=10 ; i++ {
img := &proto.Image{FileName:"image"+strconv.Itoa(i),File:"file data"}
images := &proto.StreamImageList{Image:img}
err := stream.Send(images)
if err != nil {
ctx.JSON(map[string]string{
"err": err.Error(),
})
return
}
}
//发送完毕 关闭并获取服务端返回的消息
resp, err := stream.CloseAndRecv()
if err != nil {
ctx.JSON(map[string]string{
"err": err.Error(),
})
return
}
ctx.JSON(map[string]interface{}{"result": resp,"message":"success"})
log.Println(resp)
}
而在客户端发送数据完毕的时候需要使用CloseAndRecv
需要接收服务端接收完毕的通知以及关闭当前通道。
双向流式 uploadImage
server
func (services *StreamServices) SumData(stream proto.StreamService_SumDataServer) error {
i := 0
for {
err := stream.Send(&proto.StreamSumData{Number: int32(i)})
if err != nil {
return err
}
res, err := stream.Recv()
if err == io.EOF {
return nil
}
log.Printf("res:%d,i:%d,sum:%d\r\n", res.Number, i, int32(i)+res.Number)
i++
}
}
服务端在发送消息的同时,并接收服务端发送的消息。
client
func sumData(ctx iris.Context) {
stream, err := streamClient.SumData(context.Background())
if err != nil {
ctx.JSON(map[string]string{
"err": err.Error(),
})
return
}
for i := 1; i <= 10; i++ {
err = stream.Send(&proto.StreamSumData{Number: int32(i)})
if err == io.EOF {
break
}
if err != nil {
return
}
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return
}
log.Printf("res number:%d", res.Number)
}
stream.CloseSend()
return
}
上面我们可以看到。客户端有一个执行断开连接的标识CloseSend()
,而服务器没有,因为服务端断开连接是隐式的,我们只需要退出循环即可断开连接。可以灵活的控制。
上面就是go-gRPC
的流式接口。只是一个简单的例子。如有不妥的地方欢迎指出。感谢。
期待一起交流
有疑问加站长微信联系(非本文作者)