Go Call

JunChow520 · · 88 次点击 · · 开始浏览    

  • 实现支持异步和并发的高性能客户端

Call

  • 使用Call封装通道来实现异步通知
  • 封装结构体Call用于承载一次客户端RPC调用所需信息

Golang结束channel和WaitGroup可以实现非常特殊且有别于传统函数回调的结果返回方式,常用于相对耗时运算的结果获取。核心思想是利用延时信号通知来返回,可称为Call回调或完成通知模式。

Call是一个公共可访问的结构体定义,用于包装用户请求与结果,通过内部额外地加入的channel封装实现异步结果的返回。对于使用者而言,只需要知道Call的channel属性,以及框架定义的异步回调方法。

Call的基本定义是对外部使用者的请求、返回、异步使用进行封装。

$ vim codec/call.go
package codec

//Call 承载一次客户端RPC调用所需的消息
type Call struct{
    Seq uint64//请求调用序列号
    ServiceName string //远程服务名称.方法名称
    Request interface{} //服务请求参数
    Reply interface{}//服务返回值
    Error error//错误消息
    Done chan *Call//支持异步调用,当调用结束时会调用call.done()方法通知调用方。
}
//done 当调用结束时会调用call.done()来通知调用方
func (c *Call) done(){
    select{
    case c.Done <- c:
    default:
    }
}
参数 类型 描述
Seq uint64 调用请求序列号,每个请求唯一且自增。
ServiceName string 远程服务,格式为<服务名称>.<方法名称>
Request interface{} RPC客户端请求参数
Reply interfac{} RPC服务端返回参数
Error error 错误消息
Done chan *Call 异步回调,当调用结束时需调用done()方法来通知调用方。
  • Done用于结果返回时,指向自己的指针。
  • Done用于结果返回时使用消息通知,使用者必须依靠这个来获取真正的结果。

Done具体使用时,通过框架或平台提供的方法先获取call实例。

//Go 异步调用函数 返回Call实例
func (c *Client) Go(serviceName string, request, reply interface{}, done chan *Call) *Call

框架定义的异步调用方法,最后一个参数为chan,能够为nil。

call := Go(serviceName, request, reply, nil)

获取异步执行结果,此处会发生阻塞。

result := <- call.Done

Call是对Go的封装,会阻塞call.Done,等待响应返回,是一个同步接口。

//Call 同步调用具名函数 会阻塞call.Done等待响应返回
func (c *Client) Call(serviceName string, request, reply interface{}) error{
    call := <- c.Go(serviceName, request, reply, make(chan *Call, 1)).Done
    return call.Error
}

测试

$ vim test/call_test.go
package test

import (
    "fmt"
    "log"
    "testing"
    "time"
)

//Call 承载一次客户端RPC调用所需的消息
type Call struct{
    Seq uint64//请求调用序列号
    ServiceName string //远程服务名称.方法名称
    Request interface{} //服务请求参数
    Reply interface{}//服务返回参数
    Error error//错误消息
    Done chan *Call//支持异步调用,当调用结束时会调用call.done()方法通知调用方。
}
//done 当调用结束时会调用call.done()来通知调用方
func (c *Call) done(){
    select{
    case c.Done <- c:
    default:
    }
}

//业务代码 耗时运算 异步处理 存在读写竞争
func Cal(call *Call){
    //假定运算一次需要1秒
    time.Sleep(time.Second)

    result := call.Request.(int) * 5

    call.Reply = &result
    call.done()
}

func Do(req int, reply *int, done chan *Call) *Call{
    //判断结束结果
    if done == nil{
        done = make(chan *Call, 0)
    }else{
        if cap(done) == 0{
            log.Println("chan容量为0,无法返回结果,退出此次计算。")
            return nil
        }
    }
    //创建调用
    call := &Call{Request:req, Reply:reply, Done:done}
    //业务代码 耗时运算 异步处理
    go Cal(call)
    return call
}

func TestCall(t *testing.T){
    for i:=0; i<100; i++{
        var reply *int
        call := Do(i, reply, nil)//获取了call,但此时call.Reply还不是运算结果
        //计算前结果
        fmt.Printf("i = %d, call.Reply = %v\n", i, call.Reply.(*int))
        //等待结束通知,此时call.Reply会发生变化。
        result := <- call.Done
        fmt.Printf("i = %v, call.Reply = %v, result = %v\n", i, *(call.Reply.(*int)), *(result.Reply.(*int)))
    }
}
$ go test call_test.go -v -run TestCall

WaitGroup

对于耗时的操作只要方法相同、输入参数相同,则计算的结果一定是相同的,也就是意味着是重复的操作。如何才能够避免相同的多次操作呢?可采用包装WaitGroup来实现。

type Call struct{
    WG sync.WaitGroup
    Error error
    Value interface{}
}

type Group struct{
    mutex sync.Mutex
    calls map[string]*Call
}
func (g *Group) Do(key string, fn func()(interface{}, error)) (interface{}, error){
    g.mutex.Lock()
    //创建时初始化以提高性能
    if g.calls == nil{
        g.calls = make(map[string]*Call)
    }
    //判断参数是否存在
    if call,ok := g.calls[key]; ok{
        g.mutex.Unlock()
        call.WG.Wait()
        return call.Value, call.Error
    }else{
        //执行回调
        call := new(Call)
        call.WG.Add(1)
        g.calls[key] = call
        g.mutex.Unlock()
        //获取计算结果
        call.Value, call.Error = fn()
        call.WG.Done()
        //删除键
        g.mutex.Lock()
        delete(g.calls, key)
        g.mutex.Unlock()
        //返回结果
        return call.Value, call.Error
    }
}

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:JunChow520

查看原文:Go Call

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:701969077

88 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传