golang thrift 总结一下网络上的一些坑

ka200812 · · 2971 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

我们以hello world来大概分析一下golang中的thrift包,并且扒一扒网络上有关thrift的一些坑

 

查看源码,服务器定义如下:(详见simple_server.go文件)

type TSimpleServer struct {
    quit    chan struct{}
    stopped int64

    processorFactory       TProcessorFactory   //实质是一个handler,用来相应客户端的请求
    serverTransport        TServerTransport    //实质是一个socket
    inputTransportFactory  TTransportFactory   //实质是传输协议的具体操作类(详细可见transport.go文件中TTransport结构体)
    outputTransportFactory TTransportFactory   //
    inputProtocolFactory   TProtocolFactory    //实质是传输协议(有compact、simplejson、json、binary四种协议,默认是binary)
tputProtocolFactory TProtocolFactory // }

 

在go语言中,创建一个thrift服务器有三种方法:(详见simple_server.go文件)

func NewTSimpleServer2(processor TProcessor, serverTransport TServerTransport) *TSimpleServer {
    return NewTSimpleServerFactory2(NewTProcessorFactory(processor), serverTransport)
}

func NewTSimpleServer4(processor TProcessor, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer {
    return NewTSimpleServerFactory4(NewTProcessorFactory(processor),
        serverTransport,
        transportFactory,
        protocolFactory,
    )
}

func NewTSimpleServer6(processor TProcessor, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer {
    return NewTSimpleServerFactory6(NewTProcessorFactory(processor),
        serverTransport,
        inputTransportFactory,
        outputTransportFactory,
        inputProtocolFactory,
        outputProtocolFactory,
    )
}

这三个函数分别调用了工厂函数

NewTSimpleServerFactory2;
NewTSimpleServerFactory4;
NewTSimpleServerFactory6;
func NewTSimpleServerFactory2(processorFactory TProcessorFactory, serverTransport TServerTransport) *TSimpleServer {
    return NewTSimpleServerFactory6(processorFactory,
        serverTransport,
        NewTTransportFactory(),
        NewTTransportFactory(),
        NewTBinaryProtocolFactoryDefault(),
        NewTBinaryProtocolFactoryDefault(),
    )
}

func NewTSimpleServerFactory4(processorFactory TProcessorFactory, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer {
    return NewTSimpleServerFactory6(processorFactory,
        serverTransport,
        transportFactory,
        transportFactory,
        protocolFactory,
        protocolFactory,
    )
}

func NewTSimpleServerFactory6(processorFactory TProcessorFactory, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer {
    return &TSimpleServer{
        processorFactory:       processorFactory,
        serverTransport:        serverTransport,
        inputTransportFactory:  inputTransportFactory,
        outputTransportFactory: outputTransportFactory,
        inputProtocolFactory:   inputProtocolFactory,
        outputProtocolFactory:  outputProtocolFactory,
        quit: make(chan struct{}, 1),
    }
}

好啦!现在假如我们需要创建一个以二进制协议传输的thrift服务器,那么可以用如下代码简单实现:

    serverTransport, err := thrift.NewTServerSocket("127.0.0.1:8808")
    if err != nil {
        fmt.Println("Error!", err)
        return
    }
    handler := &rpcService{}
    processor := rpc.NewRpcServiceProcessor(handler)
    server := thrift.NewTSimpleServer2(processor, serverTransport)
    fmt.Println("thrift server in localhost")

    server.Serve()

另外我在网上查看这方面资料的时候,发现大家都用的NewTSimpleServer4这个函数,然后自己又创建一遍NewTTransportFactory以及NewTBinaryProtocolFactoryDefault。

现在我们分析一下源码,发现此举实乃多此一举。这是第一坑。

 

接下来说说如何用golang thrift编写客户端,查看网络上的一些写法,发现根本用不了,服务器会阻塞住!还是从源码来分析:

在thrift自动生成的代码中,会生成一个关于客户端的示例。

// Autogenerated by Thrift Compiler (0.9.3)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING

package main

import (
    "flag"
    "fmt"
    "git.apache.org/thrift.git/lib/go/thrift"
    "math"
    "net"
    "net/url"
    "os"
    "strconv"
    "strings"
    "vic/rpc"
)

func Usage() {
    fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:")
    flag.PrintDefaults()
    fmt.Fprintln(os.Stderr, "\nFunctions:")
    fmt.Fprintln(os.Stderr, "  Video request(string vid, string cid, string platform, string url, string clientVersion)")
    fmt.Fprintln(os.Stderr)
    os.Exit(0)
}

func main() {
    flag.Usage = Usage
    var host string
    var port int
    var protocol string
    var urlString string
    var framed bool
    var useHttp bool
    var parsedUrl url.URL
    var trans thrift.TTransport
    _ = strconv.Atoi
    _ = math.Abs
    flag.Usage = Usage
    flag.StringVar(&host, "h", "localhost", "Specify host and port")
    flag.IntVar(&port, "p", 9090, "Specify port")
    flag.StringVar(&protocol, "P", "binary", "Specify the protocol (binary, compact, simplejson, json)")
    flag.StringVar(&urlString, "u", "", "Specify the url")
    flag.BoolVar(&framed, "framed", false, "Use framed transport")
    flag.BoolVar(&useHttp, "http", false, "Use http")
    flag.Parse()

    if len(urlString) > 0 {
        parsedUrl, err := url.Parse(urlString)
        if err != nil {
            fmt.Fprintln(os.Stderr, "Error parsing URL: ", err)
            flag.Usage()
        }
        host = parsedUrl.Host
        useHttp = len(parsedUrl.Scheme) <= 0 || parsedUrl.Scheme == "http"
    } else if useHttp {
        _, err := url.Parse(fmt.Sprint("http://", host, ":", port))
        if err != nil {
            fmt.Fprintln(os.Stderr, "Error parsing URL: ", err)
            flag.Usage()
        }
    }

    cmd := flag.Arg(0)
    var err error
    if useHttp {
        trans, err = thrift.NewTHttpClient(parsedUrl.String())
    } else {
        portStr := fmt.Sprint(port)
        if strings.Contains(host, ":") {
            host, portStr, err = net.SplitHostPort(host)
            if err != nil {
                fmt.Fprintln(os.Stderr, "error with host:", err)
                os.Exit(1)
            }
        }
        trans, err = thrift.NewTSocket(net.JoinHostPort(host, portStr))
        if err != nil {
            fmt.Fprintln(os.Stderr, "error resolving address:", err)
            os.Exit(1)
        }
        if framed {
            trans = thrift.NewTFramedTransport(trans)
        }
    }
    if err != nil {
        fmt.Fprintln(os.Stderr, "Error creating transport", err)
        os.Exit(1)
    }
    defer trans.Close()
    var protocolFactory thrift.TProtocolFactory
    switch protocol {
    case "compact":
        protocolFactory = thrift.NewTCompactProtocolFactory()
        break
    case "simplejson":
        protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
        break
    case "json":
        protocolFactory = thrift.NewTJSONProtocolFactory()
        break
    case "binary", "":
        protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
        break
    default:
        fmt.Fprintln(os.Stderr, "Invalid protocol specified: ", protocol)
        Usage()
        os.Exit(1)
    }
    client := rpc.NewVideoServiceClientFactory(trans, protocolFactory)
    if err := trans.Open(); err != nil {
        fmt.Fprintln(os.Stderr, "Error opening socket to ", host, ":", port, " ", err)
        os.Exit(1)
    }

    switch cmd {
    case "request":
        if flag.NArg()-1 != 5 {
            fmt.Fprintln(os.Stderr, "Request requires 5 args")
            flag.Usage()
        }
        argvalue0 := flag.Arg(1)
        value0 := argvalue0
        argvalue1 := flag.Arg(2)
        value1 := argvalue1
        argvalue2 := flag.Arg(3)
        value2 := argvalue2
        argvalue3 := flag.Arg(4)
        value3 := argvalue3
        argvalue4 := flag.Arg(5)
        value4 := argvalue4
        fmt.Print(client.Request(value0, value1, value2, value3, value4))
        fmt.Print("\n")
        break
    case "":
        Usage()
        break
    default:
        fmt.Fprintln(os.Stderr, "Invalid function ", cmd)
    }
}
View Code

我们一部分一部分来分析分析:

flag.Usage = Usage
    var host string
    var port int
    var protocol string
    var urlString string
    var framed bool
    var useHttp bool
    var parsedUrl url.URL
    var trans thrift.TTransport
    _ = strconv.Atoi
    _ = math.Abs
    flag.Usage = Usage
    flag.StringVar(&host, "h", "localhost", "Specify host and port")
    flag.IntVar(&port, "p", 9090, "Specify port")
    flag.StringVar(&protocol, "P", "binary", "Specify the protocol (binary, compact, simplejson, json)")
    flag.StringVar(&urlString, "u", "", "Specify the url")
    flag.BoolVar(&framed, "framed", false, "Use framed transport")
    flag.BoolVar(&useHttp, "http", false, "Use http")
    flag.Parse()

这些代码是设置了一些程序的启动命令,例如默认地址是loacalhost,我们可以根据client.exe -h xxx.xxx.xxx.xxx之类的命令来修改

我们发现这些代码都不是我们需要的,pass,继续看

if len(urlString) > 0 {
        parsedUrl, err := url.Parse(urlString)
        if err != nil {
            fmt.Fprintln(os.Stderr, "Error parsing URL: ", err)
            flag.Usage()
        }
        host = parsedUrl.Host
        useHttp = len(parsedUrl.Scheme) <= 0 || parsedUrl.Scheme == "http"
    } else if useHttp {
        _, err := url.Parse(fmt.Sprint("http://", host, ":", port))
        if err != nil {
            fmt.Fprintln(os.Stderr, "Error parsing URL: ", err)
            flag.Usage()
        }
    }

    cmd := flag.Arg(0)
    var err error
    if useHttp {
        trans, err = thrift.NewTHttpClient(parsedUrl.String())
    } else {
        portStr := fmt.Sprint(port)
        if strings.Contains(host, ":") {
            host, portStr, err = net.SplitHostPort(host)
            if err != nil {
                fmt.Fprintln(os.Stderr, "error with host:", err)
                os.Exit(1)
            }
        }
        trans, err = thrift.NewTSocket(net.JoinHostPort(host, portStr))
        if err != nil {
            fmt.Fprintln(os.Stderr, "error resolving address:", err)
            os.Exit(1)
        }
        if framed {
            trans = thrift.NewTFramedTransport(trans)
        }
    }

 

这部分主要作用是解析url参数,从中取得host以及port。并且用于生成一个TTransport,上面红线加粗的函数定义在源码中如下:

func NewTHttpClient(urlstr string) (TTransport, error) {
    return NewTHttpClientWithOptions(urlstr, THttpClientOptions{})
}

func NewTSocket(hostPort string) (*TSocket, error) {
    return NewTSocketTimeout(hostPort, 0)
}

细心的朋友们可能发现了端倪,第二个函数的返回值是一个TSocket指针,并不是TTransport,是不是有啥问题?不急,我们看看这两个结构体的定义就知道了:

    type TTransport interface {
    io.ReadWriteCloser
    Flusher
    ReadSizeProvider

    // Opens the transport for communication
    Open() error

    // Returns true if the transport is open
    IsOpen() bool
}

原来TTransport是一个接口类型,而TSocket则实现了该接口!

目前为止,我们获得了创建客户端所需要的关键代码:

trans, err = thrift.NewTHttpClient(parsedUrl.String())

trans, err = thrift.NewTSocket(net.JoinHostPort(host, portStr))

OK,继续分析示例!

    var protocolFactory thrift.TProtocolFactory
    switch protocol {
    case "compact":
        protocolFactory = thrift.NewTCompactProtocolFactory()
        break
    case "simplejson":
        protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
        break
    case "json":
        protocolFactory = thrift.NewTJSONProtocolFactory()
        break
    case "binary", "":
        protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
        break
    default:
        fmt.Fprintln(os.Stderr, "Invalid protocol specified: ", protocol)
        Usage()
        os.Exit(1)
    }
    client := rpc.NewVideoServiceClientFactory(trans, protocolFactory)
    if err := trans.Open(); err != nil {
        fmt.Fprintln(os.Stderr, "Error opening socket to ", host, ":", port, " ", err)
        os.Exit(1)
    }

switch语句是根据我们所输入的参数,选择传输协议。最后通过NewVideoServiceClientFactory函数 完成客户端的创建

最后,总结一下,假如我们要创建一个以二进制为传输协议,那么我们可以编写如下代码来完成:

    transport, err := thrift.NewTSocket(net.JoinHostPort("127.0.0.1", "8808"))
    if err != nil {
        fmt.Fprintln(os.Stderr, "error resolving address:", err)
        os.Exit(1)
    }
    
    protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()

    client := NewRpcServiceClientFactory(transport, protocolFactory)
    if err := transport.Open(); err != nil {
        fmt.Fprintln(os.Stderr, "Error opening socket to 127.0.0.1:8808", " ", err)
        os.Exit(1)
    }
    defer transport.Close()
    res, _ := client.SayHi(“World”)

 


有疑问加站长微信联系(非本文作者)

本文来自:博客园

感谢作者:ka200812

查看原文:golang thrift 总结一下网络上的一些坑

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2971 次点击  
加入收藏 微博
上一篇:alpha—go
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传