副本管理器的启动和ISR的收缩和扩展

在启动KafkaServer的时候,运⾏KafkaServer的startup⽅法。在该⽅法中实例化ReplicaManager,并调⽤ReplicaManager的startup⽅法:


ReplicaManager的startup⽅法:


处理ISR收缩的情况:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
case Some(leaderReplica) =>
// 获取ISR中的不同步副本
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
// 如果该集合不为空,则需要收缩ISR
if(outOfSyncReplicas.nonEmpty) {
// 从ISR中除去⾮同步副本
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.nonEmpty)
info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","),
newInSyncReplicas.map(_.brokerId).mkString(",")))
// 在缓存和zk中更新ISR
updateIsr(newInSyncReplicas)
// 标记ISR收缩事件
replicaManager.isrShrinkRate.mark()
// 由于ISR可能发⽣变化,变为1,如果ISR发⽣变化,需要增加HW
maybeIncrementLeaderHW(leaderReplica)
} else {
false
}
// 如果leader不在当前broker,则什么都不做
case None => false
}
}
// 当更新HW之后,尝试完成因ISR收缩⽽阻塞的操作。
if (leaderHWIncremented)
tryCompleteDelayedRequests()
}

对于在ISR集合中的副本,检查有没有需要从ISR中移除的,两种情况需要从ISR中移除:

  1. 卡主的Follower:如果副本的LEO经过maxLagMs毫秒还没有更新,则Follower卡主了,需要从ISR移除

  2. 慢Follower:如果副本从maxLagMs毫秒之前到现在还没有读到leader的LEO,则Follower落后,需要从ISR移除。


处理ISR变动事件⼴播,同时startup⽅法中周期性地调⽤maybePropagateIsrChanges()⽅法。

该函数周期性运⾏,检查ISR是否需要扩展。两种情况发⽣ISR的⼴播:

  1. 有尚未⼴播的ISR变动

  2. 最近5s没有发⽣ISR变动,或者上次ISR⼴播已经过去60s了。

该⽅法保证在ISR偶尔发⽣变动时,⼏秒之内即可将ISR变动⼴播出去。

避免了当发⽣⼤量ISR变更时压垮controller和其他broker。


处理⽇志⽬录异常的失败。


follower副本如何与leader同步消息

副本管理器类。


副本管理器类在实例化的时候创建ReplicaFetcherManager对象,该对象是负责从leader拉取消息与leader保持同步的线程管理器。


创建负责从leader拉取消息与leader保持同步的线程管理器。



副本拉取管理器中实现了createFetcherThread⽅法,该⽅法返回ReplicaFetcherThread对象。


ReplicaFetcherThread线程负责从Leader副本拉取消息进⾏同步。


AbstractFetcherManager中的addFetcherForPartitions⽅法中的嵌套⽅法addAndStartFetcherThread创建并启动拉取线程。⽽其中⽤到的createFetcherThread⽅法便是在AbstractFetcherManager的实现类ReplicaFetcherManager中实现的。


抽象类AbstractFetcherThread从同⼀个远程broker上为当前broker上的多个分区follower副本拉取消息。即,在远程同⼀个broker上有多个leader副本的follower副本在当前broker上。


ReplicaFetcherThread的start⽅法实际上就是AbstractFetcherThread中的start⽅法。在AbstractFetcherThread中没有start⽅法,在其⽗类ShutdownableThread也没有start⽅法:


但是ShutdownableThread继承⾃Thread,Thread中有start⽅法,并且start⽅法要调⽤run⽅法,在ShutdownableThread中有run⽅法。


该run⽅法重复调⽤doWork⽅法进⾏数据的拉取。doWork⽅法是抽象⽅法,没有实现。其实现在ShutdownableThread的实现类AbstractFetcherThread中。


上图中的doWork⽅法会反复调⽤,上图中的⽅法创建拉取请求对象,然后调⽤processFetchRequest⽅法进⾏请求的发送和结果的处理。



fetch⽅法的实现在AbstractFetcherThread的⼦类ReplicaFetcherThread中。


sendRequest⽅法在ReplicaFetcherBlockingSend中。


通过NetworkClientUtils发送请求,并等待请求的响应。


KafkaApis对Fetch的处理。



该⽅法中,Leader从本地⽇志读取数据,返回。


总结

当KafkaServer启动的时候,会实例化副本管理器。


副本管理器实例化的时候会实例化副本拉取器管理器。


副本管理器中有实现createFetcherThread⽅法,创建副本拉取器对象。


拉取线程启动起来之后不断地从leader副本所在的broker拉取消息,以便Follower与leader保持消息的同步。