用Go实现的简易TCP通信框架

concurrency · · 15365 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

接触到GO之后,GO的网络支持非常令人喜欢。GO实现了在语法层面上可以保持同步语义,但是却又没有牺牲太多性能,底层一样使用了IO路径复用,比如在LINUX下用了EPOLL,在WINDOWS下用了IOCP。

但是在开发服务端程序的时候,很多都是被动触发的,都是客户端发送来的请求需要处理。天生就是一个event-based的程序。而在GO下,因为并发是作为语言的一部分,goroutine, channel等特性则很容易的使程序员在实现功能时从容的在同步与异步之间进行转换。

因为自己的需要,我针对event-based场景的服务端做了简易的封装。具体代码见这里.

设计原则

因为GO的IO机制和并发原语的原生支持,再加上对网络API的封装,程序员可以简单的实现一个高效的服务端或者客户端程序。一般的实现就是调用net.Listen(“tcp4”, address)得到一个net.Listener,然后无限循环调用net.Listener.Accept,之后就可以得到一个net.Conn,可以调用net.Conn的接口设置发送和接收缓冲区大小,可以设置KEEPALIVE等。因为TCP的双工特性,所以可以针对一个net.Conn可以专门启动一个goroutine去无限循环接收对端发来的数据,然后解包等。

我的想法是在这个简单实现的基础上做一层薄薄的封装,使其尽量的精简,但是又不失灵活。希望能够适应不同的协议,对使用者造成尽量小的约束。

Session对象

该对象就是对net.Conn的一个简易封装,可以通过swnet.Server.AcceptLoop得到,也可以通过swnet.NewSession创建新的对象,这种一般是客户端情境下使用。得到Session对象后,可以调用Start方法开始工作。之所以还暴露出一个方法叫Start是因为在服务端下,可能会有某些需求,比如针对IP设置了ACL,那么,把Start行为交给使用者决定如何调用。但是这里需要注意的是,如果使用者不想Start,使用者有责任自己Close掉,否则会造成资源泄露。

Start后,会启动两个goroutine,一个用于专门接收对端发来的数据,一个专门用来发送数据到对端。想发送数据到对端,可以用AsyncSend方法,该方法会把要发送的数据排队到发送通道。这里使用通道的原因是因为在服务端情境下,有必要对发送的数据进行排队,防止发送很快,但是对端接收很慢,或者过多的调用AsyncSend方法,导致堆积了太多的数据,增加了内存的压力。通过channel来控制发送速率我认为是比较合理的。同时,还提供了方法可以用来修改channel的长度,一是调用NewSession时传入指定大小,二是调用Session.SetSendChannelSize设置大小,但是要注意的是,调用此方法时必须在Start之前完成,否则会产生错误。这样做的原因也是因为没必要动态更改发送通道大小。

如果发送channel满了,AsyncSend方法会返回ErrSendChanBlocking。增加这个错误类型也是因为上面的设计导致的。不返回这个错误,就没有办法让使用者得到处理该问题的机会。使用者如果拿到该错误,可以自己试着分析问题的原因,或者可以尝试循环发送,或者直接丢弃该次的发送数据。总之能够让使用者得到自己处理的机会。

如果Session对象已经Close了,那么调用AsyncSend会返回ErrStoped错误。除此之外,因为AsyncSend是把数据排队到发送channel中,那么使用者有责任确保发送的数据在发送完成前不会修改。

如果数据发送失败,或者其他原因,我的实现是直接粗暴的Close掉该Session。

还有就是,可能有些用例情景下,会发送比较大的数据包,比如64K大小,或者32K大小的数据等,未了避免反复申请内存,特此为Session增加了SetSendCallback方法。可以设置一个回调函数,用于在发送完成后可以调用该回调,给予使用者回收数据对象的机会,比如可以配合sync.Pool使用。虽然我自己测试时并没有太大的效果。

为了方便使用者设置一些net.Conn参数,增加了一个RawConn方法,可以获取到net.Conn 的实例。这里其实是挺纠结的。因为暴露出这个内部资源后,会给予使用者一个非常大的灵活度。它可以直接绕过Session的发送channel,自己玩自己的。不过出于方便使用者使用的目的,我还是这么做了。使用者自己承担相应的责任。其实这里还可以像net.HTTP那样增加一个Hijack方法,让使用者自己接管net.Conn,自己玩自己的。

Session中的很多SET/GET方法都是没有加锁的。一方面是因为很多操作在Start前一次完成,或者是GET的数据不是那么紧密的。

有些时候,如果一个Session被关闭了,可能需要知道这个行为。所以提供了SetCloseCallback方法,可以设置该方法。不设置也没有关系。调用closeCallback时会确保只调用一次。

协议序列化抽象

因为目标之一就是能够隔离具体协议格式。所以对协议做了抽象。只需要实现PacketProtocol接口即可:

