class KafkaHealthcheck(private val brokerId: Int, private val advertisedHost: String, private val advertisedPort: Int, private val zkSessionTimeoutMs: Int, private val zkClient: ZkClient) extends Logging { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId val sessionExpireListener = new SessionExpireListener def startup() { zkClient.subscribeStateChanges(sessionExpireListener) register() } def shutdown() { zkClient.unsubscribeStateChanges(sessionExpireListener) ZkUtils.deregisterBrokerInZk(zkClient, brokerId) } def register() { val advertisedHostName = if(advertisedHost == null || advertisedHost.trim.isEmpty) InetAddress.getLocalHost.getCanonicalHostName else advertisedHost val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt //在/brokers/ids/路径下存储broker的基本消息,例如端⼝号,ip地址,时间戳等,以上内容均在Ephemeral Node上,只要该broker和zk失去链接,则zk对应⽬录的内容被清空 ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, zkSessionTimeoutMs, jmxPort) }
//该SessionExpireListener的作⽤就是重建broker的节点,防⽌短暂的和zk失去链接之后,该broker对应的节点也全部丢失了 class SessionExpireListener() extends IZkStateListener { @throws(classOf[Exception]) def handleStateChanged(state: KeeperState) { // do nothing, since zkclient will do reconnect for us. } def handleNewSession() { info("re-registering broker info in ZK for broker " + brokerId) register() info("done re-registering broker") info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath)) } } }