当前broker被选为新的controller的时候,执⾏如下操作:
-
注册controller epoch事件监听
-
controller epoch +1
-
初始化controller上下⽂对象,该上下⽂对象缓存当前所有主题、活跃broker以及所有分区leader的信息
-
启动controller channel manager
-
启动副本状态机
-
启动分区状态机
如果注册为controller的过程中发⽣了异常,重新注册当前broker为controller,如此则触发新⼀轮controller选举,以保证永远有⼀个活跃的controller。
启动Kafka服务器的脚本:
main⽅法中创建KafkaServerStartable对象。
该类中包含KakfaServer对象,startup⽅法调⽤的是KafkaServer的startup⽅法
KafkaServer中的startup⽅法调⽤了kafkaController的startup⽅法。
KafkaController的startup⽅法中,将Startup样例类设置到eventManager中,然后调⽤eventManager的start⽅法:
上图中的eventManager.put(Startup)⽅法实现。
上图中的⽅法将Startup样例类放到queue中。
queue的实现:
Startup样例类,其中的process⽅法执⾏controller的选举:
上图中1的代码表示当session超时的时候的处理逻辑,也就是controller到zk连接超时重连,触发该逻辑:
⽅法的实现:
当Controller到zk的连接过期重连的时候,调⽤⽅法:
样例类:Reelect
上图中2的代码,表示当controller发⽣变化的时候的处理逻辑:
⽅法的实现:
当controller发⽣变化的时候的处理逻辑(subscribeDataChanges):
上图中3处的代码表示执⾏controller的选举:
KafkaController的startup⽅法中,调⽤eventManager的start⽅法。
thread是ControllerEventThread对象:
ShutdownableThread的实现:
其中的run⽅法:
只要系统正常运⾏,就会不断调⽤doWork⽅法:
样例类ControllerChange中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| /** * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is * required to clean up internal controller data structures */ def onControllerResignation() { debug("Resigning") // 取消注册ISR变化通知监听器 deregisterIsrChangeNotificationListener() // 取消注册分区重新分配监听器 deregisterPartitionReassignmentListener() // 取消注册带偏向的副本leader选举监听器 deregisterPreferredReplicaElectionListener() // 取消注册log.dirs事件通知监听器 deregisterLogDirEventNotificationListener() // 重置主题删除管理器 topicDeletionManager.reset() // 关闭Kafka的leader再平衡调度器 kafkaScheduler.shutdown() offlinePartitionCount = 0 preferredReplicaImbalanceCount = 0 globalTopicCount = 0 globalPartitionCount = 0 // 取消注册分区再平衡ISR变化监听器 deregisterPartitionReassignmentIsrChangeListeners() // 关闭分区状态机 partitionStateMachine.shutdown() // 取消注册主题变化监听器 deregisterTopicChangeListener() // 取消注册⼀堆分区修改监听器 partitionModificationsListeners.keys.foreach(deregisterPartitionModificationsListener) // 取消注册主题删除监听器 deregisterTopicDeletionListener() // 关闭副本状态机 replicaStateMachine.shutdown() // 取消注册broker变化监听器 deregisterBrokerChangeListener() // 重置controller上下⽂ resetControllerContext() // ⽇志:controller辞职不⼲了 info("Resigned") }
|