如何限制http.HandleFunc并发数量?

leenzhu · 2023-09-22 09:40:59 · 4377 次点击 · 大约8小时之前 开始浏览    置顶
这是一个创建于 2023-09-22 09:40:59 的主题,其中的信息可能已经有所发展或是发生改变。

func xHandle(w http.ResponseWriter, r *http.Request) {
    fmt.Printf("new request: %p\n", r)
    w.Write([]byte("Hello\n"))
    for {}
}

func main() {
    http.HandleFunc("/", xHandle)
    http.ListenAndServe(":8080", nil)
}

对于每一个http请求,http服务都会启动一个go routine 来调用xHandle函数。目前搜到的资料都是在xHandle内部做并发数控制,就是说等http服务启go routine后,发现超限了再直接退出。

我想问的是如何从源头上控制http服务的go routine数量,也是当go routine到达上限后,都不用启动go routine回调xHandle,直接在底层响应500给客户端?


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

4377 次点击  ∙  1 赞  
加入收藏 微博
13 回复  |  直到 2023-12-28 23:32:27
GGXXLL
GGXXLL · #1 · 大约1年之前
package main

import (
    "fmt"
    "net/http"
    "net/http/httptest"
    "time"

    "golang.org/x/time/rate"
)

type LimiterOption struct {
    lm *rate.Limiter
}

func WithLimiter(duration time.Duration, count int) func(o *LimiterOption) {
    return func(o *LimiterOption) {
        o.lm = rate.NewLimiter(rate.Every(duration), count)
    }
}

// LimiterWrap 每个 handler 单独限制
func LimiterWrap(f http.HandlerFunc, opts ...func(o *LimiterOption)) http.HandlerFunc {
    o := &LimiterOption{
        lm: rate.NewLimiter(rate.Every(100*time.Millisecond), 10),
    }
    for _, opt := range opts {
        opt(o)
    }

    return func(w http.ResponseWriter, r *http.Request) {
        if !o.lm.Allow() {
            w.WriteHeader(http.StatusInternalServerError)
            return
        }
        f(w, r)
    }
}

func xHandle(w http.ResponseWriter, r *http.Request) {
    w.Write([]byte("Hello\n"))
}

func main() {
    // 单独控制一个接口
    http.HandleFunc("/", LimiterWrap(xHandle, WithLimiter(100*time.Millisecond, 1)))

    // 所有接口都走同一个限流
    // http.ListenAndServe(":8080", LimiterWrap(http.DefaultServeMux.ServeHTTP))

    for i := 0; i < 10; i++ {
        w := httptest.NewRecorder()
        req, _ := http.NewRequest(http.MethodGet, "/", nil)
        http.DefaultServeMux.ServeHTTP(w, req)
        fmt.Println(i, w.Code)
        time.Sleep(time.Millisecond * 20)
    }
    //output
    //0 200
    //1 500
    //2 500
    //3 500
    //4 500
    //5 200
    //6 500
    //7 500
    //8 500
    //9 500
}
lxzan
lxzan · #2 · 大约1年之前
package main

func success() {}

func fail() {}

func main() {
    c := make(chan struct{}, 16)

    select {
    case c <- struct{}{}:
        success()
        <-c
    default:
        fail()
    }
}
leenzhu
leenzhu · #3 · 大约1年之前
GGXXLLGGXXLL #1 回复

``` package main import ( "fmt" "net/http" "net/http/httptest" "time" "golang.org/x/time/rate" ) type LimiterOption struct { lm *rate.Limiter } func WithLimiter(duration time.Duration, count int) func(o *LimiterOption) { return func(o *LimiterOption) { o.lm = rate.NewLimiter(rate.Every(duration), count) } } // LimiterWrap 每个 handler 单独限制 func LimiterWrap(f http.HandlerFunc, opts ...func(o *LimiterOption)) http.HandlerFunc { o := &LimiterOption{ lm: rate.NewLimiter(rate.Every(100*time.Millisecond), 10), } for _, opt := range opts { opt(o) } return func(w http.ResponseWriter, r *http.Request) { if !o.lm.Allow() { w.WriteHeader(http.StatusInternalServerError) return } f(w, r) } } func xHandle(w http.ResponseWriter, r *http.Request) { w.Write([]byte("Hello\n")) } func main() { // 单独控制一个接口 http.HandleFunc("/", LimiterWrap(xHandle, WithLimiter(100*time.Millisecond, 1))) // 所有接口都走同一个限流 // http.ListenAndServe(":8080", LimiterWrap(http.DefaultServeMux.ServeHTTP)) for i := 0; i < 10; i++ { w := httptest.NewRecorder() req, _ := http.NewRequest(http.MethodGet, "/", nil) http.DefaultServeMux.ServeHTTP(w, req) fmt.Println(i, w.Code) time.Sleep(time.Millisecond * 20) } //output //0 200 //1 500 //2 500 //3 500 //4 500 //5 200 //6 500 //7 500 //8 500 //9 500 } ```

@GGXXLL 感谢指点,我没有描述清我的问题。我看了下源码,我的需求该是无解的。我想要的效果在资源超限时阻止创建go routine,也就是下面代码最后一行不要执行:go c.serve(connCtx)

现在看来就是来了一个请求,只能先执行这行代码创建go routine,进入limit-handler,然后再退出了。

