参考链接
https://mshk.top/2016/07/ubuntu-rabbitmq-golang/
1、安装Golang 1.10.1 版本
1.1、创建go的工作环境
注意:我将go的工作环境放在了$HOME/gowork
root@rabbitmq-1:~# mkdir -p $HOME/gowork
root@rabbitmq-1:~# cd $HOME/gowork
root@rabbitmq-1:~/gowork# mkdir src pkg bin
1.2、下载安装包并解压至/usr/local/ 下
root@rabbitmq-1:~/gowork# wget [https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz](https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz)
root@rabbitmq-1:~/gowork# tar -C /usr/local -xzf go1.10.1.linux-amd64.tar.gz
1.3、添加环境变量
在~/.bahsrc文件中添加下面内容:
root@rabbitmq-1:/usr/local/go# vim ~/.bashrc
export GOROOT=/usr/local/go //go的安装目录。也就是刚才指定的路径
export GOPATH=$HOME/gowork //这里的路径是你go语言的工作环境,按照自己的路径配置。
export GOBIN=$GOPATH/bin //编译后的可执行文件存放的目录
export PATH=$GOPATH:$GOBIN:$GOROOT:$PATH //添加进PATH路径
注意 :这个地方一定要配置对,配置仔细,要不然在启动网络的时候会出现找不到文件的问题。
使环境变量生效
root@rabbitmq-1:/usr/local/go# source ~/.bashrc
查看是否安装成功
root@rabbitmq-1:/usr/local/go/bin# /usr/local/go/bin/go version
go version go1.10.1 linux/amd64
2、Golang调用RabbitMQ的案例
我们先将包下载到本地,然后就可以直接使用了:
root@rabbitmq-1:~# /usr/local/go/bin/go get github.com/streadway/amqp
2.1、使用Golang来发送第一个hello idoall.org
在第一个教程中,我们写程序从一个命名的队列(test-idoall-queues)中发送和接收消息。
producer_hello.go(消息生产者):
root@rabbitmq-1:~# vim producer_hello.go
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
const (
//AMQP URI
uri = "amqp://guest:guest@rabbitmq-1:5672/" # rabbitmq-1为主机名
//Durable AMQP exchange name
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues"
//Body of message
bodyMsg string = "hello idoall.org"
)
//如果存在错误,则输出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
//调用发布消息函数
publish(uri, exchangeName, queueName, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
}
//发布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
//建立连接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
//创建一个Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
//创建一个queue
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
// Producer只能发送到exchange,它是不能直接发送到queue的。
// 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
// routing_key就是指定的queue名字。
err = channel.Publish(
exchange, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
}
consumer_hello.go(消息消费者):
root@rabbitmq-1:~# vim consumer_hello.go
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
const (
//AMQP URI
uri = "amqp://guest:guest@rabbitmq-1:5672/"
//Durable AMQP exchange nam
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues"
)
//如果存在错误,则输出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
//调用消息接收者
consumer(uri, exchangeName, queueName)
}
//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
//建立连接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
//创建一个Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
//创建一个queue
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Printf("Queue bound to Exchange, starting Consume")
//订阅消息
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
//创建一个channel
forever := make(chan bool)
//调用gorountine
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
//没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
<-forever
}
查看消息(开打两个控制台console)
Console1(运行producer):
root@rabbitmq-1:~# /usr/local/go/bin/go run producer_hello.go
2018/12/14 03:41:32 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 03:41:32 got Connection, getting Channel
2018/12/14 03:41:32 got queue, declaring "test-idoall-queues"
2018/12/14 03:41:32 declared queue, publishing 16B body ("hello idoall.org")
2018/12/14 03:41:32 published 16B OK
然后运行一下命令,可以看到我们刚才创建的queues在列表中
root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues
Listing queues
test-idoall-queues 1
Console2(运行consumer)打印消息到屏幕,可以看到刚才我们通过producer发送的消息hello idoall.org
root@rabbitmq-1:/usr/sbin# /usr/local/go/bin/go run /root/consumer_hello.go
2018/12/14 03:46:01 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 03:46:01 got Connection, getting Channel
2018/12/14 03:46:01 got queue, declaring "test-idoall-queues"
2018/12/14 03:46:01 Queue bound to Exchange, starting Consume
2018/12/14 03:46:01 [*] Waiting for messages. To exit press CTRL+C
2018/12/14 03:46:01 Received a message: hello idoall.org
此时,web界面上也出现了信息。
2.2、Rabbitmq的任务分发机制
在2.1章节中,我们写程序从一个命名的队列中发送和接收消息。在这个章节中,我们将创建一个工作队列,将用于分配在多个工人之间的耗时的任务。
RabbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的。如果任务队伍过多,那么只需要创建更多的Consumer来进行任务处理即可。
producer_task.go(消息生产者):
root@rabbitmq-1:~# vim producer_task.go
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
const (
//AMQP URI
uri = "amqp://guest:guest@rabbitmq-1:5672/"
//Durable AMQP exchange name
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-task"
)
//如果存在错误,则输出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
bodyMsg := bodyFrom(os.Args)
//调用发布消息函数
publish(uri, exchangeName, queueName, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello idoall.org"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
//发布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
//建立连接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
//创建一个Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
//创建一个queue
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
// Producer只能发送到exchange,它是不能直接发送到queue的。
// 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
// routing_key就是指定的queue名字。
err = channel.Publish(
exchange, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
}
consumer_task.go(消息消费者)
root@rabbitmq-1:~# vim consumer_task.go
package main
import (
"fmt"
"log"
"bytes"
"time"
"github.com/streadway/amqp"
)
const (
//AMQP URI
uri = "amqp://guest:guest@rabbitmq-1:5672/"
//Durable AMQP exchange nam
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-task"
)
//如果存在错误,则输出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
//调用消息接收者
consumer(uri, exchangeName, queueName)
}
//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
//建立连接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
//创建一个Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
//创建一个queue
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Printf("Queue bound to Exchange, starting Consume")
//订阅消息
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
//创建一个channel
forever := make(chan bool)
//调用gorountine
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
//没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
<-forever
}
查看结果
Console1(consumer):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go
2018/12/14 05:28:59 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:28:59 got Connection, getting Channel
2018/12/14 05:28:59 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:28:59 Queue bound to Exchange, starting Consume
2018/12/14 05:28:59 [*] Waiting for messages. To exit press CTRL+C
Console2(consumer):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go
2018/12/14 05:29:04 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:29:04 got Connection, getting Channel
2018/12/14 05:29:04 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:29:04 Queue bound to Exchange, starting Consume
2018/12/14 05:29:04 [*] Waiting for messages. To exit press CTRL+C
在第三个窗口,这个时候我们使用Producer 来 Publish Message:
root@rabbitmq-1:~# /usr/local/go/bin/go run producer_task.go First message. &&/usr/local/go/bin/go run producer_task.go Second message.. && /usr/local/go/bin/go run producer_task.go Third message... && /usr/local/go/bin/go run producer_task.go Fourth message.... && /usr/local/go/bin/go run producer_task.go Fifth message.....
2018/12/14 05:34:16 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:16 got Connection, getting Channel
2018/12/14 05:34:16 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:16 declared queue, publishing 14B body ("First message.")
2018/12/14 05:34:16 published 14B OK
2018/12/14 05:34:16 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:16 got Connection, getting Channel
2018/12/14 05:34:16 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:16 declared queue, publishing 16B body ("Second message..")
2018/12/14 05:34:16 published 16B OK
2018/12/14 05:34:17 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:17 got Connection, getting Channel
2018/12/14 05:34:17 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:17 declared queue, publishing 16B body ("Third message...")
2018/12/14 05:34:17 published 16B OK
2018/12/14 05:34:17 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:17 got Connection, getting Channel
2018/12/14 05:34:17 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:17 declared queue, publishing 18B body ("Fourth message....")
2018/12/14 05:34:17 published 18B OK
2018/12/14 05:34:18 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:18 got Connection, getting Channel
2018/12/14 05:34:18 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:18 declared queue, publishing 18B body ("Fifth message.....")
2018/12/14 05:34:18 published 18B OK
这时我们再看刚才打开的两个Consumer的结果:
Console1(consumer):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go
2018/12/14 05:32:36 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:32:36 got Connection, getting Channel
2018/12/14 05:32:36 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:32:36 Queue bound to Exchange, starting Consume
2018/12/14 05:32:36 [*] Waiting for messages. To exit press CTRL+C
2018/12/14 05:34:16 Received a message: Second message..
2018/12/14 05:34:18 Done
2018/12/14 05:34:18 Received a message: Fourth message....
2018/12/14 05:34:22 Done
Console2(consumer):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go
2018/12/14 05:29:04 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:29:04 got Connection, getting Channel
2018/12/14 05:29:04 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:29:04 Queue bound to Exchange, starting Consume
2018/12/14 05:29:04 [*] Waiting for messages. To exit press CTRL+C
2018/12/14 05:34:16 Received a message: First message.
2018/12/14 05:34:17 Done
2018/12/14 05:34:17 Received a message: Third message...
2018/12/14 05:34:20 Done
2018/12/14 05:34:20 Received a message: Fifth message.....
2018/12/14 05:34:25 Done
默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin,也叫消息轮询
Web页面情况
2.3、Message acknowledgment 消息确认
每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们的代码,一旦RabbitMQ Server发送给Consumer消息后,会立即把这个Message标记为完成,然后从queue中删除。我们将无法再操作这个尚未处理完成的消息。
实际场景中,如果一个Consumer异常退出了,我们希望它处理的数据能够被另外的Consumer处理,这样数据在这种情况下(通道关闭、连接关闭、TCP连接丢失等情况)就不会丢失了。
为了保证数据不被丢失,RabbitMQ支持消息确认机制,ack(nowledgments)是从Consumer消费后发送到一个特定的消息告诉RabbitMQ已经收到、处理结束,RabbitMQ可以去安全的删除它了。
如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message重新排进队列,发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。
这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。
消息确认默认是关闭的,我们需要通过,d.ACK(false)来告诉RabbitMQ我们已经完成任务。
producer_acknowledgments(消息生产者).go:
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
const (
//AMQP URI
uri = "amqp://guest:guest@rabbitmq-1:5672/"
//Durable AMQP exchange name
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-acknowledgments"
)
//如果存在错误,则输出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
bodyMsg := bodyFrom(os.Args)
//调用发布消息函数
publish(uri, exchangeName, queueName, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello idoall.org"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
//发布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
//建立连接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
//创建一个Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
//创建一个queue
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
// Producer只能发送到exchange,它是不能直接发送到queue的。
// 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
// routing_key就是指定的queue名字。
err = channel.Publish(
exchange, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
}
consumer_acknowledgments(消息消费者).go
package main
import (
"fmt"
"log"
"bytes"
"time"
"github.com/streadway/amqp"
)
const (
//AMQP URI
uri = "amqp://guest:guest@rabbitmq-1:5672/"
//Durable AMQP exchange nam
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-acknowledgments"
)
//如果存在错误,则输出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
//调用消息接收者
consumer(uri, exchangeName, queueName)
}
//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
//建立连接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
//创建一个Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
//创建一个queue
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Printf("Queue bound to Exchange, starting Consume")
//订阅消息
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
//创建一个channel
forever := make(chan bool)
//调用gorountine
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
//没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
<-forever
}
查看结果
我们先使用Producer来发送一列消息:
root@rabbitmq-1:~# /usr/local/go/bin/go run producer_acknowledgments.go First message. &&/usr/local/go/bin/go run producer_acknowledgments.go Second message.. && /usr/local/go/bin/go run producer_acknowledgments.go Third message... && /usr/local/go/bin/go run producer_acknowledgments.go Fourth message.... && /usr/local/go/bin/go run producer_acknowledgments.go Fifth message.....
2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:10 got Connection, getting Channel
2018/12/14 07:30:10 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:10 declared queue, publishing 14B body ("First message.")
2018/12/14 07:30:10 published 14B OK
2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:10 got Connection, getting Channel
2018/12/14 07:30:10 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:10 declared queue, publishing 16B body ("Second message..")
2018/12/14 07:30:10 published 16B OK
2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:11 got Connection, getting Channel
2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:11 declared queue, publishing 16B body ("Third message...")
2018/12/14 07:30:11 published 16B OK
2018/12/14 07:30:11 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:11 got Connection, getting Channel
2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:11 declared queue, publishing 18B body ("Fourth message....")
2018/12/14 07:30:11 published 18B OK
2018/12/14 07:30:11 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:11 got Connection, getting Channel
2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:11 declared queue, publishing 18B body ("Fifth message.....")
2018/12/14 07:30:11 published 18B OK
通过rabbitmqctl命令,来看下messages_unacknowledged的情况:
root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
test-idoall-queues-task 0 0
test-idoall-queues-acknowledgments 5 0
test-idoall-queues 0 0
使用Consumer来订阅消息操作到第四条的时候,我们按CTRL+C退出,这个时候相当于消息已经被读取,但是未发送d.ACK(false):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_acknowledgments.go
2018/12/14 07:30:25 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:25 got Connection, getting Channel
2018/12/14 07:30:25 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:25 Queue bound to Exchange, starting Consume
2018/12/14 07:30:25 [*] Waiting for messages. To exit press CTRL+C
2018/12/14 07:30:25 Received a message: First message.
2018/12/14 07:30:26 Done
2018/12/14 07:30:26 Received a message: Second message..
2018/12/14 07:30:28 Done
2018/12/14 07:30:28 Received a message: Third message...
2018/12/14 07:30:31 Done
2018/12/14 07:30:31 Received a message: Fourth message....
^Csignal: interrupt
再通过rabbitmqctl命令可以看到,还是有2条消息未处理
root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
test-idoall-queues-task 0 0
test-idoall-queues-acknowledgments 2 0
test-idoall-queues 0 0
有疑问加站长微信联系(非本文作者)