RabbitMQ功能实现1- 红包未领取退回

李昊天 · · 653 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

生产者代码:

package main

import (
    uuid "github.com/satori/go.uuid"
    "github.com/streadway/amqp"
    "github.com/wonderivan/logger"
    "rmq/db/rmq"
    "time"
)

const (
    DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机
    DeadLettersQueueName    = "dlx_queue_packet"    // 死信队列

    QueueName    = "queue_packet"    // 目标队列
    ExchangeName = "exchange_packet" // 目标交换机
)

var (
    ch       *amqp.Channel
    err      error
    conn     *amqp.Connection
    queue    amqp.Queue
    dlxQueue amqp.Queue
)

func main() {

    if conn, err = rmq.GetConn(); err != nil {
        logger.Error("连接RabbitMQ服务器失败:%s", err.Error())
        return
    }

    defer conn.Close()

    if ch, err = conn.Channel(); err != nil {
        logger.Error("获取Channel失败:%s", err.Error())
        return
    }

    defer ch.Close()

    // 声明队列交换机
    if err = ch.ExchangeDeclare(ExchangeName, amqp.ExchangeFanout, true, false, false, false, nil); err != nil {
        logger.Error("声明业务交换机失败:%s", err.Error())
        return
    }

    // 创建死信交换机
    if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil {
        logger.Error("创建死信交换机:%s", err.Error())
        return
    }

    // 创建死信队列
    if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil {
        logger.Error("创建死信队列失败:%s", err.Error())
        return
    }

    // 创建业务队列
    if queue, err = ch.QueueDeclare(QueueName, true, false, false, false, amqp.Table{
        "x-message-ttl":          6000,                    // 消息过期时间 毫秒
        "x-dead-letter-exchange": DeadLettersExchangeName, // 死信交换机
        // "x-dead-letter-routing-key": "dlxKey",       // 死信路由key
    }); err != nil {
        logger.Warn("创建业务队列失败:%s", err.Error())
        return
    }

    // 业务队列绑定交换机
    if err = ch.QueueBind(queue.Name, "", ExchangeName, false, nil); err != nil {
        logger.Error("绑定业务交换机失败:%s", err.Error())
        return
    }

    // 死信队列绑定死信交换机
    if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil {
        logger.Error("绑定死信交换机失败:%s", err.Error())
    }

    for i := 1; i <= 10; i++ {
        msg := amqp.Publishing{
            MessageId:   uuid.NewV4().String(),
            ContentType: "text/plain",
            Body:        []byte("红包退回"),
        }

        // 发布消息
        err = ch.Publish(
            ExchangeName,
            "",
            false,
            false,
            msg,
        )

        if err != nil {
            logger.Error("发送失败: %s", err.Error())
            return
        } else {
            logger.Info("发送成功:%s", msg.MessageId)
        }
    }
}

消费者代码

package main

import (
    uuid "github.com/satori/go.uuid"
    "github.com/streadway/amqp"
    "github.com/wonderivan/logger"
    "rmq/db/rmq"
    "time"
)

const (
    DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机
    DeadLettersQueueName    = "dlx_queue_packet"    // 死信队列

    QueueName    = "queue_packet"    // 目标队列
    ExchangeName = "exchange_packet" // 目标交换机
)

var (
    ch       *amqp.Channel
    err      error
    conn     *amqp.Connection
    queue    amqp.Queue
    dlxQueue amqp.Queue
)


func main() {
    if conn, err = rmq.GetConn(); err != nil {
        logger.Error("连接RabbitMQ服务器失败:%s", err.Error())
        return
    }

    defer conn.Close()

    if ch, err = conn.Channel(); err != nil {
        logger.Error("获取Channel失败:%s", err.Error())
        return
    }

    defer ch.Close()

    // 创建死信交换机
    if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil {
        logger.Error("创建死信交换机:%s", err.Error())
        return
    }

    // 创建死信队列
    if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil {
        logger.Error("创建死信队列失败:%s", err.Error())
        return
    }

    // 死信队列绑定死信交换机
    if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil {
        logger.Error("绑定死信交换机失败:%s", err.Error())
    }

    msgList, err := ch.Consume(dlxQueue.Name, "", false, false, false, false, nil)
    if err != nil {
        logger.Error("消费者监听失败:%s", err.Error())
        return
    }

    for {
        select {
        case message, ok := <-msgList:
            if !ok {
                continue
            }

            go func(msg amqp.Delivery) {
                logger.Info("messageID: %s", msg.MessageId)
                logger.Info("messageBody: %s", msg.Body)
                if err = msg.Ack(false); err != nil {
                    logger.Error("确认消息失败")
                }
            }(message)
        case <-time.After(time.Second):

        }
    }
}

有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:李昊天

查看原文:RabbitMQ功能实现1- 红包未领取退回

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

653 次点击  ∙  1 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传