// PacketReader is used to unmarshal a complete packet from buff
type PacketReader interface {
    // Read data from conn and build a complete packet.
    // How to read from conn is up to you. You can set read timeout or other option.
    // If buff's capacity is small, you can make a new buff, then return it,
    // so can reuse to reduce memory overhead.
    ReadPacket(conn net.Conn, buff []byte) (interface{}, []byte, error)
}

// PacketWriter is used to marshal packet into buff
type PacketWriter interface {
    // Build a complete packet. If buff's capacity is too small,  you can make a new one
    // and return it to reuse.
    BuildPacket(packet interface{}, buff []byte) ([]byte, error)

    // How to write data to conn is up to you. So you can set write timeout or other option.
    WritePacket(conn net.Conn, buff []byte) error
}

// PacketProtocol just a composite interface
type PacketProtocol interface {
    PacketReader
    PacketWriter
}

也就是实现PacketReader/PacketWriter两个接口。为了让内存尽量的复用,减少内存压力,所以在ReadPacket方法和BuildPacket方法的返回值中需要返回一个切片。框架会在第一次调用时传入一个默认大小的切片到这两个方法中,如果容量不够,使用者可以自己重新建立切片,然后写入数据后返回该切片。下一次再实用时就使用这个返回出来的切片。

其中ReadPacket方法是在一个专门用于接收数据的goroutine中调用。实现者可以自己根据自己的策略进行读取,因为传入了net.Conn,所以使用者可以自己设置I/O Timeout。实现者有责任返回一个完整的请求包。如果中间出了错误,有必要返回一个error。当发现有error后,会关闭该Session。这样做的原因是当读取或者构建一个请求包失败时,可能是数据错误,可能是链路错误,或者其他原因,总之,个人认为这种情况下没有必要继续处理,直接关闭链接。而且这里还有一个需要注意的事项,返回出来的请求包中的数据如果有包含切片类型的数据,建议重新分配一个切片,然后从buff中拷贝进去,尽量不要对buff切片做复用,否则可能会产生额外的BUG。

BuildPacket方法是在一个专门处理发送的goroutine中调用。当发送goroutine收到数据包后,会调用BuildPacket,实现者就可以按照自己的私有格式进行序列化。同样的,buff不够,就自己重新构造一个buff,然后填充数据,并返回这个buff。

WritePacket是给予实现者自己个性化发送的需求。可能实现者需要设置I/O Timeout.

请求包路由

基于event-based的实现,总是少不了要做的事情就是把一个请求包转发到对应的处理函数中。但是具体怎么转,怎么做是取决于具体的用例情景和实现的。所以我这里做的非常简单,就是定义了一个PacketHandler接口:

// PacketHandler is used to process packet that recved from remote session
type PacketHandler interface {
    // When got a valid packet from PacketReader, you can dispatch it.
    Handle(s *Session, packet interface{})
}

使用者自己实现对应的Handle方法即可。当接收数据的goroutine收到对端发来的数据并调用PacketReader.ReadPacket后,会调用Handle方法 ,传入该Session实例与请求包。传入Session的目的是方便使用者不用去维护一个Session的实例。因为有的程序员要实现的逻辑可能比较简单,他仅仅用Session就满足了他的需求,他只需要实现对应的处理函数就好了。处理完成后,就调用Session.AsyncSend发送回应包。

这里其实可以提供一个简单的默认版本的实现的。但是考虑到协议的不同,那么就导致调度的key的不同,所以还是让使用者自己发挥吧。

使用者其实在这里有很大的自由度,他可以做基于map关系的回调分发逻辑,也可以做一个简单的实现逻辑,然后通过type assert做相应的实现。具体也是看各自的口味而定。我是比较喜欢后者,可以减少很多的Register,实现出Actor Model + Pattern Match味道的东西。

Server对象

这里还要说一下对服务端的一个简易封装。Server的实现非常简单,就是反复的去Accept,然后构造一个Session,之后就是调用用户传入的回调函数,就完活了。使用者可以自己传入net.Listener,可以传入PacketProtocol, PacketHandler以及SendChanSize。这些参数会在构造Session时传入进去,可以减少重复的代码实现。Server.AcceptLoop不会关闭构造出来的Session,使用者负责完成这件事情!

缺点

整体非常简陋,只是搭了一个模制。在我自己未公开的代码里,其实是实现了我所在公司的协议,实现了PacketProtocol。为此还专门写了个代码生成器。

还有就是NewServer需要传入一个net.Listener,比较蛋疼。后面再决定是否干掉。NewSession需要传入net.Conn,其实是妥协的产物,因为net.Listener返回的就是net.Conn,这个实例需要交给Session使用,不得已而为之,但是这里囧的是,客户端使用的时候,需要自己去net.Dial,得到一个net.Conn,也许该提供一个swnet.Dial方法。

总结

我这个发布的代码是在原有的代码基础上进行了修改,从达达的https://github.com/funny/link中得到了一些启发,但是又有很多的不同。再次感谢达达的贡献。


有疑问加站长微信联系(非本文作者)

本文来自:博客园

感谢作者:concurrency

查看原文:用Go实现的简易TCP通信框架

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

15365 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传