基于AMQP实现的golang消息队列MaxQ

饿了么 · · 3506 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

背景

饿厂此前一直是重度rabbitmq使用者,在使用的过程中遭遇了大量的问题,性能问题、故障排查问题等。Rabbitmq是用erlang开发的,该语言过于小众,实在无力在其之上再做运维和开发。痛定思痛,我们于是决定自研一个消息队列,为了降低业务层的接入难度,所以该消息队列需要兼容AMQP协议,这样就可以在业务层完全无感知的情况下接入MaxQ。

什么是AMQP协议?

AMQP(Advanced Message Queuing Protocol),是一套消息队列的七层应用协议标准,由摩根大通和iMatrix在2004年开始着手制定,于2006年发布规范,目前最新版是AMQP 1.0,MaxQ基于AMQP 0.9.1实现。相比zeroMQ这类无Broker的模型,AMQP是一种有Broker的协议模型,也就是需要运行单独AMQP中间件服务,生产者客户端和消费者客户端通过AMQP SDK与AMQP中间件服务通讯。像kafka、JMS这类以topic为核心的Broker,生产者发送key和数据到Broker,由Broker比较key之后决定给那个消费者;而AMQP中淡化了topic的概念,引入了Exchange模型,生产者发送key和数据到Exchange,Exchange根据消费者订阅queue的路由规则路由到对应的queue,也就是AMQP解耦了key和queue,即解藕了生产者和消费者,使得生产者和消费者之间的关系更灵活,消费者可自由控制消费关系。另外,AMQP是同时支持消息Push和Pull的模型,对于Push模型,消费者还可通过设置Qos达到流控的目的。

下图是AMQP 0.9.1规范中给出的AMQP架构模型图:

下面简单介绍下AMQP中的一些基本概念:

  • Broker:接收和分发消息的应用,MaxQ就是基于AMQP协议实现的Message Broker。
  • Connection:publisher / consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
  • Channel:如果客户端每次和Broker通信都需要建议一条连接,在并大量连接并发的情况下建立TCP Connection的开销将是巨大的,效率也较低。于是,AMQP引入了channel的概念,channel是connection之上应用层建立的逻辑连接,Broker在实现中可创建单独的thread/协程来实现channel的并发,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
  • Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个Broker提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange / queue等。
  • Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
  • Queue:消息最终会落到queue里,消息会由Broker push给消费者,或者消费者主动去pull queue上的消息;一个message可以被同时分发到多个queue中。
  • Binding:exchange和queue之间的消息路由策略,binding中可以包含routing key。Binding信息被保存到exchange中的路由表中,用于message的分发依据。

MaxQ - AMQP实现架构

MaxQ对AMQP协议的实现,主要做了以下几件事:

按照协议spec自动生产frame encode/decode,这里采用了golang的text/template包,将AMQP spec抽象成固定的json和对应的代码模版,如:

