简单设计go-amqp

inight · · 820 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

仓库地址

go get -u github.com/lazychanger/go-amqp

注意事项

  • rabbitmq将连接与管道分开,即connectionchannelconnection是实体连接,而channel是逻辑连接。所以当我们多线程使用时候,应该是单connection+多channel
  • connection闪断以后,channel也会失效。所以重连以后,还需要重新建立channel
  • 所有的推送消费都是基于channel,所以channel重连以后,还需要重新开启推送消费
  • connection闪断以后。推送是无效的,这个时候再去推送,肯定会报错。所以我们重连机制,必须要保证,connection在闪断时候,也要保证推送接口正常

channel管理

思考

我们先定义,channel到底代表什么?我认为单个channel操作单条队列的发送与消费,并且需要对主服务提供重启。既然要提供重启,那就要保证单个channel对象的消费与推送,不允许直接与channel操作,需要进行操作隔离,防止闪断与重启期间,操作无效的channel。要完成操作隔离,隔离消费比较简单,收集好相关操作的handle,重启以后,重新接入handle即可。但是隔离推送比较困难,因为对于用户来说,推送是实时的,而且我们也无法保证重连时间是多久,所以让用户等待重连是不可行的,那么该怎么操作呢?这里我选择的是内置一条缓存队列,用户推送数据,先进入缓存队列,再由缓存队列的消费者取出交由真正的队列,同时也要注意重连时间过长导致内存溢出、服务端重启导致交换队列还清空。所以我们设计的channel应该满足以下几点

  • 内置推送交换队列
  • 隔离消费
  • 提供重启机制

    • 重启channel
    • 重启消费
    • 重启推送
  • 提供优雅关闭(关机时候需要保障推送交换队列完全推送完毕)

代码案例


package go_amqp

import (
    "encoding/json"
    "errors"
    "fmt"
    "github.com/lazychanger/go-amqp/tools"
    "github.com/streadway/amqp"
    "log"
    "sync"
    "time"
)

type Queue struct {
    sync.Mutex
    // 队列名
    name string
    // 交换机名
    exchangeName string
    // 队列绑定交换机路由KEY
    routingKey string
    // 交换机模式
    exchange string
    // 最大内置队列长度
    maxOverstock int64

    conn    *amqp.Connection
    channel *amqp.Channel

    // 1.为什么不采用slice。因为golang的slice我们不方便作为队列操作,每次入出都会引发大量的内存操作。所以采用 map 类型
    // 2.为什么采用sync.map。因为入与出是同时执行,异步操作,如果直接采用map类型,会导致脏读、幻读等问题,我们需要对map类型的读写需要锁,而golang有内置完整的读写锁map模型
    // 3.采用map类型以后,需要我们自己维护头出尾入以及长度,所以这里有qMaxI、qMinI、ql进行数据记录
    q *sync.Map
    // 最大index
    qMaxI int64
    // 最小index
    qMinI int64
    // 内置队列长度
    ql int64
    // 是否启动释放库存
    isReleaseStock bool
    // 是否重启消费
    isReloadConsume int
    // 是否停止消费
    isStopConsume bool
    // 是否准备好
    already int

    close bool
    // 消费者
    cs []consume
}

type MessageStatus int8

const (
    AlreadyStop    = 0
    AlreadyReload  = 1
    AlreadySucceed = 2

    MessageStatusSucceed MessageStatus = 1
    MessageStatusError   MessageStatus = 2
    MessageStatusRequeue MessageStatus = 3
)

// 提供重启服务
func (q *Queue) Reload(conn *amqp.Connection) error {
    // 先关闭状态
    q.conn = conn
    // 标识重启中
    q.already = AlreadyReload
    return q.init()
}

// 初始化操作
func (q *Queue) init() error {
    var (
        ch  *amqp.Channel
        err error
    )
    log.Println("[amqp] queue init start")
    q.Lock()
    ch, err = q.conn.Channel()
    if err != nil {
        return err
    }

    // 创建交换机
    if err = ch.ExchangeDeclare(q.exchangeName, q.exchange, false, false, false, false, nil); err != nil {
        return errors.New(fmt.Sprintf("[amqp] exchange declare failed, err: %s", err))
    }
    // 创建队列
    if _, err = ch.QueueDeclare(q.name, false, false, false, false, nil); err != nil {
        return errors.New(fmt.Sprintf("[amqp] queue declare failed, err: %s", err))
    }
    // 交换机绑定队列
    if err = ch.QueueBind(q.name, q.routingKey, q.exchangeName, false, nil); err != nil {
        return errors.New(fmt.Sprintf("[amqp] queue bind failed, err: %s", err))
    }
    // 管道交换
    q.channel = ch
    // 告知已经准备完毕
    if q.already == AlreadyReload {
        q.already = AlreadySucceed

        // 重新触发消费
        q.reloadConsume()

        if !q.isReleaseStock && q.ql > 0 {
            log.Println("init release stock")
            // 重新触发
            go q.releaseStock()
        }
    }

    q.Unlock()

    log.Println("[amqp] queue init end")

    return nil
}

