Kafka特性之控制器和可靠性保证
控制器
Kafka集群包含若⼲个broker,broker.id指定broker的编号,编号不要重复。
Kafka集群上创建的主题,包含若⼲个分区。
每个分区包含若⼲个副本,副本因⼦包括了Follower副本和Leader副本。
副本⼜分为ISR(同步副本分区)和OSR(⾮同步副本分区)。
控制器就是⼀个broker。
控制器除了⼀般broker的功能,还负责Leader分区的选举。
broker选举
集群⾥第⼀个启动的broker在Zookeeper中创建临时节点
其他broker在该控制器节点创建Zookeeper watch对象,使⽤Zookeeper的监听机制接收该节点的变更。即:Kafka通过Zookeeper的分布式锁特性选举集群控制器。
下图中,节点 /myKafka/controller 是⼀个zookeeper临时节点,其中 “brokerid”:0 ,表示当前控制器是broker.id为0的broker。
每个新选出的控制器通过 Zookeeper 的条件递增操作获得⼀个全新的、数值更⼤的 controller epoch。
其他broker 在知道当前 controller epoch 后,如果收到由控制器发出的包含较旧 epoch 的消息,就会忽略它们,以防⽌“脑裂”。
⽐如当⼀个Leader副本分区所在的broker宕机,需要选举新的Leader副本分区,有可能两个具有不同纪元数字的控制器都选举了新的Leader副本分区,如果选举出来的Leader副本分区不⼀样,听谁的?脑裂了。有了纪元数字,直接使⽤纪元数字最新的控制器结果。
当控制器发现⼀个 broker 已经离开集群,那些失去Leader副本分区的Follower分区需要⼀个新Leader(这些分区的⾸领刚好是在这个 broker 上)。
-
控制器需要知道哪个broker宕机了?
-
控制器需要知道宕机的broker上负责的时候哪些分区的Leader副本分区?
下图中,
集群控制器负责监听 ids 节点,⼀旦节点⼦节点发送变化,集群控制器得到通知。
控制器遍历这些Follower副本分区,并确定谁应该成为新Leader分区,然后向所有包含新Leader分区和现有Follower的 broker 发送请求。该请求消息包含了谁是新Leader副本分区以及谁是Follower副本分区的信息。随后,新Leader分区开始处理来⾃⽣产者和消费者的请求,⽽跟随者开始从新Leader副本分区消费消息。
当控制器发现⼀个 broker 加⼊集群时,它会使⽤ broker ID 来检查新加⼊的 broker 是否包含现有分区的副本。如果有,控制器就把变更通知发送给新加⼊的 broker 和其他 broker,新 broker上的副本分区开始从Leader分区那⾥消费消息,与Leader分区保持同步。
结论:
-
Kafka使⽤ Zookeeper 的分布式锁选举控制器,并在节点加⼊集群或退出集群时通知控制器。
-
控制器负责在节点加⼊或离开集群时进⾏分区Leader选举。
-
控制器使⽤epoch来避免“脑裂”。“脑裂”是指两个节点同时认为⾃⼰是当前的控制器。
可靠性保证
概念
-
创建Topic的时候可以指定 --replication-factor 3 ,表示分区的副本数,不要超过broker的数量。
-
Leader是负责读写的节点,⽽其他副本则是Follower。Producer只把消息发送到Leader,Follower定期地到Leader上Pull数据。
-
ISR是Leader负责维护的与其保持同步的Replica列表,即当前活跃的副本列表。如果⼀个Follow落后太多,Leader会将它从ISR中移除。落后太多意思是该Follow复制的消息Follow⻓时间没有向Leader发送fetch请求(参数: replica.lag.time.max.ms 默认值:10000)。
-
为了保证可靠性,可以设置 acks=all 。Follower收到消息后,会像Leader发送ACK。⼀旦Leader收到了ISR中所有Replica的ACK,Leader就commit,那么Leader就向Producer发送ACK
副本的分配
当某个topic的 --replication-factor 为N(N>1)时,每个Partition都有N个副本,称作replica。原则上是将replica均匀的分配到整个集群上。不仅如此,partition的分配也同样需要均匀分配,为了更好的负载均衡。
副本分配的三个⽬标:
-
- 均衡地将副本分散于各个broker上
-
- 对于某个broker上分配的分区,它的其他副本在其他broker上
-
- 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。
在不考虑机架信息的情况下:
-
- 第⼀个副本分区通过轮询的⽅式挑选⼀个broker,进⾏分配。该轮询从broker列表的随机位置进⾏轮询。
-
- 其余副本通过增加偏移进⾏分配。
失效副本
失效副本的判定
replica.lag.time.max.ms 默认⼤⼩为10000。 当ISR中的⼀个Follower副本滞后Leader副本的时间超过参数 replica.lag.time.max.ms 指定的值时即判定为副本失效,需要将此Follower副本剔出除ISR。
具体实现原理:当Follower副本将Leader副本的LEO之前的⽇志全部同步时,则认为该Follower副本已经追赶上Leader副本,此时更新该副本的lastCaughtUpTimeMs标识。
Kafka的副本管理器(ReplicaManager)启动时会启动⼀个副本过期检测的定时任务,⽽这个定时任务会定时检 查当前时间与副本的lastCaughtUpTimeMs差值是否⼤于参数 replica.lag.time.max.ms 指定的值。
Kafka源码注释中说明了⼀般有两种情况会导致副本失效:
-
Follower副本进程卡住,在⼀段时间内没有向Leader副本发起同步请求,⽐如频繁的Full GC。
-
Follower副本进程同步过慢,在⼀段时间内都⽆法追赶上Leader副本,⽐如IO开销过⼤。
如果通过⼯具增加了副本因⼦,那么新增加的副本在赶上Leader副本之前也都是处于失效状态的。
如果⼀个Follower副本由于某些原因(⽐如宕机)⽽下线,之后⼜上线,在追赶上Leader副本之前也是出于失效状态。
失效副本的分区个数是⽤于衡量Kafka性能指标的重要部分。Kafka本身提供了⼀个相关的指标,即UnderReplicatedPartitions,这个可以通过JMX访问:
1 | kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions 1 |
取值范围是⼤于等于0的整数。注意:如果Kafka集群正在做分区迁移(kafka-reassign-partitions.sh)的时候,这个值也会⼤于0。
副本复制
⽇志复制算法(log replication algorithm)必须提供的基本保证是,如果它告诉客户端消息已被提交,⽽当前Leader出现故障,新选出的Leader也必须具有该消息。在出现故障时,Kafka会从挂掉Leader的ISR⾥⾯选择⼀个Follower作为这个分区新的Leader。
每个分区的 leader 会维护⼀个in-sync replica(同步副本列表,⼜称 ISR)。当Producer向broker发送消息,消息先写⼊到对应Leader分区,然后复制到这个分区的所有副本中。ACKS=ALL时,只有将消息成功复制到所有同步副本(ISR)后,这条消息才算被提交。
什么情况下会导致⼀个副本与 leader 失去同步
⼀个副本与 leader 失去同步的原因有很多,主要包括:
-
慢副本(Slow replica):follower replica 在⼀段时间内⼀直⽆法赶上 leader 的写进度。造成这种情况的最常⻅原因之⼀是 follower replica 上的 I/O瓶颈,导致它持久化⽇志的时间⽐它从 leader 消费消息的时间要⻓;
-
卡住副本(Stuck replica):follower replica 在很⻓⼀段时间内停⽌从 leader 获取消息。这可能是以为GC 停顿,或者副本出现故障;
-
刚启动副本(Bootstrapping replica):当⽤户给某个主题增加副本因⼦时,新的 follower replicas 是不同步的,直到它跟上 leader 的⽇志。
当副本落后于 leader 分区时,这个副本被认为是不同步或滞后的。在 Kafka中,副本的滞后于Leader是根据 replica.lag.time.max.ms 来衡量。
如何确认某个副本处于滞后状态
通过 replica.lag.time.max.ms 来检测卡住副本(Stuck replica)在所有情况下都能很好地⼯作。它跟踪follower 副本没有向 leader 发送获取请求的时间,通过这个可以推断 follower 是否正常。另⼀⽅⾯,使⽤消息数量检测不同步慢副本(Slow replica)的模型只有在为单个主题或具有同类流量模式的多个主题设置这些参数时才能很好地⼯作,但我们发现它不能扩展到⽣产集群中所有主题。