RabbitMQ 概述
RabbitMQ是采用Erlang编程语言实现了高级消息队列协议AMQP
(Advanced Message Queuing Protocol)的开源消息代理软件(消息队列中间件)
市面上流行的消息队列中间件有很多种,而RabbitMQ只是其中比较流行的一种
我们简单说说消息队列中间件的作用
- 解耦
- 削峰
- 异步处理
- 缓存存储
- 消息通信
- 提高系统拓展性
RabbitMQ 特点
-
可靠性
通过一些机制例如,持久化,传输确认等来确保消息传递的可靠性
-
拓展性
多个RabbitMQ节点可以组成集群
-
高可用性
队列可以在RabbitMQ集群中设置镜像,如此一来即使部分节点挂掉了,但是队列仍然可以使用
-
多种协议
-
丰富的客户端
我们常用的编程语言都支持RabbitMQ
-
管理界面
自带提供一个WEB管理界面
-
插件机制
RabbitMQ 自己提供了很多插件,可以按需要进行拓展 Plugins
RabbitMQ基础概念
总体上看RabbitMQ是一个生产者和消费者的模型,
接收
,存储
,转发
我们看看在RabbitMQ中的几个主要概念
Producer (生产者) : 消息的生产者,投递方
Consumer (消费者) : 消息的消费者
RabbitMQ Broker (RabbitMQ 代理) : RabbitMQ 服务节点(单机情况中,就是代表RabbitMQ服务器)
Queue (队列) : 在RabbitMQ中Queue是存储消息数据的唯一形式
Binding (绑定) : RabbitMQ中绑定(Binding)是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。
-
RoutingKey (路由键) : 消息投递给交换器,通常会指定一个
RoutingKey
,通过这个路由键来明确消息的路由规则RoutingKey 通常是生产者和消费者有协商一致的key策略,消费者就可以合法从生产者手中获取数据。这个RoutingKey主要当Exchange交换机模式为设定为direct和topic模式的时候使用,fanout模式不使用RoutingKey
-
Exchange (交换机) : 生产者将消息发送给交换器(交换机),再由交换器将消息路由导对应的队列中
交换机四种类型 : fanout,direct,topic,headers
-
fanout (扇形交换机) :
将发送到该类型交换机的消息(message)路由到所有的与该交换机绑定的队列中,如同一个"扇"状扩散给各个队列
fanout类型的交换机会忽略
RoutingKey
的存在,将message直接"广播"到绑定的所有队列中 -
-
direct (直连交换机) :
根据消息携带的路由键(RoutingKey) 将消息投递到对应的队列中
direct类型的交换机(exchange)是RabbitMQ Broker的默认类型,它有一个特别的属性对一些简单的应用来说是非常有用的,在使用这个类型的Exchange时,可以不必指定routing key的名字,在此类型下创建的Queue有一个默认的routing key,这个routing key一般同Queue同名.
-
Topic (主题交换机) :
topic类型交换机在
RoutingKey
和BindKey
匹配规则上更加的灵活. 同样是将消息路由到RoutingKey
和BindingKey
相匹配的队列中,但是匹配规则有如下的特点 :规则1:
RoutingKey
是一个使用.
的字符串 例如: "go.log.info" , "java.log.error"规则2:
BingingKey
也会一个使用.
分割的字符串, 但是在BindingKey
中可以使用两种特殊字符*
和#
,其中 "*" 用于匹配一个单词,"#"用于匹配多规格单词(零个或者多个单词)
RoutingKey和BindingKey 是一种"模糊匹配" ,那么一个消息Message可能 会被发送到一个或者多个队列中
无法匹配的消息将会被丢弃或者返回者生产者
-
Headers (头交换机):
Headers类型的交换机使用的不是很多
关于Headers Exchange 摘取一段比较容易理解的解释 :
有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
我们可以绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它需要考虑某条消息(message)是需要部分匹配还是全部匹配。上边说的“更多一段消息”就是"x-match"参数。当"x-match"设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当"x-match"设置为“all”的时候,就需要消息头的所有值都匹配成功。
头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。
RabbitMQ 工作流程
消息生产流程
- 消息生产者连与
RabbitMQ Broker
建立一个连接,建立好了连接之后,开启一个信道Channel
- 声明一个交换机,并设置其相关的属性(交换机类型,持久化等)
- 声明一个队列并设置其相关属性(排他性,持久化自动删除等)
- 通过路由键将交换机和队列绑定起来
- 消息生产者发送消息给
RabbitMQ Broker
, 消息中包含了路由键,交换机等信息,交换机根据接收的路由键查找匹配对应的队列 - 查找匹配成功,则将消息存储到队列中
- 查找匹配失败,根据生产者配置的属性选择丢弃或者回退给生产者
- 关闭信道
Channel
, 关闭连接
消息消费流程
- 消息消费者连与
RabbitMQ Broker
建立一个连接,建立好了连接之后,开启一个信道Channel
- 消费者向
RabbitMQ Broker
请求消费者相应队列中的消息 - 等待
RabbitMQ Broker
回应并投递相应队列中的消息,消费者接收消息 - 消费者确认(ack) 接收消息,
RabbitMQ Broker
消除已经确认的消息 - 关闭信道Channel ,关闭连接
Golang 操作RabbitMQ
RabbitMQ 支持我们常见的编程语言,此处我们使用 Golang 来操作
Golang操作RabbitMQ的前提我们需要有个RabbitMQ的服务端,至于RabbitMQ的服务怎么搭建我们此处就不详细描述了.
Golang操作RabbitMQ的客户端包,网上已经有一个很流行的了,而且也是RabbitMQ官网比较推荐的,不需要我们再从头开始构建一个RabbitMQ的Go语言客户端包. 详情
go get github.com/streadway/amqp
项目目录
___lib ______commonFunc.go ___producer.go ___comsumer.go
commonFunc.go
package lib
import (
"github.com/streadway/amqp"
"log"
)
// RabbitMQ连接函数
func RabbitMQConn() (conn *amqp.Connection,err error){
// RabbitMQ分配的用户名称
var user string = "admin"
// RabbitMQ用户的密码
var pwd string = "123456"
// RabbitMQ Broker 的ip地址
var host string = "192.168.230.132"
// RabbitMQ Broker 监听的端口
var port string = "5672"
url := "amqp://"+user+":"+pwd+"@"+host+":"+port+"/"
// 新建一个连接
conn,err =amqp.Dial(url)
// 返回连接和错误
return
}
// 错误处理函数
func ErrorHanding(err error, msg string){
if err != nil{
log.Fatalf("%s: %s", msg, err)
}
}
基础队列使用
简单队列模式是RabbitMQ的常规用法,简单理解就是消息生产者发送消息给一个队列,然后消息的消息的消费者从队列中读取消息
当多个消费者订阅同一个队列的时候,队列中的消息是平均分摊给多个消费者处理
定义一个消息的生产者
producer.go
package main
import (
"encoding/json"
"log"
"myDemo/rabbitmq_demo/lib"
"github.com/streadway/amqp"
)
type simpleDemo struct {
Name string `json:"name"`
Addr string `json:"addr"`
}
func main() {
// 连接RabbitMQ服务器
conn, err := lib.RabbitMQConn()
lib.ErrorHanding(err, "Failed to connect to RabbitMQ")
// 关闭连接
defer conn.Close()
// 新建一个通道
ch, err := conn.Channel()
lib.ErrorHanding(err, "Failed to open a channel")
// 关闭通道
defer ch.Close()
// 声明或者创建一个队列用来保存消息
q, err := ch.QueueDeclare(
// 队列名称
"simple:queue", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
lib.ErrorHanding(err, "Failed to declare a queue")
data := simpleDemo{
Name: "Tom",
Addr: "Beijing",
}
dataBytes,err := json.Marshal(data)
if err != nil{
lib.ErrorHanding(err,"struct to json failed")
}
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: dataBytes,
})
log.Printf(" [x] Sent %s", dataBytes)
lib.ErrorHanding(err, "Failed to publish a message")
}
定义一个消息的消费者
comsumer.go
package main
import (
"log"
"myDemo/rabbitmq_demo/lib"
)
func main() {
conn, err := lib.RabbitMQConn()
lib.ErrorHanding(err,"failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
lib.ErrorHanding(err,"failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"simple:queue", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
lib.ErrorHanding(err,"Failed to declare a queue")
// 定义一个消费者
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
lib.ErrorHanding(err,"Failed to register a consume")
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
select {}
}
工作队列
工作队列也称为
任务队列
任务队列是为了避免等待执行一些耗时的任务,而是将需要执行的任务封装为消息发送给工作队列,后台运行的工作进程将任务消息取出来并执行相关任务 , 多个后台工作进程同时间进行,那么任务在他们之间共享
我们定义一个任务的生产者,用于生产任务消息
task.go
package main
import (
"github.com/streadway/amqp"
"log"
"myDemo/rabbitmq_demo/lib"
"os"
"strings"
)
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "no task"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
func main() {
// 连接RabbitMQ服务器
conn, err := lib.RabbitMQConn()
lib.ErrorHanding(err, "Failed to connect to RabbitMQ")
// 关闭连接
defer conn.Close()
// 新建一个通道
ch, err := conn.Channel()
lib.ErrorHanding(err, "Failed to open a channel")
// 关闭通道
defer ch.Close()
// 声明或者创建一个队列用来保存消息
q, err := ch.QueueDeclare(
// 队列名称
"task:queue", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
lib.ErrorHanding(err, "Failed to declare a queue")
body := bodyFrom(os.Args)
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
// 将消息标记为持久消息
DeliveryMode: amqp.Persistent,
Body: []byte(body),
})
lib.ErrorHanding(err, "Failed to publish a message")
log.Printf("sent %s", body)
}
定义一个工作者,用于消费掉任务消息
worker.go
package main
import (
"log"
"myDemo/rabbitmq_demo/lib"
)
func main() {
conn, err := lib.RabbitMQConn()
lib.ErrorHanding(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
lib.ErrorHanding(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task:queue", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
lib.ErrorHanding(err, "Failed to declare a queue")
// 将预取计数器设置为1
// 在并行处理中将消息分配给不同的工作进程
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
lib.ErrorHanding(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
lib.ErrorHanding(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
测试
#shell1 go run task.go
#shell2 go run worker.go
#shell3 go run worker.go
RabbitMQ 的用法很多,详情参看官网文档
参考资料
https://www.rabbitmq.com/getstarted.html
http://rabbitmq.mr-ping.com/
https://github.com/streadway/amqp
https://blog.csdn.net/u013256816/category_6532725.html
有疑问加站长微信联系(非本文作者)