grpc 中的 stream(服务中的长连接,订阅,上传大数据等)

哆啦在这A梦在哪 · · 5015 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

前言:
之前我们讲了 grpc 怎么简单的使用 ,这次讲讲 grpc 中的 stream,srteam 顾名思义 就是 一种 流,可以源源不断的 推送 数据,很适合 传输一些大数据,或者 服务端 和 客户端 长时间 数据交互,比如 客户端 可以向 服务端 订阅 一个数据,服务端 就 可以利用 stream ,源源不断地 推送数据。

stream的种类:
客户端推送 服务端 rpc GetStream (StreamReqData) returns (stream StreamResData){}
服务端推送 客户端 rpc PutStream (stream StreamReqData) returns (StreamResData){}
客户端与 服务端 互相 推送 rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
其实这个流 已经 基本退化成 tcp了,grpc 底层为我们 分包了,所以真的很方便。

protobuf的定义:
syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc

//声明 包名
package pro;

//声明grpc服务
service Greeter {
/*
以下 分别是 服务端 推送流, 客户端 推送流 ,双向流。
*/
rpc GetStream (StreamReqData) returns (stream StreamResData){}
rpc PutStream (stream StreamReqData) returns (StreamResData){}
rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
}

//stream请求结构
message StreamReqData {
string data = 1;
}
//stream返回结构
message StreamResData {
string data = 1;
}
我们在 protobuf 里面 定义 要提供的服务,如果 你想把哪个数据 源源不断的 推送 就在前面加个stream 就好了,定义好记得编译。

服务端的实现:
package main

import (
"context"
"fmt"
"google.golang.org/grpc"
"grpc/pro"
"log"
"net"
"sync"
"time"
)

const PORT = ":50051"

type server struct {
}

//服务端 单向流
func (s *server)GetStream(req pro.StreamReqData, res pro.Greeter_GetStreamServer) error{
i:= 0
for{
i++
res.Send(&pro.StreamResData{Data:fmt.Sprintf("%v",time.Now().Unix())})
time.Sleep(1
time.Second)
if i >10 {
break
}
}
return nil
}

//客户端 单向流
func (this *server) PutStream(cliStr pro.Greeter_PutStreamServer) error {

for {
    if tem, err := cliStr.Recv(); err == nil {
        log.Println(tem)
    } else {
        log.Println("break, err :", err)
        break
    }
}

return nil

}

//客户端服务端 双向流
func(this *server) AllStream(allStr pro.Greeter_AllStreamServer) error {

wg := sync.WaitGroup{}
wg.Add(2)
go func() {
    for {
        data, _ := allStr.Recv()
        log.Println(data)
    }
    wg.Done()
}()

go func() {
    for {
        allStr.Send(&pro.StreamResData{Data:"ssss"})
        time.Sleep(time.Second)
    }
    wg.Done()
}()

wg.Wait()
return nil

}

func main(){
//监听端口
lis,err := net.Listen("tcp",PORT)
if err != nil{
return
}
//创建一个grpc 服务器
s := grpc.NewServer()
//注册事件
pro.RegisterGreeterServer(s,&server{})
//处理链接
s.Serve(lis)
}
知识点:

每个函数都对应着 完成了 protobuf 里面的 定义。
每个函数 形参都有对应的 推送 或者 接收 对象,我们只要 不断循环 Recv(),或者 Send() 就能接收或者推送了!
当return出函数,就说明此次 推送 或者 接收 结束了,client 会 对应的 收到消息!
客户端调用:
package main

import (
"google.golang.org/grpc"

"grpc/pro"
"log"
"context"
"time"
_ "google.golang.org/grpc/balancer/grpclb"

)

const (
ADDRESS = "localhost:50051"
)

func main(){
//通过grpc 库 建立一个连接
conn ,err := grpc.Dial(ADDRESS,grpc.WithInsecure())
if err != nil{
return
}
defer conn.Close()
//通过刚刚的连接 生成一个client对象。
c := pro.NewGreeterClient(conn)
//调用服务端推送流
reqstreamData := &pro.StreamReqData{Data:"aaa"}
res,_ := c.GetStream(context.Background(),reqstreamData)
for {
aa,err := res.Recv()
if err != nil {
log.Println(err)
break
}
log.Println(aa)
}
//客户端 推送 流
putRes, _ := c.PutStream(context.Background())
i := 1
for {
i++
putRes.Send(&pro.StreamReqData{Data:"ss"})
time.Sleep(time.Second)
if i > 10 {
break
}
}
//服务端 客户端 双向流
allStr,_ := c.AllStream(context.Background())
go func() {
for {
data,_ := allStr.Recv()
log.Println(data)
}
}()

go func() {
    for {
        allStr.Send(&pro.StreamReqData{Data:"ssss"})
        time.Sleep(time.Second)
    }
}()

select {
}

}
client 调用 流的函数, 就会 返回一个 流对象,只要 不断地 对它进行读取或者写入,对应方就能收到。

总结:
grpc 的 stream 和 go的协程 配合 简直完美。通过流 我们 可以更加 灵活的 实现自己的业务。如 订阅,大数据传输等。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:哆啦在这A梦在哪

查看原文:grpc 中的 stream(服务中的长连接,订阅,上传大数据等)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

5015 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传