RabbitMQ教程(译)-Topics

939496716 · · 2025 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

[原文地址:https://bingjian-zhu.github.io/2019/09/01/RabbitMQ%E6%95%99%E7%A8%8B%EF%BC%88%E8%AF%91%EF%BC%89-Topics/](https://bingjian-zhu.github.io/2019/09/01/RabbitMQ%E6%95%99%E7%A8%8B%EF%BC%88%E8%AF%91%EF%BC%89-Topics/) ## 为什么需要topic交换机? **(使用Go客户端)** <!--more--> [上一篇教程](https://bingjian-zhu.github.io/2019/09/01/RabbitMQ%E6%95%99%E7%A8%8B%EF%BC%88%E8%AF%91%EF%BC%89-Routing/),我们改进了我们的日志系统。我们使用`direct`交换机替代了`fanout`交换机,从只能盲目的广播消息改进为有可能选择性的接收日志。 尽管`direct`交换机能够改善我们的系统,但是它也有它的限制 —— 没办法基于多个标准执行路由操作。 在我们的日志系统中,我们不只希望订阅基于严重程度的日志,同时还希望订阅基于发送来源的日志。Unix工具[syslog](http://en.wikipedia.org/wiki/Syslog)就是同时基于严重程度-severity (info/warn/crit...) 和 设备-facility (auth/cron/kern...)来路由日志的。 如果这样的话,将会给予我们非常大的灵活性,我们既可以监听来源于“cron”的严重程度为“critical errors”的日志,也可以监听来源于“kern”的所有日志。 为了实现这个目的,接下来我们学习如何使用另一种更复杂的交换机 —— topic交换机。 ## topic交换机 发送到`topic`交换机的消息不可以携带随意`routing_key`,它的routing_key必须是一个由`.`分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。以下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。词语的个数可以随意,但是不要超过255字节。 binding key也必须拥有同样的格式。`topic`交换机背后的逻辑跟`direct`交换机很相似 —— 一个携带着特定routing_key的消息会被topic交换机投递给绑定键与之想匹配的队列。但是它的binding key和routing_key有两个特殊应用方式: - `*` (星号) 用来表示一个单词. - `#` (井号) 用来表示任意数量(零个或多个)单词。 下边用图说明: ![None](http://www.rabbitmq.com/img/tutorials/python-five.png) 这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个`.`分割开。路由键里的第一个单词描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的: `<celerity>.<colour>.<species>`。 我们创建了三个绑定:Q1的绑定键为 `*.orange.*`,Q2的绑定键为 `*.*.rabbit` 和 `lazy.#` 。 这三个绑定键被可以总结为: - Q1 对*所有的桔黄色动物*都感兴趣。 - Q2 则是对*所有的兔子*和*所有懒惰的动物*感兴趣。 一个携带有 `quick.orange.rabbit` 的消息将会被分别投递给这两个队列。携带着 `lazy.orange.elephant` 的消息同样也会给两个队列都投递过去。另一方面携带有 `quick.orange.fox` 的消息会投递给第一个队列,携带有 `lazy.brown.fox` 的消息会投递给第二个队列。携带有 `lazy.pink.rabbit` 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 `quick.brown.fox` 的消息不会投递给任何一个队列。 如果我们违反约定,发送了一个携带有一个单词或者四个单词(`"orange"` or `"quick.orange.male.rabbit"`)的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。 但是另一方面,即使 `"lazy.orange.male.rabbit"` 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。 >#### Topic交换机 >Topic交换机是很强大的,它可以表现出跟其他交换机类似的行为 >当一个队列的binding key为 "#"(井号) 的时候,这个队列将会无视消息的routing key,接收所有的消息。 >当 `*` (星号) 和 `#` (井号) 这两个特殊字符都未在binding key中出现的时候,此时Topic交换机就拥有的direct交换机的行为。 ## 代码整合 接下来我们会将Topic交换机应用到我们的日志系统中。在开始工作前,我们假设日志的routing key由两个单词组成,routing key看起来是这样的:`<facility>.<severity>` 代码跟[上一篇教程](https://bingjian-zhu.github.io/2019/09/01/RabbitMQ%E6%95%99%E7%A8%8B%EF%BC%88%E8%AF%91%EF%BC%89-Routing/)差不多。 emit_log_topic.go的代码: ```go package main import ( "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs_topic", // exchange severityFrom(os.Args), // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "anonymous.info" } else { s = os.Args[1] } return s } ``` receive_logs_topic.go的代码: ```go package main import ( "log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [binding_key]...", os.Args[0]) os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s) err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_topic", // exchange false, nil) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever } ``` 执行下边命令 接收所有日志: go run receive_logs_topic.go "#" 执行下边命令 接收来自”kern“设备的日志: go run receive_logs_topic.go "kern.*" 执行下边命令 只接收严重程度为”critical“的日志: go run receive_logs_topic.go "*.critical" 执行下边命令 建立多个绑定: go run receive_logs_topic.go "kern.*" "*.critical" 执行下边命令 发送路由键为 "kern.critical" 的日志: go run emit_log_topic.go "kern.critical" "A critical kernel error" 执行上边命令试试看效果吧。另外,上边代码不会对路由键和绑定键做任何假设,所以你可以在命令中使用超过两个路由键参数。 ### 如果你现在还没被搞晕,想想下边问题: - 绑定键为 `*` 的队列会取到一个routing key为空的消息吗? - 绑定键为 `#.*` 的队列会获取到一个名为`..`的路由键的消息吗?它会取到一个routing key为单个单词的消息吗? - `a.*.#` 和 `a.#`的区别在哪儿? (完整代码参见[emit_logs_topic.go](https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/emit_log_topic.go) and [receive_logs_topic.go](https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/receive_logs_topic.go)) > 原文:[Topics](https://www.rabbitmq.com/tutorials/tutorial-five-go.html)

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2025 次点击  
加入收藏 微博
被以下专栏收入,发现更多相似内容
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传