Golang-TCP异步框架Tao分析

wiseAaron · · 6086 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

TCP异步框架

Golang 编程风格

  • Go语言面向对象编程的风格是多用组合,少用继承,以匿名嵌入的方式实现继承。

掌握Go语言,要把握一个中心,两个基本点

  • 一个中心是Go语言并发模型,即不要通过共享内存来通信,要通过通信来共享内存
  • 两个基本点是Go语言的并发模型的两大基石:channelgo-routine

不要通过共享内存来通信,要通过通信来共享内存

这句话的大概解释是: 不要通过共享内存来实现通信,这是因为在复杂的分布式、多线程和多进程之间通过加锁等控制并发方式来保证数据的正确性,是非常困难和低效的。建议线程之间通过通道channel来实现通知,降低数据的竞争,提高系统的可靠性和正确性。

1. 服务启动开始

1.1 启动心跳定时器循环

func (s *Server) timeOutLoop() {
    defer s.wg.Done()

    for {
        select {
        case <-s.ctx.Done():
            return

        case timeout := <-s.timing.TimeOutChannel():
            netID := timeout.Ctx.Value(netIDCtx).(int64)
            if v, ok := s.conns.Load(netID); ok {
                sc := v.(*ServerConn)
                sc.timerCh <- timeout
            } else {
                holmes.Warnf("invalid client %d", netID)
            }
        }
    }
}

当服务开始的时候就开始了定时器循环timeOutLoop来维护Clinet连接服务的应用层心跳,在一个goroutine中通过select一直监控服务中名为timeOutChan定时任务的channel
如果有定时任务到来,通过context上下文获取netIDCtx,这是TCP连接唯一标识ID,根据这个ID我们可以找到相应的ServerConn
ServerConn:这是对于TCP连接,上层又一次的连接封装。其中主要包含三个重要的channel,分别是sendCh,handlerChtimerCh,下面会详细介绍)。

这样就可以把定时到期任务放到相应ServerConntimeCh中了,由该连接处理定时到期任务的执行。

1.2 服务启动限制处理

  • 如果服务器在接受客户端连接请求的时候发生了临时错误,那么服务器将等待最多1秒的时间再重新尝试接受请求。
  • 如果现有的连接数超过了MaxConnections(默认1000),就拒绝并关闭连接,否则启动一个新的连接开始工作。

2. 网络连接处理模块

func (sc *ServerConn) Start() {
    holmes.Infof("conn start, <%v -> %v>\n", sc.rawConn.LocalAddr(), sc.rawConn.RemoteAddr())
    onConnect := sc.belong.opts.onConnect
    if onConnect != nil {
        onConnect(sc)
    }

    loopers := []func(WriteCloser, *sync.WaitGroup){readLoop, writeLoop, handleLoop}
    for _, l := range loopers {
        looper := l
        sc.wg.Add(1)
        go looper(sc, sc.wg)
    }
}

在别的编程语言中,采用Reactor模式编写的服务器往往需要在一个IO线程异步地通过epoll进行多路复用。而因为Go线程的开销廉价,Go语言可以对每一个网络连接创建三个goroutine

  • readLoop()负责读取数据并反序列化成消息。
  • writeLoop()负责序列化消息并发送二进制字节流。
  • handleLoop()负责调用消息处理函数。

这三个协程在连接创建并启动时就会各自独立运行。

2.1 ReadLoop 实现细节

    for {
        select {
        case <-cDone: // connection closed
            holmes.Debugln("receiving cancel signal from conn")
            return
        case <-sDone: // server closed
            holmes.Debugln("receiving cancel signal from server")
            return
        default:
            msg, err = codec.Decode(rawConn)
            if err != nil {
                holmes.Errorf("error decoding message %v\n", err)
                if _, ok := err.(ErrUndefined); ok {
                    // update heart beats
                    setHeartBeatFunc(time.Now().UnixNano())
                    continue
                }
                return
            }
            setHeartBeatFunc(time.Now().UnixNano())
            handler := GetHandlerFunc(msg.MessageNumber())
            if handler == nil {
                if onMessage != nil {
                    holmes.Infof("message %d call onMessage()\n", msg.MessageNumber())
                    onMessage(msg, c.(WriteCloser))
                } else {
                    holmes.Warnf("no handler or onMessage() found for message %d\n", msg.MessageNumber())
                }
                continue
            }
            handlerCh <- MessageHandler{msg, handler}
        }
    }

