在Golang 游戏leaf系列(三) NewAgent在chanrpc和skeleton中怎么通讯(下文简称系列三)中,主要是分析了Server结构体,并没有涉及Client。本文将深入分析剩余部分。
//chanrpc.go
type RetInfo struct {
ret interface{}
err error
cb interface{}
}
type CallInfo struct {
f interface{}
args []interface{}
chanRet chan *RetInfo
cb interface{}
}
type Server struct {
functions map[interface{}]interface{}
ChanCall chan *CallInfo
}
type Client struct {
s *Server
chanSyncRet chan *RetInfo
ChanAsynRet chan *RetInfo
pendingAsynCall int
}
看Client里的属性,有同步调用chan,异步调用chan,还有一个异步记数器。然后就是一些方法:
func (s *Server) Open(l int) *Client {
c := NewClient(l)
c.Attach(s)
return c
}
func NewClient(l int) *Client {
c := new(Client)
c.chanSyncRet = make(chan *RetInfo, 1)
c.ChanAsynRet = make(chan *RetInfo, l)
return c
}
func (c *Client) Attach(s *Server) {
c.s = s
}
也就是Open方法传入的参数,决定了这两个通道的长度。
一、结合example_test.go来看一下使用方式
1.支持的参数类型
在源码中,有很多f0,f1,fn,相应的也有call0,call1,calln这些写法。实际上,针对的是不同参数类型:
//参数为[]interface,无返回值
func([]interface{})
//参数为[]interface,返回值为interface
func([]interface{}) interface{}
//参数为[]interface,返回值为[]interface
func([]interface{}) []interface{}
2.注册
s := chanrpc.NewServer(10)
var wg sync.WaitGroup
wg.Add(1)
// goroutine 1
go func() {
s.Register("f0", func(args []interface{}) {
})
s.Register("f1", func(args []interface{}) interface{} {
return 1
})
s.Register("fn", func(args []interface{}) []interface{} {
return []interface{}{1, 2, 3}
})
s.Register("add", func(args []interface{}) interface{} {
n1 := args[0].(int)
n2 := args[1].(int)
return n1 + n2
})
wg.Done()
for {
s.Exec(<-s.ChanCall)
}
}()
wg.Wait()
wg.Add(1)
“add”那个,也算是一个Call1类型的,然后s.Exec(<-s.ChanCall)
就相当于在等着ChanCall来数据了。
3.call系列
// goroutine 2
go func() {
c := s.Open(10)
// sync
err := c.Call0("f0")
if err != nil {
fmt.Println(err)
}
r1, err := c.Call1("f1")
if err != nil {
fmt.Println(err)
} else {
fmt.Println(r1)
}
rn, err := c.CallN("fn")
if err != nil {
fmt.Println(err)
} else {
fmt.Println(rn[0], rn[1], rn[2])
}
ra, err := c.Call1("add", 1, 2)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(ra)
}
// asyn
c.AsynCall("f0", func(err error) {
if err != nil {
fmt.Println(err)
}
})
c.AsynCall("f1", func(ret interface{}, err error) {
if err != nil {
fmt.Println(err)
} else {
fmt.Println(ret)
}
})
c.AsynCall("fn", func(ret []interface{}, err error) {
if err != nil {
fmt.Println(err)
} else {
fmt.Println(ret[0], ret[1], ret[2])
}
})
c.AsynCall("add", 1, 2, func(ret interface{}, err error) {
if err != nil {
fmt.Println(err)
} else {
fmt.Println(ret)
}
})
c.Cb(<-c.ChanAsynRet)
c.Cb(<-c.ChanAsynRet)
c.Cb(<-c.ChanAsynRet)
c.Cb(<-c.ChanAsynRet)
// go
s.Go("f0")
wg.Done()
}()
wg.Wait()
二、call系列
func (c *Client) Call0(id interface{}, args ...interface{}) error {
f, err := c.f(id, 0)
if err != nil {
return err
}
err = c.call(&CallInfo{
f: f,
args: args,
chanRet: c.chanSyncRet,
}, true)
if err != nil {
return err
}
ri := <-c.chanSyncRet
return ri.err
}
c.f(id,0)这个在call系列里都有,主要工作就是确认通过Register注册的function,参数的数量和Call系列是否一致。
然后就是真的要去执行了,这时候对比发现,call系列的执行都是一样的:
err = c.call(&CallInfo{
f: f,
args: args,
chanRet: c.chanSyncRet,
}, true)
func (c *Client) call(ci *CallInfo, block bool) (err error) {
defer func() {
if r := recover(); r != nil {
err = r.(error)
}
}()
if block {
c.s.ChanCall <- ci
} else {
select {
case c.s.ChanCall <- ci:
default:
err = errors.New("chanrpc channel full")
}
}
return
}
看一下CallInfo
type CallInfo struct {
f interface{}
args []interface{}
chanRet chan *RetInfo
cb interface{}
}
发现call系列没有给cb,这说明它们是不需要回调的。然后block传的全是true,说明call系列会直接向ChanCall写入callinfo。
example_test.go里call的太多了,可以先尝试运行其中一个
ra, err := c.Call1("add", 1, 2)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(ra)
}
输出的是3,上面一直没提返回值的事情,就Call1来看,ra返回的正是结果3。其实看源码,是通过ri := <-c.chanSyncRet
拿到这个值的。也就是说,s.Exec(<-s.ChanCall)
在拿到callinfo后,会写到chanSyncRet里。
在exec里有这样的代码:
// execute
switch ci.f.(type) {
case func([]interface{}):
ci.f.(func([]interface{}))(ci.args)
return s.ret(ci, &RetInfo{})
case func([]interface{}) interface{}:
ret := ci.f.(func([]interface{}) interface{})(ci.args)
return s.ret(ci, &RetInfo{ret: ret})
case func([]interface{}) []interface{}:
ret := ci.f.(func([]interface{}) []interface{})(ci.args)
return s.ret(ci, &RetInfo{ret: ret})
}
ret方法是这样的:
func (s *Server) ret(ci *CallInfo, ri *RetInfo) (err error) {
if ci.chanRet == nil {
return
}
defer func() {
if r := recover(); r != nil {
err = r.(error)
}
}()
ri.cb = ci.cb
ci.chanRet <- ri
return
}
三、异步AsynCall
现在来看一下异步调用AsynCall
c.AsynCall("add", 1, 2, func(ret interface{}, err error) {
if err != nil {
fmt.Println(err)
} else {
fmt.Println(ret)
}
})
这里在参数的最末尾,传入一个回调function,然后流程和同步类似
func (c *Client) asynCall(id interface{}, args []interface{}, cb interface{}, n int) {
f, err := c.f(id, n)
if err != nil {
c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
return
}
err = c.call(&CallInfo{
f: f,
args: args,
chanRet: c.ChanAsynRet,
cb: cb,
}, false)
if err != nil {
c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
return
}
}
只不过black参数是false的。
if block {
c.s.ChanCall <- ci
} else {
select {
case c.s.ChanCall <- ci:
default:
err = errors.New("chanrpc channel full")
}
}
再来对比一下,如果s.ChanCall读取太慢,已经写不进去了,在同步模式下,这个callinfo会一直阻塞等在那里;而异步模式,会走到default分支,也就是报个错。然后异步模式会把结果都存到ChanAsynRet里
if err != nil {
c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
return
}
当然如果存不下,还在调用AsynCall,它会直接在AsynCall里执行回调
...
// too many calls
if c.pendingAsynCall >= cap(c.ChanAsynRet) {
execCb(&RetInfo{err: errors.New("too many calls"), cb: cb})
return
}
c.asynCall(id, args, cb, n)
c.pendingAsynCall++
...
func execCb(ri *RetInfo) {
defer func() {
if r := recover(); r != nil {
if conf.LenStackBuf > 0 {
buf := make([]byte, conf.LenStackBuf)
l := runtime.Stack(buf, false)
log.Error("%v: %s", r, buf[:l])
} else {
log.Error("%v", r)
}
}
}()
// execute
switch ri.cb.(type) {
case func(error):
ri.cb.(func(error))(ri.err)
case func(interface{}, error):
ri.cb.(func(interface{}, error))(ri.ret, ri.err)
case func([]interface{}, error):
ri.cb.(func([]interface{}, error))(assert(ri.ret), ri.err)
default:
panic("bug")
}
return
}
func (c *Client) Cb(ri *RetInfo) {
c.pendingAsynCall--
execCb(ri)
}
也就是说,异步模式会把结果存起来。什么时候执行Cb呢,别忘了skeleton.Run啊
// leaf\module\skeleton.go
func (s *Skeleton) Run(closeSig chan bool) {
for {
select {
case <-closeSig:
s.commandServer.Close()
s.server.Close()
for !s.g.Idle() || !s.client.Idle() {
s.g.Close()
s.client.Close()
}
return
case ri := <-s.client.ChanAsynRet:
s.client.Cb(ri)
// 等待来自通道的数据
case ci := <-s.server.ChanCall:
s.server.Exec(ci)
case ci := <-s.commandServer.ChanCall:
s.commandServer.Exec(ci)
case cb := <-s.g.ChanCb:
s.g.Cb(cb)
case t := <-s.dispatcher.ChanTimer:
t.Cb()
}
}
}
四、总结
ChanRPC 的调用方有 3 种调用模式:
- 同步模式,调用并等待 ChanRPC 返回
- 异步模式,调用并提供回调函数,回调函数会在 ChanRPC 返回后被调用
- Go 模式,调用并立即返回,忽略任何返回值和错误
看下来,还是Go模式最简单,没有回调,也没有ret返回信息。而同步模式和异步模式,暂时还没有看到使用示例。
有疑问加站长微信联系(非本文作者)