****1:生产者****
```go
package main
import (
"github.com/streadway/amqp"
"log"
)
//错误处理
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s:%s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
FailOnError(err, "连接RabbitMQ失败")
defer conn.Close()
ch, err := conn.Channel()
FailOnError(err, "打开Channel失败")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", //name
false, //durable 持久性
false, //autoDelete
false, //专享
false, //noWait
nil, //args
)
FailOnError(err, "声明Queue队列失败")
body := "hello,RabbitMQ。你好,消息队列"
err = ch.Publish(
"", //exchange
q.Name, //key
false, //mandatory 强制的,命令的
false, //immediate
amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
log.Printf(" [x] Sent %s", body)
FailOnError(err, "发布消息失败")
}
```
*** 2:消费者***
```go
package main
import (
"github.com/streadway/amqp"
"log"
)
// 处理错误
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s:%s", msg, err)
}
}
func main() {
// conn, err := amqp.Dial("amqp://guest:suest@http://localhost:5672")
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
FailOnError(err, "连接RabbitMQ失败")
defer conn.Close()
ch, err := conn.Channel()
FailOnError(err, "打开Channel失败")
defer ch.Close()
q, err := ch.QueueDeclare("hello", false, false, false, false, nil)
FailOnError(err, "声明队列Queue失败")
msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
FailOnError(err, "注册一个接收者失败")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Println("接收一个消息:", string(d.Body))
}
}()
log.Println("等待消息中,退出请按Ctrl+C")
<-forever
}
```
*** 3:搭建erlang环境,安装RabbitMQ客户端***
有疑问加站长微信联系(非本文作者)