[Go语言]一种用于网游服务器的支持多路复用的网络协议处理框架

stevewang · · 4694 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

简介:
本文描述了使用Go语言实现的、适应于Go语言并发模型的一种支持多路复用的网络协议处理框架,并提供了框架的代码实现。作者将这种框架用于网络游戏服务器中的协议处理,但也可用于其他领域。

应用背景:
在网络游戏服务器设计中,一般都会遇到协议多路复用的场景。比如登录服务器和玩家客户端之间有1:N的多个TCP连接;登录服务器和游戏服务器之间是1:1的TCP连接。玩家登录游戏的大致流程是这样的:
  1. 玩家连接登录服务器
  2. 登录服务器向数据库请求玩家数据
  3. 登录服务器获取到玩家数据,把玩家数据转发给游戏服务器进行加载包括创建玩家对象等
  4. 登录服务器获取到加载成功回应后,通知玩家客户端可以进入游戏世界
在3和4中,因为登录服务器和游戏服务器通常只有一个TCP连接,所有玩家数据都是通过这个连接进行传输,所以需要从协议包中区分出是哪个玩家的数据。通常这个区分的依据可以是玩家的角色名,但是也可以更通用一些,用一个数字ID来区分,这样就把协议包的分发处理和协议包中与游戏逻辑有关的内容分离开来。

协议说明:
通常网游的网络协议都是报文的形式,即使底层是使用TCP,也会用一些方法把数据拆分成一个个的报文(本文中称为协议包)。因此,本文也基于这一假设,但是对于具体的协议包格式,本文没有特别限制,只是要求协议包中能够容纳一个32字节的ID。
协议包的处理大概可以分为以下两种类型。其他更复杂的会话可以由以下两种类型组合而成。
  1. 发送一个数据包并等待回应。比如登录服务器等待游戏服务器加载玩家数据的结果通知。
  2. 发送一个数据包,不需要回应。比如游戏服务器加载玩家数据后,给登录服务器发送结果通知。

框架说明:
Go语言是一种支持高并发的编程语言,它支持高并发的方式是大量轻量级的goroutine并发执行。在每个goroutine中的操作基本上都是同步阻塞的,这样可以极大地简化程序逻辑,使得代码清晰易读,容易维护。基于这点,本文实现的框架的调用接口也是使用同步方式的。
  1. 如果一个协议包需要等待回应,就在调用函数上阻塞等待。这个调用的签名为:
    func (p *Connection) Query(data []byte) ([]byte, error)
    注意:data的控制权会转交给框架,因此函数调用后不能修改data的内容。
  2. 如果发送一个协议包是对于接收到的某个协议包的回应,则调用:
    func (p *Connection) Reply(query, answer []byte) error
    注意:answer的控制权会转交给框架,因此函数调用后不能修改answer的内容。
  3. 如果一个协议包不需要回应,就直接调用发送函数:
    func (p *Connection) Write(data []byte) error
    注意:data的控制权会转交给框架,因此函数调用后不能修改data的内容。
  4. 调用者需要实现的接口:
  • Socket用于协议包的收发。基本上是net.TCPConn的简单封装,在头部加上一个协议包的长度。
  • DataHandler。用于协议处理,即没有通过Query返回的协议包会分发给此接口处理。
  • ErrorHandler。用于错误处理。当断线时,会调用此接口。
  • IdentityHandler。用于读取和设置会话ID。
 5. 关于goroutine安全的说明:
ErrorHandler和DataHandler的函数实现中不能直接调用(*Connection).Close,否则会导致死锁。

导出类型、函数和接口:

type Connection
func NewConnection(conn Socket, maxcount int, dh DataHandler, ih IdentityHandler, eh ErrorHandler) *Connection
func (p *Connection) Start()
func (p *Connection) Close()
func (p *Connection) Query(data []byte) (res []byte, err error)
func (p *Connection) Reply(query, answer []byte) error
func (p *Connection) Write(data []byte) error

type Socket interface {
	Read() ([]byte, error)
	Write([]byte) error
	Close()
}

type DataHandler interface {
	Process([]byte)
}

type ErrorHandler interface {
	OnError(error)
}

type IdentityHandler interface {
	GetIdentity([]byte) uint32
	SetIdentity([]byte, uint32)
}
完整的代码实现:

