手撸golang GO与微服务 grpc
缘起
最近阅读 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
GRPC
gRPC是跨平台、跨语言并且效率非常高的RPC方式。
gRPC默认使用protobuf。
可以用proto files创建gRPC服务,
用protobuf消息类型来定义方法参数和返回类型。
建议在gRPC里使用proto3,
因为这样可以使用gRPC支持的全部语言,
并且能避免proto2客户端与proto3服务端交互时出现兼容性问题。
在实战项目中使用gRPC时,
一定要注意服务器的防火墙必须支持HTTP2.0,
因为gRPC是基于HTTP2.0设计的。
目标
- 测试验证gRPC的四种通讯模式:
- 请求-应答模式
- 客户端推流模式
- 客户端拉流模式
- 双向流模式
设计
- hello.proto: 定义gRPC通讯协议
- HelloServer.go: gRPC服务端的实现
- pb/hello.pb.go: protoc生成的代码,源码略
- logger/logger.go: 搜集运行日志以便诊断, 源码略
单元测试
grpc_test.go,依次在四种gRPC通讯模式下发送/接收1000条消息,并对所有消息日志进行校验
package grpc
import (
"context"
"fmt"
"google.golang.org/grpc"
"io"
g "learning/gooop/grpc"
"learning/gooop/grpc/logger"
"learning/gooop/grpc/pb"
"strconv"
"sync"
"testing"
"time"
)
func Test_HelloServer(t *testing.T) {
fnAssertTrue := func(b bool, msg string) {
if !b {
t.Fatal(msg)
}
}
logger.Verbose(false)
serverPort := 3333
serverAddress := fmt.Sprintf("127.0.0.1:%d", serverPort)
iTotalMsgCount := 1000
// start server
srv := new(g.HelloServer)
err := srv.BeginServeTCP(serverPort)
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
// connect to grpc server
conn, err := grpc.Dial(serverAddress, grpc.WithInsecure())
if err != nil {
t.Fatal(err)
}
defer conn.Close()
// create grpc client
client := pb.NewHelloServerClient(conn)
// test SimpleRequest
ctx := context.Background()
for i := 0; i < iTotalMsgCount; i++ {
msg := &pb.HelloMessage{
Msg: fmt.Sprintf("SimpleRequest %d", i),
}
reply, err := client.SimpleRequest(ctx, msg)
if err != nil {
t.Fatal(err)
}
fnAssertTrue(reply.Msg == "reply "+msg.Msg, "invalid SimpleRequest response")
}
t.Log("passed SimpleRequest")
// test ClientStream
clientStream, err := client.ClientStream(ctx)
if err != nil {
t.Fatal(err)
}
for i := 0; i < iTotalMsgCount; i++ {
msg := &pb.HelloMessage{
Msg: fmt.Sprintf("ClientStream %08d", i),
}
err = clientStream.Send(msg)
if err != nil {
t.Fatal(err)
}
}
reply, err := clientStream.CloseAndRecv()
if err != nil {
t.Fatal(err)
}
fnAssertTrue(reply.Msg == "reply ClientStream", "invalid ClientStream response")
// logger.Logf("HelloServer.ClientStream, recv %s", msg.String())
for i := 0; i < iTotalMsgCount; i++ {
log := fmt.Sprintf("HelloServer.ClientStream, recv ClientStream %08d", i)
fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)
}
t.Log("passed ClientStream")
// test ServerStream
serverStream, err := client.ServerStream(ctx, &pb.HelloMessage{Msg: strconv.Itoa(iTotalMsgCount)})
if err != nil {
t.Fatal(err)
}
for {
msg, err := serverStream.Recv()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
logger.Logf("ServerStream.Recv %s", msg.Msg)
}
for i := 0; i < iTotalMsgCount; i++ {
log := fmt.Sprintf("ServerStream.Recv ServerStream-%08d", i)
fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)
}
t.Log("passed ServerStream")
// test DualStream
dualStream, err := client.DualStream(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < iTotalMsgCount; i++ {
msg := &pb.HelloMessage{
Msg: fmt.Sprintf("DualStream.Send %08d", i),
}
err := dualStream.Send(msg)
if err != nil {
t.Fatal(err)
}
}
err = dualStream.CloseSend()
if err != nil {
t.Fatal(err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
msg, err := dualStream.Recv()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
logger.Logf("DualStream.Recv %s", msg.Msg)
}
}()
wg.Wait()
for i := 0; i < iTotalMsgCount; i++ {
// Msg: "reply " + msg.Msg,
// logger.Logf("DualStream.Recv %s", msg.Msg)
log := fmt.Sprintf("DualStream.Recv reply DualStream.Send %08d", i)
fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)
}
t.Log("passed DualStream")
}
测试输出
$ go test -v grpc_test.go
=== RUN Test_HelloServer
grpc_test.go:60: passed SimpleRequest
grpc_test.go:87: passed ClientStream
grpc_test.go:110: passed ServerStream
grpc_test.go:159: passed DualStream
--- PASS: Test_HelloServer (0.79s)
PASS
ok command-line-arguments 0.791s
hello.proto
定义四种通讯模式的rpc接口
syntax = "proto3";
package pb;
option go_package="./pb";
service HelloServer {
rpc SimpleRequest(HelloMessage) returns (HelloMessage);
rpc ClientStream(stream HelloMessage) returns (HelloMessage);
rpc ServerStream(HelloMessage) returns (stream HelloMessage);
rpc DualStream(stream HelloMessage) returns (stream HelloMessage);
}
message HelloMessage {
string msg = 1;
}
HelloServer.go
gRPC服务端的实现
package grpc
import (
"context"
"fmt"
"google.golang.org/grpc"
"io"
"learning/gooop/grpc/logger"
"learning/gooop/grpc/pb"
"net"
"strconv"
)
type HelloServer int
func (me *HelloServer) SimpleRequest(ctx context.Context, msg *pb.HelloMessage) (*pb.HelloMessage, error) {
//logger.Logf("HelloServer.SimpleRequest, %s", msg.Msg)
msg.Msg = "reply " + msg.Msg
return msg, nil
}
func (me *HelloServer) ClientStream(stream pb.HelloServer_ClientStreamServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
logger.Logf("HelloServer.ClientStream, EOF")
break
}
if err != nil {
logger.Logf("HelloServer.ClientStream, err=%v", err)
return err
}
logger.Logf("HelloServer.ClientStream, recv %s", msg.Msg)
}
err := stream.SendAndClose(&pb.HelloMessage{
Msg: "reply ClientStream",
})
if err != nil {
logger.Logf("HelloServer.ClientStream, SendAndClose err=%v", err)
}
return nil
}
func (me *HelloServer) ServerStream(msg *pb.HelloMessage, stream pb.HelloServer_ServerStreamServer) error {
iMsgCount, err := strconv.Atoi(msg.Msg)
if err != nil {
return err
}
for i := 0; i < iMsgCount; i++ {
msg := &pb.HelloMessage{
Msg: fmt.Sprintf("ServerStream-%08d", i),
}
err := stream.Send(msg)
if err != nil {
return err
}
}
return nil
}
func (me *HelloServer) DualStream(stream pb.HelloServer_DualStreamServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
logger.Logf("HelloServer.DualStream, recv err=%v", err)
return err
}
logger.Logf("HelloServer.DualStream, recv msg=%v", msg.Msg)
ret := &pb.HelloMessage{
Msg: "reply " + msg.Msg,
}
err = stream.Send(ret)
if err != nil {
logger.Logf("HelloServer.DualStream, send err=%v", err)
return err
}
}
}
func (me *HelloServer) BeginServeTCP(port int) error {
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
if err != nil {
return err
}
server := grpc.NewServer()
pb.RegisterHelloServerServer(server, me)
go func() {
panic(server.Serve(listener))
}()
return nil
}
(end)
有疑问加站长微信联系(非本文作者)