def processConfigChanges(topic: String, topicConfig: Properties) { // Validate the configurations. // 找出需要排除的配置条⽬ val configNamesToExclude = excludedConfigs(topic, topicConfig) // 过滤出当前指定主题的所有分区⽇志 val logs = logManager.logsByTopicPartition.filterKeys(_.topic == topic).values.toBuffer // 如果⽇志⾮空 if (logs.nonEmpty) { // 整合默认配置和zk中覆盖默认的配置,创建新的Log配置信息 val props = new Properties() // 添加默认配置 props ++= logManager.defaultConfig.originals.asScala // 遍历覆盖默认配置的条⽬,如果该条⽬不在要排除的集合中,则直接put到props中 // 该操作会覆盖默认相同key的配置 topicConfig.asScala.foreach { case (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } // 实例化新的logConfig val logConfig = LogConfig(props) if ((topicConfig.containsKey(LogConfig.RetentionMsProp) || topicConfig.containsKey(LogConfig.MessageTimestampDifferenceMaxMsProp)) && logConfig.retentionMs < logConfig.messageTimestampDifferenceMaxMs) warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to ${logConfig.retentionMs}. It is smaller than " + s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${logConfig.messageTimestampDifferenceMaxMs}. " + s"This may result in frequent log rolling.") // 更新当前主题所有分区⽇志的配置信息 logs.foreach(_.config = logConfig) }
def updateThrottledList(prop: String, quotaManager: ReplicationQuotaManager) = { if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).length > 0) { val partitions = parseThrottledPartitions(topicConfig, kafkaConfig.brokerId, prop) quotaManager.markThrottled(topic, partitions) logger.debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions") } else { quotaManager.removeThrottle(topic) logger.debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic") } } updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, quotas.leader) updateThrottledList(LogConfig.FollowerReplicationThrottledReplicasProp, quotas.follower) }