使用nsq的时候遇到这样一个问题:
AddConcurrentHandlers(handler, 8)的时候,已经有多个协程处理收到的消息。然而,当其中一个handle发生阻塞时,会导致整个Consumer阻塞,runtime.GOMAXPROCS(runtime.NumCPU()) 代码调用了。下面的代码也看不出问题,请教一下为什么会发生阻塞?
```
// AddConcurrentHandlers sets the Handler for messages received by this Consumer. It
// takes a second argument which indicates the number of goroutines to spawn for
// message handling.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
if atomic.LoadInt32(&r.connectedFlag) == 1 {
panic("already connected")
}
atomic.AddInt32(&r.runningHandlers, int32(concurrency))
for i := 0; i < concurrency; i++ {
go r.handlerLoop(handler)
}
}
func (r *Consumer) handlerLoop(handler Handler) {
r.log(LogLevelDebug, "starting Handler")
for {
message, ok := <-r.incomingMessages
if !ok {
goto exit
}
if r.shouldFailMessage(message, handler) {
message.Finish()
continue
}
err := handler.HandleMessage(message)
if err != nil {
r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
if !message.IsAutoResponseDisabled() {
message.Requeue(-1)
}
continue
}
if !message.IsAutoResponseDisabled() {
message.Finish()
}
}
exit:
r.log(LogLevelDebug, "stopping Handler")
if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
r.exit()
}
}
```
这是因为你在AddConcurrentHandlers()中添加的handler没有主动调用msg.Finish()造成的,一个handler接收玩消息后应该及时调用Finish()否则别的goroutine取不到下一条消息,所以就阻塞了,跟nsq的设计理念有关.
#1