主要基于官网介绍的文档总结而来。
需要先了解 protocol buffers
为什么使用gRPC
通过gPRC,我们可以仅仅定义一次service 到.proto文件中,然后使用gRPC支持的任何开发语言开发客户端或服务器。
样例代码和环境的建立
首先要确保golang开发环境的正确配置,go1.5+。
$ go get -u -v google.golang.org/grpc
本人在测试中遇到报错,主要原因在于样例需要
"golang.org/x/net"
"golang.org/x/text"
的支持,本人的解决方法如下
到
$GOPATH/src/golang.org/x/
目录下,如果golang.org/x/ 不存在则手动创建一个。
然后
git clone https://github.com/golang/net.git
git clone https://github.com/golang/text.git
样例测试
$ cd $GOPATH/src/google.golang.org/grpc/examples/route_guide
$ go run server/server.go
$ go run client/client.go
下面对样例的代码进行分析
服务定义
gRPC使用 protocol buffers定义服务。
要定义服务,需要在.proto文件中做service定义如下:
service RouteGuide {
...
}
然后可以在servie的定义rpc方法,指定对应的request和response类型。gPRC允许开发者定义4中service方法,这4中方法在样例RouteGuide 中都有用到。
-
最简单的RPC方法,客户端通过调用该方法发送request到服务端,等待服务器的response,类似正常的函数调用。
// Obtains the feature at a given position. rpc GetFeature(Point) returns (Feature) {}
-
服务端单边stream的RPC( server-side streaming RPC):客户端调用该方法到服务端,服务器返回一个stream,客户端从这个stream中读取数据直到没有数据可读。从样例代码中可以看到该方法的主要特点是在response类型前加stream。
// Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}
-
客户端单边stream的RPC(A client-side streaming RPC):客户端通过使用stream将一系列的数据发送到服务端。客户端数据发送完毕后就等待服务端把数据全部读完后发送相应过来。从样例代码中可以看到该方法主要特点是在request类型前面加stream.:
// Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}
- 双边stream RPC(bidirectional streaming RPC)。客户端和服务端都通过读写流(read-write stream)向对方发送一系列的消息。这两个streams是完全独立的,所以客户端和服务端可以随意的进行读写操作:例如,服务端可以等待客户端的是数据都接收完毕后再往response里写数据,或者可以先读取一条消息再写入一条信息或者是其他的一些读写组合方式。从样例代码中可以看到该方法的主要特点就是在request和response前面都加stream。
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
样例中的.proto文件包含了服务端方法中使用的request和response类型所使用的类型的协议池消息类型定义( protocol buffer message type definitions )。
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
生成客户端和服务端代码
根据.proto文件生成客户端和服务端所需的gRPC接口代码
protoc -I routeguide/ routeguide/route_guide.proto --go_out=plugins=grpc:routeguide
创建服务端
服务端代码主要做两方面的工作:
- 实现上一步骤.proto生成的服务端接口。
- 运行一个gRPC服务来监听客户端的请求并且把请求分发到正确的服务端实现里。
实现RouteGuide
As you can see, our server has a routeGuideServer struct type that implements the generated RouteGuideServer interface:
可以看出我们的服务端有一个routeGuideServer 的结构体类型实现了RouteGuideServer 的接口。
type routeGuideServer struct {
...
}
...
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
...
}
...
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
...
}
...
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
...
}
...
Simple RPC
GetFeature,从客户端获取一个Point然后从数据库中返回对应的特征信息。
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
return feature, nil
}
}
// No feature was found, return an unnamed feature
return &pb.Feature{"", point}, nil
}
这个方法输入参数是一个RPC的context对象以及客户端发过来的点协议池(Point protocol buffer)请求。这个方法返回一个特征协议池(Feature protocol buffer)对象,对象中包含响应信息和错误。在这个方法中,我们为Feature转入了正确的信息然后和nil error一起返回,告诉gRPC服务器已经完成对RPC的处理,Feature可以返回给客户端了。
Server-side streaming RPC
ListFeatures是一个服务端stream的RPC,所以我们需要返回多个Features到客户端。
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
可以看出,该方法获取一个request对象以及一个特殊的RouteGuide_ListFeaturesServer 来写相应。这个方法中我们用Send方法把所有需要返回的Feature特征写入到RouteGuide_ListFeaturesServer 中。最后返回一个nil error告诉gRPC服务端已经写好相应。如果期间有什么错误发生,我们返回一个非nil的error,gRPC会转换为正确的RPC状态发送到线路中。
Client-side streaming RPC
.
客户端流方法RecordRoute中,我们从客户端获取一系列的Point然后返回一个RouteSummary 对象包含旅行信息。从代码中可以看到该方法里面没有任何的请求参数,而是一个RouteGuide_RecordRouteServer 流对象。服务端可以用Rev()方法从RouteGuide_RecordRouteServer 对象中读取消息并使用Write()方法往里面写消息。
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}
在这个方法中,我们使用RouteGuide_RecordRouteServer’s 的Recv方法不停的从客户端的请求中读取数据到requesst对象直到没有数据可读。服务器需要检测每次Recv返回的error,如果是nil,表示这个stream正常可以继续读,如果是io.EOF表示流已经停止了此时服务端可以返回RouteSummary。
Bidirectional streaming RPC
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
... // look for notes to be sent to client
for _, note := range s.routeNotes[key] {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
这个方法中使用RouteGuide_RouteChatServer 流对象,可以用来读消息和写消息。然而这次我们通过流返回数据的同时客户端仍然在往他们的消息流中写消息。
该方法中往消息流中写消息使用的是Send() 方法而不是 SendAndClose()
官网中介绍原因如下:具体意思暂时没有搞明白。
TODO:The syntax for reading and writing here is very similar to our client-streaming method, except the server uses the stream’s Send() method rather than SendAndClose() because it’s writing multiple responses. Although each side will always get the other’s messages in the order they were written, both the client and server can read and write in any order — the streams operate completely independently.
Starting the server
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterRouteGuideServer(grpcServer, &routeGuideServer{})
... // determine whether to use TLS
grpcServer.Serve(lis)
如代码所示,我们创建和启动一个服务器需要下面4个步骤:
- 指定端口号,用来监听客户端的请求,使用
err := net.Listen("tcp", fmt.Sprintf(":%d", *port)).
- 创建一个gRPC服务器实例
grpc.NewServer().
- 创建一个gRPC服务器实例
-
注册服务器实现到上一步骤创建的gRPC服务器实例上。
-
调用Serve启动服务,阻塞等待直到该进程被杀死或服务器的stop被调用。
使用TLS
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
grpclog.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
if *tls {
creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
if err != nil {
grpclog.Fatalf("Failed to generate credentials %v", err)
}
opts = []grpc.ServerOption{grpc.Creds(creds)}
}
grpcServer := grpc.NewServer(opts...)
pb.RegisterRouteGuideServer(grpcServer, newServer())
grpcServer.Serve(lis)
}
Creating the client
创建客户端
flag.Parse()
var opts []grpc.DialOption
if *tls {
var sn string
if *serverHostOverride != "" {
sn = *serverHostOverride
}
var creds credentials.TransportCredentials
if *caFile != "" {
var err error
creds, err = credentials.NewClientTLSFromFile(*caFile, sn)
if err != nil {
grpclog.Fatalf("Failed to create TLS credentials %v", err)
}
} else {
creds = credentials.NewClientTLSFromCert(nil, sn)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
conn, err := grpc.Dial(*serverAddr, opts...)
if err != nil {
grpclog.Fatalf("fail to dial: %v", err)
}
defer conn.Close()
client := pb.NewRouteGuideClient(conn)
为了能够调用服务端的方法,我们首先创建一个gRPC通道来和服务端沟通。通过传入服务器地址和端口号给grpc.Dial()来创建。如代码,我们还可以使用DialOptions来设置grpc中的认证方法。
一旦gRPC通道建立起来后,我们需要一个客户端来执行RPC,通过.proto创建的pb包中提供的NewRouteGuideClient方法来创建。
Calling service methods
对应服务端的四种方法,客户端也要采用不同的调用方法。
Simple RPC
feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
...
}
从代码中看出,客户端调用方法GetFeature(在),传递协议池(protocol buffer object)对象pb.Point作为参数,同时传递一个context.Context 对象,可以让我们方便的改变RPC的行为,例如超时或取消RPC。
Server-side streaming RPC
rect := &pb.Rectangle{ ... } // initialize a pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
...
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
log.Println(feature)
}
cient.ListFeaturens参见.proto生成的route_guide.pb.go
func (c *routeGuideClient) ListFeatures(ctx context.Context, in *Rectangle, opts ...grpc.CallOption) (RouteGuide_ListFeaturesClient, error) {
在这个方法中,同样的传递一个context对象和一个请求,但是返回一个RouteGuide_ListFeaturesClient实例,客户端可以从这个实例中读取得到服务端的响应。
我们使用RouteGuide_ListFeaturesClient的Recv方法来从服务端的响应中读取到协议池对象Feature中直到没有数据可读。同样的客户端在读取时需要检测返回的err,如果为nil,说明此时stream是正常的继续可读,如果为io.EOF表示数据已经到结尾了。
Client-side streaming RPC
// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
if err := stream.Send(point); err != nil {
log.Fatalf("%v.Send(%v) = %v", stream, point, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)
同样参见route_guide.pb.go中RecordRoute的定义
func (c *routeGuideClient) RecordRoute(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RecordRouteClient, error) {
stream, err := grpc.NewClientStream(ctx, &_RouteGuide_serviceDesc.Streams[1], c.cc, "/routeguide.RouteGuide/RecordRoute", opts...)
RecordRoute方法仅仅需要传递一个context参数,然后返回一个RouteGuide_RecordRouteClient流对象用于客户端写消息和读消息。
RouteGuide_RecordRouteClient的Send()方法用于向客户端发送请求,一旦完成客户端的所有请求,客户端需要调用CloseAndRecv方法来让gRPC知道客户端已经完成请求并且期望获得一个响应。
如果CloseAndRecv()返回的err不为nil,那么返回的第一个值就是一个有效的服务端响应。
Bidirectional streaming RPC
stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
和RecordRoute类型,方法RouteChat仅需要传递一个context对象,返回一个RouteGuide_RouteChatClient用于客户端读消息和写消息。
func (c *routeGuideClient) RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error) {
stream, err := grpc.NewClientStream(ctx, &_RouteGuide_serviceDesc.Streams[2], c.cc, "/routeguide.RouteGuide/RouteChat", opts...)
不过和RecordRoute不同的是,客户端在往客户端的stream里写消息的同时,服务端也在往服务端的stream中写消息。另外,该方法中客户端中读和写是分开独立运行的,没有先后顺序,还有就是客户端写消息完毕后使用CloseSend而不是CloseAndRecv
后记
之前一直在CSDN上写文章,后面会逐步转换到简书上,还请大家多多支持。
有疑问加站长微信联系(非本文作者)