安装erlang 因为RabbitMQ是基于erlang开发的
安装RabbiMQ
上述步骤自行百度windows linux都支持
安装注意事项:
erlang要和RabbiMQ版本对应上
RabbiMQ windows安装需要将.erlang.cookie改成一样的
-
rabbitMQ常用的命令
启动监控管理器:rabbitmq-plugins enable rabbitmq_management关闭监控管理器:rabbitmq-plugins disable rabbitmq_management
启动rabbitmq:net start RabbitMQ
关闭rabbitmq:net stop RabbitMQ
查看所有的队列:rabbitmqctl list_queues
清除所有的队列:rabbitmqctl reset
用户和权限设置
查看已有用户及用户的角色:rabbitmqctl.bat list_users
添加用户:rabbitmqctl add_user username password
删除用户:rabbitmqctl.bat delete_user username
分配管理员角色:rabbitmqctl set_user_tags username administrator
新增虚拟主机:rabbitmqctl add_vhost vhost_name
将新虚拟主机授权给新用户:rabbitmqctl set_permissions -p vhost_name username '.' '.' '.*'
设置用户权限:rabbitmqctl set_permissions -p VHostPath username ConfP WriteP ReadP
查看(指定hostpath)所有用户的权限信息:rabbitmqctl list_permissions [-p VHostPath]
查看指定用户的权限信息:rabbitmqctl list_user_permissions username
清除用户的权限信息:rabbitmqctl clear_permissions [-p VHostPath] username
- 角色说明
none 最小权限角色
management 管理员角色
policymaker 决策者
monitoring 监控
administrator 超级管理员
golang 简单操作RabbitMQ
link.go
package RabbitMQ
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
//连接密码
const MQURL = "amqp://用户名:密码@127.0.0.1:5672/imooc"
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
//队列名称
QueueName string
//交换机
Exchange string
//key
key string
//连接信息
Mqurl string
}
//创建RabbitMQ结构体实例
func NewRabbitMQ(queueName, exchange, key string) *RabbitMQ {
rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: exchange, key: key, Mqurl: MQURL}
var err error
//创建RabbitMQ连接
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "创建连接错误!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "获取channel失败!")
return rabbitmq
}
//断开channel和connection
func (r *RabbitMQ) Destroy() {
r.channel.Close()
r.conn.Close()
}
//错误处理函数
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
log.Fatalf("%s:%s", message, err)
panic(fmt.Sprintf("%s,%s", message, err))
}
}
simple.go
package RabbitMQ
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
//创建简单模式下RabbitMQ实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
return NewRabbitMQ(queueName, "", "")
}
//简单模式下生产代码
func (r *RabbitMQ) PublishSimple(message string) {
//1.申请队列,如果队列不存在会自动创建,如果存在则跳过创建
//保证队列存在,消息队列能发送到队列中
_, err := r.channel.QueueDeclare(
r.QueueName,
//是否持久化
false,
//是否为自动删除
false,
//是否具有排他性
false,
//是否阻塞
false,
//额外属性
nil,
)
if err != nil {
fmt.Println("QueueDeclare:", err)
}
//2.发送消息到队列中
err = r.channel.Publish(
r.Exchange,
r.QueueName,
//如果为true,根据exchange类型和routekey规则,如果无法找到符合条件的队列那么会把发送的消息返回给发送者
false,
//如果为true,当exchange发送消息队列到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
if err != nil {
fmt.Println("Publish:", err)
}
}
//简单模式下消费代码
func (r *RabbitMQ) ConsumeSimple() {
//1.申请队列,如果队列不存在会自动创建,如果存在则跳过创建
//保证队列存在,消息队列能发送到队列中
_, err := r.channel.QueueDeclare(
r.QueueName,
//是否持久化
false,
//是否为自动删除
false,
//是否具有排他性
false,
//是否阻塞
false,
//额外属性
nil)
if err != nil {
fmt.Println("QueueDeclare:", err)
}
//2.接受消息
msgs, err := r.channel.Consume(
r.QueueName,
//用来区分多个消费者
"",
//是否自动应答
true,
//是否具有排他性
false,
//如果设置为true,表示不能将同一个connection中发送消息传递给这个connection中的消费者
false,
//队列消费是否阻塞
false,
nil)
if err != nil {
fmt.Println("Consume:", err)
}
forever := make(chan bool)
//3.启用协程处理消息
go func() {
for d := range msgs {
//实现我们要处理的逻辑函数
log.Printf("Received a message:%s", d.Body)
}
}()
log.Printf("[*] Waiting for messages,To exit press CTRL+C\n")
<-forever
}
work模式
生产者 work.go
package main
import (
"fmt"
"owennew/RabbitMQ"
"strconv"
"time"
)
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
for i := 0; i <= 100; i++ {
rabbitmq.PublishSimple("Hello test!" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Println(i)
}
}
消费者 workclent1.go
package main
import "owennew/RabbitMQ"
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
rabbitmq.ConsumeSimple()
}
消费者 workclent2.go
package main
import "owennew/RabbitMQ"
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple")
rabbitmq.ConsumeSimple()
}
有疑问加站长微信联系(非本文作者)