一、MQ是什么(/s/1nuNmSglAnkXCxP0PZZvkMw 提取码:wl2c )
MQ全称为Message Queue,即消息队列 ,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生 产、存储、消费全过程的软件系统,遵循FIFO原则。
二、为什么用MQ
上下班高峰期使用天府通刷码的人非常多,以为做并发量很高,一个出站请求到后台需要做费用结算,或者积分赠送等业务。由于并发很高,并且费用结算和积分等业务本来就耗时,况且支付服务也不一定能承担那么大的请求量。
当服务器线程耗尽,后续请求会等待变慢,再加上高并发请求就会导致后续请求越来越慢,请求长时间等待,导致大量请求超时。并发太高,可能会导致服务器的内存上升,CPU使用率急速上升,甚至导致服务器宕掉。
加入MQ后的效果
高并发请求在MQ中排队,达到了消除峰值的目的,不会有大量的请求同时怼到支付系统
服务异步调用,“天府通出站API” 把结算消息放入MQ就可以返回“出站成功,费用稍后结算”给用户,响应时间很快
服务彻底解耦,即使支付服务挂掉,也不影响“天府通出站API”正常工作,当支付系统再启动仍然可以继续消费MQ中的消息。
三、MQ的使用场景
1 异步&解耦
笔者曾经负责某电商公司的用户服务,该服务提供用户注册,查询,修改等基础功能。用户注册成功之后,需要给用户发送短信。
2 消峰
高并发场景下,面对突然出现的请求峰值,非常容易导致系统变得不稳定,比如大量请求访问数据库,会对数据库造成极大的压力,或者系统的资源 CPU 、IO 出现瓶颈。
3 消息总线
所谓总线,就是像主板里的数据总线一样, 具有数据的传递和交互能力,各方不直接通信,使用总线作为标准通信接口。
笔者曾经服务于某彩票公司订单团队,在彩票订单的生命周期里,经过创建,拆分子订单,出票,算奖等诸多环节。每一个环节都需要不同的服务处理,每个系统都有自己独立的表,业务功能也相对独立。假如每个应用都去修改订单主表的信息,那就会相当混乱了。
4 延时任务
用户在美团 APP 下单,假如没有立即支付,进入订单详情会显示倒计时,如果超过支付时间,订单就会被自动取消。
非常优雅的方式是:使用消息队列的延时消息。
四、RabbitMQ主要特性:
1. 可靠性: 提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制;
2. 灵活的路由:消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用;
3. 消息集群:在相同局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用;
4. 队列高可用:队列可以在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全;
5. 多种协议的支持:支持多种消息队列协议;
6. 服务器端用Erlang语言编写,支持只要是你能想到的所有编程语言;
7. 管理界面: RabbitMQ有一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面;
8. 跟踪机制:如果消息异常,RabbitMQ提供消息跟踪机制,使用者可以找出发生了什么;
9. 插件机制:提供了许多插件,来从多方面进行扩展,也可以编写自己的插件
五、生产者代码示例
import pika
# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个队列
channel.queue_declare(queue='hello')
# 发送消息
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
# 关闭连接
connection.close()
六、消费者代码示例
import pika
# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个队列
channel.queue_declare(queue='hello')
# 定义消息处理函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 设置消费者
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
有疑问加站长微信联系(非本文作者)