RPCX使用案例

lsr199461 · · 10061 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

花了一个礼拜整理了一下 RPCX的使用方法,大致过程如下:

服务器端开发

首先,我们需要实现自己的服务,这很简单,就是定义普通的方法即可:

package example

import (
	"context"
	"fmt"
)

type Args struct {
	A int
	B int
}

type Reply struct {
	C int
}

type Arith int

func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
	reply.C = args.A * args.B
	fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
	return nil
}

func (t *Arith) Add(ctx context.Context, args *Args, reply *Reply) error {
	reply.C = args.A + args.B
	fmt.Printf("call: %d + %d = %d\n", args.A, args.B, reply.C)
	return nil
}

func (t *Arith) Say(ctx context.Context, args *string, reply *string) error {
	*reply = "hello " + *args
	return nil
}

一.点对点

点对点是最简单的一种注册中心的方式,事实上没有注册中心,客户端直接得到唯一的服务器的地址。

服务器

服务器并没有配置注册中心,而是直接启动。

package main

import (
	"flag"
	"github.com/smallnest/rpcx/server"
	"github.com/rpcx-ecosystem/rpcx-examples3"
)

var (
	addr = flag.String("addr", "localhost:8972", "server address")
)

func main() {
	flag.Parse()
	s := server.Server{}
	s.RegisterName("Arith", new(example.Arith), "")
	go s.Serve("tcp", *addr)
	select {}
}

 

客户端

客户端直接配置了服务器的地址,格式是network@ipaddress:port的格式,并没有通过第三方组件来查找。

package main

import (
	"github.com/smallnest/rpcx/client"
	"log"
	"service"
	"context"
	"flag"
)

var (
	addr       = flag.String("addr", "127.0.0.1:8972", "server address")
)

func main() {
	Peer2Peer()
}
func Peer2Peer() {
	flag.Parse()
	d := client.NewPeer2PeerDiscovery("tcp@" + *addr, "")
	xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
	defer xclient.Close()

	args := &service.Args{
		A: 10,
		B: 20,
	}

	reply := &service.Reply{}
	err := xclient.Call(context.Background(), "Mul", args, reply)
	if err != nil {
		log.Fatalf("failed to call: %v", err)
	}

	log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}

 

二.点对多

 

MultipleServers

上面的方式只能访问一台服务器,假设我们有固定的几台服务器提供相同的服务,我们可以采用这种方式。

服务器

服务器还是和上面的代码一样,只需启动自己的服务,不需要做额外的配置。下面这个例子启动了两个服务:

package main

import (
	"flag"
	"github.com/smallnest/rpcx/server"
	"github.com/rpcx-ecosystem/rpcx-examples3"
)

var (
	addr1 = flag.String("addr1", "localhost:8972", "server1 address")
	addr2 = flag.String("addr2", "localhost:8973", "server2 address")
)

func main() {
	flag.Parse()
	go createServer(*addr1)
	go createServer(*addr2)
	select {}
}
func createServer(addr string) {
	s := server.NewServer()
	s.RegisterName("Arith", new(example.Arith), "")
	s.Serve("tcp", addr)
}

 

客户端

客户端需要使用MultipleServersDiscovery来配置同一个服务的多个服务器地址,这样客户端就能基于规则从中选择一个进行调用。

可以看到,除了初始化XClient有所不同外,实际调用服务是一样的, 后面介绍的注册中心也是一样,只有初始化客户端有所不同,后续的调用都一样。

package main

import (
	"github.com/smallnest/rpcx/client"
	"log"
	"service"
	"context"
	"flag"
	"github.com/rpcx-ecosystem/rpcx-examples3"
)

var (
	addr1 = flag.String("addr1", "localhost:8972", "server1 address")
	addr2 = flag.String("addr2", "localhost:8973", "server2 address")
)

func main() {
	Peer2Many()
}

func Peer2Many(){
	flag.Parse()

	d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key:*addr1}, {Key:*addr2}})
	xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
	defer xclient.Close()

	args := &example.Args{
		A: 10,
		B: 20,
	}

	reply := &example.Reply{}
	err := xclient.Call(context.Background(), "Mul",args, reply)
	if err != nil {
		log.Fatalf("failed to call: %v", err)
	}

	log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}

 

