## 背景
关于限流Go官方通过一个采用令牌池的算法的实现:golang.org/x/time/rate,但是,这个限制的是每秒的请求数,有的时候我们希望限制的是系统并发处理的请求数量,类似线程池的功能,需求如下:
1. 设置一个最大的请求处理数量,当请求超过时,后续请求将等待,直到有请求处理完后被唤醒。
2. 请求的等待时间能够指定,超出等待时间就返回,提示给客户端。
3. 等待请求的个数需要能够限制,数量超过时就直接返回,提示给客户端。
## 设计
设计思路是实现一个Ticket池(NumLimiter),每个请求首先需要向NumLimiter申请一个ticket,当请求处理结束后,需要被回收。
获取不到ticket的请求就等待现有的ticket释放,所以会有两个核心对象:
1. NumLimiter:数量限制器(ticket 池)
2. Ticket:入场券,请求需要先申请一个Ticket
先不考虑细节,可以设计如下:
```go
package numlimiter
// 数量限制器
type NumLimiter struct {
maxTicket int // 最大请求数
maxWait int // 最大等待数
...
}
// 释放Ticket
func (r *NumLimiter) releaseTicket(t *Ticket) bool {
...
}
// 预订Ticket
func (r *NumLimiter) Reserve(ctx context.Context) (*Ticket, error) {
...
}
// 创建一个tocket池
func New(maxTicket) *NumLimiter {
l := &NumLimiter{
maxTicket: maxTicket,
}
return l
}
// 入场券
type Ticket struct {
l *NumLimiter
reqKey int64
}
// 释放入场券
func (r *Ticket) Close() {
r.l.releaseTicket(r)
}
```
**NumLimiter有两个核心的方法:**
1. Reserve - 申请Ticket:每个请求处理前需要先调用该方法获取一个ticket,如果当前颁发的ticket数已经是大于等于 maxTicket时,请求就pending等待Ticket释放。 该方法接收一个context,作用是传递外部超时或取消的信号,结束等待。
2. releaseTicket - 释放Ticket:当请求处理完就需要把持有的ticket释放,该方法不直接暴露给外部,提供给ticket的Close方法调用。
**Ticket就只有一个Close方法:**
1. Close:调用NumLimiter的releaseTicket释放Ticket
**客户端使用**:
每次处理请求需要先调用Reserve获取Ticket,获取到后才执行具体的业务逻辑,执行完毕后调用Close方法释放Ticket
```go
l := numlimiter.New(2)
func Do(req Request) error { // 模拟请求request
tk, err := l.Reserve(context.Background()) // 申请Ticket
if err != nil { // 异常
return err
}
defer tk.Close() // 释放Ticket
// 处理请求req
...
}
```
**整个框架定义好了,接着开始撸具体实现**
首先,需要给每个ticket标识一个唯一标识,我们定义一个reqKey序列,通过nextReqKeyLocked方法自增,调用时需要加锁,保证在NumLimiter实例生成的key是唯一,代码如下:
```go
type NumLimiter struct {
nextKey int64 // 下一个请求的Key
...
}
// 每次调用nextKey自动+1,调用的时候需要加锁,保证协程安全
func (r *NumLimiter) nextReqKeyLocked() int64 {
next := r.nextKey
r.nextKey++
return next
}
```
接着,我们开始实现核心的Reserve()方法,梳理后的逻辑如下:
1. 当颁发的Ticket数量小于maxTicket时,创建一个Ticket直接返回。
2. 如果Ticket数量大于等于maxTicket,就先判断当前wait请求数是否超过maxWait,如果”是“,直接返回相应的error。
3. 如果wait数没超过,就pending等待Ticket释放,同时还得监听是否超时。
**实现逻辑之前需要考虑:**
1. Ticket如何管理。想要**统一管理**已经发放的Ticket数量,就需要有地方**存储**,还能对NumLimiter中**所有方法可见**,所以在NumLimiter中增加一个**tickets属性**,类型为 :map[int64]*Ticket(注:key 为请求的key,value对应的是已经颁发的Ticket)
2. 管理**等待**Ticket。同样等待Ticket的请求需要被存储,并且能够**被唤醒**。于是也可以在NumLimiter增加一个属性:**waitTickets**,类型为:**map[int64]chan struct{}**(注:key同样是请求的key,值比较特殊,使用chan,目的是为了**其他协程**能**安全**访问,当没数据时读取会**pending**,被close后会继续,chan的类型我们不关注,所以直接使用空结构体struct{})
3. 另外,为了**保护**这些共享资源,还需要一个锁:mu sync.Mutex:
```go
type NumLimiter struct {
maxTicket int // 最大请求数
maxWait int // 最大等待数量
mu sync.Mutex
nextKey int64 // 下一个请求的Key
tickets map[int64]*Ticket
waitTickets map[int64]chan struct{}
...
}
```
接下来就可以开始实现**Reserve方法**:
```go
func (r *NumLimiter) Reserve(ctx context.Context) (*Ticket, error) {
r.mu.Lock()
reqKey := r.nextReqKeyLocked()
t := &Ticket{l: r, reqKey: reqKey, lg: r.lg, create: time.Now()}
// 当请求数量大于maxTicket就放到waitTickets中等待
if len(r.tickets) >= r.maxTicket {
if len(waitTickets) > r.maxWait {
return nil, errors.New("waiting exceed max wait")
}
req := make(chan struct{})
now := time.Now()
r.lg.Warnf("request num exceed %d, reqkey [%d] waiting for ticket, req processing num = %d, total wait num = %d", r.maxTicket, reqKey, len(r.tickets), len(r.waitTickets)+1)
r.waitTickets[reqKey] = req
r.mu.Unlock() // 需要立即解锁,否则会导致其他协程调用Reserve或releaseTicket方法获取不到锁
select {
case <-ctx.Done():
r.lg.Errorf("limiter wait timeout: key = %d, cost = %f", reqKey, time.Now().Sub(now).Seconds())
r.mu.Lock()
delete(r.waitTickets, reqKey)
r.mu.Unlock()
select {
default:
case <-req:
t.Close() // 返回ticket
}
return nil, ctx.Err()
case <-req:
r.mu.Lock()
r.tickets[reqKey] = t
r.mu.Unlock()
r.lg.Debugf("req key = %d get ticket, waiting time = %f", reqKey, time.Now().Sub(now).Seconds())
return t, nil
}
}
r.tickets[reqKey] = t
r.mu.Unlock()
return t, nil
}
```
虽然代码看着比较长,但是整个实现没太多复杂逻辑,核心代码就是等待ticket和被唤醒部分:
```go
req := make(chan struct{})
r.waitTickets[reqKey] = req
r.mu.Unlock() // 需要立即解锁,否则会导致其他协程调用Reserve或releaseTicket方法获取不到锁
select {
...
case <-req:
r.mu.Lock()
r.tickets[reqKey] = t
r.mu.Unlock()
r.lg.Debugf("req key = %d get ticket, waiting time = %f", reqKey, time.Now().Sub(now).Seconds())
return t, nil
}
```
这里是利用chan特性,当要pending等待时,会创建一个请求chan:req := make(chan struct{}),然后放到waitTickets后就立即**解锁**(目的是让其他协程能**获取到锁**),chan在没数据写入或chan**没有被关闭**的情况下会pending,如果一旦有ticket释放,会通过close这个chan方式通知继续。
另外,超时的实现是借助**context**来实现,通过监听**ctx.Done()**方法,同时还要注意**并发问题**,超时的时候还是有可能**获取到锁**,所以还是得再检查一下case <-req是否成立,成立就说明超时的同时也正好**获取到ticket**,但是由于超时了,ticket就没用了,直接释放t.Close()。
**接着,我们来实现ticket释放逻辑**:
1. 删除tickets中对应的数据。(从tickets移除了,所以相当于将ticket释放了)
2. 如果waitTickets没有数据就直接返回。len(tickets)数量已经-1,相当于ticket释放到池中。
3. 如果waitTickets有等待ticket的请求,就直接通知其中的一个等待ticket的请求可以继续,然后等待请求从waitTickets删除,相当于将要释放的ticket直接移交给等待ticket的请求。
```go
func (r *NumLimiter) releaseTicket(t *Ticket) bool {
r.mu.Lock()
defer r.mu.Unlock()
// 删除tickets中对应的数据
releaseSuccess := true
if _, ok := r.tickets[t.reqKey]; ok {
delete(r.tickets, t.reqKey)
} else {
releaseSuccess = false
}
// 如果waitTickets有等待ticket的请求
if len(r.waitTickets) > 0 {
var req chan struct{}
var reqKey int64
// 取出一条
for reqKey, req = range r.waitTickets {
break
}
close(req) // 通过close方式,通知等待ticket的协程继续
delete(r.waitTickets, reqKey)// 从waitTickets删除
}
return releaseSuccess
}
```
这里的通知方式采用close(req)的方式传输信号,相应在Reserve()方法的select case <-req等待的请求就会收到信号,继续执行,同时将获取到的ticket保存在tickets中,返回对应的ticket后,客户端获取到ticket就可以继续请求的处理。
另外,实际上releaseTicket方法是不直接暴露给客户端,而是提供给ticket的close方法调用:
```go
func (r *Ticket) Close() {
if !r.l.releaseTicket(r) {
r.lg.Errorf("limiter ticket release error: req key = %d", r.reqKey)
}
}
```
这样当获得到ticket后,客户端可以把这ticket对象传到方法,释放的时候就直接调用ticket的close方法,就不需要管NumLimiter对象。
最后增加一个**初始化方法**,方便实例化NumLimiter:
```go
func New(maxTicket, maxWait int) *NumLimiter {
l := &NumLimiter{
waitTickets: map[int64]chan struct{}{},
tickets: map[int64]*Ticket{},
maxTicket: maxTicket,
maxWait: maxWait,
}
return l
}
```
这样一个完整限量的功能就完成了。
## 总结
限量的实现是参考[database/sql](https://itart.cn/blogs/2021/explore/database-sql-pool.html) 设计,核心的思想是如何合理管理ticket,超出时借助chan实现等待,还有context实现超时,当ticket释放,通过close chan来实现广播,通知对应的等待请求可以继续。
我的博客:https://itart.cn/blogs/2022/practice/num-limiter-library.html
有疑问加站长微信联系(非本文作者))