KafkaRequestHandlerPool的作⽤是创建numThreads个KafkaRequestHandler实例,使⽤numThreads个线程启动KafkaRequestHandler。

每个KafkaRequestHandler包含了id,brokerId,线程数,请求的channel,处理请求的api等信息。

只要该类进⾏实例化,就执⾏创建KafkaRequestHandler实例并启动的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @param brokerId
* @param requestChannel
* @param apis 处理具体请求和响应的api
* @param time
* @param numThreads 运⾏KafkaRequestHandler的线程数
*/
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int) extends Logging with KafkaMetricsGroup {
// 创建包含numThreads个元素的数组
val runnables = new Array[KafkaRequestHandler](numThreads)
// 循环numThreads次,初始化KafkaRequestHandler实例numThreads个
for(i <- 0 until numThreads) {
// 赋值:每个KafkaRequestHandler中包含了KafkaRequestHandler的id,brokerId,线程数,请求的channel,处理请求的api等。
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
// 启动这些KafkaRequestHandler线程⽤于请求的处理
KafkaThread.daemon("kafka-request-handler-" + i, runnables(i)).start()
}
}

KafkaThread的start⽅法即是调⽤Thread的start⽅法,⽽start执⾏run⽅法,即此处执⾏的是KafkaThread的run⽅法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def run() {
while(true) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
// 获取请求
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
req match {
case RequestChannel.ShutdownRequest =>
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
latch.countDown()
return
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
// 对于其他请求,直接交给apis来负责处理。
apis.handle(request)
} catch {
case e: FatalExitError => latch.countDown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
} finally {
request.releaseBuffer()
}
case null => // continue
}
}
}

该类包含了关闭KafkaRequestHandler的⽅法。


具体的⽅法:


⾸先发送停⽌的请求,等待⽤户请求处理的结束 latch.await() 。

优雅停机。


将请求直接放到requestQueue中。

其中处理ShutdownRequest的处理逻辑: