健康检查的初始化和启动:

在启动KafkaServer的startup⽅法中,实例化并启动了健康检查:


健康检查的startup⽅法的执⾏逻辑:


注册状态监听器的具体实现:


subscribeStateChanges(listener)具体实现:调⽤zookeeper客户端的⽅法,该⽅法将监听器对象添加到_stateListener这个Set集合中。


zookeeper客户端的回调⽅法:新建会话事件触发监听器。


如果发⽣了zk重连,则需要重新在zk中注册当前borker。


会话建⽴异常,触发监听器:


⽆法建⽴到zk的连接:


状态改变,触发执⾏监听器⽅法:


只要状态发⽣改变,就标记当前事件的发⽣。⽤于监控。


其中register⽅法具体逻辑:解决端点的主机名端⼝号,然后调⽤zkUtil的⽅法将当前broker的信息注册到zookeeper中。


registerBrokerInZk的具体逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 如果Kafka的apiVersion不低于0.10.0.X,则使⽤json v4格式(包含多个端点和机架)注册broker。
* 否则使⽤json v2格式注册。
*
* json v4格式包含了默认的端点以兼容⽼客户端。
*
* @param id broker ID
* @param host broker host name
* @param port broker port
* @param advertisedEndpoints broker对外提供服务的端点
* @param jmxPort jmx port
* @param rack broker所在机架
* @param apiVersion Kafka version the broker is running as
*/
def registerBrokerInZk(id: Int,
host: String,
port: Int,
advertisedEndpoints: Seq[EndPoint],
jmxPort: Int,
rack: Option[String],
apiVersion: ApiVersion) {
// /brokers/ids/<broker.id>
val brokerIdPath = BrokerIdsPath + "/" + id
// see method documentation for reason why we do this
val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
val json = Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack)
// 将broker信息注册到指定的路径。该znode的值就是json字符串
// 默认znode节点是:/broker
registerBrokerInZk(brokerIdPath, json)
info("Registered broker %d at path %s with addresses: %s".format(id,
brokerIdPath, advertisedEndpoints.mkString(",")))
}

在zk中注册broker的具体实现:


主要是在zk的/brokers/[0…N] 路径上建⽴该Broker的信息,并且该节点是ZK中的Ephemeral Node,当此Broker离线的时候,zk上对应的节点也就消失了,那么其它Broker可以及时发现该Broker的异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class KafkaHealthcheck(private val brokerId: Int,
private val advertisedHost: String,
private val advertisedPort: Int,
private val zkSessionTimeoutMs: Int,
private val zkClient: ZkClient) extends Logging {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
val sessionExpireListener = new SessionExpireListener
def startup() {
zkClient.subscribeStateChanges(sessionExpireListener)
register()
}
def shutdown() {
zkClient.unsubscribeStateChanges(sessionExpireListener)
ZkUtils.deregisterBrokerInZk(zkClient, brokerId)
}
def register() {
val advertisedHostName =
if(advertisedHost == null || advertisedHost.trim.isEmpty)
InetAddress.getLocalHost.getCanonicalHostName
else
advertisedHost
val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
//在/brokers/ids/路径下存储broker的基本消息,例如端⼝号,ip地址,时间戳等,以上内容均在Ephemeral Node上,只要该broker和zk失去链接,则zk对应⽬录的内容被清空
ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName,
advertisedPort, zkSessionTimeoutMs, jmxPort)
}

//该SessionExpireListener的作⽤就是重建broker的节点,防⽌短暂的和zk失去链接之后,该broker对应的节点也全部丢失了
class SessionExpireListener() extends IZkStateListener {
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
}
def handleNewSession() {
info("re-registering broker info in ZK for broker " + brokerId)
register()
info("done re-registering broker")
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
}
}
}