- 实现支持异步和并发的高性能客户端
Call
- 使用Call封装通道来实现异步通知
- 封装结构体Call用于承载一次客户端RPC调用所需信息
Golang结束channel和WaitGroup可以实现非常特殊且有别于传统函数回调的结果返回方式,常用于相对耗时运算的结果获取。核心思想是利用延时信号通知来返回,可称为Call回调或完成通知模式。
Call是一个公共可访问的结构体定义,用于包装用户请求与结果,通过内部额外地加入的channel封装实现异步结果的返回。对于使用者而言,只需要知道Call的channel属性,以及框架定义的异步回调方法。
Call的基本定义是对外部使用者的请求、返回、异步使用进行封装。
$ vim codec/call.go
package codec
//Call 承载一次客户端RPC调用所需的消息
type Call struct{
Seq uint64//请求调用序列号
ServiceName string //远程服务名称.方法名称
Request interface{} //服务请求参数
Reply interface{}//服务返回值
Error error//错误消息
Done chan *Call//支持异步调用,当调用结束时会调用call.done()方法通知调用方。
}
//done 当调用结束时会调用call.done()来通知调用方
func (c *Call) done(){
select{
case c.Done <- c:
default:
}
}
参数 | 类型 | 描述 |
---|---|---|
Seq | uint64 | 调用请求序列号,每个请求唯一且自增。 |
ServiceName | string | 远程服务,格式为<服务名称>.<方法名称> |
Request | interface{} | RPC客户端请求参数 |
Reply | interfac{} | RPC服务端返回参数 |
Error | error | 错误消息 |
Done | chan *Call | 异步回调,当调用结束时需调用done()方法来通知调用方。 |
- Done用于结果返回时,指向自己的指针。
- Done用于结果返回时使用消息通知,使用者必须依靠这个来获取真正的结果。
Done具体使用时,通过框架或平台提供的方法先获取call实例。
//Go 异步调用函数 返回Call实例
func (c *Client) Go(serviceName string, request, reply interface{}, done chan *Call) *Call
框架定义的异步调用方法,最后一个参数为chan,能够为nil。
call := Go(serviceName, request, reply, nil)
获取异步执行结果,此处会发生阻塞。
result := <- call.Done
Call
是对Go
的封装,会阻塞call.Done
,等待响应返回,是一个同步接口。
//Call 同步调用具名函数 会阻塞call.Done等待响应返回
func (c *Client) Call(serviceName string, request, reply interface{}) error{
call := <- c.Go(serviceName, request, reply, make(chan *Call, 1)).Done
return call.Error
}
测试
$ vim test/call_test.go
package test
import (
"fmt"
"log"
"testing"
"time"
)
//Call 承载一次客户端RPC调用所需的消息
type Call struct{
Seq uint64//请求调用序列号
ServiceName string //远程服务名称.方法名称
Request interface{} //服务请求参数
Reply interface{}//服务返回参数
Error error//错误消息
Done chan *Call//支持异步调用,当调用结束时会调用call.done()方法通知调用方。
}
//done 当调用结束时会调用call.done()来通知调用方
func (c *Call) done(){
select{
case c.Done <- c:
default:
}
}
//业务代码 耗时运算 异步处理 存在读写竞争
func Cal(call *Call){
//假定运算一次需要1秒
time.Sleep(time.Second)
result := call.Request.(int) * 5
call.Reply = &result
call.done()
}
func Do(req int, reply *int, done chan *Call) *Call{
//判断结束结果
if done == nil{
done = make(chan *Call, 0)
}else{
if cap(done) == 0{
log.Println("chan容量为0,无法返回结果,退出此次计算。")
return nil
}
}
//创建调用
call := &Call{Request:req, Reply:reply, Done:done}
//业务代码 耗时运算 异步处理
go Cal(call)
return call
}
func TestCall(t *testing.T){
for i:=0; i<100; i++{
var reply *int
call := Do(i, reply, nil)//获取了call,但此时call.Reply还不是运算结果
//计算前结果
fmt.Printf("i = %d, call.Reply = %v\n", i, call.Reply.(*int))
//等待结束通知,此时call.Reply会发生变化。
result := <- call.Done
fmt.Printf("i = %v, call.Reply = %v, result = %v\n", i, *(call.Reply.(*int)), *(result.Reply.(*int)))
}
}
$ go test call_test.go -v -run TestCall
WaitGroup
对于耗时的操作只要方法相同、输入参数相同,则计算的结果一定是相同的,也就是意味着是重复的操作。如何才能够避免相同的多次操作呢?可采用包装WaitGroup来实现。
type Call struct{
WG sync.WaitGroup
Error error
Value interface{}
}
type Group struct{
mutex sync.Mutex
calls map[string]*Call
}
func (g *Group) Do(key string, fn func()(interface{}, error)) (interface{}, error){
g.mutex.Lock()
//创建时初始化以提高性能
if g.calls == nil{
g.calls = make(map[string]*Call)
}
//判断参数是否存在
if call,ok := g.calls[key]; ok{
g.mutex.Unlock()
call.WG.Wait()
return call.Value, call.Error
}else{
//执行回调
call := new(Call)
call.WG.Add(1)
g.calls[key] = call
g.mutex.Unlock()
//获取计算结果
call.Value, call.Error = fn()
call.WG.Done()
//删除键
g.mutex.Lock()
delete(g.calls, key)
g.mutex.Unlock()
//返回结果
return call.Value, call.Error
}
}
有疑问加站长微信联系(非本文作者)