func (srv *Server) Serve(l net.Listener) error {
    if fn := testHookServerServe; fn != nil {
        fn(srv, l) // call hook with unwrapped listener
    }

    origListener := l
    l = &onceCloseListener{Listener: l}
    defer l.Close()

    if err := srv.setupHTTP2_Serve(); err != nil {
        return err
    }

    if !srv.trackListener(&l, true) {
        return ErrServerClosed
    }
    defer srv.trackListener(&l, false)

    baseCtx := context.Background()
    if srv.BaseContext != nil {
        baseCtx = srv.BaseContext(origListener)
        if baseCtx == nil {
            panic("BaseContext returned a nil context")
        }
    }

    var tempDelay time.Duration // how long to sleep on accept failure

    ctx := context.WithValue(baseCtx, ServerContextKey, srv)
    for {
        rw, err := l.Accept()
        if err != nil {
            if srv.shuttingDown() {
                return ErrServerClosed
            }
            if ne, ok := err.(net.Error); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay > max {
                    tempDelay = max
                }
                srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay)
                time.Sleep(tempDelay)
                continue
            }
            return err
        }
        connCtx := ctx
        if cc := srv.ConnContext; cc != nil {
            connCtx = cc(connCtx, rw)
            if connCtx == nil {
                panic("ConnContext returned nil")
            }
        }
        tempDelay = 0
        c := srv.newConn(rw)
        c.setState(c.rwc, StateNew, runHooks) // before Serve can return
        go c.serve(connCtx)
    }
}
leenzhu
leenzhu · #4 · 大约1年之前

@GGXXLL 感谢指点,我没有描述清我的问题。我看了下源码,我的需求该是无解的。我想要的效果在资源超限时阻止创建go routine,也就是下面代码最后一行不要执行:go c.serve(connCtx)

现在看来就是来了一个请求,只能先执行这行代码创建go routine,进入limit-handler,然后再退出了。

func (srv *Server) Serve(l net.Listener) error {
    if fn := testHookServerServe; fn != nil {
        fn(srv, l) // call hook with unwrapped listener
    }

    origListener := l
    l = &onceCloseListener{Listener: l}
    defer l.Close()

    if err := srv.setupHTTP2_Serve(); err != nil {
        return err
    }

    if !srv.trackListener(&l, true) {
        return ErrServerClosed
    }
    defer srv.trackListener(&l, false)

    baseCtx := context.Background()
    if srv.BaseContext != nil {
        baseCtx = srv.BaseContext(origListener)
        if baseCtx == nil {
            panic("BaseContext returned a nil context")
        }
    }

    var tempDelay time.Duration // how long to sleep on accept failure

    ctx := context.WithValue(baseCtx, ServerContextKey, srv)
    for {
        rw, err := l.Accept()
        if err != nil {
            if srv.shuttingDown() {
                return ErrServerClosed
            }
            if ne, ok := err.(net.Error); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay > max {
                    tempDelay = max
                }
                srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay)
                time.Sleep(tempDelay)
                continue
            }
            return err
        }
        connCtx := ctx
        if cc := srv.ConnContext; cc != nil {
            connCtx = cc(connCtx, rw)
            if connCtx == nil {
                panic("ConnContext returned nil")
            }
        }
        tempDelay = 0
        c := srv.newConn(rw)
        c.setState(c.rwc, StateNew, runHooks) // before Serve can return
        go c.serve(connCtx)
    }
}
GGXXLL
GGXXLL · #5 · 大约1年之前

@leenzhu 确实。实际上即使起了 goroutine 调用 handle 了,也直接返回了,开销很小,所以不太懂为啥要限制这个呀。

hcnyf
hcnyf · #6 · 大约1年之前

可以用一个定长的channel配合select实现吧

leenzhu
leenzhu · #7 · 大约1年之前
GGXXLLGGXXLL #5 回复

@leenzhu 确实。实际上即使起了 goroutine 调用 handle 了,也直接返回了,开销很小,所以不太懂为啥要限制这个呀。

假设我的limit-handler没写好,不能正常退出或者处理速度过慢,那么当请求快速源源不断到来时,就有可能导致系统内有无数个go-routine。如果不能即时消化这些go-routine可能就会导致内存爆掉了。

不过如果limit-handler写得足够健壮,这个担心应该是多余的:smile:

lxj15398019970
lxj15398019970 · #8 · 大约1年之前

fasthttt的话,可以设置 Concurrency

lxj15398019970
lxj15398019970 · #9 · 大约1年之前

fasthttp的话,可以设置 Concurrency

fuhuizn
fuhuizn · #10 · 大约1年之前
// 限制1000个连接
const limitN = 1000
var limit = make(chan int8, limitN)

func init(){
    for i:=0;i
fuhuizn
fuhuizn · #11 · 大约1年之前
// 限制1000个连接
const limitN = 1000
var limit = make(chan int8, limitN)

func init(){
    for i:=0;i
xiaoxia2020
xiaoxia2020 · #12 · 大约1年之前

楼主的那个效果只能改源码

aperer
aperer · #13 · 大约1年之前

channel 应该可以实现,在goroutine启动之前从定长的channel里取值,取到就会继续执行,没取到就会阻塞,需要限制多少并发,就建多大的goroutine

添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传