⼯作流程如下:

配置存储于/config/entityType/entityName,如/config/topics/<topic_name>以及/config/clients/,默认配置存储与各⾃的节点中,上述节点中保存的是覆盖默认配置的数据,以properties的格式。

可以使⽤分级路径同时指定多个实体的名称,如:/config/users//clients/

设置通知路径/config/changes,避免对所有主题进⾏监控,有事通知。DynamicConfigManager监控该路径。

更新配置的第⼀步是更新配置的properties。

之后,在/config/changes/下创建⼀个新的序列znode,类似于/config/changes/config_change_12231,该节点保存了实体类型和实体名称。

序列znode包含的数据形式:{“version” : 1, “entity_type”:“topic/client”, “entity_name” : “topic_name/client_id”}

这只是⼀个通知,真正的配置数据存储于/config/entityType/entityName节点

版本2的通知格式:{“version” : 2, “entity_path”:“entity_type/entity_name”}

可以使⽤分级路径指定多个实体:如,users//clients/

该类对所有的broker设置监视器。监视器⼯作流程如下:

  1. 监视器读取所有的配置更改通知。

  2. 监视器跟踪它应⽤过的后缀数字最⾼的配置更新。

  3. 监视器先前处理过的通知,15min之后监视器将其删除。

  4. 对于新的更改,监视器读取新的配置,将新的配置和默认配置整合,然后更新现有的配置。

配置永远从zk配置路径读取,通知仅⽤于触发该动作。

如果⼀个broker宕机,错过了⼀个更新,没问题——当broker重启的时候,加载所有的配置。

注意:如果有两个连续的配置更新,可能只有最后⼀个会处理(因为在broker读取配置信息的时候,可能两个更新都处理过了)。

此时,broker不需要进⾏两次配置更新,虽然⼈畜⽆害。

DynamicConfigManager重启的时候,重新处理所有的通知。可能有点⼉浪费资源,但是它避免了丢失配置更新。

但要避免在启动时出现任何竞争情况, 因为这些情况可能会丢失初始配置加载与注册更改通知之间的更改。

KafkaServer启动的时候,在startup⽅法中,配置动态配置管理器,并启动动态配置管理器:


DynamicConfigManager的startup⽅法的逻辑:在动态配置管理器启动的时候,⾸先执⾏⼀遍配置更新。


configChangeListener.init()⽅法的具体实现:


上图中68⾏订阅⼦节点个数变化监听器,具体实现:


上图中标红框的是订阅⼦节点个数变化监听器,只要⼦节点个数发⽣变化,就回调listener。

listener是哪个?NodeChangeListener


NodeChangeListener的具体实现:


处理通知的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 处理给定的通知列表中的所有通知
*/
private def processNotifications(notifications: Seq[String]) {
// 如果通知⾮空
if (notifications.nonEmpty) {
info(s"Processing notification(s) to $seqNodeRoot")
try {
val now = time.milliseconds
// 遍历通知集合
for (notification <- notifications) {
// 获取通知的编号
val changeId = changeNumber(notification)
// 对⽐最后执⾏的修改通知编号,如果当前通知编号⼤于上次执⾏的,就执⾏配置更新
if (changeId > lastExecutedChange) {
// /config/changes/config_change_12121
val changeZnode = seqNodeRoot + "/" + notification
// 读取该通知节点的内容
val data = zkUtils.readDataMaybeNull(changeZnode)._1.orNull
if (data != null) {
// 如果有需要更改的数据,则执⾏配置的更新
notificationHandler.processNotification(data)
} else {
logger.warn(s"read null data from $changeZnode when processing
notification $notification")
}
// 修改上次已执⾏编号为当前节点编号
lastExecutedChange = changeId
}
}
// 移除过期的通知
purgeObsoleteNotifications(now, notifications)
} catch {
case e: ZkInterruptedException =>
if (!isClosed.get)
throw e
}
}
}

上⾯代码中第22⾏的实现:

⾸先,notificationHandler是哪个?


该类在哪⾥实例化?


即notificationHandler就是ConfigChangedNotificationHandler类。

1
notificationHandler.processNotification(data)

上⾯代码的具体实现:



如果版本1,则:


如果版本2,则:


具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def processConfigChanges(topic: String, topicConfig: Properties) {
// Validate the configurations.
// 找出需要排除的配置条⽬
val configNamesToExclude = excludedConfigs(topic, topicConfig)
// 过滤出当前指定主题的所有分区⽇志
val logs = logManager.logsByTopicPartition.filterKeys(_.topic == topic).values.toBuffer
// 如果⽇志⾮空
if (logs.nonEmpty) {
// 整合默认配置和zk中覆盖默认的配置,创建新的Log配置信息
val props = new Properties()
// 添加默认配置
props ++= logManager.defaultConfig.originals.asScala
// 遍历覆盖默认配置的条⽬,如果该条⽬不在要排除的集合中,则直接put到props中
// 该操作会覆盖默认相同key的配置
topicConfig.asScala.foreach { case (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
// 实例化新的logConfig
val logConfig = LogConfig(props)
if ((topicConfig.containsKey(LogConfig.RetentionMsProp)
|| topicConfig.containsKey(LogConfig.MessageTimestampDifferenceMaxMsProp))
&& logConfig.retentionMs < logConfig.messageTimestampDifferenceMaxMs)
warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to ${logConfig.retentionMs}. It is smaller than " +
s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value
${logConfig.messageTimestampDifferenceMaxMs}. " +
s"This may result in frequent log rolling.")
// 更新当前主题所有分区⽇志的配置信息
logs.foreach(_.config = logConfig)
}
def updateThrottledList(prop: String, quotaManager: ReplicationQuotaManager) = {
if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).length > 0) {
val partitions = parseThrottledPartitions(topicConfig, kafkaConfig.brokerId, prop)
quotaManager.markThrottled(topic, partitions)
logger.debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions")
} else {
quotaManager.removeThrottle(topic)
logger.debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic")
}
}
updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, quotas.leader)
updateThrottledList(LogConfig.FollowerReplicationThrottledReplicasProp, quotas.follower)
}

删除过期配置更新通知节点。通过时间对⽐,过期时间为:15min。



获取指定实体类型中各个实体的配置信息:



getEntityConfigRootPath(entityType)的具体实现:


其中,主题配置管理器TopicConfigHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def processConfigChanges(topic: String, topicConfig: Properties) {
// Validate the configurations.
// 找出需要排除的配置条⽬
val configNamesToExclude = excludedConfigs(topic, topicConfig)
// 过滤出当前指定主题的所有分区⽇志
val logs = logManager.logsByTopicPartition.filterKeys(_.topic == topic).values.toBuffer
// 如果⽇志⾮空
if (logs.nonEmpty) {
// 整合默认配置和zk中覆盖默认的配置,创建新的Log配置信息
val props = new Properties()
// 添加默认配置
props ++= logManager.defaultConfig.originals.asScala
// 遍历覆盖默认配置的条⽬,如果该条⽬不在要排除的集合中,则直接put到props中
// 该操作会覆盖默认相同key的配置
topicConfig.asScala.foreach { case (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
// 实例化新的logConfig
val logConfig = LogConfig(props)
if ((topicConfig.containsKey(LogConfig.RetentionMsProp)
|| topicConfig.containsKey(LogConfig.MessageTimestampDifferenceMaxMsProp))
&& logConfig.retentionMs < logConfig.messageTimestampDifferenceMaxMs)
warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to
${logConfig.retentionMs}. It is smaller than " +
s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value
${logConfig.messageTimestampDifferenceMaxMs}. " +
s"This may result in frequent log rolling.")
// 更新当前主题所有分区⽇志的配置信息
logs.foreach(_.config = logConfig)
}

def updateThrottledList(prop: String, quotaManager: ReplicationQuotaManager) = {
if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).length > 0) {
val partitions = parseThrottledPartitions(topicConfig, kafkaConfig.brokerId, prop)
quotaManager.markThrottled(topic, partitions)
logger.debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions")
} else {
quotaManager.removeThrottle(topic)
logger.debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic
$topic")
}
}
updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, quotas.leader)
updateThrottledList(LogConfig.FollowerReplicationThrottledReplicasProp, quotas.follower)
}