Kafka源码之ReplicaManager
副本管理器的启动和ISR的收缩和扩展
在启动KafkaServer的时候,运⾏KafkaServer的startup⽅法。在该⽅法中实例化ReplicaManager,并调⽤ReplicaManager的startup⽅法:
ReplicaManager的startup⽅法:
处理ISR收缩的情况:
1 | def maybeShrinkIsr(replicaMaxLagTimeMs: Long) { |
对于在ISR集合中的副本,检查有没有需要从ISR中移除的,两种情况需要从ISR中移除:
-
卡主的Follower:如果副本的LEO经过maxLagMs毫秒还没有更新,则Follower卡主了,需要从ISR移除
-
慢Follower:如果副本从maxLagMs毫秒之前到现在还没有读到leader的LEO,则Follower落后,需要从ISR移除。
处理ISR变动事件⼴播,同时startup⽅法中周期性地调⽤maybePropagateIsrChanges()⽅法。
该函数周期性运⾏,检查ISR是否需要扩展。两种情况发⽣ISR的⼴播:
-
有尚未⼴播的ISR变动
-
最近5s没有发⽣ISR变动,或者上次ISR⼴播已经过去60s了。
该⽅法保证在ISR偶尔发⽣变动时,⼏秒之内即可将ISR变动⼴播出去。
避免了当发⽣⼤量ISR变更时压垮controller和其他broker。
处理⽇志⽬录异常的失败。
follower副本如何与leader同步消息
副本管理器类。
副本管理器类在实例化的时候创建ReplicaFetcherManager对象,该对象是负责从leader拉取消息与leader保持同步的线程管理器。
创建负责从leader拉取消息与leader保持同步的线程管理器。
副本拉取管理器中实现了createFetcherThread⽅法,该⽅法返回ReplicaFetcherThread对象。
ReplicaFetcherThread线程负责从Leader副本拉取消息进⾏同步。
AbstractFetcherManager中的addFetcherForPartitions⽅法中的嵌套⽅法addAndStartFetcherThread创建并启动拉取线程。⽽其中⽤到的createFetcherThread⽅法便是在AbstractFetcherManager的实现类ReplicaFetcherManager中实现的。
抽象类AbstractFetcherThread从同⼀个远程broker上为当前broker上的多个分区follower副本拉取消息。即,在远程同⼀个broker上有多个leader副本的follower副本在当前broker上。
ReplicaFetcherThread的start⽅法实际上就是AbstractFetcherThread中的start⽅法。在AbstractFetcherThread中没有start⽅法,在其⽗类ShutdownableThread也没有start⽅法:
但是ShutdownableThread继承⾃Thread,Thread中有start⽅法,并且start⽅法要调⽤run⽅法,在ShutdownableThread中有run⽅法。
该run⽅法重复调⽤doWork⽅法进⾏数据的拉取。doWork⽅法是抽象⽅法,没有实现。其实现在ShutdownableThread的实现类AbstractFetcherThread中。
上图中的doWork⽅法会反复调⽤,上图中的⽅法创建拉取请求对象,然后调⽤processFetchRequest⽅法进⾏请求的发送和结果的处理。
fetch⽅法的实现在AbstractFetcherThread的⼦类ReplicaFetcherThread中。
sendRequest⽅法在ReplicaFetcherBlockingSend中。
通过NetworkClientUtils发送请求,并等待请求的响应。
KafkaApis对Fetch的处理。
该⽅法中,Leader从本地⽇志读取数据,返回。
总结
当KafkaServer启动的时候,会实例化副本管理器。
副本管理器实例化的时候会实例化副本拉取器管理器。
副本管理器中有实现createFetcherThread⽅法,创建副本拉取器对象。
拉取线程启动起来之后不断地从leader副本所在的broker拉取消息,以便Follower与leader保持消息的同步。