{
    "id": 50,
    "methods": [{"id": 10,
                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
                   {"type": "shortstr", "name": "queue", "default-value": ""},
                               {"type": "bit", "name": "passive", "default-value": false},
                               {"type": "bit", "name": "durable", "default-value": false},
                               {"type": "bit", "name": "exclusive", "default-value": false},
                               {"type": "bit", "name": "auto-delete", "default-value": false},
                               {"type": "bit", "name": "nowait", "default-value": false},
                               {"type": "table", "name": "arguments", "default-value": {}}],
                 "name": "declare",
                 "synchronous" : true},
                {"id": 11,
                 "arguments": [{"type": "shortstr", "name": "queue"},
                               {"type": "long", "name": "message-count"},
                               {"type": "long", "name": "consumer-count"}],
                 "name": "declare-ok"},
                {"id": 20,
                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
                               {"type": "shortstr", "name": "queue", "default-value": ""},
                               {"type": "shortstr", "name": "exchange"},
                               {"type": "shortstr", "name": "routing-key", "default-value": ""},
                               {"type": "bit", "name": "nowait", "default-value": false},
                               {"type": "table", "name": "arguments", "default-value": {}}],
                 "name": "bind",
                 "synchronous" : true},
                {"id": 21,
                 "arguments": [],
                 "name": "bind-ok"},
                {"id": 30,
                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
                               {"type": "shortstr", "name": "queue", "default-value": ""},
                               {"type": "bit", "name": "nowait", "default-value": false}],
                 "name": "purge",
                 "synchronous" : true},
                {"id": 31,
                 "arguments": [{"type": "long", "name": "message-count"}],
                 "name": "purge-ok"},
                {"id": 40,
                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
                               {"type": "shortstr", "name": "queue", "default-value": ""},
                               {"type": "bit", "name": "if-unused", "default-value": false},
                               {"type": "bit", "name": "if-empty", "default-value": false},
                               {"type": "bit", "name": "nowait", "default-value": false}],
                 "name": "delete",
                 "synchronous" : true},
                {"id": 41,
                 "arguments": [{"type": "long", "name": "message-count"}],
                 "name": "delete-ok"},
                {"id": 50,
                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
                               {"type": "shortstr", "name": "queue", "default-value": ""},
                               {"type": "shortstr", "name": "exchange"},
                               {"type": "shortstr", "name": "routing-key", "default-value": ""},
                               {"type": "table", "name": "arguments", "default-value": {}}],
                 "name": "unbind",
                 "synchronous" : true},
                {"id": 51,
                 "arguments": [],
                 "name": "unbind-ok"}
                ],
    "name": "queue"
},

代码生成模版:

type {{$struct}} struct {
    {{range .Fields}}
        {{.Literal}} {{$.TypeOfFieldLiteral .}}
    {{end}}
    {{if .Content}}
        // Content
        properties *Properties
        body []byte
    {{end}}
}
// Name returns the string representation of the Method, implements
// Method.Name().
func (m *{{$struct}}) Name() string {
    return "{{.NameLiteral $class.Name}}"
}
// ID returns the AMQP index number of the Method, implements Method.ID().
func (m *{{$struct}}) ID() uint16 {
    return {{.ID}}
}
// Class returns a instance of the Class of this method, implements Method.Class().
func (m *{{$struct}}) Class() Class {
    return &{{$classStruct}}{}
}
// String returns the string representation for the Method.
func (m *{{$struct}}) String() string {
    return {{.StringLiteral $class.Name}}
}


Vhost API实现:

func (v *VHost) QueueDeclare(node, name string, durable, exclusive, autoDelete bool, args amqp.Table) ...
func (v *VHost) QueueInspect(name string) ...
func (v *VHost) QueueBind(name, key, exchange string, args amqp.Table) ...
func (v *VHost) QueueUnbind(name, key, exchange string, args amqp.Table) ...
func (v *VHost) QueuePurge(name string) ...
func (v *VHost) QueueDelete(name string, ifUnused, ifEmpty bool) ...
func (v *VHost) ExchangeDeclare(name string, exType string, durable bool, autoDelete bool, internal bool, arguments amqp.Table)
func (v *VHost) ExchangeDelete(name string, ifUnused bool) ...
func (v *VHost) ExchangeBind(destination, key, source string, args amqp.Table) ...
func (v *VHost) ExchangeUnbind(destination, key, source string, args amqp.Table) ...
func (v *VHost) Publish(exchange, key string, mandatory, immediate bool, props *amqp.Properties, body []byte) 


Exchange接口化,实现4种Exchange路由模式

// Exchange publisher
type publisher interface {
    bind(b *Binding) (exists bool)
    unbind(b *Binding)
    bindingsCount() int
    allBindings() []*Binding
    publish(msg *Message, routingKey string) (count int, err error)
}
 
type directPublisher struct {
 ...
}
  
type fanoutPublisher struct {
 ...
}
  
type topicPublisher struct {
 ...
}
  
type headersPublisher struct {
 ...
}


Queue接口化——MaxQ集群

  • Normal Queue: queue功能的具体实现,包括Publish、Consume、Cancel、Ack、Get等,单机版MaxQ会实例化此queue。
  • Master Queue: Normal Queue的超集,集群模式下会实例化此queue,在HA镜像策略下会与Slave Queue同步消息。
  • Virtual Queue: 负责远程调用Master Queue的API,主要是用作消息转发。
  • Slave Queue: Virtual Queue的超集,除了消息转发,还和Master Queue进行消息同步,在Master Queue down掉后,会被选取为新的Master Queue。


