原文链接: 高并发系统的限流策略:漏桶和令牌桶(附源码剖析)
前言
哈喽,大家好,我是正在学习PS
技术的asong
,这是我并发编程系列的第5
篇文章,今天与大家聊一聊高并发系统中的限流技术,限流又称为流量控制,是指限制到达系统的并发请求数,当达到限制条件则可以拒绝请求,可以起到保护下游服务,防止服务过载等作用。常用的限流策略有漏桶算法、令牌桶算法、滑动窗口;下文主要与大家一起分析一下漏桶算法和令牌桶算法,滑动窗口就不在这里这介绍了。好啦,废话不多话,开整。
文中测试代码已上传:https://github.com/asong2020/... 欢迎`star
漏桶算法
漏桶算法比较好理解,假设我们现在有一个水桶,我们向这个水桶里添水,虽然我们我们无法预计一次会添多少水,也无法预计水流入的速度,但是可以固定出水的速度,不论添水的速率有多大,都按照固定的速率流出,如果桶满了,溢出的上方水直接抛弃。我们把水当作HTTP
请求,每次都把请求放到一个桶中,然后以固定的速率处理请求,说了这么多,不如看一个图加深理解(图片来自于网络,手残党不会画,多多包涵):
原理其实很简单,就看我们怎么实现它了,uber
团队有一个开源的uber-go/ratelimit
库,这个库就是漏桶的一种实现,下面我们一起来看一看他的实现思路。
样例
学习一个新东西的时候,往往是从会用开始的,慢慢才能明白其实现原理,所以我们先来看看这个库是怎样使用的,这里我们直接提供一个实际使用例子,配合Gin
框架,我们添加一个限流中间件,来达到请求限流的作用,测试代码如下:
// 定义全局限流器对象
var rateLimit ratelimit.Limiter
// 在 gin.HandlerFunc 加入限流逻辑
func leakyBucket() gin.HandlerFunc {
prev := time.Now()
return func(c *gin.Context) {
now := rateLimit.Take()
fmt.Println(now.Sub(prev)) // 为了打印时间间隔
prev = now // 记录上一次的时间,没有这个打印的会有问题
}
}
func main() {
rateLimit = ratelimit.New(10)
r := gin.Default()
r.GET("/ping", leakyBucket(), func(c *gin.Context) {
c.JSON(200, true)
})
r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}
我们简单使用压测工具ab
测试一下:ab -n 10 -c 2 http://127.0.0.1:8080/ping
,执行结果部分如下:
观察结果可知,每次处理请求的时间间隔是10ms,并且后面的请求耗时越来越久,为什么会这样呢? 这里先卖个小关子,看完uber
的实现你就知道了~
源码实现
我们首先来看一下其核心结构:
type limiter struct {
sync.Mutex
last time.Time
sleepFor time.Duration
perRequest time.Duration
maxSlack time.Duration
clock Clock
}
type Limiter interface {
// Take should block to make sure that the RPS is met.
Take() time.Time
}
限制器接口只提供了一个方法take()
,take()
方法会阻塞确保两次请求之间的时间走完,具体实现我们在下面进行分析。实现限制器接口的结构体中各个字段的意义如下:
sync.Mutext
:互斥锁,控制并发的作用last
:记录上一次的时刻sleepFor
:距离处理下一次请求需要等待的时间perRequest
:每次请求的时间间隔maxSlack
:最大松弛量,用来解决突发流量clock
:一个时钟或模拟时钟,提供了now
和sleep
方法,是实例化速率限制器
要是用该限制器,首先需要通过New
方法进行初始化,一个必传的参数是rate
,代表的是每秒请求量(RPS),还有一个可选参数,参数类型option
,也就是我们可以自定义limit
,不过一般使用场景不多,这里就不过多介绍了。我主要看一下他是怎么保证固定速率的,截取New
方法部分代码如下:
l := &limiter{
perRequest: time.Second / time.Duration(rate),
maxSlack: -10 * time.Second / time.Duration(rate),
}
根据我们传入的请求数量,能计算出1
s内要通过n
个请求,每个请求之间的间隔时间是多少,这样在take
方法中就可以根据这个字段来处理请求的固定速率问题,这里还初始化了最大松弛化字段,他的值是负数,默认最大松弛量是10个请求的时间间隔。
接下来我们主要看一下take
方法:
func (t *limiter) Take() time.Time {
t.Lock()
defer t.Unlock()
now := t.clock.Now()
if t.last.IsZero() {
t.last = now
return t.last
}
t.sleepFor += t.perRequest - now.Sub(t.last)
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}
return t.last
}
take()
方法的执行步骤如下:
- 为了控制并发,所以进入该方法就需要进行上锁,该锁的粒度比较大,整个方法都加上了锁
- 通过
IsZero
方法来判断当前是否是第一次请求,如果是第一次请求,直接取now
时间即可返回。 - 如果不是第一次请求,就需要计算距离处理下一次请求需要等待的时间,这里有一个要注意点的是累加需要等待的时间,目的是可以给后面的抵消使用
- 如果当前累加需要等待的时间大于最大松弛量了,将等待的时间设置为最大松弛量的时间。
- 如果当前请求多余的时间无法完全抵消此次的所需量,调用
sleep
方法进行阻塞,同时清空等待的时间。如果sleepFor
小于0,说明此次请求时间间隔大于预期间隔,也就说无需等待可以直接处理请求。
步骤其实不是很多,主要需要注意一个知识点 —— 最大松弛量。
漏桶算法有个天然缺陷就是无法应对突发流量(匀速,两次请求 req1
和 req2
之间的延迟至少应该 >=perRequest
),举个例子说明:假设我们现在有三个请求req1
、req2
、req3
按顺序处理,每个请求处理间隔为100ms,req1
请求处理完成之后150ms,req2
请求到来,依据限速策略可以对 req2
立即处理,当 req2
完成后,50ms
后, req3
到来,这个时候距离上次请求还不足 100ms
,因此还需要等待 50ms
才能继续执行, 但是,对于这种情况,实际上这三个请求一共消耗了 250ms
才完成,并不是预期的 200ms
。
对于上面这种情况,我们可以把之前间隔比较长的请求的时间匀给后面的请求判断限流时使用,减少请求等待的时间了,但是当两个请求之间到达的间隔比较大时,就会产生很大的可抵消时间,以至于后面大量请求瞬间到达时,也无法抵消这个时间,那样就已经失去了限流的意义,所以引入了最大松弛量 (maxSlack) 的概念, 该值为负值,表示允许抵消的最长时间,防止以上情况的出现。
以上就是漏桶实现的基本思路了,整体还是很简单的,你学会了吗?
令牌桶算法
令牌桶其实和漏桶的原理类似,令牌桶就是想象有一个固定大小的桶,系统会以恒定速率向桶中放 Token,桶满则暂时不放。从网上找了图,表述非常恰当:
关于令牌桶限流算法的实现,Github
有一个高效的基于令牌桶限流算法实现的限流库:github.com/juju/ratelimit
,Golang
的timer/rate
也是令牌桶的一种实现,本文就不介绍juju/ratelimit
库了,有兴趣的自己学习一下的他的实现思想吧,我们主要来看一看time/rate
是如何实现的。
样例
还是老样子,我们还是结合gin
写一个限流中间件看看他是怎么使用的,例子如下:
import (
"net/http"
"time"
"github.com/gin-gonic/gin"
"golang.org/x/time/rate"
)
var rateLimit *rate.Limiter
func tokenBucket() gin.HandlerFunc {
return func(c *gin.Context) {
if rateLimit.Allow() {
c.String(http.StatusOK, "rate limit,Drop")
c.Abort()
return
}
c.Next()
}
}
func main() {
limit := rate.Every(100 * time.Millisecond)
rateLimit = rate.NewLimiter(limit, 10)
r := gin.Default()
r.GET("/ping", tokenBucket(), func(c *gin.Context) {
c.JSON(200, true)
})
r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}
上面的例子我们首先调用NewLimiter
方法构造一个限流器,第一个参数是r limit
,代表每秒可以向Token
桶中产生多少token
,第二个参数是b int
,代表Token
桶的容量大小,对于上面的例子,表示每100ms
往桶中放一个token
,也就是1s
钟产生10
个,桶的容量就是10
。消费token
的方法这里我们使用Allow
方法,Allow 实际上就是 AllowN(time.Now(),1)
,AllowN
方法表示,截止到某一时刻,目前桶中数目是否至少为 n
个,满足则返回 true
,同时从桶中消费 n
个 token
。反之返回不消费 Token
。对应上面的例子,当桶中的数目不足于1
个时,就会丢掉该请求。
源码剖析
Limit类型
time/rate
自定义了一个limit
类型,其实他本质就是float64
的别名,Limit
定了事件的最大频率,表示每秒事件的数据量,0就表示无限制。Inf
是无限的速率限制;它允许所有事件(即使突发为0)。还提供 Every
方法来指定向 Token
桶中放置 Token
的间隔,计算出每秒时间的数据量。
type Limit float64
// Inf is the infinite rate limit; it allows all events (even if burst is zero).
const Inf = Limit(math.MaxFloat64)
// Every converts a minimum time interval between events to a Limit.
func Every(interval time.Duration) Limit {
if interval <= 0 {
return Inf
}
return 1 / Limit(interval.Seconds())
}
Limiter
结构体
type Limiter struct {
mu sync.Mutex
limit Limit
burst int
tokens float64
// last is the last time the limiter's tokens field was updated
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
lastEvent time.Time
}
各个字段含义如下:
mu
:互斥锁、为了控制并发limit
:每秒允许处理的事件数量,即每秒处理事件的频率burst
:令牌桶的最大数量,如果burst
为0,并且limit == Inf,则允许处理任何事件,否则不允许tokens
:令牌桶中可用的令牌数量last
:记录上次limiter的tokens被更新的时间lastEvent
:lastEvent
记录速率受限制(桶中没有令牌)的时间点,该时间点可能是过去的,也可能是将来的(Reservation
预定的结束时间点)
Reservation
结构体
type Reservation struct {
ok bool
lim *Limiter
tokens int
timeToAct time.Time
// This is the Limit at reservation time, it can change later.
limit Limit
}
各个字段含义如下:
ok
:到截至时间是否可以获取足够的令牌lim
:limiter
对象tokens
:需要获取的令牌数量timeToAct
:需要等待的时间点limit
:代表预定的时间,是可以更改的。
reservation
就是一个预定令牌的操作,timeToAct
是本次预约需要等待到的指定时间点才有足够预约的令牌。
Limiter
消费token
Limite
r有三个token
的消费方法,分别是Allow
、Reserve
和Wait
,最终三种消费方式都调用了 reserveN
、advance
这两个方法来生成和消费 Token
。所以我们主要看看reserveN
、advance
函数的具体实现。
advance
方法的实现:
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
//last不能在当前时间now之后,否则计算出来的elapsed为负数,会导致令牌桶数量减少
last := lim.last
if now.Before(last) {
last = now
}
//根据令牌桶的缺数计算出令牌桶未进行更新的最大时间
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last) //令牌桶未进行更新的时间段
if elapsed > maxElapsed {
elapsed = maxElapsed
}
//根据未更新的时间(未向桶中加入令牌的时间段)计算出产生的令牌数
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta //计算出可用的令牌数
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
advance
方法的作用是更新令牌桶的状态,计算出令牌桶未更新的时间(elapsed),根据elapsed
算出需要向桶中加入的令牌数delta
,然后算出桶中可用的令牌数newTokens
.
reserveN
方法的实现:reserveN
是AllowN
,ReserveN
及WaitN
的辅助方法,用于判断在maxFutureReserve
时间内是否有足够的令牌。
// @param n 要消费的token数量
// @param maxFutureReserve 愿意等待的最长时间
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
// 如果没有限制
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true, //桶中有足够的令牌
lim: lim,
tokens: n,
timeToAct: now,
}
}
//更新令牌桶的状态,tokens为目前可用的令牌数量
now, last, tokens := lim.advance(now)
// 计算取完之后桶还能剩能下多少token
tokens -= float64(n)
var waitDuration time.Duration
// 如果token < 0, 说明目前的token不够,需要等待一段时间
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
ok := n <= lim.burst && waitDuration <= maxFutureReserve
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
// timeToAct表示当桶中满足token数目等于n的时间
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// 更新桶里面的token数目
// 更新last时间
// lastEvent
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
上面的代码我已经进行了注释,这里在总结一下流程:
- 首选判断是否拥有速率限制,没有速率限制也就是桶中一致拥有足够的令牌。
- 计算从上次取 Token 的时间到当前时刻,期间一共新产生了多少
Token
:我们只在取Token
之前生成新的Token
,也就意味着每次取Token
的间隔,实际上也是生成Token
的间隔。我们可以利用tokensFromDuration
, 轻易的算出这段时间一共产生Token
的数目。所以当前Token
数目 = 新产生的Token
数目 + 之前剩余的Token
数目 - 要消费的Token
数目。 - 如果消费后剩余 Token 数目大于零,说明此时 Token 桶内仍不为空,此时 Token 充足,无需调用侧等待。
如果 Token 数目小于零,则需等待一段时间。那么这个时候,我们可以利用durationFromTokens
将当前负值的 Token 数转化为需要等待的时间。 - 将需要等待的时间等相关结果返回给调用方
其实整个过程就是利用了 Token 数可以和时间相互转化 的原理。而如果 Token 数为负,则需要等待相应时间即可。
上面提到了durationFromTokens
、tokensFromDuration
这两个方法,是关键,他们的实现如下:
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
seconds := tokens / float64(limit)
return time.Nanosecond * time.Duration(1e9*seconds)
}
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
// Split the integer and fractional parts ourself to minimize rounding errors.
// See golang.org/issues/34861.
sec := float64(d/time.Second) * float64(limit)
nsec := float64(d%time.Second) * float64(limit)
return sec + nsec/1e9
}
durationFromTokens
:功能是计算出生成N
个新的Token
一共需要多久。tokensFromDuration
:给定一段时长,这段时间一共可以生成多少个 Token。
细心的网友会发现tokensFromDuration
方法既然是计算一段时间一共可以生成多少个 Token
,为什么不直接进行相乘呢?其实Golang
最初的版本就是采用d.Seconds() * float64(limit)
直接相乘实现的,虽然看上去一点问题没有,但是这里是两个小数相乘,会带来精度损失,所以采用现在这种方法实现,分别求出秒的整数部分和小数部分,进行相乘后再相加,这样可以得到最精确的精度。
limiter
归还Token
既然我们可以消费Token
,那么对应也可以取消此次消费,将token
归还,当调用 Cancel()
函数时,消费的 Token 数将会尽可能归还给 Token 桶。归还也并不是那么简单,接下我们我们看看归还token
是如何实现的。
func (r *Reservation) CancelAt(now time.Time) {
if !r.ok {
return
}
r.lim.mu.Lock()
defer r.lim.mu.Unlock()
/*
1.如果无需限流
2. tokens为0 (需要获取的令牌数量为0)
3. 已经过了截至时间
以上三种情况无需处理取消操作
*/
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
return
}
//计算出需要还原的令牌数量
//这里的r.lim.lastEvent可能是本次Reservation的结束时间,也可能是后来的Reservation的结束时间,所以要把本次结束时间点(r.timeToAct)之后产生的令牌数减去
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
// 当小于0,表示已经都预支完了,不能归还了
if restoreTokens <= 0 {
return
}
//从新计算令牌桶的状态
now, _, tokens := r.lim.advance(now)
//还原当前令牌桶的令牌数量,当前的令牌数tokens加上需要还原的令牌数restoreTokens
tokens += restoreTokens
//如果tokens大于桶的最大容量,则将tokens置为桶的最大容量
if burst := float64(r.lim.burst); tokens > burst {
tokens = burst
}
// update state
r.lim.last = now //记录桶的更新时间
r.lim.tokens = tokens //更新令牌数量
// 如果都相等,说明跟没消费一样。直接还原成上次的状态吧
if r.timeToAct == r.lim.lastEvent {
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
if !prevEvent.Before(now) {
r.lim.lastEvent = prevEvent
}
}
return
}
注释已经添加,就不在详细解释了,重点是这一行代码:restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
,r.tokens
指的是本次消费的token
数,r.timeToAcr
指的是Token
桶可以满足本次消费数目的时刻,也就是消费的时刻+等待的时长,r.lim.lastEvent
指的是最近一次消费的timeToAct
的值,通过r.limit.tokensFromDuration
方法得出的结果指的是从该次消费到当前时间,一共又消费了多少Token
数目,所以最终得出这一段的代码含义是:
要归还的Token
= 该次消费的Token
- 新消费的token
。
好啦,源码就暂时分析到这了,因为标准库的实现的代码量有点大,还有一部分在这里没有说,留给大家自己去剖析吧~。
总结
本文重点介绍了漏桶算法和令牌桶算法,漏桶算法和令牌桶算法的主要区别在于,"漏桶算法"能够强行限制数据的传输速率(或请求频率),而"令牌桶算法"在能够限制数据的平均传输速率外,还允许某种程度的突发传输。在某些情况下,漏桶算法不能够有效地使用网络资源,因为漏桶的漏出速率是固定的,所以即使网络中没有发生拥塞,漏桶算法也不能使某一个单独的数据流达到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。而令牌桶算法则能够满足这些具有突发特性的流量。通常,漏桶算法与令牌桶算法结合起来为网络流量提供更高效的控制。
文中测试代码已上传:https://github.com/asong2020/... 欢迎star
好啦,这篇文章就到这里啦,素质三连(分享、点赞、在看)都是笔者持续创作更多优质内容的动力!
创建了一个Golang学习交流群,欢迎各位大佬们踊跃入群,我们一起学习交流。入群方式:加我vx拉你入群,或者公众号获取入群二维码
结尾给大家发一个小福利吧,最近我在看[微服务架构设计模式]这一本书,讲的很好,自己也收集了一本PDF,有需要的小伙可以到自行下载。获取方式:关注公众号:[Golang梦工厂],后台回复:[微服务],即可获取。
我翻译了一份GIN中文文档,会定期进行维护,有需要的小伙伴后台回复[gin]即可下载。
翻译了一份Machinery中文文档,会定期进行维护,有需要的小伙伴们后台回复[machinery]即可获取。
我是asong,一名普普通通的程序猿,让我们一起慢慢变强吧。欢迎各位的关注,我们下期见~~~
推荐往期文章:
- Go看源码必会知识之unsafe包
- 源码剖析panic与recover,看不懂你打我好了!
- 详解并发编程基础之原子操作(atomic包)
- 详解defer实现机制
- 真的理解interface了嘛
- Leaf—Segment分布式ID生成系统(Golang实现版本)
- 十张动图带你搞懂排序算法(附go实现代码)
- go参数传递类型
- 手把手教姐姐写消息队列
- 常见面试题之缓存雪崩、缓存穿透、缓存击穿
- 详解Context包,看这一篇就够了!!!
- go-ElasticSearch入门看这一篇就够了(一)
- 面试官:go中for-range使用过吗?这几个问题你能解释一下原因吗
有疑问加站长微信联系(非本文作者)