仓库地址
go get -u github.com/lazychanger/go-amqp
注意事项
- rabbitmq将连接与管道分开,即
connection
与channel
。connection
是实体连接,而channel
是逻辑连接。所以当我们多线程使用时候,应该是单connection
+多channel
connection
闪断以后,channel
也会失效。所以重连以后,还需要重新建立channel
- 所有的
推送
,消费
都是基于channel
,所以channel
重连以后,还需要重新开启推送
、消费
- 在
connection
闪断以后。推送
是无效的,这个时候再去推送,肯定会报错。所以我们重连机制,必须要保证,connection
在闪断时候,也要保证推送
接口正常
单channel
管理
思考
我们先定义,channel
到底代表什么?我认为单个channel
操作单条队列的发送与消费,并且需要对主服务提供重启。既然要提供重启,那就要保证单个channel
对象的消费与推送,不允许直接与channel
操作,需要进行操作隔离,防止闪断与重启期间,操作无效的channel
。要完成操作隔离,隔离消费
比较简单,收集好相关操作的handle
,重启以后,重新接入handle
即可。但是隔离推送
比较困难,因为对于用户来说,推送
是实时的,而且我们也无法保证重连时间是多久,所以让用户等待重连是不可行的,那么该怎么操作呢?这里我选择的是内置一条缓存队列,用户推送
数据,先进入缓存队列,再由缓存队列的消费者取出交由真正的队列,同时也要注意重连时间过长导致内存溢出、服务端重启导致交换队列还清空。所以我们设计的channel
应该满足以下几点
- 内置推送交换队列
- 隔离消费
提供重启机制
- 重启
channel
- 重启
消费
- 重启
推送
- 重启
- 提供优雅关闭(关机时候需要保障推送交换队列完全推送完毕)
代码案例
package go_amqp
import (
"encoding/json"
"errors"
"fmt"
"github.com/lazychanger/go-amqp/tools"
"github.com/streadway/amqp"
"log"
"sync"
"time"
)
type Queue struct {
sync.Mutex
// 队列名
name string
// 交换机名
exchangeName string
// 队列绑定交换机路由KEY
routingKey string
// 交换机模式
exchange string
// 最大内置队列长度
maxOverstock int64
conn *amqp.Connection
channel *amqp.Channel
// 1.为什么不采用slice。因为golang的slice我们不方便作为队列操作,每次入出都会引发大量的内存操作。所以采用 map 类型
// 2.为什么采用sync.map。因为入与出是同时执行,异步操作,如果直接采用map类型,会导致脏读、幻读等问题,我们需要对map类型的读写需要锁,而golang有内置完整的读写锁map模型
// 3.采用map类型以后,需要我们自己维护头出尾入以及长度,所以这里有qMaxI、qMinI、ql进行数据记录
q *sync.Map
// 最大index
qMaxI int64
// 最小index
qMinI int64
// 内置队列长度
ql int64
// 是否启动释放库存
isReleaseStock bool
// 是否重启消费
isReloadConsume int
// 是否停止消费
isStopConsume bool
// 是否准备好
already int
close bool
// 消费者
cs []consume
}
type MessageStatus int8
const (
AlreadyStop = 0
AlreadyReload = 1
AlreadySucceed = 2
MessageStatusSucceed MessageStatus = 1
MessageStatusError MessageStatus = 2
MessageStatusRequeue MessageStatus = 3
)
// 提供重启服务
func (q *Queue) Reload(conn *amqp.Connection) error {
// 先关闭状态
q.conn = conn
// 标识重启中
q.already = AlreadyReload
return q.init()
}
// 初始化操作
func (q *Queue) init() error {
var (
ch *amqp.Channel
err error
)
log.Println("[amqp] queue init start")
q.Lock()
ch, err = q.conn.Channel()
if err != nil {
return err
}
// 创建交换机
if err = ch.ExchangeDeclare(q.exchangeName, q.exchange, false, false, false, false, nil); err != nil {
return errors.New(fmt.Sprintf("[amqp] exchange declare failed, err: %s", err))
}
// 创建队列
if _, err = ch.QueueDeclare(q.name, false, false, false, false, nil); err != nil {
return errors.New(fmt.Sprintf("[amqp] queue declare failed, err: %s", err))
}
// 交换机绑定队列
if err = ch.QueueBind(q.name, q.routingKey, q.exchangeName, false, nil); err != nil {
return errors.New(fmt.Sprintf("[amqp] queue bind failed, err: %s", err))
}
// 管道交换
q.channel = ch
// 告知已经准备完毕
if q.already == AlreadyReload {
q.already = AlreadySucceed
// 重新触发消费
q.reloadConsume()
if !q.isReleaseStock && q.ql > 0 {
log.Println("init release stock")
// 重新触发
go q.releaseStock()
}
}
q.Unlock()
log.Println("[amqp] queue init end")
return nil
}
// 对推送简单封装一下,使json对象推送更加简便
func (q *Queue) PublishJson(v interface{}) error {
body, err := json.Marshal(v)
if err != nil {
return err
}
return q.Publish(body)
}
// 原始推送,要求[]byte
func (q *Queue) Publish(data []byte) error {
// 检查库存,防止溢出
if q.maxOverstock > 0 && q.ql > q.maxOverstock {
return publishOverstock
}
if q.close {
return errors.New("service closing")
}
// 启动锁,防止释放库存时候,共同操作导致脏写
q.Lock()
// 防止并发map创建
if q.q == nil {
q.q = &sync.Map{}
}
// 增加最大下标
q.qMaxI++
// 增加最大长度
q.ql++
// 存储值
q.q.Store(q.qMaxI, data)
q.Unlock()
// 检查释放库存是否启动,未启动并且channel已经准备完毕,就启动库存释放
if !q.isReleaseStock && q.already == AlreadySucceed {
go q.releaseStock()
}
log.Printf("published,now %d", q.ql)
return nil
}
// 内置队列消费,库存释放
func (q *Queue) releaseStock() {
// 判断是否重复启动
q.Lock()
if q.isReleaseStock {
q.Unlock()
log.Println("[amqp] release stock already run")
return
}
// 标记服务已经启动
q.isReleaseStock = true
q.Unlock()
log.Println("[amqp] release stock")
for {
// 如果库存为空或者channel还未准备好就关闭循环
if q.ql == 0 || q.already != AlreadySucceed {
break
}
// 先将当前长度取出,防止循环时候修改,变成脏读
l := q.ql
// 实际启动当前轮次库存释放
for i := int64(0); i < l; i++ {
if q.already != AlreadySucceed {
break
}
// 对库存
q.Lock()
log.Printf("internal queues length: %d", q.ql)
// 对库存最小下标进行+1
q.qMinI++
// 减少库存最大数
q.ql--
// 锁期间,顶部索引不变
min := q.qMinI
q.Unlock()
// 读取内容
body, has := q.q.Load(min)
// 预防脏读
if has && body != nil {
// 推送
_ = q.publish(body.([]byte))
} else {
log.Println("[amqp] data error")
}
// 释放map空间
q.q.Delete(min)
}
// 本轮库存释放已经结束,延迟执行3秒后执行下一轮
ticker := time.NewTicker(time.Second * 3)
select {
case <-ticker.C:
ticker.Stop()
}
}
// 标记关闭
q.isReleaseStock = false
}
// 实际channel向队列发送数据
func (q *Queue) publish(data []byte) error {
return q.channel.Publish(q.exchangeName, q.routingKey, false, false, amqp.Publishing{
Body: data,
})
}
// 添加消费内容,先存储,等待服务启动以后触发
func (q *Queue) Consume(name string, consumeFunc ConsumeFunc, repeat int) error {
q.cs = append(q.cs, consume{
repeat: tools.IF(repeat <= 0, 1, repeat).(int),
consumeFunc: consumeFunc,
name: name,
})
// 尝试启动消费
q.reloadConsume()
return nil
}
// 暂停消费
func (q *Queue) StopConsume() {
q.isStopConsume = true
}
// 启动消费
func (q *Queue) StartConsume() {
q.isStopConsume = false
q.reloadConsume()
}
// 实际触发消费
func (q *Queue) reloadConsume() {
// 如果未启动,直接返回
if q.already != AlreadySucceed || q.isStopConsume {
return
}
// 推送重启消费
q.isReloadConsume++
// 记录当前消费重启值
reloadConsume := q.isReloadConsume
// 记录当前channel重启值
for i, c := range q.cs {
// 并发消费
for l := 0; l < c.repeat; l++ {
name := fmt.Sprintf("%s_%d-%d", c.name, i, l)
msgs, err := q.channel.Consume(q.name, name, false, false, false, false, nil)
if err != nil {
log.Fatalf("[AMQP] customer register err;name: %s, %s", name, err)
} else {
go func(c ConsumeFunc, consumeName string, reloadConsume int) {
for msg := range msgs {
switch c(msg.Body, consumeName) {
case MessageStatusSucceed:
case MessageStatusError:
_ = msg.Ack(true)
break
case MessageStatusRequeue:
_ = msg.Reject(true)
break
}
// 如果channel重启或者消费重启,都结束当前消费,防止溢出,或者正在关闭
if q.already != AlreadySucceed || q.isReloadConsume != reloadConsume || q.close || q.isStopConsume {
break
}
}
}(c.consumeFunc, name, reloadConsume)
}
}
}
}
// 优雅重启
func (q *Queue) Close() error {
// 先标记关闭
q.close = true
retry := 0
for {
if q.ql > 0 {
if q.already == AlreadySucceed {
if q.isReleaseStock == false {
q.releaseStock()
}
} else {
retry++
// 如果channel没有准备好,内置队列也没有释放完,则重试三次,三次还没有处理好,就放弃重试
if retry > 3 {
break
}
}
} else {
break
}
ticker := time.NewTicker(time.Second / 2)
select {
case <-ticker.C:
ticker.Stop()
}
}
return q.channel.Close()
}
type consume struct {
name string
consumeFunc ConsumeFunc
repeat int
}
type ConsumeFunc func(data []byte, name string) MessageStatus
多channel
管理
思考
上面介绍了单个channel
内部状态维护,那么现在就要开始对这些channel
进行管理。管理内容如下:
channel
注册- 断线重启
- 优雅关闭
代码
package go_amqp
import (
"github.com/streadway/amqp"
"log"
"sync"
"time"
)
type Connection struct {
// 配置
config *config
// amqp连接
conn *amqp.Connection
// 连接状态
isConnected bool
// 关机提示
done chan bool
// amqp闪断通知
notifyClose chan *amqp.Error
// 多队列(channel),此处认为一个channel管理一条队列
qs []*Queue
}
type config struct {
// 连接AMQP DSN构建驱动
driver Driver
// 最大消息重发次数
maxSendRetries int
// 最大重连次数
maxReconnects int
// 重连延迟时间
reconnectDelay time.Duration
// 最大发送积压数
maxOverstock int64
}
func New(opts ...Option) (*Connection, error) {
// 建立默认连接对象
conn := &Connection{
// 生成默认配置
config: &config{
reconnectDelay: time.Second,
maxReconnects: 0,
maxSendRetries: 3,
},
}
for _, opt := range opts {
// 配置写入
opt(conn.config)
}
// 基础必须配置检查
if conn.config.driver == nil {
return nil, missingConnectionDsnDriver
}
// 断线重连
go conn.handleReconnect()
return conn, nil
}
func (c *Connection) handleReconnect() {
tryConnects := 0
for {
c.isConnected = false
if err := c.connect(); err != nil {
if c.config.maxReconnects > 0 && tryConnects > c.config.maxReconnects {
log.Fatalf("[AMQP] Reconnection times exceeded!(%d)", tryConnects)
return
}
tryConnects += 1
log.Printf("Failed to connect. %s Retrying...(%d)", err, tryConnects)
time.Sleep(c.config.reconnectDelay)
} else {
// clear try connect
tryConnects = 0
}
// 等待下一步信号通知
select {
case <-c.done:
return
case <-c.notifyClose:
}
}
}
//
func (c *Connection) connect() error {
url := c.config.driver.Url()
log.Printf("[amqp] connected. %s", url)
conn, err := amqp.Dial(url)
if err != nil {
return err
}
c.conn = conn
c.notifyClose = make(chan *amqp.Error)
c.conn.NotifyClose(c.notifyClose)
c.isConnected = true
// channel重连
c.queueReconnect()
return nil
}
// channel重连
// 当连接闪断以后,需要重新建立新的连接,所以,所有的channel也需要进行新的连接
func (c *Connection) queueReconnect() {
for _, q := range c.qs {
if err := q.Reload(c.conn); err != nil {
log.Println(err)
}
}
}
// 优雅关闭,
// 关闭channel
// 关闭闪断重连机制
// 关闭connection
func (c *Connection) Close() error {
if c.isConnected {
return alreadyClosed
}
var wg sync.WaitGroup
// 批量关闭
for i := 0; i < len(c.qs); i++ {
wg.Add(1)
go func(idx int, group *sync.WaitGroup) {
_ = c.qs[idx].Close()
group.Done()
}(i, &wg)
}
wg.Wait()
_ = c.conn.Close()
// 关闭
close(c.done)
return nil
}
// 生成单Channel管理,让一个channel管理一条队列的发送与消费
func (c *Connection) Queue(name, exchange, routingKey string) (*Queue, error) {
q := &Queue{
name: name,
exchange: amqp.ExchangeDirect,
exchangeName: exchange,
routingKey: routingKey,
maxOverstock: c.config.maxOverstock,
}
if c.isConnected == true {
if err := q.Reload(c.conn); err != nil {
return nil, err
}
}
c.qs = append(c.qs, q)
return q, nil
}
注
刚开始写一些技术分享的,很多地方或者结构可能写的比较粗糙,欢迎各位执教、交流。如果有不了解或者新的想法,也可以评论留言沟通。thanks!
有疑问加站长微信联系(非本文作者)