golang rabbitmq的使用(三)

麦穗儿 · · 350 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

在第二篇中讲到了fanout类型的exchange,fanout类型exchange会将消息发送到所有和其连接的queue中,不会去区分和过滤。但现实中我们往往需要有过滤条件将不同的消息区分开来发到不同的queue。举个简单的例子,系统的log。我们需要将收到的log根据info,debug,warn,error发送到不同的queue,由不同的consumer来处理这些不同的log。
为了实现这个功能我们可以

  • ExchangeDeclare是指定type为direct
    err = ch.ExchangeDeclare(
        "syslog_direct", //exchange name
        "direct",        //exchange type
        true,            //durable
        false,           //autodelete
        false,
        false,
        nil,
    )
  • Producer在Publish时指定routing key
err = ch.Publish(
            "syslog_direct", //exchange
            routingKey,      //routing key
            false,
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(msgBody),
            })
  • Consumer在QueueBind时指定routing key
err = ch.QueueBind(
                q.Name,
                routingKey,      //routing key
                "syslog_direct", //exchange
                false,
                nil,
            )

具体测试代码如下:
conf.go

package config

const (
    RMQADDR      = "amqp://guest:guest@172.17.84.205:5672/"
    EXCHANGENAME = "syslog_direct"
    CONSUMERCNT  = 4
)

var (
    RoutingKeys [4]string = [4]string{"info", "debug", "warn", "error"}
)

producer.go

package main

import (
    config "binTest/rabbitmqTest/t1/l4/conf"
    "fmt"
    "log"
    "os"

    "github.com/streadway/amqp"
)

func main() {

    conn, err := amqp.Dial(config.RMQADDR)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        config.EXCHANGENAME, //exchange name
        "direct",            //exchange type
        true,                //durable
        false,               //autodelete
        false,
        false,
        nil,
    )

    failOnError(err, "Failed to declare exchange")

    if len(os.Args) < 3 {
        fmt.Println("Arguments error(ex:producer info/debug/warn/error msg1 msg2 msg3")
        return
    }

    routingKey := os.Args[1]

    validKey := false
    for _, item := range config.RoutingKeys {
        if routingKey == item {
            validKey = true
            break
        }
    }

    if validKey == false {
        fmt.Println("Arguments error, no valid routing key specified.(ex:producer info/debug/warn/error msg1 msg2 msg3")
        return
    }
    msgs := os.Args[2:]

    msgNum := len(msgs)

    for cnt := 0; cnt < msgNum; cnt++ {
        msgBody := msgs[cnt]
        err = ch.Publish(
            config.EXCHANGENAME, //exchange
            routingKey,          //routing key
            false,
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(msgBody),
            })

        log.Printf(" [x] Sent %s", msgBody)
    }
    failOnError(err, "Failed to publish a message")

}

func failOnError(err error, msg string) {
    if err != nil {
        fmt.Printf("%s: %s\n", msg, err)
    }
}

consumer.go

package main

import (
    config "binTest/rabbitmqTest/t1/l4/conf"
    "fmt"
    "log"

    "github.com/streadway/amqp"
)

func main() {

    conn, err := amqp.Dial(config.RMQADDR)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    forever := make(chan bool)

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        config.EXCHANGENAME, //exchange name
        "direct",            //exchange type
        true,                //durable
        false,               //autodelete
        false,
        false,
        nil,
    )

    failOnError(err, "Failed to declare exchange")

    for routing := 0; routing < config.CONSUMERCNT; routing++ {
        go func(routingNum int) {

            q, err := ch.QueueDeclare(
                "",
                false, //durable
                false,
                true,
                false,
                nil,
            )

            failOnError(err, "Failed to declare a queue")

            err = ch.QueueBind(
                q.Name,
                config.RoutingKeys[routingNum],
                config.EXCHANGENAME,
                false,
                nil,
            )
            failOnError(err, "Failed to bind exchange")

            msgs, err := ch.Consume(
                q.Name,
                "",
                true, //Auto Ack
                false,
                false,
                false,
                nil,
            )

            if err != nil {
                log.Fatal(err)
            }

            for msg := range msgs {
                log.Printf("In %s consume a message: %s\n", config.RoutingKeys[routingNum], msg.Body)
            }

        }(routing)
    }

    <-forever
}

func failOnError(err error, msg string) {
    if err != nil {
        fmt.Printf("%s: %s\n", msg, err)
    }
}

详细代码可以从这里取到(l4):
https://github.com/BinWang-sh...


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:麦穗儿

查看原文:golang rabbitmq的使用(三)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

350 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传