Golang如何使用消息队列rabbitmq

mb6066e504cce6f · · 1023 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

AMQP协议

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。

RabbitMQ 就是 amqp 协议的Erlang的实现。

AMQP的模型架构的主要角色,生产者、消费者、交换器、队列。

生产者、消费者、服务节点

  • 生产者(Producter) 消息投递方

  • 消费者(Consumer) 消息接收方

  • 服务节点(Broker) 消息的服务节点,基本上可以简单的把一个broker看成一台消息服务器

图片

交换器、队列、绑定

绑定

Rabbitmq中需要路由键和绑定键联合使用才能使生产者成功投递到队列中去。

  • RoutingKey 生产者发送给交换器绑定的Key

  • BindingKey 交换器和队列绑定的Key

生产者将消息投递到交换器,通过交换器绑定的队列,最终投递到对应的队列中去。
图片

交换器

Rabbitmq共有4种交换器

  • fanout 把消息投递到所有与此交换器绑定的队列中

  • direct 把消息投递到 BindingKey 和 RoutingKey 完全匹配的队列中

  • topic 规则匹配,BindingKey中存在两种特殊字符

    • *匹配零个或多个单词

    • #匹配一个单词

  • header 不依赖于RoutingKey而是通过消息体中的headers属性来进行匹配绑定,通过headers中的key和BindingKey完全匹配,由于性能较差一般用的比较少。

基本使用

在Golang中创建rabbitmq 生产者基本步骤是:

  1. 连接Connection

  2. 创建Channel

  3. 创建或连接一个交换器

  4. 创建或连接一个队列

  5. 交换器绑定队列

  6. 投递消息

  7. 关闭Channel

  8. 关闭Connection

连接

// connection
connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
   panic(err)
}

// channel
channel, err := connection.Channel()
if err != nil {
   panic(err)
}

创建一个交换器

if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil {
   panic(err)
}

参数解析:

  • name 交换机名称

  • kind 交换机类型

  • durable 持久化

  • autoDelete 是否自动删除

  • internal 是否是内置交换机

  • noWait 是否等待服务器确认

  • args 其它配置

参数说明要点:

  • autoDelete:

自动删除功能必须要在交换器曾经绑定过队列或者交换器的情况下,处于不再使用的时候才会自动删除,
如果是刚刚创建的尚未绑定队列或者交换器的交换器或者早已创建只是未进行队列或者交换器绑定的交换器是不会自动删除的。

  • internal:

内置交换器是一种特殊的交换器,这种交换器不能直接接收生产者发送的消息,
只能作为类似于队列的方式绑定到另一个交换器,来接收这个交换器中路由的消息,
内置交换器同样可以绑定队列和路由消息,只是其接收消息的来源与普通交换器不同。

  • noWait

当noWait为true时,声明时无需等待服务器的确认。
该通道可能由于错误而关闭。添加一个NotifyClose侦听器应对任何异常。

创建交换器还有一个差不多的方法(ExchangeDeclarePassive),他主要是假定交换已存在,并尝试连接到
不存在的交换将导致RabbitMQ引发异常,可用于检测交换的存在。

创建一个队列

if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil {
   panic(err)
}

参数解析:

  • name 队列名称

  • durable 持久化

  • autoDelete 自动删除

  • exclusive 排他

  • noWait 是否等待服务器确认

  • args Table

参数说明要点:

  • exclusive 排他

排他队列只对首次创建它的连接可见,排他队列是基于连接(Connection)可见的,并且该连接内的所有信道(Channel)都可以访问这个排他队列,在这个连接断开之后,该队列自动删除,由此可见这个队列可以说是绑到连接上的,对同一服务器的其他连接不可见。
同一连接中不允许建立同名的排他队列的
这种排他优先于持久化,即使设置了队列持久化,在连接断开后,该队列也会自动删除。
非排他队列不依附于连接而存在,同一服务器上的多个连接都可以访问这个队列。

  • autoDelete 设置是否自动删除。

为true则设置队列为自动删除。
自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
不能把这个参数错误地理解为:"当连接到此队列的所有客户端断开时,这个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。

创建队列还有一个差不多的方法(QueueDeclarePassive),他主要是假定队列已存在,并尝试连接到
不存在的队列将导致RabbitMQ引发异常,可用于检测队列的存在。

绑定交换器和队列

if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil {
   panic(err)
}

