go语言完善的基础设施为编写网络程序提供了极大的便利.只需要少量代码就可以编写一个高性能,稳定的异步网络程序.
本文介绍一个迷你的,基于事件回调的异步网络库.
首先简单介绍一下并发模型.
go提供了基于goroutine的同步网络接口,所以对每个网络连接可以创建一个单独的goroutine用于接收网络数据.这个goroutine是执行一个死循环,不断的recv数据,解包然后将完整的逻辑包发送到一个每连接唯一的chan中,供逻辑消费.
除了网络接收goroutine之外,每个连接还有一个专门的处理逻辑消息的goroutine,它的工作就是不断的从关联的chan中提取逻辑包并处理.
与之前的chuck-lua类似,为了让使用者可以方便定制自己的包结构,我提供了packet和decoder的抽象.
packet.go
package packet
const (
RAWPACKET = 1
RPACKET = 2
WPACKET = 3
EPACKET = 4
)
type Packet interface{
MakeWrite()(*Packet)
MakeRead() (*Packet)
Clone() (*Packet)
PkLen() (uint32)
DataLen() (uint32)
Buffer() (*ByteBuffer)
GetType() (byte)
}
decoder.go
package packet
import(
"net"
"encoding/binary"
"fmt"
"io"
)
var (
ErrPacketTooLarge = fmt.Errorf("Packet too Large")
ErrEOF = fmt.Errorf("Eof")
)
type Decoder interface{
DoRecv(Conn net.Conn)(Packet,error)
}
type RPacketDecoder struct{
maxpacket uint32
}
func NewRPacketDecoder(maxpacket uint32)(RPacketDecoder){
return RPacketDecoder{maxpacket:maxpacket}
}
func (this RPacketDecoder)DoRecv(Conn net.Conn)(Packet,error){
header := make([]byte,4)
n, err := io.ReadFull(Conn, header)
if n == 0 && err == io.EOF {
return nil,ErrEOF
}else if err != nil {
return nil,err
}
size := binary.LittleEndian.Uint32(header)
if size > this.maxpacket {
return nil,ErrPacketTooLarge
}
buf := make([]byte,size+4)
copy(buf[:],header[:])
n, err = io.ReadFull(Conn,buf[4:])
if n == 0 && err == io.EOF {
return nil,ErrEOF
}else if err != nil {
return nil,err
}
return NewRPacket(NewBufferByBytes(buf,(uint32)(len(buf)))),nil
}
type RawDecoder struct{
}
func NewRawDecoder()(RawDecoder){
return RawDecoder{}
}
func (this RawDecoder)DoRecv(Conn net.Conn)(Packet,error){
buff := make([]byte,4096)
n,err := Conn.Read(buff)
if n == 0 && err == io.EOF {
return nil,ErrEOF
}else if err != nil {
return nil,err
}
return NewRawPacket(NewBufferByBytes(buff,(uint32)(n))),nil
}
内置了2种包的类型,分别是rawpacket,rpacket/wpacket.
rawpacket其实就是原始二进制数据流,没有区分逻辑界限.
rpacket/wpacket则提供了一种4字节包头,的二进制流包结构.
eventpacket则作为内部使用,目前用来向逻辑通告连接关闭,错误等事件.
下面就是整个网络库的核心部分tcpsession,它提供了对tcp连接的处理.
package tcpsession
import(
"net"
packet "kendynet-go/packet"
"fmt"
)
var (
ErrUnPackError = fmt.Errorf("TcpSession: UnpackError")
ErrSendClose = fmt.Errorf("send close")
ErrSocketClose = fmt.Errorf("socket close")
)
type Tcpsession struct{
Conn net.Conn
Packet_que chan packet.Packet
decoder packet.Decoder
socket_close bool
ud interface{}
}
func (this *Tcpsession) SetUd(ud interface{}){
this.ud = ud
}
func (this *Tcpsession) Ud()(interface{}){
return this.ud
}
func dorecv(session *Tcpsession){
for{
p,err := session.decoder.DoRecv(session.Conn)
if session.socket_close{
break
}
if err != nil {
session.Packet_que <- packet.NewEventPacket(err)
break
}
session.Packet_que <- p
}
close(session.Packet_que)
}
func ProcessSession(tcpsession *Tcpsession,decoder packet.Decoder,
process_packet func (*Tcpsession,packet.Packet,error))(error){
if tcpsession.socket_close{
return ErrSocketClose
}
tcpsession.decoder = decoder
go dorecv(tcpsession)
for{
msg,ok := <- tcpsession.Packet_que
if !ok {
//log error
return nil
}
if packet.EPACKET == msg.GetType(){
process_packet(tcpsession,nil,msg.(packet.EventPacket).GetError())
}else{
process_packet(tcpsession,msg,nil)
}
if tcpsession.socket_close{
return nil
}
}
}
func NewTcpSession(conn net.Conn)(*Tcpsession){
session := new(Tcpsession)
session.Conn = conn
session.Packet_que = make(chan packet.Packet,1024)
session.socket_close = false
return session
}
func (this *Tcpsession)Send(wpk packet.Packet)(error){
if this.socket_close{
return ErrSocketClose
}
idx := (uint32)(0)
for{
buff := wpk.Buffer().Bytes()
end := wpk.PkLen()
n,err := this.Conn.Write(buff[idx:end])
if err != nil || n < 0 {
return ErrSendClose
}
idx += (uint32)(n)
if idx >= (uint32)(end){
break
}
}
return nil
}
func (this *Tcpsession)Close(){
if this.socket_close{
return
}
this.socket_close = true
this.Conn.Close()
}
代码十分简短只有一百行出头点,这里关键地方时,dorecv,Send和ProcessSession三个函数.
dorecv所做的就是不断调用decoder.DoRecv从网络中提取网络包,然后将其写入到Packet_que中.
Send函数则保证需要发送的数据被写入到内湖缓冲或出错才会返回.
ProcessSession则是整个库的核心所在,接收到新连接之后,用新建连接作为参数调用ProcessSession,当网络包到达或出错时将会回调使用者提供的回调函数.从实现看它只是简单的创建一个goroutine执行dorecv,然后在一个for循环中不断读取到达的网络包然后调用回调函数.
下面是一个使用示例,更多的示例请参考:https://github.com/sniperHW/kendynet-go
package main
import(
"net"
tcpsession "kendynet-go/tcpsession"
packet "kendynet-go/packet"
"fmt"
)
func main(){
service := ":8010"
tcpAddr,err := net.ResolveTCPAddr("tcp4", service)
if err != nil{
fmt.Printf("ResolveTCPAddr")
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil{
fmt.Printf("ListenTCP")
}
for {
conn, err := listener.Accept()
if err != nil {
continue
}
session := tcpsession.NewTcpSession(conn)
fmt.Printf("a client comming\n")
go tcpsession.ProcessSession(session,packet.NewRawDecoder(),
func (session *tcpsession.Tcpsession,rpk packet.Packet,errno error){
if rpk == nil{
fmt.Printf("%s\n",errno)
session.Close()
return
}
session.Send(rpk)
})
}
}
有疑问加站长微信联系(非本文作者)