当nsq跑起来之后, 我们可能会遇到以下问题
- 分布式部署
- 处理错误(何时requeue)
- 如何使用golang lib
抱着不应该只停留在入门的态度, 笔者粗浅的研究了一下这几个问题, 希望也对有同样疑问的人有帮助.
部署
由于NSQ的分布式网络结构, NSQD必须广播(到lookup)自己的地址并让消费者连接, 那么多个NSQD无法做透明负载均衡.
就必须为每一个NSQD分配单独的IP(或host)以保证消费者(在lookup找到NSQ节点)能够正确的连接. 这样部署起来可能会麻烦一些 但问题不大, 注意一下就好.
ps: 如果有更好的方法请告诉我, 小弟感激不尽.
NSQ Requeue And Backoff
建议结合官方文档来看
requeue(重试)
用于当错误发生, 需要重试时.
backoff(避退)
backoff能降低消费者吞吐量以让消费者从错误中恢复.
当消费者在backoff状态时, 这个消费者将不再处理任何消息, 直到backoff超时
当触发backoff时控制台将打印:
// 进入backoff状态, RDY设置为0代表准备接收0条消息(不接收消息) (协议详情看 https://nsq.io/clients/tcp_protocol_spec.html)
WRN 1 [test/test] backing off for 1m4s (backoff level 6), setting all to RDY 0
// 时间到了将设置RDY为1接收1条消息以测试状态, 官方将这个状态称为`tests the waters`
WRN 1 [test/test] (DESKTOP-HELJ7V4:4150) backoff timeout expired, sending RDY 1
当有多个消费者竞争时, 出错的消费者应当主动backoff不再处理消息(以让出更多的机会给其他消费者).
如果只有一个消费者, 则消费者会等到backoff超时后才开始处理消息(空出时间让消费者恢复).
避退是存在于整个消费者上的, 所以消费者每当一个消息处理失败了之后都会增加这个消费者的backoff level. 这会影响这个消费者的处理能力.
到底需不需要用backoff, 就要看业务了:
- 消息是用来更新数据库订单状态的, 这是一个不容易出错的逻辑, 如果需要requeue则需要backoff让出优先级, 让其他消费者来做, 尽量以挽救这个订单.
- 消息是用来通知第三方(如支付宝支付成功的http回调)的, 一般requeue是发生在第三方端响应不满足预期的响应, 这不是我方消费者的错误, 应当不使用backoff, 避免阻塞消息消费.
参考:
golang lib
nsq提供golang的client lib. 支持全部特性.
本着不重复造轮子原则, 我也想尽大可能的使用nsq lib里的代码逻辑来实现需求, 但有些需求它实现不了, 我也只好自己写代码了.
先看看它原有的几个逻辑
消息自动重试
// Handler is the message processing interface for Consumer
//
// Implement this interface for handlers that return whether or not message
// processing completed successfully.
//
// When the return value is nil Consumer will automatically handle FINishing.
//
// When the returned value is non-nil Consumer will automatically handle REQueing.
type Handler interface {
HandleMessage(message *Message) error
}
消息自动重试与判断失败
func (r *Consumer) handlerLoop(handler Handler) {
r.log(LogLevelDebug, "starting Handler")
for {
message, ok := <-r.incomingMessages
if !ok {
goto exit
}
if r.shouldFailMessage(message, handler) {
message.Finish()
continue
}
err := handler.HandleMessage(message)
if err != nil {
r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
if !message.IsAutoResponseDisabled() {
message.Requeue(-1)
}
continue
}
if !message.IsAutoResponseDisabled() {
message.Finish()
}
}
exit:
r.log(LogLevelDebug, "stopping Handler")
if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
r.exit()
}
}
判断失败
func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
// message passed the max number of attempts
if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
message.ID, message.Attempts)
logger, ok := handler.(FailedMessageLogger)
if ok {
logger.LogFailedMessage(message)
}
return true
}
return false
}
requeue
可以看到当handler返回的error不为空时, nsq将自动requeue, 这种重试是很方便但是
使用这个重试机制的坏处是:
- 不能自定义requeue的等待时间(默认等待时间=config.DefaultRequeueDelay*Attempts)
- 会在控制台打印一个ERR(不能自定义格式, 而且有一些err不应该打印到控制台), 这点可能有洁癖的开发者受不了.
- 一些错误不应该重试, 如入参不合法, 再怎么重试也是徒劳. 这时候应该
直接失败
.
所以我建议不要使用这个err机制, 而应当手动使用msg.Requeue(-1)
或者msg.RequeueWithoutBackoff(-1)
来显示指定requeue.
shouldFailMessage
我们可以使用 FailedMessageLogger interface自定义当消息失败时的处理方式.
但它的shouldFailMessage又有什么需求满足不了呢?
- 在失败的时候拿到最后一次错误信息
- shouldFailMessage只能判断处理重试次数过多的失败, 不能处理
直接失败
的消息.
所以又只有自己实现啦:
我们直接在Handler中判断Attempts来实现错误处理.
但为了保证我们的消息不被shouldFailMessage处理, 需要配置MaxAttempts为0或者一个比较大的数.
有疑问加站长微信联系(非本文作者)