rabbitMq 消息丢失处理机制 Confirm模式 [go 版本]

forlife · · 1111 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

此篇文章本作者理解的一知半解,还不够深刻,有时间我再补充,今天有点忙,欢迎大家评论讲解,3Q!!!!

参考文章:https://studygolang.com/artic...

生产者 producer.go
package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "time"
)

//因:快速实现逻辑,故:不处理错误逻辑
func main() {
    conn, _ := amqp.Dial("amqp://user:password@host:ip/vhost")
    ch, _ := conn.Channel()
    body := "Hello World!! " + time.Now().Format("2006-01-02 15:04:05")
    fmt.Println(body)
    var exchange_name string = "j_exch_head"
    var routing_key string = "jkey"
    var etype string = amqp.ExchangeHeaders

    a := ch.Confirm(false)
    //声明交换器
    ch.ExchangeDeclare(exchange_name, etype, true, false, false, true, nil)

    confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1)) // 处理确认逻辑
    defer confirmOne(confirms) // 处理方法
    ch.Publish(
        exchange_name, // exchange 这里为空则不选择 exchange
        routing_key,   // routing key
        false,         // mandatory
        false,         // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
            Headers:     amqp.Table{"x-match": "any", "mail": "470047253@qq.com", "author": "Jhonny"}, // 头部信息 any:匹配一个即可 all:全部匹配
            //Expiration:  "3000", // 设置过期时间
        },
    )

    fmt.Println(a)
    // defer 关键字
    defer conn.Close() // 压栈 后进先出
    defer ch.Close()   // 压栈 后进先出
}
// 消息确认
func confirmOne(confirms <-chan amqp.Confirmation) {
    if confirmed := <-confirms; confirmed.Ack {
        fmt.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
    } else {
        fmt.Printf("confirmed delivery of delivery tag: %d", confirmed.DeliveryTag)
    }
}
消费者 consumer.go
package main

import (
    "github.com/streadway/amqp"
    "log"
)

func main() {
    conn, _ := amqp.Dial("amqp://user:password@host:ip/vhost")
    ch, _ := conn.Channel()
    var exchange_name string = "j_exch_head"
    var routing_key string = "jkey"
    var queue_name string = "j_queue"
    var etype string = amqp.ExchangeHeaders // 头部交换机
    ch.QueueDeclare(queue_name, true, false, true, false, nil)
    ch.QueueBind(
        queue_name,    // queue name
        routing_key,   // routing key: Headers 头部交换机跟routing_key 没关系
        exchange_name, // exchange
        false,
        amqp.Table{"mail": "470047253@qq.com"}, // 头部信息 any:匹配一个即可 all:全部匹配
    )
    //声明交换器
    ch.ExchangeDeclare(exchange_name, etype, true, false, false, false, nil)
    //监听
    msgs, _ := ch.Consume(
        queue_name, // queue name
        "",         // consumer
        true,       // auto-ack 自动确认
        false,      // exclusive
        false,      // no-local
        false,      // no-wait
        nil,        // args
    )
    forever := make(chan bool)
    go func() {
        for d := range msgs {
            //println("tset")
            log.Printf(" [x] %s", d.Body)
        }
    }()
    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever
}

有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:forlife

查看原文:rabbitMq 消息丢失处理机制 Confirm模式 [go 版本]

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1111 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传