// 对推送简单封装一下,使json对象推送更加简便
func (q *Queue) PublishJson(v interface{}) error {
    body, err := json.Marshal(v)
    if err != nil {
        return err
    }

    return q.Publish(body)
}

// 原始推送,要求[]byte
func (q *Queue) Publish(data []byte) error {
    // 检查库存,防止溢出
    if q.maxOverstock > 0 && q.ql > q.maxOverstock {
        return publishOverstock
    }

    if q.close {
        return errors.New("service closing")
    }

    // 启动锁,防止释放库存时候,共同操作导致脏写
    q.Lock()
    // 防止并发map创建
    if q.q == nil {
        q.q = &sync.Map{}
    }
    // 增加最大下标
    q.qMaxI++
    // 增加最大长度
    q.ql++
    // 存储值
    q.q.Store(q.qMaxI, data)
    q.Unlock()

    // 检查释放库存是否启动,未启动并且channel已经准备完毕,就启动库存释放
    if !q.isReleaseStock && q.already == AlreadySucceed {
        go q.releaseStock()
    }

    log.Printf("published,now %d", q.ql)
    return nil
}

// 内置队列消费,库存释放
func (q *Queue) releaseStock() {
    // 判断是否重复启动
    q.Lock()
    if q.isReleaseStock {
        q.Unlock()
        log.Println("[amqp] release stock already run")
        return
    }
    // 标记服务已经启动
    q.isReleaseStock = true
    q.Unlock()
    log.Println("[amqp] release stock")
    for {
        // 如果库存为空或者channel还未准备好就关闭循环
        if q.ql == 0 || q.already != AlreadySucceed {
            break
        }
        // 先将当前长度取出,防止循环时候修改,变成脏读
        l := q.ql
        // 实际启动当前轮次库存释放
        for i := int64(0); i < l; i++ {
            if q.already != AlreadySucceed {
                break
            }
            // 对库存
            q.Lock()
            log.Printf("internal queues length: %d", q.ql)
            // 对库存最小下标进行+1
            q.qMinI++
            // 减少库存最大数
            q.ql--
            // 锁期间,顶部索引不变
            min := q.qMinI
            q.Unlock()

            // 读取内容
            body, has := q.q.Load(min)
            // 预防脏读
            if has && body != nil {
                // 推送
                _ = q.publish(body.([]byte))
            } else {
                log.Println("[amqp] data error")
            }
            // 释放map空间
            q.q.Delete(min)
        }

        // 本轮库存释放已经结束,延迟执行3秒后执行下一轮
        ticker := time.NewTicker(time.Second * 3)
        select {
        case <-ticker.C:
            ticker.Stop()
        }
    }
    // 标记关闭
    q.isReleaseStock = false
}

// 实际channel向队列发送数据
func (q *Queue) publish(data []byte) error {
    return q.channel.Publish(q.exchangeName, q.routingKey, false, false, amqp.Publishing{
        Body: data,
    })
}

// 添加消费内容,先存储,等待服务启动以后触发
func (q *Queue) Consume(name string, consumeFunc ConsumeFunc, repeat int) error {
    q.cs = append(q.cs, consume{
        repeat:      tools.IF(repeat <= 0, 1, repeat).(int),
        consumeFunc: consumeFunc,
        name:        name,
    })
    // 尝试启动消费
    q.reloadConsume()

    return nil
}

// 暂停消费
func (q *Queue) StopConsume() {
    q.isStopConsume = true
}

// 启动消费
func (q *Queue) StartConsume() {
    q.isStopConsume = false
    q.reloadConsume()
}

// 实际触发消费
func (q *Queue) reloadConsume() {
    // 如果未启动,直接返回
    if q.already != AlreadySucceed || q.isStopConsume {
        return
    }
    // 推送重启消费
    q.isReloadConsume++
    // 记录当前消费重启值
    reloadConsume := q.isReloadConsume
    // 记录当前channel重启值
    for i, c := range q.cs {
        // 并发消费
        for l := 0; l < c.repeat; l++ {
            name := fmt.Sprintf("%s_%d-%d", c.name, i, l)
            msgs, err := q.channel.Consume(q.name, name, false, false, false, false, nil)
            if err != nil {
                log.Fatalf("[AMQP] customer register err;name: %s, %s", name, err)
            } else {
                go func(c ConsumeFunc, consumeName string, reloadConsume int) {
                    for msg := range msgs {
                        switch c(msg.Body, consumeName) {
                        case MessageStatusSucceed:
                        case MessageStatusError:
                            _ = msg.Ack(true)
                            break
                        case MessageStatusRequeue:
                            _ = msg.Reject(true)
                            break
                        }
                        // 如果channel重启或者消费重启,都结束当前消费,防止溢出,或者正在关闭
                        if q.already != AlreadySucceed || q.isReloadConsume != reloadConsume || q.close || q.isStopConsume {
                            break
                        }
                    }
                }(c.consumeFunc, name, reloadConsume)
            }
        }
    }
}