在Readloop 循环中通过codec来读取网络rawConn连接中数据包,并且返回的是解析后的数据。
codec使用的解析函数是在服务启动的时候注册的,注册的还有该类型数据的执行函数,以消息类型为key保存在message.go包中。
解析成功后再获取该消息的执行函数,二者封装成MessageHandler发送到handlerCh中。供HandleLoop循环执行。

2.2 HandleLoop 实现细节

for {
    select {
    case <-cDone: // connectin closed
        holmes.Debugln("receiving cancel signal from conn")
        return
    case <-sDone: // server closed
        holmes.Debugln("receiving cancel signal from server")
        return
    case msgHandler := <-handlerCh:
        msg, handler := msgHandler.message, msgHandler.handler
        if handler != nil {
            if askForWorker {
                err = WorkerPoolInstance().Put(netID, func() {
                    handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
                })
                if err != nil {
                    holmes.Errorln(err)
                }
                addTotalHandle()
            } else {
                handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
            }
        }
    case timeout := <-timerCh:
        if timeout != nil {
            timeoutNetID := NetIDFromContext(timeout.Ctx)
            if timeoutNetID != netID {
                holmes.Errorf("timeout net %d, conn net %d, mismatched!\n", timeoutNetID, netID)
            }
            if askForWorker {
                err = WorkerPoolInstance().Put(netID, func() {
                    timeout.Callback(time.Now(), c.(WriteCloser))
                })
                if err != nil {
                    holmes.Errorln(err)
                }
            } else {
                timeout.Callback(time.Now(), c.(WriteCloser))
            }
        }
    }
}

HandleLoop循环中,主要监听handlerChtimerCh,一个是消息执行channel,一个是定时任务到期channel

  • handlerCh处理都是ReadLoop循环中发送过来的数据,通过异步任务池来执行任务。
  • timerCh 处理的该连接下定时任务的执行,也是通过异步任务池来执行任务。

2.2 WriteLoop 实现细节

for {
    select {
    case <-cDone: // connection closed
        holmes.Debugln("receiving cancel signal from conn")
        return
    case <-sDone: // server closed
        holmes.Debugln("receiving cancel signal from server")
        return
    case pkt = <-sendCh:
        if pkt != nil {
            if _, err = rawConn.Write(pkt); err != nil {
                holmes.Errorf("error writing data %v\n", err)
                return
            }
        }
    }
}
func ProcessMessage(ctx context.Context, conn tao.WriteCloser) {
    msg := tao.MessageFromContext(ctx).(Message)
    holmes.Infof("receving message %s\n", msg.Content)
    conn.Write(msg)
}

WriteLoop循环中,主要监听sendCh,它会非阻塞地将sendCh中的消息全部发送完毕再退出,避免漏发消息。
sendCh消息的传入是在服务开始的时候message注册的,在ProcessMessage中通过Write异步写入到 sendCh中。

3. 总结

Tao 框架中三大循环ReadLoopHandleLoopWriteLoop是整个的核心代码,这三个Loop中是通过channel来实现数据的传递,而每一个TCP连接都会实现这三个goroutine。每一个goroutine都是独立运行。

  • 框架支持通过tao.TLSCredsOption()函数提供传输层安全的TLS Server

  • 而在在我们开发不同的业务中,编写业务代码是在自定义message当中。需要实现DeserializeMessage解析该类型数据包函数和ProcessMessage 只用该消息的函数。

  • 在框架中使用context联系程序上线文,使得程序能够优雅的退出。

  • Context的使用在另一篇文章中Golang并发模型

  • 至于在框架中定时器的实现分析在另一篇文章中 Golang-基于TimeingWheel定时器

4. 感谢

感谢leesper为开源社区做出的贡献,提供我们学习。

Tao 源码


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:wiseAaron

查看原文:Golang-TCP异步框架Tao分析

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

6086 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传