我们都知道,自从Kafka诞生之际,就一直使用Zookeeper服务来进行kafka集群的元数据和状态管理,虽然在KIP-500中有提议未来将移除Zookeeper的依赖,使用Raft协议来实现新的元数据和状态管理,但在这之前,我们仍然需要对kafka集群的整个元数据和状态有一定理解,才能更好的维护和保障kafka集群。
前言
在kafka集群中,ZooKeeper集群用于存放集群元数据
、成员管理
、Controller 选举
,以及其他一些管理类任务
。
-
存放元数据
: 是指主题分区的所有数据都保存在 ZooKeeper 中,且以它保存的数据为权威,其他“人” 都要与它保持对齐。 -
成员管理
: 是指 Broker 节点的注册、注销以及属性变更。 -
Controller 选举
: 是指选举集群 Controller,而其他管理类任务包括但不限于主题删除、参数配置。
Zookeeper服务
在开始之前,我们首先需要对Zookeeper服务有一定的了解。在官方文档中,Zookeeper有如下能力:
- 配置管理: 其实就是单纯的K/V存储,可以用来配置的存储和管理,实现对分布式系统组件的集中式的配置管理
- 命名: 命令空间管理,在zookeeper中配置可通过不同的根路径来实现简单的命名空间隔离(简单的chroot隔离)
- 分布式的同步服务: 可保证分布式的同步
- 分组服务: 可以将一组服务配置进行分组管理,其实在Dubbo的注册中心中,就是使用分组和命名来统一进行生产者的注册和发现
在Zookeeper中所有的命名空间和文件系统比较类似,每个节点都有一个唯一的路径
,如下图:
为什么Zookeeper服务可以满足配置管理和协调服务呢?主要是因为它的节点属性,通常也是经常可能会忽略的问题,即在Zookeeper中存在两种节点: 持久节点
和临时节点
,节点也称之为znode
,而znode本身除了数据之外,还会存储一些额外的信息:
- 数据变更的版本号
- ACL变更的版本号
- 时间戳
每次znode中的数据有变更,版本号都会递增,并且client在向znode获取数据时,不仅会获取实际数据,而且会获取到数据的版本,这在整个配置管理领域会相当有用。
前面说到的持久节点
只要znode创建后,便一直会存在,这种节点主要用于配置的持久化存储,只要zookeeper集群整体可用,那该节点也一直可用(及时集群异常后恢复,该节点依旧存在);而临时节点
随着某个创建 znode 的会话的开始,被创建,而一旦这个会话被撤除掉,这个 znode 就会自动被 ZooKeeper 删除。
对于临时节点
来讲,经常会被用在如下场景:
- dubbo中的服务注册
- 其他分布式服务中的leader选举
而真正使得Zookeeper在Java语言生态中有如此大影响还依赖于另外一个功能: watches
,Client 可以对某个 znode 设置watch
当这个 znode 有变更的时候,就会产生watch
事件,这个事件会由 ZooKeeper 通知至 Client, 即是 Client 会收到来自 Server 的通知,可以实现数据的动态变更。(之所以说是Java语言生态,是因为在云原生领域,Etcd几乎是Zookeeper的替代品,不同的是它是Golang语言生态体系中的)
kafka集群数据在zookeeper的存储
前面对Zookeeper服务进行了大概的说明,接下来我们一起看看kafka的集群数据在zookeeper中是如何存储和分布的。
链接zk:
# 前面说到zookeeper支持命名空间,因此我们可以使用一个zookeeper集群来管理多个kafka集群
# 只需要在启动kafka时指定zk的chroot环境,比如/kafka 在zookeeper中将存储名称为`kafka`的集群的相关数据
sh-4.2# sh zkCli.sh -server 127.0.0.1:2181
Connecting to 127.0.0.1:2181
....
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /
[kafka, zookeeper]
查看kafka数据存储结构:
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
kafka元数据存储目录
- admin: 存储管理员接口操作的相关信息,主要为topic删除事件,分区迁移事件,优先副本选举,信息(一般为临时节点)
- brokers: 主要存储broker相关的信息,broker节点以及节点上的topic相关信息
- cluster: 存储kafka集群信息
- config: 存储broker,client,topic,user以及changer相关的配置信息
- consumers: 存放消费者相关信息(一般为空)
- controller: 用于存放控制节点信息(注意: 该节点是一个临时节点,用于controller节点注册)
- controller_epoch: 用于存放controller节点当前的年龄
- isr_change_notification: 用于存储isr的变更通知(临时节点,当有isr进行变动时,会用于事件通知,可进行watch获取集群isr状态变更)
- latest_producer_id_block: 该节点用于存储处理事务相关的pid范围
- log_dir_event_notification: 日志目录事件通知
admin目录结构
[zk: 127.0.0.1:2181(CONNECTED) 2] ls /kafka/admin
[delete_topics]
[zk: 127.0.0.1:2181(CONNECTED) 3] ls /kafka/admin/delete_topics
[]
[zk: 127.0.0.1:2181(CONNECTED) 4] get /kafka/admin/delete_topics
null
brokers目录结构
# broker和topic列表数据
[zk: 127.0.0.1:2181(CONNECTED) 5] ls /kafka/brokers
[ids, seqid, topics]
# 表示当前集群有3个节点
[zk: 127.0.0.1:2181(CONNECTED) 6] ls /kafka/brokers/ids
[1, 2, 3]
[zk: 127.0.0.1:2181(CONNECTED) 7] ls /kafka/brokers/seqid
[]
# 表示集群当前的topic信息
[zk: 127.0.0.1:2181(CONNECTED) 8] ls /kafka/brokers/topics
[__consumer_offsets, appjsonlog_heartbeat, nginx_log,ingress_log ]
# broker详情
[zk: 127.0.0.1:2181(CONNECTED) 10] get /kafka/brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.0.0.1:9092"],"jmx_port":9999,"host":"10.0.0.1","timestamp":"1588925944886","port":9092,"version":4}
# topic详情
[zk: 127.0.0.1:2181(CONNECTED) 11] get /kafka/brokers/topics/__consumer_offsets
{"version":2,"partitions":{"1":[2,1,3],"0":[3,2,1],"2":[1,2,3],"adding_replicas":{},"removing_replicas":{}}
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets
[partitions]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/__consumer_offsets/partitions
[0, 1, 2]
# 查看topic某个分区的状态详情
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/__consumer_offsets/partitions/1
[state]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/__consumer_offsets/partitions/1/state
[]
[zk: localhost:2181(CONNECTED) 8] get /brokers/topics/__consumer_offsets/partitions/1/state
{"controller_epoch":3,"leader":1,"version":1,"leader_epoch":12,"isr":[3,1]}
cluster目录结构
[zk: 127.0.0.1:2181(CONNECTED) 5] get /kafka/cluster/id
{"version":"1","id":"5C-JZf4vRdqKzlca7Lv7pA"}
cZxid = 0x100000018
ctime = Fri Mar 30 22:21:07 CST 2018
mZxid = 0x100000018
mtime = Fri Mar 30 22:21:07 CST 2018
pZxid = 0x100000018
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 45
numChildren = 0
config目录结构
[zk: 127.0.0.1:2181(CONNECTED) 12] ls /kafka/config
[brokers, changes, clients, topics, users]
# 需要注意的是,config目录用来存放各种实体的配置,用于使用kafka相关工具对实体进行的配置变更存储
# 因此,一般在kafka集群运行后,如不设置相关动态参数,该目录下的配置一般为空
[zk: 127.0.0.1:2181(CONNECTED) 19] ls /kafka/config/brokers
[]
[zk: 127.0.0.1:2181(CONNECTED) 20] ls /kafka/config/changes
[]
[zk: 127.0.0.1:2181(CONNECTED) 21] ls /kafka/config/clients
[]
[zk: 127.0.0.1:2181(CONNECTED) 22] ls /kafka/config/topics
[__consumer_offsets]
[zk: 127.0.0.1:2181(CONNECTED) 23] ls /kafka/config/users
[]
# 可以看到,存储消费者偏移量的topic在配置中
# 是因为该topic有一些额外的topic级别参数
# 如果我们对topic参数有过动态变更,将会在这里存储
[zk: 127.0.0.1:2181(CONNECTED) 24] get /kafka/config/topics/__consumer_offsets
{"version":1,"config":{"segment.bytes":"104857600","compression.type":"producer","cleanup.policy":"compact"}}
controller目录结构
# 可以看到当前broker-1为集群的controller节点
[zk: 127.0.0.1:2181(CONNECTED) 30] get /kafka/controller
{"version":1,"brokerid":1,"timestamp":"1588833869354"}
controller_epoch 目录结构
# 可以看到controller的年龄是2,说明controller经历过2次变更了
[zk: 127.0.0.1:2181(CONNECTED) 32] get /kafka/controller_epoch
2
其他几个目录结构
# 存储事务相关的pid范围数据
# broker启动时提前预分配一段 PID,当前是 0~999,即提前分配出 1000 个 PID 来
# 一旦 PID 超过了 999,则目前会按照 1000 的步长重新分配
# 集群中所有 broker 启动时都会启动一个叫 TransactionCoordinator 的组件,该组件能够执行预分配 PID 块和分配 PID 的工作,而所有 broker 都使用 /latest_producer_id_block 节点来保存 PID 块
[zk: 127.0.0.1:2181(CONNECTED) 37] get /kafka/latest_producer_id_block
{"version":1,"broker":1,"block_start":"19000","block_end":"19999"}
[zk: 127.0.0.1:2181(CONNECTED) 39] ls /kafka/isr_change_notification
[]
[zk: 127.0.0.1:2181(CONNECTED) 40] get /kafka/isr_change_notification
null
[zk: 127.0.0.1:2181(CONNECTED) 41] ls /kafka/log_dir_event_notification
[]
[zk: 127.0.0.1:2181(CONNECTED) 42] get /kafka/log_dir_event_notification
null
以上就是整个kafka元数据在zookeeper集群中的整体存储结构,下面附上一张kafka的元数据存储结构图。
kafka元数据在zk中的分布
有疑问加站长微信联系(非本文作者)