***生产者***
```go
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "连接RabbitMQ失败")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "conn打开一个Channel失败")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable 消息持久性。虽然这个命令本身是正确的,但是在我们目前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列 ,
// RabbitMQ不允许你使用不同的参数重新定义一个已经存在的队列,并且会向任何尝试这样做的程序返回一个错误。但有一个快速的解决方法 - 让我们声明一个不同名称的队列
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "定义队列失败")
body := "你好.工作队列.Work Queue."
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent, //消息持久化
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "发送消息失败")
log.Printf("Sent %s", body)
}
/*
工作队列 背后的主要思想是避免立即执行资源密集型任务,必须等待完成。
相反,我们安排稍后完成任务。我们把一个任务封装 成一个消息并发送给一个队列。
在后台运行的工作进程将弹出任务并最终执行作业。当你运行多个消费者时,任务将在他们之间共享。
使用任务队列的优点之一是能够轻松地平行工作。如果我们积压工作,
我们可以增加更多的工人,这样可以轻松扩展。
循环调度
消息确认————为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发回确认,
告诉RabbitMQ已经收到,处理了一个特定的消息,并且RabbitMQ可以自由删除它。
没有任何消息超时; 当消费者死亡时,RabbitMQ将重新传递消息。即使处理消息需要非常很长的时间也没关系。
消息持久性 。利用消息去确认可以防止由于消费者宕机等原因导致的消息丢失,
但当RabbitMQ崩溃的时候,为了防止消息丢失,我们需要将队列和消息标记为持久性
amqp.Publishing {DeliveryMode:amqp.Persistent ...}
公平派遣
*/
```
***消费者***
```go
package main
import (
"github.com/streadway/amqp"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "打开RabbitMQ失败!")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "打开Channel失败!")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "定义Queue失败!")
// 公平分配,每次只给一个消费者分配一个。
err = ch.Qos(
1, // prefetch count //预取数量
0, // prefetch size
false, // global
)
failOnError(err, "设置QoS失败!")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack 消息确认----修改为手动确认
false, // exclusive
false, // no-local
false, // no-wait 消息持久性
nil, // args
)
failOnError(err, "注册消费者失败!")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
time.Sleep(3 * time.Second)
log.Printf("Done")
d.Ack(false) // 通过为“auto-ack”参数传递false来使用手动消息确认,然后使用d.Ack(false)发送一个正确的确认
}
}()
log.Printf(" Waiting for messages.")
<-forever // 阻塞当前进程
}
/*
autoAck 消息自动确认 :false
消息忘记确认的后果,这是一个常犯的错误,但后果是严重的。当你的消费者退出时,
消息将被重新传递,但是RabbitMQ将会消耗越来越多的内存。
虽然这个命令本身是正确的,但是在我们目前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列 ,
这个队列并不耐用。RabbitMQ不允许你使用不同的参数重新定义一个已经存在的队列,并且会向任何尝试这样做的程序返
回一个错误。但有一个快速的解决方法 - 让我们声明一个不同名称的队列
*/
```
有疑问加站长微信联系(非本文作者)