连接rabbitmq的代码
消费者断线重连:
package main
import (
"log"
"github.com/streadway/amqp"
"database/sql"
_"github.com/go-sql-driver/mysql"
//"time"
)
func insert(db *sql.DB, a []byte) {
stmt, err := db.Prepare("INSERT INTO rabbit (name) VALUES(?)")
stmt.Exec(a)
defer stmt.Close()
if err != nil {
log.Println(err)
return
}
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
//打开mysql
db, err := sql.Open("mysql", "root:0022....hh@tcp(127.0.0.1:3306)/rabbitmq?charset=utf8")
if err != nil {
log.Fatalf("Open database error: %s\n", err)
}
defer db.Close()
err = db.Ping()
if err != nil {
log.Fatal(err)
}
//forever := make(chan bool)
//连接rabbitmq
for {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
continue
}
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 创建消费者
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
// 协程获取消息队列处理结果
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
insert(db, d.Body)
}
}()
if conn != nil {
//time.Sleep(50*time.Second)
continue
}
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
//<-forever
}
}
有疑问加站长微信联系(非本文作者))