一般提到回调,第一反映就是函数回调,太熟悉了。在学习golang的过程中,通过阅读相关源代码,发现golang结合channel和WaitGroup,会有非常特殊的有别于函数回调的结果返回方式,常用于相对耗时运算的结果获取。其核心思路就是利用延时信号通知,来返回。因为暂时没查到中文的定义,暂时叫做Call回调,或者叫完成通知模式。
通道组合
Call就是一个公共的可访问的结构体定义,用于包装用户请求与结果,然后通过内部额外加入的channel封装实现异步结果的返回。对使用者而言,需要知道Call的channel属性,以及框架定义的异步调用方法。
//Call定义
type Call struct {
Request interface{}
Reply interface{}
Done chan *Call //用于结果返回时,消息通知,使用者必须依靠这个来获取真正的结果。
}
//...
//具体使用的时候,通过框架或平台提供的方法先获取到call实例
//方法定义通常为:foo(req ,reply interface{},done chan *Call)*call
call:=foo(req,reply,nil)//框架定义的异步调用方法,最后一个参数为chan,能够为nil
//do something
reply<-call.Done//此处阻塞,等待foo的执行结果
//do another thing
完整示例代码可以查看go语言基本的net\rpc\client.go文件。由于代码中业务比较多,这里给出一个应用的代码抽象:
package main
import (
"fmt"
"time"
)
// Call的基本定义,对外部使用者的请求、返回以及异步使用进行封装。
type Call struct {
Request interface{}
Reply interface{}
Done chan *Call //用于结果返回时,指向自己的指针
}
// 非常重要的异步调用结果返回,供框架内部使用。
func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
// 阻塞情况处理,这里忽略
}
}
func main(){
for i:=0;i<100;i++{
var reply *int
call:=GO(i,reply,nil) //获取到了call,但此时call.Reply还不是运算结果
//先打印结果还没有计算出来的情况
fmt.Printf("i=%d,运算前:call.Reply=%v \n",i,call.Reply.(*int))
result:=<-call.Done //等待Done的通知,此时call.Reply发生了变化。
fmt.Printf("i=%d,运算后:call.Reply=%v,result=%+v \n",i,*(call.Reply.(*int)),*(result.Reply.(*int)))
}
}
// 供业务调用的异步计算函数封装,用户只需要了解对应参数。
func GO(req int,reply *int,done chan *Call)*Call{
if done==nil{
done=make(chan *Call,10)
}else{
if cap(done)==0{
fmt.Println("chan容量为0,无法返回结果,退出此次计算!")
return nil
}
}
call:=&Call{
Request:req,
Reply:reply,
Done:done,
}
//调用一个可能比较耗时的计算,注意用"go"
go caculate(call)
return call
}
//真正的业务处理代码
//简单示意,其实存在读写竞争。run -race 就会出现提示
func caculate(call *Call){
//假定运算一次需要耗时1秒
time.Sleep(time.Second)
tmp:=call.Request.(int)*5
call.Reply=&tmp
call.done()
}
由此可以看出,适合于运算需要一定时间,但可以在等待过程中,处理其他业务的情况。
wg组合
前面提到了Call封装通道,来实现异步。那么,对于耗时的操作能否避免相同多次操作(方法相同、输入参数相同、结果肯定相同)的重复性呢?这时候,可以考虑采用包装WaitGroup来实现。
在github上,golang的groupcache有一个这样的模块封装,可以直接拿来使用,这里copy如下:
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized,如果要通用,此处string->inteface{}
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
// 性能要高点,可以创建时初始化
if g.m == nil {
g.m = make(map[string]*call)
}
// 输入参数是否已经存在,存在就等待对应结果,不用计算。
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
// 不存在,则开始执行fn
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
//获取到计算结果
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}
来尝试调用一下:
func main() {
req := "执行请求"
g := new(Group)
// 借助之前文章提及的waitgroup包装函数,不用也行,运行时加入 '-race'
var wg waitgroup.WaitGroupWrapper
for i := 0; i < 1000; i++ {
wg.Wrap(func() {
j, _ := g.Do(req, NeedSecondTime)
fmt.Println("NeedSecondTime被调用了=j", j)
})
}
wg.Wait()
}
// 每执行一次,都需要1秒
var counter=0
func NeedSecondTime()(interface{},error){
time.Sleep(time.Second)
counter++
return counter,nil
}
毫无疑问,结果是1,NeedSecondTime只执行了一次。