之前几篇说了类似广播的fanout类型的Exchange,支持分类的direct类型的Exchange。在使用direct类型的Exchange中使用了log的例子,我们可以区分info, debug, warn, error类型的log。但是实际中可能还会有更进一步的需求类似,我希望看到系统内核的error日志信息,希望看到请求耗时最长的接口的debug日志。对于这样的需求就可以使用topic类型的exchange。
使用topic类型的exchange方法如下
(1)在producer和consumer中都声明相同topic类型的exchange
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
(2)在producer中发布消息时指定routing key
err = ch.Publish(
"logs_topic", // exchange
"error.kernel", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
(3)在consumer中绑定queue到exchange
err = ch.QueueBind(
q.Name, // queue name
"error.kernel", // routing key
"logs_topic", // exchange
false,
nil)
routing key定义
routing key可以明确指定明确的匹配也可使用通配符进行匹配。
通配符"#"代表零个或多个words
通配符"*"代表一个词(word)
routing key如果指定为"#"则接受所有的消息,类似fanout类型的exchange
不带通配符"#"或"*"的routing key则接受指定的消息,类似direct类型的exchange
routing key使用以"."分割的层级结构。
例如:
"info.payment.vip" 是只消费vip支付的info消息。
"info.payment.*" 接受所有支付的info消息
具体代码如下
conf.go
consumer.go
package main
import (
config "binTest/rabbitmqTest/t1/l5/conf"
"fmt"
"log"
"os"
"github.com/streadway/amqp"
)
/*
./consumer "#" info.payment.* *.log debug.payment.#
*/
func main() {
conn, err := amqp.Dial(config.RMQADDR)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
forever := make(chan bool)
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
config.EXCHANGENAME, //exchange name
"topic", //exchange kind
true, //durable
false, //autodelete
false,
false,
nil,
)
failOnError(err, "Failed to declare exchange")
if len(os.Args) < 2 {
log.Println(len(os.Args))
log.Println(`"Arguments error(Example: ./consumer "#" info.payment.* *.log debug.payment.#"`)
return
}
topics := os.Args[1:]
topicsCnt := len(topics)
for routing := 0; routing < topicsCnt; routing++ {
go func(routingNum int) {
q, err := ch.QueueDeclare(
"",
false, //durable
false, //delete when unused
true, //exclusive
false, //no-wait
nil, //arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name,
topics[routingNum],
config.EXCHANGENAME,
false,
nil,
)
failOnError(err, "Failed to bind exchange")
msgs, err := ch.Consume(
q.Name,
"",
true, //Auto Ack
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
for msg := range msgs {
log.Printf("In %s consume a message: %s\n", topics[routingNum], msg.Body)
}
}(routing)
}
<-forever
}
func failOnError(err error, msg string) {
if err != nil {
fmt.Printf("%s: %s\n", msg, err)
}
}
producer.go
package main
import (
config "binTest/rabbitmqTest/t1/l5/conf"
"fmt"
"log"
"os"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial(config.RMQADDR)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
config.EXCHANGENAME, //exchange name
"topic", //exchange kind
true, //durable
false, //autodelete
false,
false,
nil,
)
failOnError(err, "Failed to declare exchange")
if len(os.Args) < 3 {
fmt.Println("Arguments error(ex:producer topic msg1 msg2 msg3")
return
}
routingKey := os.Args[1]
msgs := os.Args[2:]
msgNum := len(msgs)
for cnt := 0; cnt < msgNum; cnt++ {
msgBody := msgs[cnt]
err = ch.Publish(
config.EXCHANGENAME, //exchange
routingKey, //routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msgBody),
})
log.Printf(" [x] Sent %s", msgBody)
}
failOnError(err, "Failed to publish a message")
}
func failOnError(err error, msg string) {
if err != nil {
fmt.Printf("%s: %s\n", msg, err)
}
}
运行结果
consumer
producer
根据结果可以看出
consumer指定了4个routing key
- "#"接收所有消息。 producer的所有消息都会接收
- "info.payment.*" 接受所有info.payment的消息。
producer发出的如下消息会被接收
./producer info.payment.googlepay "start payment at 16:00" - "*.log" 接受所有第二层级为log的消息
producer发出的如下消息会被接收
./producer info.log "start recording payment log" - "debug.payment.#" 接收所有发往routing key为debug.payment开头的消息
producer发出的如下消息会被接收
./producer debug.payment.Alipay "10 yuan payment at 15:25" "1000 dollors payment at 15:26" "10 pounds payment at 15:27"
如下的消息也会被接收(因为#代表零个或多个words)
./producer debug.payment.payinfo.Alipay "test payment info"
详细代码在如下
https://github.com/BinWang-sh...
有疑问加站长微信联系(非本文作者)