Kafka源码之Topic创建流程
Topic创建
有两种创建⽅式:⾃动创建、⼿动创建。
在server.properties中配置 auto.create.topics.enable=true 时,kafka在发现该topic不存在的时候会按照默认配置⾃动创建topic,触发⾃动创建topic有以下两种情况:
-
Producer向某个不存在的Topic写⼊消息
-
Consumer从某个不存在的Topic读取消息
⼿动创建
当 auto.create.topics.enable=false 时,需要⼿动创建topic,否则消息会发送失败。⼿动创建topic的⽅式如下:
1 | bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic kafka_test |
–replication-factor: 副本数⽬
–partitions: 分区数据
–topic: topic名字
查看Topic⼊⼝
查看脚本⽂件 kafka-topics.sh
1 | exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@" |
最终还是调⽤的 TopicCommand 类:⾸先判断参数是否为空,并且create、list、alter、descibe、delete只允许存在⼀个,进⾏参数验证,创建 zookeeper 链接,如果参数中包含 create 则开始创建topic,其他情况类似。
1 | def main(args: Array[String]): Unit = { |
创建Topic
1 | def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) { |
-
- 如果客户端指定了topic的partition的replicas分配情况,则直接把所有topic的元数据信息持久化写⼊到zk,topic的properties写⼊到/config/topics/{topic}⽬录, topic的PartitionAssignment写⼊到/brokers/topics/{topic}⽬录
-
- 根据分区数量、副本集、是否指定机架来⾃动⽣成topic的分区数据
-
- 下⾯继续来看 AdminUtils.createTopic ⽅法
1
2
3
4
5
6
7
8def createTopic(zkUtils: ZkUtils, topic: String, partitions: Int, replicationFactor: Int, topicConfig: Properties = new Properties, rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
// 获取集群中每个broker的brokerId和机架信息信息的列表,为下⾯的
val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
// 根据是否禁⽤指定机架策略来⽣成分配策略
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
// 在zookeeper中创建或更新主题分区分配路径
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
} -
- 下⾯继续来看 AdminUtils.assignReplicasToBrokers ⽅法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],nPartitions: Int, replicationFactor: Int, fixedStartIndex: Int = -1, startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
if (nPartitions <= 0)
// 分区个数partitions不能⼩于等于0
throw new InvalidPartitionsException("Number of partitions must be larger than 0.")
if (replicationFactor <= 0)
// 副本个数replicationFactor不能⼩于等于0
throw new InvalidReplicationFactorException("Replication factor must be larger than 0.")
if (replicationFactor > brokerMetadatas.size)
// 副本个数replicationFactor不能⼤于broker的节点个数
throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}.")
if (brokerMetadatas.forall(_.rack.isEmpty))
// 没有指定机架信息的情况
assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex, startPartitionId)
else {
// 针对指定机架信息的情况,更加复杂⼀点
if (brokerMetadatas.exists(_.rack.isEmpty))
throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")
assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex, startPartitionId)
}
}
}-
- 未指定机架策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private def assignReplicasToBrokersRackUnaware(nPartitions: Int, replicationFactor: Int, brokerList: Seq[Int], fixedStartIndex: Int, startPartitionId: Int): Map[Int, Seq[Int]] = {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex
else rand.nextInt(brokerArray.length)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex
else rand.nextInt(brokerArray.length)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}遍历每个分区partition然后从brokerArray(brokerId的列表)中选取replicationFactor个brokerId分配给这个partition.
创建⼀个可变的Map⽤来存放本⽅法将要返回的结果,即分区partition和分配副本的映射关系。
由于fixedStartIndex为-1,所以startIndex是⼀个随机数,⽤来计算⼀个起始分配的brokerId,同时由于startPartitionId为-1,所以currentPartitionId的值为0,可⻅默认创建topic时总是从编号为0的分区依次轮询进⾏分配。nextReplicaShift表示下⼀次副本分配相对于前⼀次分配的位移量,这个字⾯上理解有点绕,不如举个例⼦:假设集群中有3个broker节点,即代码中的brokerArray,创建某topic有3个副本和6个分区,那么⾸先从partitionId(partition的编号)为0的分区开始进⾏分配,假设第⼀次计算(由rand.nextInt(brokerArray.length)随机)到nextReplicaShift为1,第⼀次随机到的startIndex为2,那么partitionId为0的第⼀个副本的位置(这⾥指的是brokerArray的数组下标)firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length =(0+2)%3 = 2,第⼆个副本的位置为replicaIndex(firstReplicaIndex, nextReplicaShift, j,brokerArray.length) = replicaIndex(2, nextReplicaShift+1,0, 3)=?。
继续计算 replicaIndex(2, nextReplicaShift+1,0, 3) = replicaIndex(2, 2,0, 3)= (2+(1+(2+0)%(3-1)))%3=0。继续计算下⼀个副本的位置replicaIndex(2, 2,1, 3) = (2+(1+(2+1)%(3-1)))%3=1。所以partitionId为0的副本分配位置列表为[2,0,1],如果brokerArray正好是从0开始编号,也正好是顺序不间断的,即brokerArray为[0,1,2]的话,那么当前partitionId为0的副本分配策略为[2,0,1]。如果brokerId不是从零开始,也不是顺序的(有可能之前集群的其中broker⼏个下线了),最终的brokerArray为[2,5,8],那么partitionId为0的分区的副本分配策略为[8,2,5]。为了便于说明问题,可以简单的假设brokerArray就是[0,1,2]。
同样计算下⼀个分区,即partitionId为1的副本分配策略。此时nextReplicaShift还是为2,没有满⾜⾃增的条件。这个分区的firstReplicaIndex = (1+2)%3=0。第⼆个副本的位置replicaIndex(0,2,0,3) = (0+(1+(2+0)%(3-1)))%3 = 1,第三个副本的位置replicaIndex(0,2,1,3) = 2,最终partitionId为2的分区分配策略为[0,1,2]
-
- 指定机架策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48private def assignReplicasToBrokersRackAware(nPartitions: Int,
replicationFactor: Int,
brokerMetadatas: Seq[BrokerMetadata],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
id -> rack
}.toMap
val numRacks = brokerRackMap.values.toSet.size
val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
val numBrokers = arrangedBrokerList.size
val ret = mutable.Map[Int, Seq[Int]]()
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
val leader = arrangedBrokerList(firstReplicaIndex)
val replicaBuffer = mutable.ArrayBuffer(leader)
val racksWithReplicas = mutable.Set(brokerRackMap(leader))
val brokersWithReplicas = mutable.Set(leader)
var k = 0
for (_ <- 0 until replicationFactor - 1) {
var done = false
while (!done) {
val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
val rack = brokerRackMap(broker)
// Skip this broker if
// 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
// that do not have any replica, or
// 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
replicaBuffer += broker
racksWithReplicas += rack
brokersWithReplicas += broker
done = true
}
k += 1
}
}
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}-
- assignReplicasToBrokersRackUnaware的执⾏前提是所有的broker都没有配置机架信息,⽽assignReplicasToBrokersRackAware的执⾏前提是所有的broker都配置了机架信息,如果出现部分broker配置了机架信息⽽另⼀部分没有配置的话,则会抛出AdminOperationException的异常,如果还想要顺利创建topic的话,此时需加上“–disable-rack-aware”
-
- 第⼀步获得brokerId和rack信息的映射关系列表brokerRackMap ,之后调⽤getRackAlternatedBrokerList()⽅法对brokerRackMap做进⼀步的处理⽣成⼀个brokerId的列表。举例:假设⽬前有3个机架rack1、rack2和rack3,以及9个broker,分别对应关系如下:
1
2
3rack1: 0, 1, 2
rack2: 3, 4, 5
rack3: 6, 7, 8那么经过getRackAlternatedBrokerList()⽅法处理过后就变成了[0, 3, 6, 1, 4, 7, 2, 5, 8]这样⼀个列表,显⽽易⻅的这是轮询各个机架上的broker⽽产⽣的,之后你可以简单的将这个列表看成是brokerId的列表,对应assignReplicasToBrokersRackUnaware()⽅法中的brokerArray,但是其中包含了简单的机架分配信息。之后的步骤也和未指定机架信息的算法类似,同样包含startIndex、currentPartiionId, nextReplicaShift的概念,循环为每⼀个分区分配副本。分配副本时处理第⼀个副本之外,其余的也调⽤replicaIndex⽅法来获得⼀个broker,但是这⾥和assignReplicasToBrokersRackUnaware()不同的是,这⾥不是简单的将这个broker添加到当前分区的副本列表之中,还要经过⼀层的筛选,满⾜以下任意⼀个条件的broker不能被添加到当前分区的副本列表之中:
-
- 如果此broker所在的机架中已经存在⼀个broker拥有该分区的副本,并且还有其他的机架中没有任何⼀个broker拥有该分区的副本。对应代码中的(!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
-
- 如果此broker中已经拥有该分区的副本,并且还有其他broker中没有该分区的副本。对应代码中的(!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers))
-
- ⽆论是带机架信息的策略还是不带机架信息的策略,上层调⽤⽅法AdminUtils.assignReplicasToBrokers()最后都是获得⼀个[Int, Seq[Int]]类型的副本分配列表,其最后作为kafka zookeeper节 点/brokers/topics/{topic-name}节点数据。⾄此kafka的topic创建就讲解完了,有些同学会感到很疑问,全⽂通篇(包括上⼀篇)都是在讲述如何分配副本,最后得到的也不过是个分配的⽅案,并没有真正创建这些副本的环节,其实这个观点没有任何问题,对于通过kafka提供的kafka-topics.sh脚本创建topic的⽅法来说,它只是提供⼀个副本的分配⽅案,并在kafka zookeeper中创建相应的节点⽽已。kafka broker的服务会注册监听/brokers/topics/⽬录下是否有节点变化,如果有新节点创建就会监听到,然后根据其节点中的数据(即topic的分区副本分配⽅案)来创建对应的副本。