我们以hello world来大概分析一下golang中的thrift包,并且扒一扒网络上有关thrift的一些坑
查看源码,服务器定义如下:(详见simple_server.go文件)
type TSimpleServer struct { quit chan struct{} stopped int64 processorFactory TProcessorFactory //实质是一个handler,用来相应客户端的请求 serverTransport TServerTransport //实质是一个socket inputTransportFactory TTransportFactory //实质是传输协议的具体操作类(详细可见transport.go文件中TTransport结构体) outputTransportFactory TTransportFactory // inputProtocolFactory TProtocolFactory //实质是传输协议(有compact、simplejson、json、binary四种协议,默认是binary)
tputProtocolFactory TProtocolFactory // }
在go语言中,创建一个thrift服务器有三种方法:(详见simple_server.go文件)
func NewTSimpleServer2(processor TProcessor, serverTransport TServerTransport) *TSimpleServer { return NewTSimpleServerFactory2(NewTProcessorFactory(processor), serverTransport) } func NewTSimpleServer4(processor TProcessor, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer { return NewTSimpleServerFactory4(NewTProcessorFactory(processor), serverTransport, transportFactory, protocolFactory, ) } func NewTSimpleServer6(processor TProcessor, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer { return NewTSimpleServerFactory6(NewTProcessorFactory(processor), serverTransport, inputTransportFactory, outputTransportFactory, inputProtocolFactory, outputProtocolFactory, ) }
这三个函数分别调用了工厂函数
NewTSimpleServerFactory2;
NewTSimpleServerFactory4;
NewTSimpleServerFactory6;
func NewTSimpleServerFactory2(processorFactory TProcessorFactory, serverTransport TServerTransport) *TSimpleServer { return NewTSimpleServerFactory6(processorFactory, serverTransport, NewTTransportFactory(), NewTTransportFactory(), NewTBinaryProtocolFactoryDefault(), NewTBinaryProtocolFactoryDefault(), ) } func NewTSimpleServerFactory4(processorFactory TProcessorFactory, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer { return NewTSimpleServerFactory6(processorFactory, serverTransport, transportFactory, transportFactory, protocolFactory, protocolFactory, ) } func NewTSimpleServerFactory6(processorFactory TProcessorFactory, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer { return &TSimpleServer{ processorFactory: processorFactory, serverTransport: serverTransport, inputTransportFactory: inputTransportFactory, outputTransportFactory: outputTransportFactory, inputProtocolFactory: inputProtocolFactory, outputProtocolFactory: outputProtocolFactory, quit: make(chan struct{}, 1), } }
好啦!现在假如我们需要创建一个以二进制协议传输的thrift服务器,那么可以用如下代码简单实现:
serverTransport, err := thrift.NewTServerSocket("127.0.0.1:8808") if err != nil { fmt.Println("Error!", err) return } handler := &rpcService{} processor := rpc.NewRpcServiceProcessor(handler) server := thrift.NewTSimpleServer2(processor, serverTransport) fmt.Println("thrift server in localhost") server.Serve()
另外我在网上查看这方面资料的时候,发现大家都用的NewTSimpleServer4这个函数,然后自己又创建一遍NewTTransportFactory以及NewTBinaryProtocolFactoryDefault。
现在我们分析一下源码,发现此举实乃多此一举。这是第一坑。
接下来说说如何用golang thrift编写客户端,查看网络上的一些写法,发现根本用不了,服务器会阻塞住!还是从源码来分析:
在thrift自动生成的代码中,会生成一个关于客户端的示例。
// Autogenerated by Thrift Compiler (0.9.3) // DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING package main import ( "flag" "fmt" "git.apache.org/thrift.git/lib/go/thrift" "math" "net" "net/url" "os" "strconv" "strings" "vic/rpc" ) func Usage() { fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:") flag.PrintDefaults() fmt.Fprintln(os.Stderr, "\nFunctions:") fmt.Fprintln(os.Stderr, " Video request(string vid, string cid, string platform, string url, string clientVersion)") fmt.Fprintln(os.Stderr) os.Exit(0) } func main() { flag.Usage = Usage var host string var port int var protocol string var urlString string var framed bool var useHttp bool var parsedUrl url.URL var trans thrift.TTransport _ = strconv.Atoi _ = math.Abs flag.Usage = Usage flag.StringVar(&host, "h", "localhost", "Specify host and port") flag.IntVar(&port, "p", 9090, "Specify port") flag.StringVar(&protocol, "P", "binary", "Specify the protocol (binary, compact, simplejson, json)") flag.StringVar(&urlString, "u", "", "Specify the url") flag.BoolVar(&framed, "framed", false, "Use framed transport") flag.BoolVar(&useHttp, "http", false, "Use http") flag.Parse() if len(urlString) > 0 { parsedUrl, err := url.Parse(urlString) if err != nil { fmt.Fprintln(os.Stderr, "Error parsing URL: ", err) flag.Usage() } host = parsedUrl.Host useHttp = len(parsedUrl.Scheme) <= 0 || parsedUrl.Scheme == "http" } else if useHttp { _, err := url.Parse(fmt.Sprint("http://", host, ":", port)) if err != nil { fmt.Fprintln(os.Stderr, "Error parsing URL: ", err) flag.Usage() } } cmd := flag.Arg(0) var err error if useHttp { trans, err = thrift.NewTHttpClient(parsedUrl.String()) } else { portStr := fmt.Sprint(port) if strings.Contains(host, ":") { host, portStr, err = net.SplitHostPort(host) if err != nil { fmt.Fprintln(os.Stderr, "error with host:", err) os.Exit(1) } } trans, err = thrift.NewTSocket(net.JoinHostPort(host, portStr)) if err != nil { fmt.Fprintln(os.Stderr, "error resolving address:", err) os.Exit(1) } if framed { trans = thrift.NewTFramedTransport(trans) } } if err != nil { fmt.Fprintln(os.Stderr, "Error creating transport", err) os.Exit(1) } defer trans.Close() var protocolFactory thrift.TProtocolFactory switch protocol { case "compact": protocolFactory = thrift.NewTCompactProtocolFactory() break case "simplejson": protocolFactory = thrift.NewTSimpleJSONProtocolFactory() break case "json": protocolFactory = thrift.NewTJSONProtocolFactory() break case "binary", "": protocolFactory = thrift.NewTBinaryProtocolFactoryDefault() break default: fmt.Fprintln(os.Stderr, "Invalid protocol specified: ", protocol) Usage() os.Exit(1) } client := rpc.NewVideoServiceClientFactory(trans, protocolFactory) if err := trans.Open(); err != nil { fmt.Fprintln(os.Stderr, "Error opening socket to ", host, ":", port, " ", err) os.Exit(1) } switch cmd { case "request": if flag.NArg()-1 != 5 { fmt.Fprintln(os.Stderr, "Request requires 5 args") flag.Usage() } argvalue0 := flag.Arg(1) value0 := argvalue0 argvalue1 := flag.Arg(2) value1 := argvalue1 argvalue2 := flag.Arg(3) value2 := argvalue2 argvalue3 := flag.Arg(4) value3 := argvalue3 argvalue4 := flag.Arg(5) value4 := argvalue4 fmt.Print(client.Request(value0, value1, value2, value3, value4)) fmt.Print("\n") break case "": Usage() break default: fmt.Fprintln(os.Stderr, "Invalid function ", cmd) } }
我们一部分一部分来分析分析:
flag.Usage = Usage var host string var port int var protocol string var urlString string var framed bool var useHttp bool var parsedUrl url.URL var trans thrift.TTransport _ = strconv.Atoi _ = math.Abs flag.Usage = Usage flag.StringVar(&host, "h", "localhost", "Specify host and port") flag.IntVar(&port, "p", 9090, "Specify port") flag.StringVar(&protocol, "P", "binary", "Specify the protocol (binary, compact, simplejson, json)") flag.StringVar(&urlString, "u", "", "Specify the url") flag.BoolVar(&framed, "framed", false, "Use framed transport") flag.BoolVar(&useHttp, "http", false, "Use http") flag.Parse()
这些代码是设置了一些程序的启动命令,例如默认地址是loacalhost,我们可以根据client.exe -h xxx.xxx.xxx.xxx之类的命令来修改
我们发现这些代码都不是我们需要的,pass,继续看
if len(urlString) > 0 { parsedUrl, err := url.Parse(urlString) if err != nil { fmt.Fprintln(os.Stderr, "Error parsing URL: ", err) flag.Usage() } host = parsedUrl.Host useHttp = len(parsedUrl.Scheme) <= 0 || parsedUrl.Scheme == "http" } else if useHttp { _, err := url.Parse(fmt.Sprint("http://", host, ":", port)) if err != nil { fmt.Fprintln(os.Stderr, "Error parsing URL: ", err) flag.Usage() } } cmd := flag.Arg(0) var err error if useHttp { trans, err = thrift.NewTHttpClient(parsedUrl.String()) } else { portStr := fmt.Sprint(port) if strings.Contains(host, ":") { host, portStr, err = net.SplitHostPort(host) if err != nil { fmt.Fprintln(os.Stderr, "error with host:", err) os.Exit(1) } } trans, err = thrift.NewTSocket(net.JoinHostPort(host, portStr)) if err != nil { fmt.Fprintln(os.Stderr, "error resolving address:", err) os.Exit(1) } if framed { trans = thrift.NewTFramedTransport(trans) } }
这部分主要作用是解析url参数,从中取得host以及port。并且用于生成一个TTransport,上面红线加粗的函数定义在源码中如下:
func NewTHttpClient(urlstr string) (TTransport, error) { return NewTHttpClientWithOptions(urlstr, THttpClientOptions{}) } func NewTSocket(hostPort string) (*TSocket, error) { return NewTSocketTimeout(hostPort, 0) }
细心的朋友们可能发现了端倪,第二个函数的返回值是一个TSocket指针,并不是TTransport,是不是有啥问题?不急,我们看看这两个结构体的定义就知道了:
type TTransport interface { io.ReadWriteCloser Flusher ReadSizeProvider // Opens the transport for communication Open() error // Returns true if the transport is open IsOpen() bool }
原来TTransport是一个接口类型,而TSocket则实现了该接口!
目前为止,我们获得了创建客户端所需要的关键代码:
trans, err = thrift.NewTHttpClient(parsedUrl.String())
trans, err = thrift.NewTSocket(net.JoinHostPort(host, portStr))
OK,继续分析示例!
var protocolFactory thrift.TProtocolFactory switch protocol { case "compact": protocolFactory = thrift.NewTCompactProtocolFactory() break case "simplejson": protocolFactory = thrift.NewTSimpleJSONProtocolFactory() break case "json": protocolFactory = thrift.NewTJSONProtocolFactory() break case "binary", "": protocolFactory = thrift.NewTBinaryProtocolFactoryDefault() break default: fmt.Fprintln(os.Stderr, "Invalid protocol specified: ", protocol) Usage() os.Exit(1) } client := rpc.NewVideoServiceClientFactory(trans, protocolFactory) if err := trans.Open(); err != nil { fmt.Fprintln(os.Stderr, "Error opening socket to ", host, ":", port, " ", err) os.Exit(1) }
switch语句是根据我们所输入的参数,选择传输协议。最后通过NewVideoServiceClientFactory函数 完成客户端的创建
最后,总结一下,假如我们要创建一个以二进制为传输协议,那么我们可以编写如下代码来完成:
transport, err := thrift.NewTSocket(net.JoinHostPort("127.0.0.1", "8808")) if err != nil { fmt.Fprintln(os.Stderr, "error resolving address:", err) os.Exit(1) } protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() client := NewRpcServiceClientFactory(transport, protocolFactory) if err := transport.Open(); err != nil { fmt.Fprintln(os.Stderr, "Error opening socket to 127.0.0.1:8808", " ", err) os.Exit(1) } defer transport.Close() res, _ := client.SayHi(“World”)
有疑问加站长微信联系(非本文作者)