并发请求量限制组件分享

uuid · · 737 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

## 背景 关于限流Go官方通过一个采用令牌池的算法的实现:golang.org/x/time/rate,但是,这个限制的是每秒的请求数,有的时候我们希望限制的是系统并发处理的请求数量,类似线程池的功能,需求如下: 1. 设置一个最大的请求处理数量,当请求超过时,后续请求将等待,直到有请求处理完后被唤醒。 2. 请求的等待时间能够指定,超出等待时间就返回,提示给客户端。 3. 等待请求的个数需要能够限制,数量超过时就直接返回,提示给客户端。 ## 设计 设计思路是实现一个Ticket池(NumLimiter),每个请求首先需要向NumLimiter申请一个ticket,当请求处理结束后,需要被回收。 获取不到ticket的请求就等待现有的ticket释放,所以会有两个核心对象: 1. NumLimiter:数量限制器(ticket 池) 2. Ticket:入场券,请求需要先申请一个Ticket 先不考虑细节,可以设计如下: ```go package numlimiter // 数量限制器 type NumLimiter struct { maxTicket int // 最大请求数 maxWait int // 最大等待数 ... } // 释放Ticket func (r *NumLimiter) releaseTicket(t *Ticket) bool { ... } // 预订Ticket func (r *NumLimiter) Reserve(ctx context.Context) (*Ticket, error) { ... } // 创建一个tocket池 func New(maxTicket) *NumLimiter { l := &NumLimiter{ maxTicket: maxTicket, } return l } // 入场券 type Ticket struct { l *NumLimiter reqKey int64 } // 释放入场券 func (r *Ticket) Close() { r.l.releaseTicket(r) } ``` **NumLimiter有两个核心的方法:** 1. Reserve - 申请Ticket:每个请求处理前需要先调用该方法获取一个ticket,如果当前颁发的ticket数已经是大于等于 maxTicket时,请求就pending等待Ticket释放。 该方法接收一个context,作用是传递外部超时或取消的信号,结束等待。 2. releaseTicket - 释放Ticket:当请求处理完就需要把持有的ticket释放,该方法不直接暴露给外部,提供给ticket的Close方法调用。 **Ticket就只有一个Close方法:** 1. Close:调用NumLimiter的releaseTicket释放Ticket **客户端使用**: 每次处理请求需要先调用Reserve获取Ticket,获取到后才执行具体的业务逻辑,执行完毕后调用Close方法释放Ticket ```go l := numlimiter.New(2) func Do(req Request) error { // 模拟请求request tk, err := l.Reserve(context.Background()) // 申请Ticket if err != nil { // 异常 return err } defer tk.Close() // 释放Ticket // 处理请求req ... } ``` **整个框架定义好了,接着开始撸具体实现** 首先,需要给每个ticket标识一个唯一标识,我们定义一个reqKey序列,通过nextReqKeyLocked方法自增,调用时需要加锁,保证在NumLimiter实例生成的key是唯一,代码如下: ```go type NumLimiter struct { nextKey int64 // 下一个请求的Key ... } // 每次调用nextKey自动+1,调用的时候需要加锁,保证协程安全 func (r *NumLimiter) nextReqKeyLocked() int64 { next := r.nextKey r.nextKey++ return next } ``` 接着,我们开始实现核心的Reserve()方法,梳理后的逻辑如下: 1. 当颁发的Ticket数量小于maxTicket时,创建一个Ticket直接返回。 2. 如果Ticket数量大于等于maxTicket,就先判断当前wait请求数是否超过maxWait,如果”是“,直接返回相应的error。 3. 如果wait数没超过,就pending等待Ticket释放,同时还得监听是否超时。 **实现逻辑之前需要考虑:** 1. Ticket如何管理。想要**统一管理**已经发放的Ticket数量,就需要有地方**存储**,还能对NumLimiter中**所有方法可见**,所以在NumLimiter中增加一个**tickets属性**,类型为 :map[int64]*Ticket(注:key 为请求的key,value对应的是已经颁发的Ticket) 2. 管理**等待**Ticket。同样等待Ticket的请求需要被存储,并且能够**被唤醒**。于是也可以在NumLimiter增加一个属性:**waitTickets**,类型为:**map[int64]chan struct{}**(注:key同样是请求的key,值比较特殊,使用chan,目的是为了**其他协程**能**安全**访问,当没数据时读取会**pending**,被close后会继续,chan的类型我们不关注,所以直接使用空结构体struct{}) 3. 另外,为了**保护**这些共享资源,还需要一个锁:mu sync.Mutex: ```go type NumLimiter struct { maxTicket int // 最大请求数 maxWait int // 最大等待数量 mu sync.Mutex nextKey int64 // 下一个请求的Key tickets map[int64]*Ticket waitTickets map[int64]chan struct{} ... } ``` 接下来就可以开始实现**Reserve方法**: ```go func (r *NumLimiter) Reserve(ctx context.Context) (*Ticket, error) { r.mu.Lock() reqKey := r.nextReqKeyLocked() t := &Ticket{l: r, reqKey: reqKey, lg: r.lg, create: time.Now()} // 当请求数量大于maxTicket就放到waitTickets中等待 if len(r.tickets) >= r.maxTicket { if len(waitTickets) > r.maxWait { return nil, errors.New("waiting exceed max wait") } req := make(chan struct{}) now := time.Now() r.lg.Warnf("request num exceed %d, reqkey [%d] waiting for ticket, req processing num = %d, total wait num = %d", r.maxTicket, reqKey, len(r.tickets), len(r.waitTickets)+1) r.waitTickets[reqKey] = req r.mu.Unlock() // 需要立即解锁,否则会导致其他协程调用Reserve或releaseTicket方法获取不到锁 select { case <-ctx.Done(): r.lg.Errorf("limiter wait timeout: key = %d, cost = %f", reqKey, time.Now().Sub(now).Seconds()) r.mu.Lock() delete(r.waitTickets, reqKey) r.mu.Unlock() select { default: case <-req: t.Close() // 返回ticket } return nil, ctx.Err() case <-req: r.mu.Lock() r.tickets[reqKey] = t r.mu.Unlock() r.lg.Debugf("req key = %d get ticket, waiting time = %f", reqKey, time.Now().Sub(now).Seconds()) return t, nil } } r.tickets[reqKey] = t r.mu.Unlock() return t, nil } ``` 虽然代码看着比较长,但是整个实现没太多复杂逻辑,核心代码就是等待ticket和被唤醒部分: ```go req := make(chan struct{}) r.waitTickets[reqKey] = req r.mu.Unlock() // 需要立即解锁,否则会导致其他协程调用Reserve或releaseTicket方法获取不到锁 select { ... case <-req: r.mu.Lock() r.tickets[reqKey] = t r.mu.Unlock() r.lg.Debugf("req key = %d get ticket, waiting time = %f", reqKey, time.Now().Sub(now).Seconds()) return t, nil } ``` 这里是利用chan特性,当要pending等待时,会创建一个请求chan:req := make(chan struct{}),然后放到waitTickets后就立即**解锁**(目的是让其他协程能**获取到锁**),chan在没数据写入或chan**没有被关闭**的情况下会pending,如果一旦有ticket释放,会通过close这个chan方式通知继续。 另外,超时的实现是借助**context**来实现,通过监听**ctx.Done()**方法,同时还要注意**并发问题**,超时的时候还是有可能**获取到锁**,所以还是得再检查一下case <-req是否成立,成立就说明超时的同时也正好**获取到ticket**,但是由于超时了,ticket就没用了,直接释放t.Close()。 **接着,我们来实现ticket释放逻辑**: 1. 删除tickets中对应的数据。(从tickets移除了,所以相当于将ticket释放了) 2. 如果waitTickets没有数据就直接返回。len(tickets)数量已经-1,相当于ticket释放到池中。 3. 如果waitTickets有等待ticket的请求,就直接通知其中的一个等待ticket的请求可以继续,然后等待请求从waitTickets删除,相当于将要释放的ticket直接移交给等待ticket的请求。 ```go func (r *NumLimiter) releaseTicket(t *Ticket) bool { r.mu.Lock() defer r.mu.Unlock() // 删除tickets中对应的数据 releaseSuccess := true if _, ok := r.tickets[t.reqKey]; ok { delete(r.tickets, t.reqKey) } else { releaseSuccess = false } // 如果waitTickets有等待ticket的请求 if len(r.waitTickets) > 0 { var req chan struct{} var reqKey int64 // 取出一条 for reqKey, req = range r.waitTickets { break } close(req) // 通过close方式,通知等待ticket的协程继续 delete(r.waitTickets, reqKey)// 从waitTickets删除 } return releaseSuccess } ``` 这里的通知方式采用close(req)的方式传输信号,相应在Reserve()方法的select case <-req等待的请求就会收到信号,继续执行,同时将获取到的ticket保存在tickets中,返回对应的ticket后,客户端获取到ticket就可以继续请求的处理。 另外,实际上releaseTicket方法是不直接暴露给客户端,而是提供给ticket的close方法调用: ```go func (r *Ticket) Close() { if !r.l.releaseTicket(r) { r.lg.Errorf("limiter ticket release error: req key = %d", r.reqKey) } } ``` 这样当获得到ticket后,客户端可以把这ticket对象传到方法,释放的时候就直接调用ticket的close方法,就不需要管NumLimiter对象。 最后增加一个**初始化方法**,方便实例化NumLimiter: ```go func New(maxTicket, maxWait int) *NumLimiter { l := &NumLimiter{ waitTickets: map[int64]chan struct{}{}, tickets: map[int64]*Ticket{}, maxTicket: maxTicket, maxWait: maxWait, } return l } ``` 这样一个完整限量的功能就完成了。 ## 总结 限量的实现是参考[database/sql](https://itart.cn/blogs/2021/explore/database-sql-pool.html) 设计,核心的思想是如何合理管理ticket,超出时借助chan实现等待,还有context实现超时,当ticket释放,通过close chan来实现广播,通知对应的等待请求可以继续。 我的博客:https://itart.cn/blogs/2022/practice/num-limiter-library.html

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

737 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传