rabbitmq 客户端golang实战

hiker_urey · · 12114 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

rabbitmq消息模式

rabbitmq中进行消息控制的组建可以分为以下几部分:

  1. exchange:rabbitmq中的路由部件,控制消息的转发路径;
  2. queue:rabbitmq的消息队列,可以有多个消费者从队列中读取消息;
  3. consumer:消息的消费者;

rabbitmq在使用过程中可以单独使用queue进行消息传递(例如celery就可以使用单个queue进行多对多的消息传递),也利用exchange与queue构建多种消息模式,主要包括fanout、direct和topic方式,模式的使用方式在此放一张图,不再此做详细解释。

rabbitmq

我在使用的rabbitmq的过程中,主要是进行消息的广播及主题订阅:

[producer] -> [exchange] ->fanout-> [queue of consumer] -> [consumer]
       |                             /|\
       ------->[exchange] ->topic------

不同的设备连接到rabbitmq中创建自己的queue,将queue绑定的两个不同的exchange,分别接收广播消息及主题消息。通过配置queue的持久化及消息过期时间,则可以在设备短暂下线的情况下,将消息缓存在queue中,之后上线后再从queue中读取消息。

rabbitmq客户端

rabbitmq客户端本质上是实现amqp协议的通信过程,golang的基础package使用的是github.com/streadway/amqp

在此主要对客户端构建中的一些问题进行陈述,详细的客户端构建代码请参见:rabbitmq_client.go

创建queue

exchange和queue实际上都是通过amqp协议进行创建的,如果在创建过程时,rabbitmq中已经有相同名称的exchange或queue但属性不则会创建失败。通常情况下exchange的属性不会变化,但是queue可能会修改过期时间、消息TTL等属性,因此实现过程中,若queue创建不成功则进行删除后再创建(在我的应用场景中queue与消费者绑定,因此不存在误删在使用中的queue的问题):

func (clt *Client) queInit(server *broker, ifFresh bool) (err error) {

	var num int
	ch := clt.ch

	if ifFresh {
		num, err = ch.QueueDelete(
			server.quePrefix+"."+clt.device,
			false,
			false,
			false,
		)
		if err != nil {
			return
		}
		log.Println("[RABBITMQ_CLIENT]", clt.device, "queue deleted with", num, "message purged")
	}

	args := make(amqp.Table)
	args["x-message-ttl"] = messageTTL
	args["x-expires"] = queueExpire
	q, err := ch.QueueDeclare(
		server.quePrefix+"."+clt.device, // name
		true,  // durable
		false, // delete when usused
		false, // exclusive
		false, // no-wait
		args,  // arguments
	)
    // 注意在此配置的两个参数,详细用意请参见 http://next.rabbitmq.com/ttl.html
	if err != nil {
		return
	}

	for _, topic := range server.topics {
		err = ch.QueueBind(
			q.Name,
			topic.keyPrefix+"."+clt.device,
			topic.chanName,
			false,
			nil,
		)
		if err != nil {
			return
		}
	}

	clt.que = q
	return
}

消息接收

对于消费者消息的接收过程如下所示:

msgs, err := clt.ch.Consume(
		clt.que.Name, // queue
		clt.device,   // consumer
		false,        // auto ack
		false,        // exclusive
		false,        // no local
		false,        // no wait
		nil,          // args
	)
	if err != nil {
		clt.Close()
		log.Println("[RABBITMQ_CLIENT]", "Start consume ERROR:", err)
		return nil
	}

	clt.msgs = msgs
	clt.pubChan = make(chan *publishMsg, 4)

	go func() {
		cc := make(chan *amqp.Error)
		e := <-clt.ch.NotifyClose(cc)
		log.Println("[RABBITMQ_CLIENT]", "channel close error:", e.Error())
		clt.cancel()
	}()

	go func() {
		for d := range msgs {
			msg := d.Body
			msgProcess(d.Exchange, msg)
			d.Ack(false)
		}
	}()

通过ch.Consume调用可以得到一个接收消息的msgs channel,在此没有配置auto ack,而是在消息处理结束之后,通过调用d.Ack(false)反馈ACK,这样可以保证消息在被处理之后,再进行确认。消费过程中,还调用ch.NotifyClose(cc)amqp.Channel的关闭进行侦听。

