生产者代码:
package main
import (
uuid "github.com/satori/go.uuid"
"github.com/streadway/amqp"
"github.com/wonderivan/logger"
"rmq/db/rmq"
"time"
)
const (
DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机
DeadLettersQueueName = "dlx_queue_packet" // 死信队列
QueueName = "queue_packet" // 目标队列
ExchangeName = "exchange_packet" // 目标交换机
)
var (
ch *amqp.Channel
err error
conn *amqp.Connection
queue amqp.Queue
dlxQueue amqp.Queue
)
func main() {
if conn, err = rmq.GetConn(); err != nil {
logger.Error("连接RabbitMQ服务器失败:%s", err.Error())
return
}
defer conn.Close()
if ch, err = conn.Channel(); err != nil {
logger.Error("获取Channel失败:%s", err.Error())
return
}
defer ch.Close()
// 声明队列交换机
if err = ch.ExchangeDeclare(ExchangeName, amqp.ExchangeFanout, true, false, false, false, nil); err != nil {
logger.Error("声明业务交换机失败:%s", err.Error())
return
}
// 创建死信交换机
if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil {
logger.Error("创建死信交换机:%s", err.Error())
return
}
// 创建死信队列
if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil {
logger.Error("创建死信队列失败:%s", err.Error())
return
}
// 创建业务队列
if queue, err = ch.QueueDeclare(QueueName, true, false, false, false, amqp.Table{
"x-message-ttl": 6000, // 消息过期时间 毫秒
"x-dead-letter-exchange": DeadLettersExchangeName, // 死信交换机
// "x-dead-letter-routing-key": "dlxKey", // 死信路由key
}); err != nil {
logger.Warn("创建业务队列失败:%s", err.Error())
return
}
// 业务队列绑定交换机
if err = ch.QueueBind(queue.Name, "", ExchangeName, false, nil); err != nil {
logger.Error("绑定业务交换机失败:%s", err.Error())
return
}
// 死信队列绑定死信交换机
if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil {
logger.Error("绑定死信交换机失败:%s", err.Error())
}
for i := 1; i <= 10; i++ {
msg := amqp.Publishing{
MessageId: uuid.NewV4().String(),
ContentType: "text/plain",
Body: []byte("红包退回"),
}
// 发布消息
err = ch.Publish(
ExchangeName,
"",
false,
false,
msg,
)
if err != nil {
logger.Error("发送失败: %s", err.Error())
return
} else {
logger.Info("发送成功:%s", msg.MessageId)
}
}
}
消费者代码
package main
import (
uuid "github.com/satori/go.uuid"
"github.com/streadway/amqp"
"github.com/wonderivan/logger"
"rmq/db/rmq"
"time"
)
const (
DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机
DeadLettersQueueName = "dlx_queue_packet" // 死信队列
QueueName = "queue_packet" // 目标队列
ExchangeName = "exchange_packet" // 目标交换机
)
var (
ch *amqp.Channel
err error
conn *amqp.Connection
queue amqp.Queue
dlxQueue amqp.Queue
)
func main() {
if conn, err = rmq.GetConn(); err != nil {
logger.Error("连接RabbitMQ服务器失败:%s", err.Error())
return
}
defer conn.Close()
if ch, err = conn.Channel(); err != nil {
logger.Error("获取Channel失败:%s", err.Error())
return
}
defer ch.Close()
// 创建死信交换机
if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil {
logger.Error("创建死信交换机:%s", err.Error())
return
}
// 创建死信队列
if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil {
logger.Error("创建死信队列失败:%s", err.Error())
return
}
// 死信队列绑定死信交换机
if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil {
logger.Error("绑定死信交换机失败:%s", err.Error())
}
msgList, err := ch.Consume(dlxQueue.Name, "", false, false, false, false, nil)
if err != nil {
logger.Error("消费者监听失败:%s", err.Error())
return
}
for {
select {
case message, ok := <-msgList:
if !ok {
continue
}
go func(msg amqp.Delivery) {
logger.Info("messageID: %s", msg.MessageId)
logger.Info("messageBody: %s", msg.Body)
if err = msg.Ack(false); err != nil {
logger.Error("确认消息失败")
}
}(message)
case <-time.After(time.Second):
}
}
}
有疑问加站长微信联系(非本文作者)