[完结14章]MQ大牛成长课--从0到1手写分布式消息队列中间件

okkaandw · · 391 次点击 · 开始浏览    置顶

一、MQ是什么(/s/1nuNmSglAnkXCxP0PZZvkMw 提取码:wl2c ) MQ全称为Message Queue,即消息队列 ,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生 产、存储、消费全过程的软件系统,遵循FIFO原则。 二、为什么用MQ 上下班高峰期使用天府通刷码的人非常多,以为做并发量很高,一个出站请求到后台需要做费用结算,或者积分赠送等业务。由于并发很高,并且费用结算和积分等业务本来就耗时,况且支付服务也不一定能承担那么大的请求量。 当服务器线程耗尽,后续请求会等待变慢,再加上高并发请求就会导致后续请求越来越慢,请求长时间等待,导致大量请求超时。并发太高,可能会导致服务器的内存上升,CPU使用率急速上升,甚至导致服务器宕掉。 加入MQ后的效果 高并发请求在MQ中排队,达到了消除峰值的目的,不会有大量的请求同时怼到支付系统 服务异步调用,“天府通出站API” 把结算消息放入MQ就可以返回“出站成功,费用稍后结算”给用户,响应时间很快 服务彻底解耦,即使支付服务挂掉,也不影响“天府通出站API”正常工作,当支付系统再启动仍然可以继续消费MQ中的消息。 三、MQ的使用场景 1 异步&解耦 笔者曾经负责某电商公司的用户服务,该服务提供用户注册,查询,修改等基础功能。用户注册成功之后,需要给用户发送短信。 2 消峰 高并发场景下,面对突然出现的请求峰值,非常容易导致系统变得不稳定,比如大量请求访问数据库,会对数据库造成极大的压力,或者系统的资源 CPU 、IO 出现瓶颈。 3 消息总线 所谓总线,就是像主板里的数据总线一样, 具有数据的传递和交互能力,各方不直接通信,使用总线作为标准通信接口。 笔者曾经服务于某彩票公司订单团队,在彩票订单的生命周期里,经过创建,拆分子订单,出票,算奖等诸多环节。每一个环节都需要不同的服务处理,每个系统都有自己独立的表,业务功能也相对独立。假如每个应用都去修改订单主表的信息,那就会相当混乱了。 4 延时任务 用户在美团 APP 下单,假如没有立即支付,进入订单详情会显示倒计时,如果超过支付时间,订单就会被自动取消。 非常优雅的方式是:使用消息队列的延时消息。 四、RabbitMQ主要特性:  1. 可靠性: 提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制;  2. 灵活的路由:消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用; 3. 消息集群:在相同局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用; 4. 队列高可用:队列可以在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全;  5. 多种协议的支持:支持多种消息队列协议;  6. 服务器端用Erlang语言编写,支持只要是你能想到的所有编程语言;  7. 管理界面: RabbitMQ有一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面;  8. 跟踪机制:如果消息异常,RabbitMQ提供消息跟踪机制,使用者可以找出发生了什么;  9. 插件机制:提供了许多插件,来从多方面进行扩展,也可以编写自己的插件 五、生产者代码示例 import pika # 连接到 RabbitMQ 服务器 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 声明一个队列 channel.queue_declare(queue='hello') # 发送消息 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") # 关闭连接 connection.close() 六、消费者代码示例 import pika # 连接到 RabbitMQ 服务器 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 声明一个队列 channel.queue_declare(queue='hello') # 定义消息处理函数 def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 设置消费者 channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

391 次点击  ∙  1 赞  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传