Golang RabbitMQ学习笔记(2 : 实现工作队列)

18393910396 · · 810 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。
***生产者*** ```go package main import ( "github.com/streadway/amqp" "log" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "连接RabbitMQ失败") defer conn.Close() ch, err := conn.Channel() failOnError(err, "conn打开一个Channel失败") defer ch.Close() q, err := ch.QueueDeclare( "task_queue", // name true, // durable 消息持久性。虽然这个命令本身是正确的,但是在我们目前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列 , // RabbitMQ不允许你使用不同的参数重新定义一个已经存在的队列,并且会向任何尝试这样做的程序返回一个错误。但有一个快速的解决方法 - 让我们声明一个不同名称的队列 false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "定义队列失败") body := "你好.工作队列.Work Queue." err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, amqp.Publishing{ DeliveryMode: amqp.Persistent, //消息持久化 ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "发送消息失败") log.Printf("Sent %s", body) } /* 工作队列 背后的主要思想是避免立即执行资源密集型任务,必须等待完成。 相反,我们安排稍后完成任务。我们把一个任务封装 成一个消息并发送给一个队列。 在后台运行的工作进程将弹出任务并最终执行作业。当你运行多个消费者时,任务将在他们之间共享。 使用任务队列的优点之一是能够轻松地平行工作。如果我们积压工作, 我们可以增加更多的工人,这样可以轻松扩展。 循环调度 消息确认————为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发回确认, 告诉RabbitMQ已经收到,处理了一个特定的消息,并且RabbitMQ可以自由删除它。 没有任何消息超时; 当消费者死亡时,RabbitMQ将重新传递消息。即使处理消息需要非常很长的时间也没关系。 消息持久性 。利用消息去确认可以防止由于消费者宕机等原因导致的消息丢失, 但当RabbitMQ崩溃的时候,为了防止消息丢失,我们需要将队列和消息标记为持久性 amqp.Publishing {DeliveryMode:amqp.Persistent ...} 公平派遣 */ ``` ***消费者*** ```go package main import ( "github.com/streadway/amqp" "log" "time" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "打开RabbitMQ失败!") defer conn.Close() ch, err := conn.Channel() failOnError(err, "打开Channel失败!") defer ch.Close() q, err := ch.QueueDeclare( "task_queue", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "定义Queue失败!") // 公平分配,每次只给一个消费者分配一个。 err = ch.Qos( 1, // prefetch count //预取数量 0, // prefetch size false, // global ) failOnError(err, "设置QoS失败!") msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack 消息确认----修改为手动确认 false, // exclusive false, // no-local false, // no-wait 消息持久性 nil, // args ) failOnError(err, "注册消费者失败!") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) time.Sleep(3 * time.Second) log.Printf("Done") d.Ack(false) // 通过为“auto-ack”参数传递false来使用手动消息确认,然后使用d.Ack(false)发送一个正确的确认 } }() log.Printf(" Waiting for messages.") <-forever // 阻塞当前进程 } /* autoAck 消息自动确认 :false 消息忘记确认的后果,这是一个常犯的错误,但后果是严重的。当你的消费者退出时, 消息将被重新传递,但是RabbitMQ将会消耗越来越多的内存。 虽然这个命令本身是正确的,但是在我们目前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列 , 这个队列并不耐用。RabbitMQ不允许你使用不同的参数重新定义一个已经存在的队列,并且会向任何尝试这样做的程序返 回一个错误。但有一个快速的解决方法 - 让我们声明一个不同名称的队列 */ ```
第 1 条附言  · 
刚刚学习。如有问题,希望大家可以交流反馈。
810 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传