RocketMQ主从读写分离机制

aside section._1OhGeD · · 1914 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

微信公众号「后端进阶」,专注后端技术分享:Java、Golang、WEB框架、分布式中间件、服务治理等等。

一般来说,选择主从备份实现高可用的架构中,都会具备读写分离机制,比如 MySql 读写分离,客户端可以向主从服务器读取数据,但客户写数据只能通过主服务器。

RocketMQ 的读写分离机制又跟上述描写的不太一致,RocketMQ 有属于自己的一套读写分离逻辑,它会判断主服务器的消息堆积量来决定消费者是否向从服务器拉取消息消费。

决定消费者是否向从服务器拉取消息消费的值存在 GetMessageResult 类中:

org.apache.rocketmq.store.GetMessageResult:

private boolean suggestPullingFromSlave = false;

其默认值为 false,即默认消费者不会消费从服务器,以下逻辑可以改变该值:

org.apache.rocketmq.store.DefaultMessageStore#getMessage:

long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
    * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);

其中 maxOffsetPy 为当前最大物理偏移量,maxPhyOffsetPulling 为本次消息拉取最大物理偏移量,他们的差即可表示消息堆积量,TOTAL_PHYSICAL_MEMORY_SIZE 表示当前系统物理内存,accessMessageInMemoryMaxRatio 的默认值为 40,以上逻辑即可算出当前消息堆积量是否大于物理内存的 40 %,如果大于则将 suggestPullingFromSlave 设置为 true。

接下来该参数值会在消息拉取逻辑里面产生作用:

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest:

if (getMessageResult.isSuggestPullingFromSlave()) {
  responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
  responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}

switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
  case ASYNC_MASTER:
  case SYNC_MASTER:
    break;
  case SLAVE:
    if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
      response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
      responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
    }
    break;
}

if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
  // consume too slow ,redirect to another machine
  if (getMessageResult.isSuggestPullingFromSlave()) {
    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
  }
  // consume ok
  else {
    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
  }
} else {
  responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}

如果发现主服务器的消息堆积超过了物理内存的 40%,则会设置 suggestWhichBrokerId 为从服务器 broker ID。

这里还会有个 slaveReadEnable 值来决定是否可以从从服务器拉取消息:

  1. 如果 slaveReadEnable=true,并且堆积量已经超过物理内存 40%时,则建议从从服务器拉取消息,否则还是从主服务器拉取消息;
  2. 如果 slaveReadEnable=false,则消息者只能从主服务器中拉取消息。

org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#updatePullFromWhichNode:

public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
    AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
    if (null == suggest) {
        this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
    } else {
        suggest.set(brokerId);
    }
}

当消费者收到拉取响应回来的数据后,会将下次建议拉取的 brokerID 缓存起来。下次拉取消息就会从 pullFromWhichNodeTable 中取出拉取 brokerId。

公众号「后端进阶」,专注后端技术分享!

关注公众号回复关键字「后端」免费领取后端开发大礼包!


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:aside section._1OhGeD

查看原文:RocketMQ主从读写分离机制

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1914 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传