启动kafka

kafka-server-start.sh内容如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

查看Kafka.Kafka源码


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def main(args: Array[String]): Unit = {
try {
// 读取启动配置
val serverProps = getPropsFromArgs(args)
// 封装KafkaServer
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// register signal handler to log termination due to SIGTERM, SIGHUP and SIGINT (control-c)
registerLoggingSignalHandler()
// attach shutdown handler to catch terminating signals as well as normal termination
// 增加回调监听
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})
// 启动服务
kafkaServerStartable.startup()
// 等待
kafkaServerStartable.awaitShutdown()
}
catch {
case e: Throwable =>
fatal(e)
Exit.exit(1)
}
Exit.exit(0)
}

上⾯的 kafkaServerStartabl 封装了 KafkaServer ,最终执⾏ startup 的是KafkaServer,如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class KafkaServerStartable(val serverConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters)

def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)

// 启动kafka的broker
def startup() {
try server.startup()
catch {
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}
}

// 关闭kafka的broker
def shutdown() {
try server.shutdown()
catch {
case _: Throwable =>
fatal("Halting Kafka.")
// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
Exit.halt(1)
}
}

/**
* Allow setting broker state from the startable.
* This is needed when a custom kafka server startable want to emit new states that it introduces.
*/
def setServerState(newState: Byte) {
server.brokerState.newState(newState)
}

def awaitShutdown(): Unit = server.awaitShutdown()

}

下⾯来看⼀下 KafkaServer的 startup ⽅法,启动了很多东⻄,后⾯都会⽤到,代码中也加⼊了注释


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def startup() {
try {
info("starting")
// 是否关闭
if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

// 是否已启动完成
if (startupComplete.get)
return

// 开始启动,并设置已启动变量
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
// 设置broker为启动状态
brokerState.newState(Starting)

/* start scheduler */
// 启动定时器
kafkaScheduler.startup()

/* setup zookeeper */
// 初始化zookeeper配置
zkUtils = initZk()

/* Get or create cluster_id */
// 在zookeeper上⽣成集群Id
_clusterId = getOrGenerateClusterId(zkUtils)
info(s"Cluster ID = $clusterId")

/* generate brokerId */
// 从配置⽂件获取brokerId
val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
config.brokerId = brokerId
// ⽇志上下⽂
logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
this.logIdent = logContext.logPrefix

/* create and configure metrics */
// 通过配置⽂件中的MetricsReporter的实现类来创建实例
val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter], Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava)
// 默认监控会增加jmx
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
// 创建metric对象
metrics = new Metrics(metricConfig, reporters, time, true)

/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
// 初始化配额管理服务,对于每个producer或者consumer,可以对他们produce或者consum的速度上限作出限制
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
// 增加监听器
notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)

// 创建⽇志管理组件,创建时会检查log⽬录下是否有.kafka_cleanshutdown⽂件,如果没有的话,broker进⼊RecoveringFrom UncleanShutdown 状态
/* start log manager */
logManager = LogManager(config, initialOfflineDirs, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()

// 创建元数据管理组件
metadataCache = new MetadataCache(config.brokerId)
// 创建凭证提供者组件
credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)

// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
// 创建⼀个sockerServer组件,并启动。该组件启动后,就会开始接收请求
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)

// 创建⼀个副本管理组件,并启动该组件
/* start replica manager */
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()

// 创建kafka控制器,并启动。该控制器启动后broker会尝试去zk创建节点竞争成为controller
/* start kafka controller */
kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix)
kafkaController.startup()

// 创建⼀个集群管理组件
adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)

// 创建群组协调器,并且启动
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
groupCoordinator.startup()

// 启动事务协调器,带有单独的后台线程调度程序,⽤于事务到期和⽇志加载
/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkUtils, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup()

// 构造授权器
/* Get the authorizer and initialize it if one is specified.*/
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map {
authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}

// 构造api组件,针对各个接⼝会处理不同的业务
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, brokerTopicStats, clusterId, time)

// 请求处理池
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time, config.numIoThreads)

Mx4jLoader.maybeLoad()

// 动态配置处理器的相关配置
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers), ConfigType.Client -> new ClientIdConfigHandler(quotaManagers), ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider), ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

// 初始化动态配置管理器,并启动
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
dynamicConfigManager.startup()

// 通知监听者
/* tell everyone we are alive */
val listeners = config.advertisedListeners.map { endpoint =>
if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
else
endpoint
}
// kafka健康检查组件
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack, config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()

// 记录⼀下恢复点
// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
checkpointBrokerId(config.brokerId)

// 修改broker状态
socketServer.startProcessors()
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}