TCP异步框架
Golang 编程风格
- Go语言面向对象编程的风格是多用组合,少用继承,以匿名嵌入的方式实现继承。
掌握Go语言,要把握一个中心,两个基本点。
- 一个中心是Go语言并发模型,即不要通过共享内存来通信,要通过通信来共享内存;
-
两个基本点是Go语言的并发模型的两大基石:
channel
和go-routine
。
不要通过共享内存来通信,要通过通信来共享内存
这句话的大概解释是: 不要通过共享内存来实现通信,这是因为在复杂的分布式、多线程和多进程之间通过加锁等控制并发方式来保证数据的正确性,是非常困难和低效的。建议线程之间通过通道channel
来实现通知,降低数据的竞争,提高系统的可靠性和正确性。
1. 服务启动开始
1.1 启动心跳定时器循环
func (s *Server) timeOutLoop() {
defer s.wg.Done()
for {
select {
case <-s.ctx.Done():
return
case timeout := <-s.timing.TimeOutChannel():
netID := timeout.Ctx.Value(netIDCtx).(int64)
if v, ok := s.conns.Load(netID); ok {
sc := v.(*ServerConn)
sc.timerCh <- timeout
} else {
holmes.Warnf("invalid client %d", netID)
}
}
}
}
当服务开始的时候就开始了定时器循环timeOutLoop
来维护Clinet
连接服务的应用层心跳,在一个goroutine
中通过select
一直监控服务中名为timeOutChan
定时任务的channel
,
如果有定时任务到来,通过context
上下文获取netIDCtx
,这是TCP连接唯一标识ID,根据这个ID我们可以找到相应的ServerConn
(ServerConn:这是对于TCP连接,上层又一次的连接封装。其中主要包含三个重要的channel
,分别是sendCh
,handlerCh
和timerCh
,下面会详细介绍)。
这样就可以把定时到期任务放到相应ServerConn
的timeCh
中了,由该连接处理定时到期任务的执行。
1.2 服务启动限制处理
- 如果服务器在接受客户端连接请求的时候发生了临时错误,那么服务器将等待最多1秒的时间再重新尝试接受请求。
- 如果现有的连接数超过了MaxConnections(默认1000),就拒绝并关闭连接,否则启动一个新的连接开始工作。
2. 网络连接处理模块
func (sc *ServerConn) Start() {
holmes.Infof("conn start, <%v -> %v>\n", sc.rawConn.LocalAddr(), sc.rawConn.RemoteAddr())
onConnect := sc.belong.opts.onConnect
if onConnect != nil {
onConnect(sc)
}
loopers := []func(WriteCloser, *sync.WaitGroup){readLoop, writeLoop, handleLoop}
for _, l := range loopers {
looper := l
sc.wg.Add(1)
go looper(sc, sc.wg)
}
}
在别的编程语言中,采用Reactor
模式编写的服务器往往需要在一个IO线程异步地通过epoll
进行多路复用。而因为Go线程的开销廉价,Go语言可以对每一个网络连接创建三个goroutine。
-
readLoop()
负责读取数据并反序列化成消息。 -
writeLoop()
负责序列化消息并发送二进制字节流。 -
handleLoop()
负责调用消息处理函数。
这三个协程在连接创建并启动时就会各自独立运行。
2.1 ReadLoop 实现细节
for {
select {
case <-cDone: // connection closed
holmes.Debugln("receiving cancel signal from conn")
return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server")
return
default:
msg, err = codec.Decode(rawConn)
if err != nil {
holmes.Errorf("error decoding message %v\n", err)
if _, ok := err.(ErrUndefined); ok {
// update heart beats
setHeartBeatFunc(time.Now().UnixNano())
continue
}
return
}
setHeartBeatFunc(time.Now().UnixNano())
handler := GetHandlerFunc(msg.MessageNumber())
if handler == nil {
if onMessage != nil {
holmes.Infof("message %d call onMessage()\n", msg.MessageNumber())
onMessage(msg, c.(WriteCloser))
} else {
holmes.Warnf("no handler or onMessage() found for message %d\n", msg.MessageNumber())
}
continue
}
handlerCh <- MessageHandler{msg, handler}
}
}
在Readloop 循环中通过codec
来读取网络rawConn
连接中数据包,并且返回的是解析后的数据。
而codec
使用的解析函数是在服务启动的时候注册的,注册的还有该类型数据的执行函数,以消息类型为key
保存在message.go
包中。
解析成功后再获取该消息的执行函数,二者封装成MessageHandler
发送到handlerCh
中。供HandleLoop
循环执行。
2.2 HandleLoop 实现细节
for {
select {
case <-cDone: // connectin closed
holmes.Debugln("receiving cancel signal from conn")
return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server")
return
case msgHandler := <-handlerCh:
msg, handler := msgHandler.message, msgHandler.handler
if handler != nil {
if askForWorker {
err = WorkerPoolInstance().Put(netID, func() {
handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
})
if err != nil {
holmes.Errorln(err)
}
addTotalHandle()
} else {
handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
}
}
case timeout := <-timerCh:
if timeout != nil {
timeoutNetID := NetIDFromContext(timeout.Ctx)
if timeoutNetID != netID {
holmes.Errorf("timeout net %d, conn net %d, mismatched!\n", timeoutNetID, netID)
}
if askForWorker {
err = WorkerPoolInstance().Put(netID, func() {
timeout.Callback(time.Now(), c.(WriteCloser))
})
if err != nil {
holmes.Errorln(err)
}
} else {
timeout.Callback(time.Now(), c.(WriteCloser))
}
}
}
}
在HandleLoop
循环中,主要监听handlerCh
和timerCh
,一个是消息执行channel
,一个是定时任务到期channel
。
-
handlerCh
处理都是ReadLoop循环中发送过来的数据,通过异步任务池来执行任务。 -
timerCh
处理的该连接下定时任务的执行,也是通过异步任务池来执行任务。
2.2 WriteLoop 实现细节
for {
select {
case <-cDone: // connection closed
holmes.Debugln("receiving cancel signal from conn")
return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server")
return
case pkt = <-sendCh:
if pkt != nil {
if _, err = rawConn.Write(pkt); err != nil {
holmes.Errorf("error writing data %v\n", err)
return
}
}
}
}
func ProcessMessage(ctx context.Context, conn tao.WriteCloser) {
msg := tao.MessageFromContext(ctx).(Message)
holmes.Infof("receving message %s\n", msg.Content)
conn.Write(msg)
}
在WriteLoop
循环中,主要监听sendCh
,它会非阻塞地将sendCh中的消息全部发送完毕再退出,避免漏发消息。
而sendCh
消息的传入是在服务开始的时候message
注册的,在ProcessMessage
中通过Write
异步写入到 sendCh
中。
3. 总结
在Tao
框架中三大循环ReadLoop
、HandleLoop
和WriteLoop
是整个的核心代码,这三个Loop
中是通过channel
来实现数据的传递,而每一个TCP连接都会实现这三个goroutine
。每一个goroutine
都是独立运行。
框架支持通过tao.TLSCredsOption()函数提供传输层安全的TLS Server
而在在我们开发不同的业务中,编写业务代码是在自定义
message
当中。需要实现DeserializeMessage
解析该类型数据包函数和ProcessMessage
只用该消息的函数。在框架中使用
context
联系程序上线文,使得程序能够优雅的退出。Context
的使用在另一篇文章中Golang并发模型。至于在框架中定时器的实现分析在另一篇文章中 Golang-基于TimeingWheel定时器。
4. 感谢
感谢leesper
为开源社区做出的贡献,提供我们学习。
有疑问加站长微信联系(非本文作者)