这是测试生产者
**代码**
```
func Send(queueName string, msg interface{}) error {
if ch == nil {
if err := InitRabbitmq(Url); err != nil {
log.Println(err)
return err
}
}
_, err := ch.QueueDeclarePassive(queueName, true, false, false, true, nil)
if err != nil {
q, err = ch.QueueDeclare(queueName, true, false, false, false, nil)
if err != nil {
log.Println(err)
return err
}
}
data, err := json.Marshal(msg)
if err != nil {
return err
}
count++
log.Println(q, count)
err = ch.Publish(queueName,
q.Name,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: data,
})
if err != nil {
log.Println(err)
return err
}
return nil
}
```
```
func InitRabbitmq(url string) error {
var (
err error
)
conn, err = amqp.Dial(url)
if err != nil {
log.Println("Failed to connect to RabbitMq")
return err
}
ch, err = conn.Channel()
if err != nil {
defer conn.Close()
log.Println("Failed to open a channel")
return err
}
mqsync.Lock()
defer mqsync.Unlock()
q, err = ch.QueueDeclare("hello_world", true, false, false, false, nil)
if err != nil {
log.Println("Failed to declare queue:", err.Error())
return err
}
log.Println("rabbitmq init ok!")
return nil
}
```
```
var Url = `amqp://guest:guest@0.0.0.0:5672/`
func main(){
log.SetFlags(log.Lshortfile)
err := InitRabbitmq(Url)
if err != nil {
log.Println(err)
}
go func(){
data :=map[string]interface{}{
"1":hello,
}
for {
err := Send(data)
if err != nil {
log.Println(err)
}
time.Sleep(1 * time.Second)
}
}
go func(){
data :=map[string]interface{}{
"2":world,
}
for {
err := Send(data)
if err != nil {
log.Println(err)
}
time.Sleep(1 * time.Second)
}
}
select {}
}
```
**问题:**
![image.png](https://static.studygolang.com/190428/be8ecfc43c4152d4375aa87e087316cf.png)
**错误信息**
![image.png](https://static.studygolang.com/190428/d282c528631991e34e0d8bc2032b866d.png)
**分析**
>测试发现不是消费者好着,通过再次启动此程序发现
询问还有其他问题影响吗?
找到一个和我类似的错误:http://www.itkeyword.com/doc/8522470228588686135/golang-rabbitmq-channel-connection-is-not-open
我将代码中的close 关闭了也是一样的。
#1
更多评论
**解决办法**
>跑了3个小时了 目前没有发现连接断开。我先总结下这次优化的问题。
![image.png](https://static.studygolang.com/190428/3673dbfbf3e965b2d905f10486d72d94.png)
看到客户端连接rabbitmq 有closed 字段用来检验是断开连接。源码中也使用closed 字段检查连接性。于是发现AMQP的管道同样有一个closed。于是参考源码封装出来判断连接的函数。
```
connect.go
func (c *Connection) IsClosed() bool {
return (atomic.LoadInt32(&c.closed) == 1)
}
channel.go
func (c *Channel) IsClosed() bool {
return (atomic.LoadInt32(&c.closed) == 1)
}
```
修改重连为:
```
if conn.IsClosed() || ch.IsClosed() {
log.Println("连接断开,重新连接")
err = InitRabbitmq(Url)
log.Println(err)
}
```
**修改后稳定性增强,自己的问题没有仔细看源码。**
#3