4种不同的交换机类型
直连交换机:Direct exchange
扇形交换机:Fanout exchange
主题交换机:Topic exchange
首部交换机:Headers exchange
交换机具体含义参考 https://www.jianshu.com/p/469...
不用交换机的队列
以下代码参数具体含义可以参考 https://segmentfault.com/a/11...
生产者示例 producer.go
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"time"
)
func main() {
conn, err := amqp.Dial("amqp://user:password@host:ip/vhost")
if err != nil {
log.Fatalf(" %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("%s", err)
}
defer ch.Close()
body := "Hello World! " + time.Now().Format("2006-01-02 15:04:05")
fmt.Println(body)
ch.QueueDeclare(
"j_test_delay", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
err = ch.Publish(
"", // exchange 这里为空则不选择 exchange
"j_test_delay", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
}
$ go run producer.go
Hello World! 2021-03-17 18:05:39
消费者 consumer.go
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main() {
conn, err := amqp.Dial("amqp://user:password@host:ip/vhost")
if err != nil {
fmt.Printf("%s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
fmt.Printf("%s", err)
}
defer ch.Close()
msgs, err := ch.Consume(
"j_test_delay", // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatal(err)
}
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for queue. To exit press CTRL+C")
<-forever
}
$ go run consumer.go
2021/03/17 18:05:13 [*] Waiting for queue. To exit press CTRL+C
2021/03/17 18:05:20 message: Hello World! 2021-03-17 18:05:20
2021/03/17 18:05:39 message: Hello World! 2021-03-17 18:05:39
有疑问加站长微信联系(非本文作者)