1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
| def startup() { try { info("starting") // 是否关闭 if (isShuttingDown.get) throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
// 是否已启动完成 if (startupComplete.get) return
// 开始启动,并设置已启动变量 val canStartup = isStartingUp.compareAndSet(false, true) if (canStartup) { // 设置broker为启动状态 brokerState.newState(Starting)
/* start scheduler */ // 启动定时器 kafkaScheduler.startup()
/* setup zookeeper */ // 初始化zookeeper配置 zkUtils = initZk()
/* Get or create cluster_id */ // 在zookeeper上⽣成集群Id _clusterId = getOrGenerateClusterId(zkUtils) info(s"Cluster ID = $clusterId")
/* generate brokerId */ // 从配置⽂件获取brokerId val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs config.brokerId = brokerId // ⽇志上下⽂ logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ") this.logIdent = logContext.logPrefix
/* create and configure metrics */ // 通过配置⽂件中的MetricsReporter的实现类来创建实例 val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter], Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava) // 默认监控会增加jmx reporters.add(new JmxReporter(jmxPrefix)) val metricConfig = KafkaServer.metricConfig(config) // 创建metric对象 metrics = new Metrics(metricConfig, reporters, time, true)
/* register broker metrics */ _brokerTopicStats = new BrokerTopicStats // 初始化配额管理服务,对于每个producer或者consumer,可以对他们produce或者consum的速度上限作出限制 quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse("")) // 增加监听器 notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala) logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
// 创建⽇志管理组件,创建时会检查log⽬录下是否有.kafka_cleanshutdown⽂件,如果没有的话,broker进⼊RecoveringFrom UncleanShutdown 状态 /* start log manager */ logManager = LogManager(config, initialOfflineDirs, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel) logManager.startup()
// 创建元数据管理组件 metadataCache = new MetadataCache(config.brokerId) // 创建凭证提供者组件 credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
// Create and start the socket server acceptor threads so that the bound port is known. // Delay starting processors until the end of the initialization sequence to ensure // that credentials have been loaded before processing authentications. // 创建⼀个sockerServer组件,并启动。该组件启动后,就会开始接收请求 socketServer = new SocketServer(config, metrics, time, credentialProvider) socketServer.startup(startupProcessors = false)
// 创建⼀个副本管理组件,并启动该组件 /* start replica manager */ replicaManager = createReplicaManager(isShuttingDown) replicaManager.startup()
// 创建kafka控制器,并启动。该控制器启动后broker会尝试去zk创建节点竞争成为controller /* start kafka controller */ kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix) kafkaController.startup()
// 创建⼀个集群管理组件 adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
// 创建群组协调器,并且启动 /* start group coordinator */ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM) groupCoordinator.startup()
// 启动事务协调器,带有单独的后台线程调度程序,⽤于事务到期和⽇志加载 /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkUtils, metrics, metadataCache, Time.SYSTEM) transactionCoordinator.startup()
// 构造授权器 /* Get the authorizer and initialize it if one is specified.*/ authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName => val authZ = CoreUtils.createObject[Authorizer](authorizerClassName) authZ.configure(config.originals()) authZ }
// 构造api组件,针对各个接⼝会处理不同的业务 /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, brokerTopicStats, clusterId, time)
// 请求处理池 requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time, config.numIoThreads)
// 动态配置处理器的相关配置 /* start dynamic config manager */ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers), ConfigType.Client -> new ClientIdConfigHandler(quotaManagers), ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider), ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
// 初始化动态配置管理器,并启动 // Create the config manager. start listening to notifications dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers) dynamicConfigManager.startup()
// 通知监听者 /* tell everyone we are alive */ val listeners = config.advertisedListeners.map { endpoint => if (endpoint.port == 0) endpoint.copy(port = socketServer.boundPort(endpoint.listenerName)) else endpoint } // kafka健康检查组件 kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack, config.interBrokerProtocolVersion) kafkaHealthcheck.startup()
// 记录⼀下恢复点 // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it checkpointBrokerId(config.brokerId)
// 修改broker状态 socketServer.startProcessors() brokerState.newState(RunningAsBroker) shutdownLatch = new CountDownLatch(1) startupComplete.set(true) isStartingUp.set(false) AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics) info("started") } } catch { case e: Throwable => fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e) isStartingUp.set(false) shutdown() throw e } }