golang-熔断器

Best博客 · · 604 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

熔断器

go-zero在breaker里面基于google的sre算法实现了熔断器逻辑,并在redis等客户端操作的时候引入了熔断器

算法公式
image.png

go-zero 实现了这个公式,并且作为底层组件很好被嵌入使用,在redis的调用就有很好的表现.

  1. 定义接口
  2. 底层组件实现该接口,
  3. 初始化breaker各属性对象,依赖对象数据类型定义的都是interface
    https://github.com/tal-tech/go-zero/blob/master/core/breaker/breaker.go

使用demo

import (
    "errors"
    "fmt"
    "github.com/stretchr/testify/assert"
    "github.com/tal-tech/go-zero/core/stat"
    "strconv"
    "strings"
    "testing"
)
// 熔断服务会将闭包每次返回的结果统计起来
// 最终在sre公式中通过计算得到是否放行的标志
func TestGoogleBreaker(t *testing.T) {
    br := NewBreaker()
    for i := 0; i < 500; i++ {
        br.Do(
            func() error {

                fmt.Println("如果闭包一直执行那么i就会连续的被打印", i)

                if i > 4 {
                    err := errors.New("err info")
                    return err
                } else {
                    return nil
                }

            },
        )
    }
}
image.png

redis调用使用demo

package redis

import (
    "errors"
    "fmt"
    "strconv"
    "time"

    red "github.com/go-redis/redis"
    "github.com/tal-tech/go-zero/core/breaker"
    "github.com/tal-tech/go-zero/core/mapping"
)
//redis客户端初始化过程中,特意将breaker作为brk依赖注入,使得go-zero底层redis客户端组件自带熔断保护措施
// 参照这我们使用breaker的灵活性就很大了,  闭包。。
func NewRedis(redisAddr, redisType string, redisPass ...string) *Redis {
    var pass string
    for _, v := range redisPass {
        pass = v
    }

    return &Redis{
        Addr: redisAddr,
        Type: redisType,
        Pass: pass,
        brk:  breaker.NewBreaker(),//redis客户端初始化过程中,特意将breaker作为brk依赖注入
    }
}


// 这是找到go-zero的redis客户端del过程
// 看见没 s.brk.DoWithAcceptable, go-zero将所有操作redis的过程都以闭包的形式包入breker中
func (s *Redis) Del(keys ...string) (val int, err error) {
    err = s.brk.DoWithAcceptable(func() error {
        conn, err := getRedis(s)
        if err != nil {
            return err
        }

        if v, err := conn.Del(keys...).Result(); err != nil {
            return err
        } else {
            val = int(v)
            return nil
        }
    }, acceptable)

    return
}

代码分析:


// 闭包是以参数传入该方法的
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
   // 该方法每次在执行闭包之前会先通过 b.accept(); 验证当前是否还能执行闭包 
   if err := b.accept(); err != nil {
        if fallback != nil {
            return fallback(err)
        } else {
            return err
        }
    }

    defer func() {
        if e := recover(); e != nil {
            b.markFailure()
            panic(e)
        }
    }()

    err := req()
    if acceptable(err) {
                // 闭包执行返回的err为nil走这里,进行数据统计
        b.markSuccess()
    } else {
                // 闭包执行返回的err 不为 nil走这里
        b.markFailure()
    }

    return err
}


// sre 公式的真是写照了
// 闭包每次执行之前都会先执行这个方法,看一下还能不能执行闭包
func (b *googleBreaker) accept() error {
    accepts, total := b.history()  //这个方法返回当前时刻 闭包返回nil,与非nil的统计数值,用于sre公式计算
    weightedAccepts := b.k * float64(accepts)
    // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
    dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
    if dropRatio <= 0 {
        return nil
    }

    if b.proba.TrueOnProba(dropRatio) {
        return ErrServiceUnavailable
    }

    return nil
}
// 闭包执行成功,其实还是调用add
func (b *googleBreaker) markSuccess() {
    b.stat.Add(1)
}


func (rw *RollingWindow) Add(v float64) {
    rw.lock.Lock()
    defer rw.lock.Unlock()
    rw.updateOffset()  //核心。。
    rw.win.add(rw.offset, v)
}

// 这里就是计算 sre 算法用到的 参数数字的过程了
// 一开始在初始化breaker的时候,10S为单位分成40份
// 这里用到了一个巧妙方式自动初始化一部分区间的数值,
// 也就是算法一开始所描述的,当错误次数过高熔断被触发,当一段时间过期后便会重新再去调用闭包,
// 只所以会过一段时间去调用,是因为sre公式又成立了,之所以成立了就是因为这里随着时间的推移初始化掉
//了一部分区间的数值
func (rw *RollingWindow) updateOffset() {
        // breaker初始化过程中,|--|--|--|--|--|...   每个区间时间长度为: 10 * time.Second / 40,对应代码为数组的40个下标,不同时刻 闭包执行的统计值的累加结果存在不同下标的对象中进行累加  ,这里的span()就是根据当前时刻决定下标移动的跨度
        span := rw.span()      
        if span > 0 {  
        offset := rw.offset //当前区间的下标
        // reset expired buckets
        start := offset + 1  // 新的开始下标数值
        steps := start + span //新的结束下标数值
        var remainder int
                // 到底了,就从头开始清洗数据
        if steps > rw.size {
            remainder = steps - rw.size
            steps = rw.size
        }
                // 没到底,就将该范围的数值重置为0,这也就是在表达,随着时间的推移,会有一部分数据给归零处理
//使得sre公式又开始成立,也就是上游服务过载让其休息下,过一段时间再来调用看看~
        for i := start; i < steps; i++ {
            rw.win.resetBucket(i)
            offset = i
        }
        for i := 0; i < remainder; i++ {
            rw.win.resetBucket(i)
            offset = i
        }
        rw.offset = offset
        rw.lastTime = timex.Now()
    }
}

 

参考文献

https://blog.csdn.net/jfwan/article/details/109328874


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:Best博客

查看原文:golang-熔断器

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

604 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传