注意:在一个gorontinue中同时对msgs和notifyClose两个channel进行读取可能会导致死锁。因为msgs被关闭就会结束相应的gorontinue,此时notifyClose因为没有接收者,而在amqp.channel关闭的过程中出现死锁。

消息发送

在amqp的消息发送过程中,其对于消息的确认机制略有些蛋疼。因为在发送的时候不可配置发送的消息id,但在接收确认时,消息id是按照自然数递增的,也就是说发送者需要按照自然数递增的顺序自己维护发送的消息id。相关代码如下所示:

func (clt *Client) publishProc() {
	ticker := time.NewTicker(tickTime)
	deliveryMap := make(map[uint64]*publishMsg)

	defer func() {
		atomic.AddInt32(&clt.onPublish, -1)
		ticker.Stop()
		for _, msg := range deliveryMap {
			msg.ackErr = errCancel
			msg.cancel()
		}
	}()

	var deliveryTag uint64 = 1
	var ackTag uint64 = 1
	var pMsg *publishMsg
	for {
		select {

		case <-clt.ctx.Done():
			return

		case pMsg = <-clt.pubChan:
			pMsg.startTime = time.Now()
			err := clt.sendPublish(pMsg.topicId, pMsg.keySuffix, pMsg.msg, pMsg.expire)
			if err != nil {
				pMsg.ackErr = err
				pMsg.cancel()
			}
			deliveryMap[deliveryTag] = pMsg
			deliveryTag++

		case c, ok := <-clt.confirm:
			if !ok {
				log.Println("[RABBITMQ_CLIENT]", "client Publish notify channel error")
				return
			}
			pMsg = deliveryMap[c.DeliveryTag]
			// fmt.Println("DeliveryTag:", c.DeliveryTag)
			delete(deliveryMap, c.DeliveryTag)
			if c.Ack {
				pMsg.ackErr = nil
				pMsg.cancel()
			} else {
				pMsg.ackErr = errNack
				pMsg.cancel()
			}
		case <-ticker.C:
			now := time.Now()
			for {
				if len(deliveryMap) == 0 {
					break
				}
				pMsg = deliveryMap[ackTag]
				if pMsg != nil {
					if now.Sub(pMsg.startTime.Add(pubTime)) > 0 {
						pMsg.ackErr = errTimeout
						pMsg.cancel()
						delete(deliveryMap, ackTag)
					} else {
						break
					}
				}
				ackTag++
			}
		}
	}
}

发送过程的构造要点:

  1. 使用一个map[uint64]*publishMsg存储已经发送的消息,map的键为消息的id;
  2. 接收到确认消息后,通过消息的反馈机制反馈确认信息,并从map中删除消息;
  3. 在每一个tick,按照递增的id检查map中是否有超时消息,通过消息的反馈机制反馈超时信息;
  4. 在协程退出时向每个消息发送反馈信息,并删除消息。

需要注意的是,消息反馈并没有使用channel,因为消息的接收者可能因为超时不再侦听channel,从而导致发送过程出现阻塞。可以用长度不为0的反馈channel使得发送过程不阻塞,但是着需要等待gc后才能释放反馈channel的内存。因此在此并没有使用channel接收反馈,而是通过context的事件来告知发送方消息发送过程结束,反馈信息则提前写在publishMsgackErr中。

总结

作为golang的入门级选手,在实现rabbitmq客户端过程中还是踩了一些坑,最后的实现还是可以算是高效可靠。rabbitmq的库本身有心跳机制来维持与服务器之间的连接,但依据实现mqtt客户端的经验,还是自己实现了心跳来保障客户端上层连接的可靠性。因此在接收和发送两方面,该客户端实现还是经受住了考验,欢迎大家参考。


有疑问加站长微信联系(非本文作者)

本文来自:开源中国博客

感谢作者:hiker_urey

查看原文:rabbitmq 客户端golang实战

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

12114 次点击  
加入收藏 微博
1 回复  |  直到 2019-06-04 22:06:44
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传