当前broker被选为新的controller的时候,执⾏如下操作:

  1. 注册controller epoch事件监听

  2. controller epoch +1

  3. 初始化controller上下⽂对象,该上下⽂对象缓存当前所有主题、活跃broker以及所有分区leader的信息

  4. 启动controller channel manager

  5. 启动副本状态机

  6. 启动分区状态机

如果注册为controller的过程中发⽣了异常,重新注册当前broker为controller,如此则触发新⼀轮controller选举,以保证永远有⼀个活跃的controller。

启动Kafka服务器的脚本:


main⽅法中创建KafkaServerStartable对象。


该类中包含KakfaServer对象,startup⽅法调⽤的是KafkaServer的startup⽅法


KafkaServer中的startup⽅法调⽤了kafkaController的startup⽅法。


KafkaController的startup⽅法中,将Startup样例类设置到eventManager中,然后调⽤eventManager的start⽅法:


上图中的eventManager.put(Startup)⽅法实现。


上图中的⽅法将Startup样例类放到queue中。

queue的实现:


Startup样例类,其中的process⽅法执⾏controller的选举:


上图中1的代码表示当session超时的时候的处理逻辑,也就是controller到zk连接超时重连,触发该逻辑:


⽅法的实现:


当Controller到zk的连接过期重连的时候,调⽤⽅法:


样例类:Reelect



上图中2的代码,表示当controller发⽣变化的时候的处理逻辑:


⽅法的实现:


当controller发⽣变化的时候的处理逻辑(subscribeDataChanges):



上图中3处的代码表示执⾏controller的选举:


KafkaController的startup⽅法中,调⽤eventManager的start⽅法。



thread是ControllerEventThread对象:


ShutdownableThread的实现:


其中的run⽅法:


只要系统正常运⾏,就会不断调⽤doWork⽅法:


样例类ControllerChange中:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is
* required to clean up internal controller data structures
*/
def onControllerResignation() {
debug("Resigning")
// 取消注册ISR变化通知监听器
deregisterIsrChangeNotificationListener()
// 取消注册分区重新分配监听器
deregisterPartitionReassignmentListener()
// 取消注册带偏向的副本leader选举监听器
deregisterPreferredReplicaElectionListener()
// 取消注册log.dirs事件通知监听器
deregisterLogDirEventNotificationListener()
// 重置主题删除管理器
topicDeletionManager.reset()
// 关闭Kafka的leader再平衡调度器
kafkaScheduler.shutdown()
offlinePartitionCount = 0
preferredReplicaImbalanceCount = 0
globalTopicCount = 0
globalPartitionCount = 0
// 取消注册分区再平衡ISR变化监听器
deregisterPartitionReassignmentIsrChangeListeners()
// 关闭分区状态机
partitionStateMachine.shutdown()
// 取消注册主题变化监听器
deregisterTopicChangeListener()
// 取消注册⼀堆分区修改监听器
partitionModificationsListeners.keys.foreach(deregisterPartitionModificationsListener)
// 取消注册主题删除监听器
deregisterTopicDeletionListener()
// 关闭副本状态机
replicaStateMachine.shutdown()
// 取消注册broker变化监听器
deregisterBrokerChangeListener()
// 重置controller上下⽂
resetControllerContext()
// ⽇志:controller辞职不⼲了
info("Resigned")
}