RPC介绍
Remote Procedure Call,远程过程调用
解决问题
而一旦踏入公司尤其是大型互联网公司就会发现,公司的系统都由成千上万大大小小的服务组成,各服务部署在不同的机器上,由不同的团队负责。这时就会遇到两个问题:
1)要搭建一个新服务,免不了需要依赖他人的服务,而现在他人的服务都在远端,怎么调用?
2)其它团队要使用我们的服务,我们的服务该怎么发布以便他人调用?
过程
- 服务消费方(client)调用以本地调用方式调用服务;
- client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
- client stub找到服务地址,并将消息发送到服务端;
- server stub收到消息后进行解码;
- server stub根据解码结果调用本地的服务;
- 本地服务执行并将结果返回给server stub;
- server stub将返回结果打包成消息并发送至消费方;
- client stub接收到消息,并进行解码;
- 服务消费方得到最终结果。
RPC的目标就是要2~8这些步骤都封装起来,让用户对这些细节透明。
实现方式
RPC可以通过HTTP来实现(grpc基于http2.0),也可以通过Socket自己实现一套协议来实现.
grpc框架
4种通信方式
- 简单rpc:一个请求对象对应一个返回对象
rpc simpleHello(Person) returns (Result) {}
- 服务端流式rpc :一个请求对象,服务端可以传回多个结果对象
rpc serverStreamHello(Person) returns (stream Result) {}
- 客户端流式rpc:客户端传入多个请求对象,服务端返回一个响应结果
rpc clientStreamHello(stream Person) returns (Result) {}
- 双向流式rpc:可以传入多个对象,返回多个响应对象
rpc biStreamHello(stream Person) returns (stream Result) {}
案例
simple.proto
syntax = "proto3";
package testRPC;
service testRPC {
rpc simple (emit) returns (on) {
}
rpc serverStream (emit) returns (stream on){}
rpc clientStream (stream emit) returns (on){}
rpc bothStream (stream emit) returns (stream on){}
}
message emit{
string type = 1;
string name = 2;
}
message on{
string type = 1;
int32 age = 2;
}
service
package main
import (
"net"
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/test/proto"
"google.golang.org/grpc/reflection"
"fmt"
"golang.org/x/net/context"
"io"
)
const (
port = ":50051"
)
type server struct {
}
func (s *server) Simple(ctx context.Context, in *pb.Emit) (*pb.On, error) {
return &pb.On{Type: "Hello " + in.Type, Age: 10}, nil
}
func (s *server) ServerStream(in *pb.Emit, stream pb.TestRPC_ServerStreamServer) (error) {
for i := 1; i < 10; i++ {
if err := stream.Send(&pb.On{Type: "Hello " + in.Type, Age: int32(i)}); err != nil {
return err
}
}
return nil
}
func (s *server) ClientStream(stream pb.TestRPC_ClientStreamServer) (error) {
var pointCount int32
for {
_, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.On{
Type: "end",
Age: pointCount,
})
}
pointCount++
}
}
func (s *server) BothStream(stream pb.TestRPC_BothStreamServer) (error) {
for {
emit, err := stream.Recv()
if err == io.EOF {
return nil
}
fmt.Println(string(emit.Type))
for i := 1; i < 3; i++ {
if err := stream.Send(&pb.On{Type: emit.Type, Age: int32(i)}); err != nil {
return err
}
}
}
return nil
}
func main() {
lis, _ := net.Listen("tcp", port)
s := grpc.NewServer()
pb.RegisterTestRPCServer(s, &server{})
reflection.Register(s)
if err := s.Serve(lis); err != nil {
fmt.Println("error")
}
}
client
package main
import (
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/test/proto"
"golang.org/x/net/context"
"time"
"log"
"io"
)
const (
address = "localhost:50051"
)
func main() {
conn, _ := grpc.Dial(address, grpc.WithInsecure())
defer conn.Close()
rpcClient := pb.NewTestRPCClient(conn)
testSimple(rpcClient)
testServerStream(rpcClient)
testClientStream(rpcClient)
testBothStream(rpcClient)
}
func testSimple(client pb.TestRPCClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// simple
r, _ := client.Simple(ctx, &pb.Emit{Type: "11"})
log.Printf("Greeting: %s", r)
}
func testServerStream(client pb.TestRPCClient) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.ServerStream(ctx, &pb.Emit{Type: "Hello "})
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
log.Println(feature)
}
}
func testClientStream(client pb.TestRPCClient) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.ClientStream(ctx)
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
var points []pb.Emit
for i := 0; i < 10; i++ {
points = append(points, pb.Emit{Type: "t", Name: string(i)})
}
for _, point := range points {
if err := stream.Send(&point); err != nil {
log.Fatalf("%v.Send(%v) = %v", stream, point, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)
}
func testBothStream(client pb.TestRPCClient) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.BothStream(ctx)
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
var points []pb.Emit
for i := 0; i < 10; i++ {
points = append(points, pb.Emit{Type: "xiaoming"+string(i), Name: "xiaohong"})
}
waitc := make(chan struct{})
go func() {
for {
on, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s = %d)", on.Type, on.Age)
}
}()
for _, point := range points {
if err := stream.Send(&point); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
}
有疑问加站长微信联系(非本文作者)