Consul

Consul是HashiCorp公司推出的开源工具,用于实现分布式系统的服务发现与配置。Consul是分布式的、高可用的、 可横向扩展的。它具备以下特性:

  • 服务发现: Consul提供了通过DNS或者HTTP接口的方式来注册服务和发现服务。一些外部的服务通过Consul很容易的找到它所依赖的服务。
  • 健康检测: Consul的Client提供了健康检查的机制,可以通过用来避免流量被转发到有故障的服务上。
  • Key/Value存储: 应用程序可以根据自己的需要使用Consul提供的Key/Value存储。 Consul提供了简单易用的HTTP接口,结合其他工具可以实现动态配置、功能标记、领袖选举等等功能。
  • 多数据中心: Consul支持开箱即用的多数据中心. 这意味着用户不需要担心需要建立额外的抽象层让业务扩展到多个区域。

Consul也是使用Go开发的,在Go生态圈也被广泛应用。

服务器

服务器端的开发和zookeeper、etcd和consul类似。

它主要配置几个参数:

  • ServiceAddress: 本机的监听地址, 这个对外暴露的监听地址, 格式为tcp@ipaddress:port
  • ConsulServers: consul集群的地址
  • BasePath: 服务前缀。 如果有多个项目同时使用consul,避免命名冲突,可以设置这个参数,为当前的服务设置命名空间
  • Metrics: 用来更新服务的TPS
  • UpdateInterval: 服务的刷新间隔, 如果在一定间隔内(当前设为2 * UpdateInterval)没有刷新,服务就会从consul中删除

在开始之前,你得确保已经装上了Consul 使用指令 consul agent -dev 开启,更多consul使用方式请自行get

package main

import (
	"flag"
	"github.com/smallnest/rpcx/server"
	"github.com/smallnest/rpcx/serverplugin"
	"time"
	"github.com/rcrowley/go-metrics"
	"log"
	"github.com/rpcx-ecosystem/rpcx-examples3"
)

var (
	addr       = flag.String("addr", "localhost:8972", "server address")
	consulAddr = flag.String("consulAddr", "localhost:8500", "consul address")
	basePath   = flag.String("base", "/rpcx_test", "prefix path")
)

func main() {
	flag.Parse()
	s := server.NewServer()
	addRegistryPlugin(s)
	s.RegisterName("Arith", new(example.Arith), "")
	s.Serve("tcp", *addr)
}

func addRegistryPlugin(s *server.Server) {

	r := &serverplugin.ConsulRegisterPlugin{
		ServiceAddress: "tcp@" + *addr,
		ConsulServers:  []string{*consulAddr},
		BasePath:       *basePath,
		Metrics:        metrics.NewRegistry(),
		UpdateInterval: time.Minute,
	}
	err := r.Start()
	if err != nil {
		log.Fatal(err)
	}
	s.Plugins.Add(r)
}

 

客户端

配置ConsulDiscovery,使用basepath和consul的地址。

package main

import (
	"github.com/smallnest/rpcx/client"
	"log"
	"context"
	"flag"
	"github.com/rpcx-ecosystem/rpcx-examples3"
)

var (
	consulAddr = flag.String("consulAddr", "localhost:8500", "consul address")
	basePath   = flag.String("base", "/rpcx_test/Arith", "prefix path")
)

func ConsulServer() {
	flag.Parse()

	d := client.NewConsulDiscovery(*basePath, "", []string{*consulAddr}, nil)
	xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
	defer xclient.Close()

	args := &example.Args{
		A: 10,
		B: 20,
	}

	reply := &example.Reply{}
	err := xclient.Call(context.Background(), "Mul", args, reply)
	if err != nil {
		log.Fatalf("failed to call: %v", err)
	}

	log.Printf("%d * %d = %d", args.A, args.B, reply.C)

}
func main() {
	ConsulServer()
}

 

 


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

10061 次点击  
加入收藏 微博
被以下专栏收入,发现更多相似内容
1 回复  |  直到 2019-05-10 18:28:06
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传