MaxQ - 生产实现架构

如果要将MaxQ应用到生产,还需要更多工作要做:

  1. MaxQ集群化,集群间的元数据通过zookeeper存储和同步,消息通过grpc进行通信。
  2. 通过四层Proxy,生产者或消费者客户端可以采用官方或第三方的AMQP SDK与MaxQ集群通讯。
  3. 集群管理,由于集群信息和元数据信息都存储在zookeeper上,因此通过zookeeper可以实现集群节点管理、扩容缩容和集群切换;

同时MaxQ本身提供了HTTP API管理和统计接口,因此可对集群进行监控统计、资源分配等。


MaxQ相关特性

1. 消息可靠性

  • Publishing可靠性,生产者设置confirm服务端确认机制,确认服务端成功接收到生产者消息。
  • 消息Routing可靠性,生产者设置Publish mandatory,确认消息路由到queue。
  • Consuming可靠性,消费者设置手工Ack,服务端在收到消息Ack后才清除本地消息。
  • Persisting可靠性, 采用RAID1存储持久化消息;
  • 分布式下的可靠性,设置queue的镜像模式,启动Slave Queue,与Master Queue进行消息同步,在aster Queue down掉后,Slave Queue可被选举为Master Queue。

2. 容错性

  • zookeeper不可用

1. 元数据已缓存在内存中,不会有任何影响,生产方和消费方仍可正常生产和消费

2. 服务会自动降级,元数据不可变更

3. zookeeper恢复,服务自愈


  • 节点故障

通过zookeeper进行Master Queue选举:

1.NodeA和NodeB收到NodeC挂掉的事件,NodeA和NodeB成为Master queue的候选节点

2.NodeA和NodeB各自上报同步的offset到zookeeper

3.NodeA和NodeB各自决策,offset最新的NodeA选为Master queue

4.NodeA将Master信息同步至zookeeper

5.NodeB更新新的Master信息,并同步数据

6.NodeC恢复,成为Slave queue,并与新的Master同步数据


  • 网络分区

3. 扩展性

  1. HA、Exchange和Queue动态扩展属性参数
  2. Exchange、Binding、Queue支持自定义扩展, 如:x-message-ttl、x-expires、x-max-length、x-dead-letter-exchange


使用场景和案例

下面介绍下MaxQ作为消息队列的经典三种使用场景和使用案例:

1. 异步解耦

订单系统与消息通知系统解耦

1.用户订单支付成功,直接向MaxQ推送下单成功通知,主流程迅速返回

2.消息通知系统异步接收通知消息, 发送短信通知或应用通知


2. 削峰填谷

SQL-autoreview系统分析优化SQL语句,并将结果落DB,属于慢消费, 生产高峰期处理能力不够,可利用MaxQ的堆积能力,匀速消费和处理。


3. 发布订阅

DC数据变更发布和订阅

1.DRC将DC的数据变更记录发布至MaxQ

2.各业务系统订阅相关的数据变更,并进一步做业务处理


未来的展望

1. Sharding Queue支持Queue的水平扩展,让单Queue的性能不再成为瓶颈;

2. 支持消息巨量堆积,让消息堆积不再成为问题;

3. 延时队列,支持按单消息延时,让消息延时变的简单,无需再通过ttl+deadletter exchange做延时推送;

4. 历史消息trace,追溯查询已经消费掉的消息,让生产方和消费方不再因消息是否生产了或消费了而发生扯皮。


作者介绍:

张培培,2015年加入饿了么,现任饿了么框架工具部架构师,负责饿了么消息队列MaxQ。


参考文档

1. AMQP 0.9.1 官方协议

2. RabbitMQ与AMQP协议详解

3. 消息队列的流派之争


有疑问加站长微信联系(非本文作者)

本文来自:知乎专栏

感谢作者:饿了么

查看原文:基于AMQP实现的golang消息队列MaxQ

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

3506 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传