Golang并发操作RabbitMQ

三月的风雨 · · 1688 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

send.go

通过amqp连接RabbitMQ,在通过协程发送信息

package main

import (
    "github.com/streadway/amqp"
    "log"
    "rabbitmqTest/utils"
    "sync"
)



func main() {
    //TODO 连接地址改为自己主机地址
    conn, err := amqp.Dial("amqp://guest:guest@192.168.100.101:5672/")
    utils.FailOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    utils.FailOnError(err, "Failed to open a channel")
    defer ch.Close()

    bodyMap := make(map[string]string)

    bodyMap["test1"] = "a"
    bodyMap["test2"] = "b"
    bodyMap["test3"] = "c"

    var wg sync.WaitGroup
    errList := make(chan error, 2 * len(bodyMap))
    for name, body := range bodyMap {
        wg.Add(1)
        go func(name, body string) {
            defer wg.Done()
            q, err := ch.QueueDeclare(
                name, // name
                false,   // durable
                false,   // delete when unused
                false,   // exclusive
                false,   // no-wait
                nil,     // arguments
            )

            if err != nil {
                errList <- err
                return
            }

            err = ch.Publish(
                "",     // exchange
                q.Name, // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing{
                    ContentType: "text/plain",
                    Body:        []byte(body),
                })
            if err != nil {
                errList <- err
                return
            }
            log.Printf(" [x] Sent %s", body)
        }(name, body)
    }
    wg.Wait()
    close(errList)


    if len(errList) > 0{
        for err := range errList {
            utils.FailOnError(err, "Failed send message")
        }
    }


}

recv.go

package main

import (
    "log"
    "github.com/streadway/amqp"
    "rabbitmqTest/utils"
    "sync"
)



func main() {
    //TODO 连接地址改为自己主机地址
    conn, err := amqp.Dial("amqp://guest:guest@192.168.100.101:5672/")
    utils.FailOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    utils.FailOnError(err, "Failed to open a channel")
    defer ch.Close()

    nameList := []string{
        "test1",
        "test2",
        "test3",
    }
    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    var wg sync.WaitGroup
    errList := make(chan error, 2 * len(nameList))
    for _, name := range nameList {
        go func(name string) {
            q, err := ch.QueueDeclare(
                name, // name
                false,   // durable
                false,   // delete when unused
                false,   // exclusive
                false,   // no-wait
                nil,     // arguments
            )
            utils.FailOnError(err, "Failed to declare a queue")
            msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
            )
            utils.FailOnError(err, "Failed to register a consumer")
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
            }
        }(name)
    }
    wg.Wait()
    close(errList)

    if len(errList) > 0{
        for err := range errList {
            utils.FailOnError(err, "Failed send message")
        }
    }

    forever := make(chan bool)


    <-forever
}

log.go

package utils
import (
    "log"
)
func FailOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:三月的风雨

查看原文:Golang并发操作RabbitMQ

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1688 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传