熔断器
go-zero在breaker里面基于google的sre算法实现了熔断器逻辑,并在redis等客户端操作的时候引入了熔断器
算法公式
go-zero 实现了这个公式,并且作为底层组件很好被嵌入使用,在redis的调用就有很好的表现.
- 定义接口
- 底层组件实现该接口,
- 初始化breaker各属性对象,依赖对象数据类型定义的都是interface
https://github.com/tal-tech/go-zero/blob/master/core/breaker/breaker.go
使用demo
import (
"errors"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stat"
"strconv"
"strings"
"testing"
)
// 熔断服务会将闭包每次返回的结果统计起来
// 最终在sre公式中通过计算得到是否放行的标志
func TestGoogleBreaker(t *testing.T) {
br := NewBreaker()
for i := 0; i < 500; i++ {
br.Do(
func() error {
fmt.Println("如果闭包一直执行那么i就会连续的被打印", i)
if i > 4 {
err := errors.New("err info")
return err
} else {
return nil
}
},
)
}
}
redis调用使用demo
package redis
import (
"errors"
"fmt"
"strconv"
"time"
red "github.com/go-redis/redis"
"github.com/tal-tech/go-zero/core/breaker"
"github.com/tal-tech/go-zero/core/mapping"
)
//redis客户端初始化过程中,特意将breaker作为brk依赖注入,使得go-zero底层redis客户端组件自带熔断保护措施
// 参照这我们使用breaker的灵活性就很大了, 闭包。。
func NewRedis(redisAddr, redisType string, redisPass ...string) *Redis {
var pass string
for _, v := range redisPass {
pass = v
}
return &Redis{
Addr: redisAddr,
Type: redisType,
Pass: pass,
brk: breaker.NewBreaker(),//redis客户端初始化过程中,特意将breaker作为brk依赖注入
}
}
// 这是找到go-zero的redis客户端del过程
// 看见没 s.brk.DoWithAcceptable, go-zero将所有操作redis的过程都以闭包的形式包入breker中
func (s *Redis) Del(keys ...string) (val int, err error) {
err = s.brk.DoWithAcceptable(func() error {
conn, err := getRedis(s)
if err != nil {
return err
}
if v, err := conn.Del(keys...).Result(); err != nil {
return err
} else {
val = int(v)
return nil
}
}, acceptable)
return
}
代码分析:
// 闭包是以参数传入该方法的
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
// 该方法每次在执行闭包之前会先通过 b.accept(); 验证当前是否还能执行闭包
if err := b.accept(); err != nil {
if fallback != nil {
return fallback(err)
} else {
return err
}
}
defer func() {
if e := recover(); e != nil {
b.markFailure()
panic(e)
}
}()
err := req()
if acceptable(err) {
// 闭包执行返回的err为nil走这里,进行数据统计
b.markSuccess()
} else {
// 闭包执行返回的err 不为 nil走这里
b.markFailure()
}
return err
}
// sre 公式的真是写照了
// 闭包每次执行之前都会先执行这个方法,看一下还能不能执行闭包
func (b *googleBreaker) accept() error {
accepts, total := b.history() //这个方法返回当前时刻 闭包返回nil,与非nil的统计数值,用于sre公式计算
weightedAccepts := b.k * float64(accepts)
// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
if dropRatio <= 0 {
return nil
}
if b.proba.TrueOnProba(dropRatio) {
return ErrServiceUnavailable
}
return nil
}
// 闭包执行成功,其实还是调用add
func (b *googleBreaker) markSuccess() {
b.stat.Add(1)
}
func (rw *RollingWindow) Add(v float64) {
rw.lock.Lock()
defer rw.lock.Unlock()
rw.updateOffset() //核心。。
rw.win.add(rw.offset, v)
}
// 这里就是计算 sre 算法用到的 参数数字的过程了
// 一开始在初始化breaker的时候,10S为单位分成40份
// 这里用到了一个巧妙方式自动初始化一部分区间的数值,
// 也就是算法一开始所描述的,当错误次数过高熔断被触发,当一段时间过期后便会重新再去调用闭包,
// 只所以会过一段时间去调用,是因为sre公式又成立了,之所以成立了就是因为这里随着时间的推移初始化掉
//了一部分区间的数值
func (rw *RollingWindow) updateOffset() {
// breaker初始化过程中,|--|--|--|--|--|... 每个区间时间长度为: 10 * time.Second / 40,对应代码为数组的40个下标,不同时刻 闭包执行的统计值的累加结果存在不同下标的对象中进行累加 ,这里的span()就是根据当前时刻决定下标移动的跨度
span := rw.span()
if span > 0 {
offset := rw.offset //当前区间的下标
// reset expired buckets
start := offset + 1 // 新的开始下标数值
steps := start + span //新的结束下标数值
var remainder int
// 到底了,就从头开始清洗数据
if steps > rw.size {
remainder = steps - rw.size
steps = rw.size
}
// 没到底,就将该范围的数值重置为0,这也就是在表达,随着时间的推移,会有一部分数据给归零处理
//使得sre公式又开始成立,也就是上游服务过载让其休息下,过一段时间再来调用看看~
for i := start; i < steps; i++ {
rw.win.resetBucket(i)
offset = i
}
for i := 0; i < remainder; i++ {
rw.win.resetBucket(i)
offset = i
}
rw.offset = offset
rw.lastTime = timex.Now()
}
}
参考文献
https://blog.csdn.net/jfwan/article/details/109328874
有疑问加站长微信联系(非本文作者)