golang:实现thrift的client端协程安全

郝冠伟 · · 14766 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

前言

Golang作为我们服务端开发的主要语言,实现了很多基础服务,比如oauth2,账户系统,支付,客服等。而在早期开发阶段,也尝试使用golang做页面展示,但每种语言都有自己最擅长的领域,让golang来搞前端实在是有点疼,最终我们选择php+golang的方式来作为整体的服务架构。

那么问题来了,php和golang这对好基友如何愉快的玩耍呢?结论是thrift是块好肥皂!

抛砖

市面上肥皂一大堆,最著名的是舒肤佳,那么我们为毛不用舒肤佳,而选择thrift呢。。。因为够酸爽!

这种酸爽,只有牙口好,才能吃嘛嘛香。众所周知,thrift有多种型号(传输协议),比如家用型的TDebugProtocol,持久型TBinaryProtocol还有爆炸型TCompactProtocol。

而我们使用初始,想当然的选择了爆炸型TCompactProtocol这种更能让酸爽感提升百分之10的型号。但是php牙口不太好,遇到golang搓出的64位int时,1234567890硬是给爆成了1234567891(此处只是举个例子,php在处理golang返回的int64会出现错误的结果)。所以,php和golang这对好基友,thrift爆炸酸爽好,不如thrift持久好。(据说thrift下一个版本会修复这个bug,敬请关注吧)

引玉

乱扯一通,引据经典,发现Thrift生成的server端是thread safe的,但client端不是。所以需要多个thread和server端通信,则每个thread需要init一个自己的client实例。

那么问题来了,golang是如何实现thrift的client端协程安全呢?

实践

首先,thrift实现golang的server端,依托golang牛叉的goroutine,只实现了一种类似TThreadedServer的服务模型,所以毛老师再也不用担心我滴使用了。

func (p *TSimpleServer) AcceptLoop() error {
    for {
        select {
        case <-p.quit:
            return nil
        default:
        }

        client, err := p.serverTransport.Accept()
        if err != nil {
            log.Println("Accept err: ", err)
        }
        if client != nil {
            go func() {// 起新routine处理
                if err := p.processRequests(client); err != nil {
                    log.Println("error processing request:", err)
                }
            }()
        }
    }
}

其次,thrift的client端都是线程不安全的,那么问题来了,重新实现Transport好搞,还是在现有Transport的基础上使用pool好?

还在我思考如何修改Transport的实现时,毛老师已经搞定了pool,那么结论来了,在Transport基础上使用pool好。。。即便重新实现也无非是加pool,这样一来还得改thrift的client实现,真是费时费力又不讨好。thrift默认实现的Transport有基础的读写功能,丢到pool里照样游来游去。

以下是毛老师实现的pool,有基本的超时检查,最大激活和空闲数等功能。

type Pool struct {

    // Dial is an application supplied function for creating new connections.
    Dial func() (interface{}, error)

    // Close is an application supplied functoin for closeing connections.
    Close func(c interface{}) error

    // TestOnBorrow is an optional application supplied function for checking
    // the health of an idle connection before the connection is used again by
    // the application. Argument t is the time that the connection was returned
    // to the pool. If the function returns an error, then the connection is
    // closed.
    TestOnBorrow func(c interface{}, t time.Time) error

    // Maximum number of idle connections in the pool.
    MaxIdle int

    // Maximum number of connections allocated by the pool at a given time.
    // When zero, there is no limit on the number of connections in the pool.
    MaxActive int

    // Close connections after remaining idle for this duration. If the value
    // is zero, then idle connections are not closed. Applications should set
    // the timeout to a value less than the server's timeout.
    IdleTimeout time.Duration

    // mu protects fields defined below.
    mu     sync.Mutex
    closed bool
    active int

    // Stack of idleConn with most recently used at the front.
    idle list.List
}

type idleConn struct {
    c interface{}
    t time.Time
}

// New creates a new pool. This function is deprecated. Applications should
// initialize the Pool fields directly as shown in example.
func New(dialFn func() (interface{}, error), closeFn func(c interface{}) error, maxIdle int) *Pool {
    return &Pool{Dial: dialFn, Close: closeFn, MaxIdle: maxIdle}
}

