使用nsq的时候,handler阻塞导致整个阻塞

阅读 1182 次  1 条评论
使用nsq的时候遇到这样一个问题: AddConcurrentHandlers(handler, 8)的时候,已经有多个协程处理收到的消息。然而,当其中一个handle发生阻塞时,会导致整个Consumer阻塞,runtime.GOMAXPROCS(runtime.NumCPU()) 代码调用了。下面的代码也看不出问题,请教一下为什么会发生阻塞? ``` // AddConcurrentHandlers sets the Handler for messages received by this Consumer. It // takes a second argument which indicates the number of goroutines to spawn for // message handling. // // This panics if called after connecting to NSQD or NSQ Lookupd // // (see Handler or HandlerFunc for details on implementing this interface) func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) { if atomic.LoadInt32(&r.connectedFlag) == 1 { panic("already connected") } atomic.AddInt32(&r.runningHandlers, int32(concurrency)) for i := 0; i < concurrency; i++ { go r.handlerLoop(handler) } } func (r *Consumer) handlerLoop(handler Handler) { r.log(LogLevelDebug, "starting Handler") for { message, ok := <-r.incomingMessages if !ok { goto exit } if r.shouldFailMessage(message, handler) { message.Finish() continue } err := handler.HandleMessage(message) if err != nil { r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID) if !message.IsAutoResponseDisabled() { message.Requeue(-1) } continue } if !message.IsAutoResponseDisabled() { message.Finish() } } exit: r.log(LogLevelDebug, "stopping Handler") if atomic.AddInt32(&r.runningHandlers, -1) == 0 { r.exit() } } ```

1条回复

主题回复:

(您需要 登录 后才能回复 没有账号 ?)
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet