漏斗算法思想是将所有请求先存到一个桶里。若此刻桶容量没满,表示当前请求是可以访问资源。若满了,则拒绝服务。同时桶会以固定速率取出桶里的请求来处理
具体实现方法可以将请求先暂存到一个队列中,若队列已满,则拒绝该请求。同时有一个周期性定时任务来消费队列里的数据
从实现可以看出,不管请求有多少或者瞬时流量有多大,请求的处理是固定速率的,所以令牌桶油流量整形的功能
相对计数器方法,令牌桶能有效避免抖动的问题,但当瞬时请求量很大时,后续的请求很有可能由于得不到及时处理而超时
import ( "context" "errors" "github.com/google/uuid" "sync" "time" ) type Request interface{} type Command struct { id string request Request } type Micros int64 type Handler func(request Request) (error, interface{}) type Flow interface { Do(ctx context.Context, request Request, timeout time.Duration) (error, interface{}) } type CommandResult struct { err error result interface{} } func NewCommandResult(err error, resp interface{}) *CommandResult { return &CommandResult{ err: err, result: resp, } } type TokenBucket struct { commandInterval Micros commands chan *Command maxBucket int ticker *time.Ticker l sync.Mutex handler Handler dones map[string]chan *CommandResult } func NewCommand(request Request) *Command { return &Command{ id: GetUUID(), request: request, } } func GetUUID() string { uid, err := uuid.NewUUID() if err != nil { panic(errors.New("uuid get fail")) } return uid.String() } func (token *TokenBucket) register(id string) chan *CommandResult { token.l.Lock() defer token.l.Unlock() done := token.dones[id] if done == nil { done = make(chan *CommandResult, 1) token.dones[id] = done } return done } func (token *TokenBucket) isRegister(id string) bool { token.l.Lock() defer token.l.Unlock() _, ok := token.dones[id] return ok } func (token *TokenBucket) trigger(id string, v *CommandResult) { token.l.Lock() ch := token.dones[id] delete(token.dones, id) token.l.Unlock() if ch != nil { ch <- v close(ch) } } func (token *TokenBucket) Do(ctx context.Context, request Request, timeout time.Duration) (error, interface{}) { command := NewCommand(request) done := token.register(command.id) select { case token.commands <- command: default: println("触发限流,time:" + time.Now().String() + "," + request.(string)) return errors.New("触发限流"), nil } cctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() select { case resp := <-done: return resp.err, resp.result case <-cctx.Done(): println("请求超时,time:" + time.Now().String() + "," + request.(string)) token.trigger(command.id, nil) return errors.New("请求超时,time:" + time.Now().String()), nil } } func (token *TokenBucket) start() { for range token.ticker.C { select { case command := <-token.commands: if token.isRegister(command.id) { err, resp := token.handler(command.request) token.trigger(command.id, NewCommandResult(err, resp)) } default: } } } func NewTokenBucket(maxBuctet int /*桶里的最大请求数*/, commandInterval Micros /*请求处理的时间间隔,单位是微秒*/, handler Handler/*请求的处理方法*/) Flow { tokenBucket := &TokenBucket{ commandInterval: commandInterval, commands: make(chan *Command, maxBuctet), maxBucket: maxBuctet, handler: handler, dones: make(map[string]chan *CommandResult), ticker: time.NewTicker(time.Microsecond * time.Duration(commandInterval)), } go tokenBucket.start() return tokenBucket }复制代码
具体使用方法:
token := NewTokenBucket(5, 100000, func(request Request) (e error, i interface{}) { println("handler:" + request.(string) + "," + time.Now().String()) return nil, request }) token.Do(context.Background(), "test"+strconv.Itoa(t), time.Millisecond*500)复制代码
有疑问加站长微信联系(非本文作者)