// 优雅重启
func (q *Queue) Close() error {
    // 先标记关闭
    q.close = true

    retry := 0

    for {

        if q.ql > 0 {
            if q.already == AlreadySucceed {
                if q.isReleaseStock == false {
                    q.releaseStock()
                }
            } else {
                retry++
                // 如果channel没有准备好,内置队列也没有释放完,则重试三次,三次还没有处理好,就放弃重试
                if retry > 3 {
                    break
                }
            }
        } else {
            break
        }

        ticker := time.NewTicker(time.Second / 2)
        select {
        case <-ticker.C:
            ticker.Stop()
        }
    }

    return q.channel.Close()
}

type consume struct {
    name        string
    consumeFunc ConsumeFunc
    repeat      int
}
type ConsumeFunc func(data []byte, name string) MessageStatus

channel管理

思考

上面介绍了单个channel内部状态维护,那么现在就要开始对这些channel进行管理。管理内容如下:

  • channel注册
  • 断线重启
  • 优雅关闭

代码

package go_amqp

import (
    "github.com/streadway/amqp"
    "log"
    "sync"
    "time"
)

type Connection struct {
    // 配置
    config *config
    // amqp连接
    conn *amqp.Connection
    // 连接状态
    isConnected bool
    // 关机提示
    done chan bool
    // amqp闪断通知
    notifyClose chan *amqp.Error
    // 多队列(channel),此处认为一个channel管理一条队列
    qs []*Queue
}


type config struct {
    // 连接AMQP DSN构建驱动
    driver Driver

    // 最大消息重发次数
    maxSendRetries int
    // 最大重连次数
    maxReconnects int
    // 重连延迟时间
    reconnectDelay time.Duration

    // 最大发送积压数
    maxOverstock int64
}


func New(opts ...Option) (*Connection, error) {
    // 建立默认连接对象
    conn := &Connection{
        // 生成默认配置
        config: &config{
            reconnectDelay: time.Second,
            maxReconnects:  0,
            maxSendRetries: 3,
        },
    }

    for _, opt := range opts {
        // 配置写入
        opt(conn.config)
    }

    // 基础必须配置检查
    if conn.config.driver == nil {
        return nil, missingConnectionDsnDriver
    }

    // 断线重连
    go conn.handleReconnect()

    return conn, nil
}

func (c *Connection) handleReconnect() {
    tryConnects := 0
    for {
        c.isConnected = false
        if err := c.connect(); err != nil {
            if c.config.maxReconnects > 0 && tryConnects > c.config.maxReconnects {
                log.Fatalf("[AMQP] Reconnection times exceeded!(%d)", tryConnects)
                return
            }

            tryConnects += 1
            log.Printf("Failed to connect. %s Retrying...(%d)", err, tryConnects)
            time.Sleep(c.config.reconnectDelay)
        } else {
            // clear try connect
            tryConnects = 0
        }
        // 等待下一步信号通知
        select {
        case <-c.done:
            return
        case <-c.notifyClose:
        }
    }
}

//
func (c *Connection) connect() error {
    url := c.config.driver.Url()
    log.Printf("[amqp] connected. %s", url)
    conn, err := amqp.Dial(url)
    if err != nil {
        return err
    }
    c.conn = conn

    c.notifyClose = make(chan *amqp.Error)
    c.conn.NotifyClose(c.notifyClose)

    c.isConnected = true
    // channel重连
    c.queueReconnect()
    return nil
}

// channel重连
// 当连接闪断以后,需要重新建立新的连接,所以,所有的channel也需要进行新的连接
func (c *Connection) queueReconnect() {
    for _, q := range c.qs {
        if err := q.Reload(c.conn); err != nil {
            log.Println(err)
        }
    }
}

// 优雅关闭,
// 关闭channel
// 关闭闪断重连机制
// 关闭connection
func (c *Connection) Close() error {
    if c.isConnected {
        return alreadyClosed
    }

    var wg sync.WaitGroup
    // 批量关闭
    for i := 0; i < len(c.qs); i++ {
        wg.Add(1)
        go func(idx int, group *sync.WaitGroup) {
            _ = c.qs[idx].Close()
            group.Done()
        }(i, &wg)
    }

    wg.Wait()

    _ = c.conn.Close()

    // 关闭
    close(c.done)
    return nil
}

// 生成单Channel管理,让一个channel管理一条队列的发送与消费
func (c *Connection) Queue(name, exchange, routingKey string) (*Queue, error) {
    q := &Queue{
        name:         name,
        exchange:     amqp.ExchangeDirect,
        exchangeName: exchange,
        routingKey:   routingKey,
        maxOverstock: c.config.maxOverstock,
    }

    if c.isConnected == true {
        if err := q.Reload(c.conn); err != nil {
            return nil, err
        }
    }
    c.qs = append(c.qs, q)
    return q, nil
}

刚开始写一些技术分享的,很多地方或者结构可能写的比较粗糙,欢迎各位执教、交流。如果有不了解或者新的想法,也可以评论留言沟通。thanks!


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:inight

查看原文:简单设计go-amqp

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

820 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传