package multiplexer

import (
	"errors"
	"sync"
	"sync/atomic"
)

var (
	ERR_EXIT = errors.New("exit")
)

type Socket interface {
	Read() ([]byte, error)
	Write([]byte) error
	Close()
}
type DataHandler interface {
	Process([]byte)
}
type ErrorHandler interface {
	OnError(error)
}
type IdentityHandler interface {
	GetIdentity([]byte) uint32
	SetIdentity([]byte, uint32)
}
type Connection struct {
	conn       Socket
	wg         sync.WaitGroup
	mutex      sync.Mutex
	applicants map[uint32]chan []byte
	chexit     chan bool
	chsend     chan []byte
	chch       chan chan []byte
	dh         DataHandler
	ih         IdentityHandler
	eh         ErrorHandler
	identity   uint32
}

func NewConnection(conn Socket, maxcount int, dh DataHandler, ih IdentityHandler, eh ErrorHandler) *Connection {
	count := maxcount
	if count < 1024 {
		count = 1024
	}
	chch := make(chan chan []byte, count)
	for i := 0; i < count; i++ {
		chch <- make(chan []byte, 1)
	}
	return &Connection{
		conn:       conn,
		applicants: make(map[uint32]chan []byte, count),
		chsend:     make(chan []byte, count),
		chexit:     make(chan bool),
		chch:       chch,
		dh:         dh,
		ih:         ih,
		eh:         eh,
	}
}
func (p *Connection) Start() {
	p.wg.Add(2)
	go func() {
		defer p.wg.Done()
		p.recv()
	}()
	go func() {
		defer p.wg.Done()
		p.send()
	}()
}
func (p *Connection) Close() {
	close(p.chexit)
	p.conn.Close()
	p.wg.Wait()
}
func (p *Connection) Query(data []byte) (res []byte, err error) {
	var ch chan []byte
	select {
	case <-p.chexit:
		return nil, ERR_EXIT
	case ch = <-p.chch:
		defer func() {
			p.chch <- ch
		}()
	}
	id := p.newIdentity()
	p.ih.SetIdentity(data, id)
	p.addApplicant(id, ch)
	defer func() {
		if err != nil {
			p.popApplicant(id)
		}
	}()
	if err := p.Write(data); err != nil {
		return nil, err
	}
	select {
	case <-p.chexit:
		return nil, ERR_EXIT
	case res = <-ch:
		break
	}
	return res, nil
}
func (p *Connection) Reply(query, answer []byte) error {
	// put back the identity attached to the query
	id := p.ih.GetIdentity(query)
	p.ih.SetIdentity(answer, id)
	return p.Write(answer)
}
func (p *Connection) Write(data []byte) error {
	select {
	case <-p.chexit:
		return ERR_EXIT
	case p.chsend <- data:
		break
	}
	return nil
}
func (p *Connection) send() {
	for {
		select {
		case <-p.chexit:
			return
		case data := <-p.chsend:
			if p.conn.Write(data) != nil {
				return
			}
		}
	}
}
func (p *Connection) recv() (err error) {
	defer func() {
		if err != nil {
			select {
			case <-p.chexit:
				err = nil
			default:
				p.eh.OnError(err)
			}
		}
	}()
	for {
		select {
		case <-p.chexit:
			return nil
		default:
			break
		}
		data, err := p.conn.Read()
		if err != nil {
			return err
		}
		if id := p.ih.GetIdentity(data); id > 0 {
			ch, ok := p.popApplicant(id)
			if ok {
				ch <- data
				continue
			}
		}
		p.dh.Process(data)
	}
	return nil
}
func (p *Connection) newIdentity() uint32 {
	return atomic.AddUint32(&p.identity, 1)
}
func (p *Connection) addApplicant(identity uint32, ch chan []byte) {
	p.mutex.Lock()
	defer p.mutex.Unlock()
	p.applicants[identity] = ch
}
func (p *Connection) popApplicant(identity uint32) (chan []byte, bool) {
	p.mutex.Lock()
	defer p.mutex.Unlock()
	ch, ok := p.applicants[identity]
	if !ok {
		return nil, false
	}
	delete(p.applicants, identity)
	return ch, true
}


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

4694 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传