rabbitmq消息模式
rabbitmq中进行消息控制的组建可以分为以下几部分:
- exchange:rabbitmq中的路由部件,控制消息的转发路径;
- queue:rabbitmq的消息队列,可以有多个消费者从队列中读取消息;
- consumer:消息的消费者;
rabbitmq在使用过程中可以单独使用queue进行消息传递(例如celery就可以使用单个queue进行多对多的消息传递),也利用exchange与queue构建多种消息模式,主要包括fanout、direct和topic方式,模式的使用方式在此放一张图,不再此做详细解释。
我在使用的rabbitmq的过程中,主要是进行消息的广播及主题订阅:
[producer] -> [exchange] ->fanout-> [queue of consumer] -> [consumer]
| /|\
------->[exchange] ->topic------
不同的设备连接到rabbitmq中创建自己的queue,将queue绑定的两个不同的exchange,分别接收广播消息及主题消息。通过配置queue的持久化及消息过期时间,则可以在设备短暂下线的情况下,将消息缓存在queue中,之后上线后再从queue中读取消息。
rabbitmq客户端
rabbitmq客户端本质上是实现amqp协议的通信过程,golang的基础package使用的是github.com/streadway/amqp
。
在此主要对客户端构建中的一些问题进行陈述,详细的客户端构建代码请参见:rabbitmq_client.go
创建queue
exchange和queue实际上都是通过amqp协议进行创建的,如果在创建过程时,rabbitmq中已经有相同名称的exchange或queue但属性不则会创建失败。通常情况下exchange的属性不会变化,但是queue可能会修改过期时间、消息TTL等属性,因此实现过程中,若queue创建不成功则进行删除后再创建(在我的应用场景中queue与消费者绑定,因此不存在误删在使用中的queue的问题):
func (clt *Client) queInit(server *broker, ifFresh bool) (err error) {
var num int
ch := clt.ch
if ifFresh {
num, err = ch.QueueDelete(
server.quePrefix+"."+clt.device,
false,
false,
false,
)
if err != nil {
return
}
log.Println("[RABBITMQ_CLIENT]", clt.device, "queue deleted with", num, "message purged")
}
args := make(amqp.Table)
args["x-message-ttl"] = messageTTL
args["x-expires"] = queueExpire
q, err := ch.QueueDeclare(
server.quePrefix+"."+clt.device, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
args, // arguments
)
// 注意在此配置的两个参数,详细用意请参见 http://next.rabbitmq.com/ttl.html
if err != nil {
return
}
for _, topic := range server.topics {
err = ch.QueueBind(
q.Name,
topic.keyPrefix+"."+clt.device,
topic.chanName,
false,
nil,
)
if err != nil {
return
}
}
clt.que = q
return
}
消息接收
对于消费者消息的接收过程如下所示:
msgs, err := clt.ch.Consume(
clt.que.Name, // queue
clt.device, // consumer
false, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
clt.Close()
log.Println("[RABBITMQ_CLIENT]", "Start consume ERROR:", err)
return nil
}
clt.msgs = msgs
clt.pubChan = make(chan *publishMsg, 4)
go func() {
cc := make(chan *amqp.Error)
e := <-clt.ch.NotifyClose(cc)
log.Println("[RABBITMQ_CLIENT]", "channel close error:", e.Error())
clt.cancel()
}()
go func() {
for d := range msgs {
msg := d.Body
msgProcess(d.Exchange, msg)
d.Ack(false)
}
}()
通过ch.Consume
调用可以得到一个接收消息的msgs channel,在此没有配置auto ack,而是在消息处理结束之后,通过调用d.Ack(false)
反馈ACK,这样可以保证消息在被处理之后,再进行确认。消费过程中,还调用ch.NotifyClose(cc)
对amqp.Channel
的关闭进行侦听。
注意:在一个gorontinue中同时对msgs和notifyClose两个channel进行读取可能会导致死锁。因为msgs被关闭就会结束相应的gorontinue,此时notifyClose因为没有接收者,而在amqp.channel
关闭的过程中出现死锁。
消息发送
在amqp的消息发送过程中,其对于消息的确认机制略有些蛋疼。因为在发送的时候不可配置发送的消息id,但在接收确认时,消息id是按照自然数递增的,也就是说发送者需要按照自然数递增的顺序自己维护发送的消息id。相关代码如下所示:
func (clt *Client) publishProc() {
ticker := time.NewTicker(tickTime)
deliveryMap := make(map[uint64]*publishMsg)
defer func() {
atomic.AddInt32(&clt.onPublish, -1)
ticker.Stop()
for _, msg := range deliveryMap {
msg.ackErr = errCancel
msg.cancel()
}
}()
var deliveryTag uint64 = 1
var ackTag uint64 = 1
var pMsg *publishMsg
for {
select {
case <-clt.ctx.Done():
return
case pMsg = <-clt.pubChan:
pMsg.startTime = time.Now()
err := clt.sendPublish(pMsg.topicId, pMsg.keySuffix, pMsg.msg, pMsg.expire)
if err != nil {
pMsg.ackErr = err
pMsg.cancel()
}
deliveryMap[deliveryTag] = pMsg
deliveryTag++
case c, ok := <-clt.confirm:
if !ok {
log.Println("[RABBITMQ_CLIENT]", "client Publish notify channel error")
return
}
pMsg = deliveryMap[c.DeliveryTag]
// fmt.Println("DeliveryTag:", c.DeliveryTag)
delete(deliveryMap, c.DeliveryTag)
if c.Ack {
pMsg.ackErr = nil
pMsg.cancel()
} else {
pMsg.ackErr = errNack
pMsg.cancel()
}
case <-ticker.C:
now := time.Now()
for {
if len(deliveryMap) == 0 {
break
}
pMsg = deliveryMap[ackTag]
if pMsg != nil {
if now.Sub(pMsg.startTime.Add(pubTime)) > 0 {
pMsg.ackErr = errTimeout
pMsg.cancel()
delete(deliveryMap, ackTag)
} else {
break
}
}
ackTag++
}
}
}
}
发送过程的构造要点:
- 使用一个
map[uint64]*publishMsg
存储已经发送的消息,map的键为消息的id; - 接收到确认消息后,通过消息的反馈机制反馈确认信息,并从map中删除消息;
- 在每一个tick,按照递增的id检查map中是否有超时消息,通过消息的反馈机制反馈超时信息;
- 在协程退出时向每个消息发送反馈信息,并删除消息。
需要注意的是,消息反馈并没有使用channel,因为消息的接收者可能因为超时不再侦听channel,从而导致发送过程出现阻塞。可以用长度不为0的反馈channel使得发送过程不阻塞,但是着需要等待gc后才能释放反馈channel的内存。因此在此并没有使用channel接收反馈,而是通过context
的事件来告知发送方消息发送过程结束,反馈信息则提前写在publishMsg
的ackErr
中。
总结
作为golang的入门级选手,在实现rabbitmq客户端过程中还是踩了一些坑,最后的实现还是可以算是高效可靠。rabbitmq的库本身有心跳机制来维持与服务器之间的连接,但依据实现mqtt客户端的经验,还是自己实现了心跳来保障客户端上层连接的可靠性。因此在接收和发送两方面,该客户端实现还是经受住了考验,欢迎大家参考。