背景
饿厂此前一直是重度rabbitmq使用者,在使用的过程中遭遇了大量的问题,性能问题、故障排查问题等。Rabbitmq是用erlang开发的,该语言过于小众,实在无力在其之上再做运维和开发。痛定思痛,我们于是决定自研一个消息队列,为了降低业务层的接入难度,所以该消息队列需要兼容AMQP协议,这样就可以在业务层完全无感知的情况下接入MaxQ。
什么是AMQP协议?
AMQP(Advanced Message Queuing Protocol),是一套消息队列的七层应用协议标准,由摩根大通和iMatrix在2004年开始着手制定,于2006年发布规范,目前最新版是AMQP 1.0,MaxQ基于AMQP 0.9.1实现。相比zeroMQ这类无Broker的模型,AMQP是一种有Broker的协议模型,也就是需要运行单独AMQP中间件服务,生产者客户端和消费者客户端通过AMQP SDK与AMQP中间件服务通讯。像kafka、JMS这类以topic为核心的Broker,生产者发送key和数据到Broker,由Broker比较key之后决定给那个消费者;而AMQP中淡化了topic的概念,引入了Exchange模型,生产者发送key和数据到Exchange,Exchange根据消费者订阅queue的路由规则路由到对应的queue,也就是AMQP解耦了key和queue,即解藕了生产者和消费者,使得生产者和消费者之间的关系更灵活,消费者可自由控制消费关系。另外,AMQP是同时支持消息Push和Pull的模型,对于Push模型,消费者还可通过设置Qos达到流控的目的。
下图是AMQP 0.9.1规范中给出的AMQP架构模型图:
下面简单介绍下AMQP中的一些基本概念:
- Broker:接收和分发消息的应用,MaxQ就是基于AMQP协议实现的Message Broker。
- Connection:publisher / consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
- Channel:如果客户端每次和Broker通信都需要建议一条连接,在并大量连接并发的情况下建立TCP Connection的开销将是巨大的,效率也较低。于是,AMQP引入了channel的概念,channel是connection之上应用层建立的逻辑连接,Broker在实现中可创建单独的thread/协程来实现channel的并发,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
- Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个Broker提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange / queue等。
- Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
- Queue:消息最终会落到queue里,消息会由Broker push给消费者,或者消费者主动去pull queue上的消息;一个message可以被同时分发到多个queue中。
- Binding:exchange和queue之间的消息路由策略,binding中可以包含routing key。Binding信息被保存到exchange中的路由表中,用于message的分发依据。
MaxQ - AMQP实现架构
MaxQ对AMQP协议的实现,主要做了以下几件事:
按照协议spec自动生产frame encode/decode,这里采用了golang的text/template包,将AMQP spec抽象成固定的json和对应的代码模版,如:
{
"id": 50,
"methods": [{"id": 10,
"arguments": [{"type": "short", "name": "ticket", "default-value": 0},
{"type": "shortstr", "name": "queue", "default-value": ""},
{"type": "bit", "name": "passive", "default-value": false},
{"type": "bit", "name": "durable", "default-value": false},
{"type": "bit", "name": "exclusive", "default-value": false},
{"type": "bit", "name": "auto-delete", "default-value": false},
{"type": "bit", "name": "nowait", "default-value": false},
{"type": "table", "name": "arguments", "default-value": {}}],
"name": "declare",
"synchronous" : true},
{"id": 11,
"arguments": [{"type": "shortstr", "name": "queue"},
{"type": "long", "name": "message-count"},
{"type": "long", "name": "consumer-count"}],
"name": "declare-ok"},
{"id": 20,
"arguments": [{"type": "short", "name": "ticket", "default-value": 0},
{"type": "shortstr", "name": "queue", "default-value": ""},
{"type": "shortstr", "name": "exchange"},
{"type": "shortstr", "name": "routing-key", "default-value": ""},
{"type": "bit", "name": "nowait", "default-value": false},
{"type": "table", "name": "arguments", "default-value": {}}],
"name": "bind",
"synchronous" : true},
{"id": 21,
"arguments": [],
"name": "bind-ok"},
{"id": 30,
"arguments": [{"type": "short", "name": "ticket", "default-value": 0},
{"type": "shortstr", "name": "queue", "default-value": ""},
{"type": "bit", "name": "nowait", "default-value": false}],
"name": "purge",
"synchronous" : true},
{"id": 31,
"arguments": [{"type": "long", "name": "message-count"}],
"name": "purge-ok"},
{"id": 40,
"arguments": [{"type": "short", "name": "ticket", "default-value": 0},
{"type": "shortstr", "name": "queue", "default-value": ""},
{"type": "bit", "name": "if-unused", "default-value": false},
{"type": "bit", "name": "if-empty", "default-value": false},
{"type": "bit", "name": "nowait", "default-value": false}],
"name": "delete",
"synchronous" : true},
{"id": 41,
"arguments": [{"type": "long", "name": "message-count"}],
"name": "delete-ok"},
{"id": 50,
"arguments": [{"type": "short", "name": "ticket", "default-value": 0},
{"type": "shortstr", "name": "queue", "default-value": ""},
{"type": "shortstr", "name": "exchange"},
{"type": "shortstr", "name": "routing-key", "default-value": ""},
{"type": "table", "name": "arguments", "default-value": {}}],
"name": "unbind",
"synchronous" : true},
{"id": 51,
"arguments": [],
"name": "unbind-ok"}
],
"name": "queue"
},
代码生成模版:
type {{$struct}} struct {
{{range .Fields}}
{{.Literal}} {{$.TypeOfFieldLiteral .}}
{{end}}
{{if .Content}}
// Content
properties *Properties
body []byte
{{end}}
}
// Name returns the string representation of the Method, implements
// Method.Name().
func (m *{{$struct}}) Name() string {
return "{{.NameLiteral $class.Name}}"
}
// ID returns the AMQP index number of the Method, implements Method.ID().
func (m *{{$struct}}) ID() uint16 {
return {{.ID}}
}
// Class returns a instance of the Class of this method, implements Method.Class().
func (m *{{$struct}}) Class() Class {
return &{{$classStruct}}{}
}
// String returns the string representation for the Method.
func (m *{{$struct}}) String() string {
return {{.StringLiteral $class.Name}}
}
Vhost API实现:
func (v *VHost) QueueDeclare(node, name string, durable, exclusive, autoDelete bool, args amqp.Table) ...
func (v *VHost) QueueInspect(name string) ...
func (v *VHost) QueueBind(name, key, exchange string, args amqp.Table) ...
func (v *VHost) QueueUnbind(name, key, exchange string, args amqp.Table) ...
func (v *VHost) QueuePurge(name string) ...
func (v *VHost) QueueDelete(name string, ifUnused, ifEmpty bool) ...
func (v *VHost) ExchangeDeclare(name string, exType string, durable bool, autoDelete bool, internal bool, arguments amqp.Table)
func (v *VHost) ExchangeDelete(name string, ifUnused bool) ...
func (v *VHost) ExchangeBind(destination, key, source string, args amqp.Table) ...
func (v *VHost) ExchangeUnbind(destination, key, source string, args amqp.Table) ...
func (v *VHost) Publish(exchange, key string, mandatory, immediate bool, props *amqp.Properties, body []byte)
Exchange接口化,实现4种Exchange路由模式
// Exchange publisher
type publisher interface {
bind(b *Binding) (exists bool)
unbind(b *Binding)
bindingsCount() int
allBindings() []*Binding
publish(msg *Message, routingKey string) (count int, err error)
}
type directPublisher struct {
...
}
type fanoutPublisher struct {
...
}
type topicPublisher struct {
...
}
type headersPublisher struct {
...
}
Queue接口化——MaxQ集群
- Normal Queue: queue功能的具体实现,包括Publish、Consume、Cancel、Ack、Get等,单机版MaxQ会实例化此queue。
- Master Queue: Normal Queue的超集,集群模式下会实例化此queue,在HA镜像策略下会与Slave Queue同步消息。
- Virtual Queue: 负责远程调用Master Queue的API,主要是用作消息转发。
- Slave Queue: Virtual Queue的超集,除了消息转发,还和Master Queue进行消息同步,在Master Queue down掉后,会被选取为新的Master Queue。
MaxQ - 生产实现架构
如果要将MaxQ应用到生产,还需要更多工作要做:
- MaxQ集群化,集群间的元数据通过zookeeper存储和同步,消息通过grpc进行通信。
- 通过四层Proxy,生产者或消费者客户端可以采用官方或第三方的AMQP SDK与MaxQ集群通讯。
- 集群管理,由于集群信息和元数据信息都存储在zookeeper上,因此通过zookeeper可以实现集群节点管理、扩容缩容和集群切换;
同时MaxQ本身提供了HTTP API管理和统计接口,因此可对集群进行监控统计、资源分配等。
MaxQ相关特性
1. 消息可靠性
- Publishing可靠性,生产者设置confirm服务端确认机制,确认服务端成功接收到生产者消息。
- 消息Routing可靠性,生产者设置Publish mandatory,确认消息路由到queue。
- Consuming可靠性,消费者设置手工Ack,服务端在收到消息Ack后才清除本地消息。
- Persisting可靠性, 采用RAID1存储持久化消息;
- 分布式下的可靠性,设置queue的镜像模式,启动Slave Queue,与Master Queue进行消息同步,在aster Queue down掉后,Slave Queue可被选举为Master Queue。
2. 容错性
- zookeeper不可用
1. 元数据已缓存在内存中,不会有任何影响,生产方和消费方仍可正常生产和消费
2. 服务会自动降级,元数据不可变更
3. zookeeper恢复,服务自愈
- 节点故障
通过zookeeper进行Master Queue选举:
1.NodeA和NodeB收到NodeC挂掉的事件,NodeA和NodeB成为Master queue的候选节点
2.NodeA和NodeB各自上报同步的offset到zookeeper
3.NodeA和NodeB各自决策,offset最新的NodeA选为Master queue
4.NodeA将Master信息同步至zookeeper
5.NodeB更新新的Master信息,并同步数据
6.NodeC恢复,成为Slave queue,并与新的Master同步数据
- 网络分区
3. 扩展性
- HA、Exchange和Queue动态扩展属性参数
- Exchange、Binding、Queue支持自定义扩展, 如:x-message-ttl、x-expires、x-max-length、x-dead-letter-exchange
使用场景和案例
下面介绍下MaxQ作为消息队列的经典三种使用场景和使用案例:
1. 异步解耦
订单系统与消息通知系统解耦
1.用户订单支付成功,直接向MaxQ推送下单成功通知,主流程迅速返回
2.消息通知系统异步接收通知消息, 发送短信通知或应用通知
2. 削峰填谷
SQL-autoreview系统分析优化SQL语句,并将结果落DB,属于慢消费, 生产高峰期处理能力不够,可利用MaxQ的堆积能力,匀速消费和处理。
3. 发布订阅
DC数据变更发布和订阅
1.DRC将DC的数据变更记录发布至MaxQ
2.各业务系统订阅相关的数据变更,并进一步做业务处理
未来的展望
1. Sharding Queue支持Queue的水平扩展,让单Queue的性能不再成为瓶颈;
2. 支持消息巨量堆积,让消息堆积不再成为问题;
3. 延时队列,支持按单消息延时,让消息延时变的简单,无需再通过ttl+deadletter exchange做延时推送;
4. 历史消息trace,追溯查询已经消费掉的消息,让生产方和消费方不再因消息是否生产了或消费了而发生扯皮。
作者介绍:
张培培,2015年加入饿了么,现任饿了么框架工具部架构师,负责饿了么消息队列MaxQ。
参考文档
3. 消息队列的流派之争
有疑问加站长微信联系(非本文作者)