参数解析:

  • name 队列名称

  • key BindingKey 根据交换机类型来设定

  • exchange 交换机名称

  • noWait 是否等待服务器确认

  • args Table

绑定交换器

if err = channel.ExchangeBind("dest", "q1Key", "src", false, nil); err != nil {
   panic(err)
}

参数解析:

  • destination 目的交换器

  • key RoutingKey 路由键

  • source 源交换器

  • noWait 是否等待服务器确认

  • args Table 其它参数

生产者发送消息至交换器source中,交换器source根据路由键找到与其匹配的另一个交换器destination,井把消息转发到destination中,进而存储在.destination绑定的队列queue中,某种程度上来说destination交换器可以看作一个队列。如图:

投递消息

if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{
   Timestamp:   time.Now(),
   ContentType: "text/plain",
   Body:        []byte("Hello Golang and AMQP(Rabbitmq)!"),
}); err != nil {
   panic(err)
}

参数解析:

  • exchange 交换器名称

  • key RouterKey

  • mandatory 是否为无法路由的消息进行返回处理

  • immediate 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃

  • msg 消息体

参数说明要点:

  • mandatory

消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式,
设置为 true 表示将消息返回到生产者,否则直接丢弃消息。

  • immediate

参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。imrnediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。

RabbitMQ 3.0版本开始去掉了对imrnediate参数的支持

消费信息

Rabbitmq消费方式共有2种,分别是推模式和拉模式

推模式是通过持续订阅的方式来消费信息,
Consume将信道(Channel)直为接收模式,直到取消队列的订阅为止。在接收模式期间,RabbitMQ会不断地推送消息给消费者。
推送消息的个数还是会受到channel.Qos的限制

deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil)
if err != nil {
   panic(err)
}

如果ack设置为false则表示需要手动进行ack消费

v, ok := <-deliveries
if ok {
   // 手动ack确认
   // 注意:这里只要调用了ack就是手动确认模式,
   // multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
   // 并不是表示设置为false就不进行当前ack确认
   if err := v.Ack(true); err != nil {
       fmt.Println(err.Error())
   }
} else {
   fmt.Println("Channel close")
}

参数解析:

  • queue 队列名称

  • consumer 消息者名称

  • autoAck 是否确认消费

  • exclusive 排他

  • noLocal

  • noWait bool

  • args Table

参数说明要点:

  • noLocal

设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者

拉模式:
相对来说比较简单,是由消费者主动拉取信息来消费,同样也需要进行ack确认消费

channel.Get(queue string, autoAck bool)

简单示例Demo

下面是一个简单示例,只是为了通信测试,单条数据收发

func Connection() (*amqp.Connection) {
   conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
   if err != nil {
       panic(err)
   }
   return conn
}

func Sample() {
   var wg sync.WaitGroup
   wg.Add(1)
   go SampleConsumption(&wg)

   // 创建连接
   connection := Connection()
   defer connection.Close()

   // 开启 channel
   channel, err := connection.Channel()
   if err != nil {
       panic(err)
   }

   defer channel.Close()

   if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil {
       panic(err)
   }

   if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil {
       panic(err)
   }

   if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil {
       panic(err)
   }

   // mandatory true 未找到队列返回给消费者
   returnChan := make(chan amqp.Return,0)
   channel.NotifyReturn(returnChan)

   // Publish
   if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{
       Timestamp:   time.Now(),
       ContentType: "text/plain",
       Body:        []byte("Hello Golang and AMQP(Rabbitmq)!"),
   }); err != nil {
       panic(err)
   }

   //for v := range returnChan{
   //    fmt.Printf("Return %#v\n",v)
   //}
   
   wg.Wait()
}

func SampleConsumption(wg *sync.WaitGroup) {
   connection := Connection()
   defer connection.Close()

   channel, err := connection.Channel()
   if err != nil {
       panic(err)
   }

   defer channel.Close()
   
   deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil)
   if err != nil {
       panic(err)
   }

   // 这里只取一条,因为product只发一条
   v, ok := <-deliveries
   if ok {
       if err := v.Ack(true); err != nil {
           fmt.Println(err.Error())
       }
   } else {
       fmt.Println("Channel close")
   }
   wg.Done()
}

来源:https://blog.crcms.cn/2019/09/29/go-ioc/



有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:mb6066e504cce6f

查看原文:Golang如何使用消息队列rabbitmq

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1023 次点击  ∙  1 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传