// Get gets a connection. The application must close the returned connection.
// This method always returns a valid connection so that applications can defer
// error handling to the first use of the connection.
func (p *Pool) Get() (interface{}, error) {
    p.mu.Lock()
    // if closed
    if p.closed {
        p.mu.Unlock()
        return nil, ErrPoolClosed
    }
    // Prune stale connections.
    if timeout := p.IdleTimeout; timeout > 0 {
        for i, n := 0, p.idle.Len(); i < n; i++ {
            e := p.idle.Back()
            if e == nil {
                break
            }
            ic := e.Value.(idleConn)
            if ic.t.Add(timeout).After(nowFunc()) {
                break
            }
            p.idle.Remove(e)
            p.active -= 1
            p.mu.Unlock()
            // ic.c.Close()
            p.Close(ic.c)
            p.mu.Lock()
        }
    }
    // Get idle connection.
    for i, n := 0, p.idle.Len(); i  0 && p.active >= p.MaxActive {
        p.mu.Unlock()
        return nil, ErrPoolExhausted
    }
    // No idle connection, create new.
    dial := p.Dial
    p.active += 1
    p.mu.Unlock()
    c, err := dial()
    if err != nil {
        p.mu.Lock()
        p.active -= 1
        p.mu.Unlock()
        c = nil
    }
    return c, err
}

// Put adds conn back to the pool, use forceClose to close the connection forcely
func (p *Pool) Put(c interface{}, forceClose bool) error {
    if !forceClose {
        p.mu.Lock()
        if !p.closed {
            p.idle.PushFront(idleConn{t: nowFunc(), c: c})
            if p.idle.Len() > p.MaxIdle {
                // remove exceed conn
                c = p.idle.Remove(p.idle.Back()).(idleConn).c
            } else {
                c = nil
            }
        }
        p.mu.Unlock()
    }
    // close exceed conn
    if c != nil {
        p.mu.Lock()
        p.active -= 1
        p.mu.Unlock()
        return p.Close(c)
    }
    return nil
}

// ActiveCount returns the number of active connections in the pool.
func (p *Pool) ActiveCount() int {
    p.mu.Lock()
    active := p.active
    p.mu.Unlock()
    return active
}

// Relaase releases the resources used by the pool.
func (p *Pool) Release() error {
    p.mu.Lock()
    idle := p.idle
    p.idle.Init()
    p.closed = true
    p.active -= idle.Len()
    p.mu.Unlock()
    for e := idle.Front(); e != nil; e = e.Next() {
        p.Close(e.Value.(idleConn).c)
    }
    return nil
}

最后,在实际使用thrift相关的设置貌似只有超时时间,那么问题来了,pool下,thrift的超时时间如何是好?

由于在使用pool之前,使用每个routine创建一个client的方式,超时时间设置的都很短,server端和client都是15秒。换了pool使用方式之后,时间没变,也就是说我们把超时交给thrift自己管理,但发现经常性的出现EOF的I/O错误。经过跟踪发现,在请求量小的情况下,15秒就显得太短了,pool里会easy的出现空闲时间超过15秒的连接,而当我们get出来使用时,因为超时,导致了EOF。

经过实践,server端的时间一定要足够长,我们设置了8h,client端的超时则交给pool管理,不然pool里还有可能出现超时的连接。

// server
    transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
    protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
    serverTransport, err := thrift.NewTServerSocketTimeout(bind, thriftCallTimeOut)
    if err != nil {
        log.Exitf("start thrift rpc error(%v)", err)
    }
    // thrift rpc service
    handler := NewThriftRPC()
    processor := thriftRpc.NewRpcServiceProcessor(handler)
    server := thrift.NewTSimpleServer4(processor, serverTransport, transportFactory, protocolFactory)
    thriftServer = append(thriftServer, server)
    log.Info("start thrift rpc listen addr: %s", bind)
    go server.Serve()

// client
thriftPool = &pool.Pool{
    Dial: func() (interface{}, error) {
        addr := conf.MyConf.ThriftOAuth2Addr[rand.Intn(len(conf.MyConf.ThriftOAuth2Addr))]
        sock, err := thrift.NewTSocket(addr)  // client端不设置超时
        if err != nil {
            log.Error("thrift.NewTSocketTimeout(%s) error(%v)", addr, err)
            return nil, err
        }
        tF := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
        pF := thrift.NewTBinaryProtocolFactoryDefault()
        client := rpc.NewRpcServiceClientFactory(tF.GetTransport(sock), pF)
        if err = client.Transport.Open(); err != nil {
            log.Error("client.Transport.Open() error(%v)", err)
            return nil, err
        }
        return client, nil
    },
    Close: func(v interface{}) error {
        v.(*rpc.RpcServiceClient).Transport.Close()
        return nil
    },
    MaxActive:   conf.MyConf.ThriftMaxActive,
    MaxIdle:     conf.MyConf.ThriftMaxIdle,
    IdleTimeout: conf.MyConf.ThriftIdleTimeout,
}

pool.idleTimeout 7h // pool最大空闲时间,设置比server端小,都设置8h,也有可能出现超时连接

有疑问加站长微信联系(非本文作者)

本文来自:猎豹移动技术博客

感谢作者:郝冠伟

查看原文:golang:实现thrift的client端协程安全

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

14766 次点击  
加入收藏 微博
3 回复  |  直到 2000-01-01 00:00:00
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传