Topic创建

有两种创建⽅式:⾃动创建、⼿动创建。

在server.properties中配置 auto.create.topics.enable=true 时,kafka在发现该topic不存在的时候会按照默认配置⾃动创建topic,触发⾃动创建topic有以下两种情况:

  1. Producer向某个不存在的Topic写⼊消息

  2. Consumer从某个不存在的Topic读取消息

⼿动创建

当 auto.create.topics.enable=false 时,需要⼿动创建topic,否则消息会发送失败。⼿动创建topic的⽅式如下:

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic kafka_test

–replication-factor: 副本数⽬

–partitions: 分区数据

–topic: topic名字

查看Topic⼊⼝

查看脚本⽂件 kafka-topics.sh


1
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

最终还是调⽤的 TopicCommand 类:⾸先判断参数是否为空,并且create、list、alter、descibe、delete只允许存在⼀个,进⾏参数验证,创建 zookeeper 链接,如果参数中包含 create 则开始创建topic,其他情况类似。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def main(args: Array[String]): Unit = {

val opts = new TopicCommandOptions(args)

// 判断参数⻓度
if(args.length == 0)
CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.")

// create、list、alter、descibe、delete只允许存在⼀个
// should have exactly one action
val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
if(actions != 1)
CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")

// 参数验证
opts.checkArgs()
// 初始化zookeeper链接
val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, JaasUtils.isZkSecurityEnabled())
var exitCode = 0
try {
if(opts.options.has(opts.createOpt))
// 创建topic
createTopic(zkUtils, opts)
else if(opts.options.has(opts.alterOpt))
// 修改topic
alterTopic(zkUtils, opts)
else if(opts.options.has(opts.listOpt))
// 列出所有的topic,bin/kafka-topics.sh --list --zookeeper localhost:2181
listTopics(zkUtils, opts)
else if(opts.options.has(opts.describeOpt))
// 查看topic描述,bin/kafka-topics.sh --describe --zookeeper localhost:2181
describeTopic(zkUtils, opts)
else if(opts.options.has(opts.deleteOpt))
// 删除topic
deleteTopic(zkUtils, opts)
} catch {
case e: Throwable =>
println("Error while executing topic command : " + e.getMessage)
error(Utils.stackTrace(e))
exitCode = 1
} finally {
zkUtils.close()
Exit.exit(exitCode)
}
}

创建Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
// 获取topic名称
val topic = opts.options.valueOf(opts.topicOpt)
val configs = parseTopicConfigsToBeAdded(opts)
val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
if (Topic.hasCollisionChars(topic))
println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
try {
//如果客户端指定了topic的partition的replicas分配情况,则直接把所有topic的元数据信息持久化写⼊到zk,
// topic的properties写⼊到/config/topics/{topic}⽬录,
// topic的PartitionAssignment写⼊到/brokers/topics/{topic}⽬录
if (opts.options.has(opts.replicaAssignmentOpt)) {
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
} else {
// 否则需要⾃动⽣成topic的PartitionAssignment
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
// 分区
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
// 副本集
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
// 从0.10.x版本开始,kafka可以⽀持指定broker的机架信息,如果指定了机架信息则在副本分配时会尽可能地让分区的副本分不到不同的机架上。
// 指定机架信息是通过kafka的配置⽂件config/server.properties中的broker.rack参数来配置的
val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
else RackAwareMode.Enforced
AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
}
println("Created topic \"%s\".".format(topic))
} catch {
case e: TopicExistsException => if (!ifNotExists) throw e
}
}
    1. 如果客户端指定了topic的partition的replicas分配情况,则直接把所有topic的元数据信息持久化写⼊到zk,topic的properties写⼊到/config/topics/{topic}⽬录, topic的PartitionAssignment写⼊到/brokers/topics/{topic}⽬录
    1. 根据分区数量、副本集、是否指定机架来⾃动⽣成topic的分区数据
    1. 下⾯继续来看 AdminUtils.createTopic ⽅法
    1
    2
    3
    4
    5
    6
    7
    8
    def createTopic(zkUtils: ZkUtils, topic: String, partitions: Int, replicationFactor: Int, topicConfig: Properties = new Properties,    rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
    // 获取集群中每个broker的brokerId和机架信息信息的列表,为下⾯的
    val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
    // 根据是否禁⽤指定机架策略来⽣成分配策略
    val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
    // 在zookeeper中创建或更新主题分区分配路径
    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
    }
    1. 下⾯继续来看 AdminUtils.assignReplicasToBrokers ⽅法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],nPartitions: Int, replicationFactor: Int, fixedStartIndex: Int = -1,  startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
    if (nPartitions <= 0)
    // 分区个数partitions不能⼩于等于0
    throw new InvalidPartitionsException("Number of partitions must be larger than 0.")
    if (replicationFactor <= 0)
    // 副本个数replicationFactor不能⼩于等于0
    throw new InvalidReplicationFactorException("Replication factor must be larger than 0.")
    if (replicationFactor > brokerMetadatas.size)
    // 副本个数replicationFactor不能⼤于broker的节点个数
    throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}.")
    if (brokerMetadatas.forall(_.rack.isEmpty))
    // 没有指定机架信息的情况
    assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex, startPartitionId)
    else {
    // 针对指定机架信息的情况,更加复杂⼀点
    if (brokerMetadatas.exists(_.rack.isEmpty))
    throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")
    assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex, startPartitionId)
    }
    }
    }
      1. 未指定机架策略
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      private def assignReplicasToBrokersRackUnaware(nPartitions: Int, replicationFactor: Int, brokerList: Seq[Int], fixedStartIndex: Int, startPartitionId: Int): Map[Int, Seq[Int]] = {
      val ret = mutable.Map[Int, Seq[Int]]()
      val brokerArray = brokerList.toArray
      val startIndex = if (fixedStartIndex >= 0) fixedStartIndex
      else rand.nextInt(brokerArray.length)
      var currentPartitionId = math.max(0, startPartitionId)
      var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex
      else rand.nextInt(brokerArray.length)
      for (_ <- 0 until nPartitions) {
      if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
      nextReplicaShift += 1
      val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
      val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
      for (j <- 0 until replicationFactor - 1)
      replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
      ret.put(currentPartitionId, replicaBuffer)
      currentPartitionId += 1
      }
      ret
      }

      遍历每个分区partition然后从brokerArray(brokerId的列表)中选取replicationFactor个brokerId分配给这个partition.

      创建⼀个可变的Map⽤来存放本⽅法将要返回的结果,即分区partition和分配副本的映射关系。

      由于fixedStartIndex为-1,所以startIndex是⼀个随机数,⽤来计算⼀个起始分配的brokerId,同时由于startPartitionId为-1,所以currentPartitionId的值为0,可⻅默认创建topic时总是从编号为0的分区依次轮询进⾏分配。nextReplicaShift表示下⼀次副本分配相对于前⼀次分配的位移量,这个字⾯上理解有点绕,不如举个例⼦:假设集群中有3个broker节点,即代码中的brokerArray,创建某topic有3个副本和6个分区,那么⾸先从partitionId(partition的编号)为0的分区开始进⾏分配,假设第⼀次计算(由rand.nextInt(brokerArray.length)随机)到nextReplicaShift为1,第⼀次随机到的startIndex为2,那么partitionId为0的第⼀个副本的位置(这⾥指的是brokerArray的数组下标)firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length =(0+2)%3 = 2,第⼆个副本的位置为replicaIndex(firstReplicaIndex, nextReplicaShift, j,brokerArray.length) = replicaIndex(2, nextReplicaShift+1,0, 3)=?。

      继续计算 replicaIndex(2, nextReplicaShift+1,0, 3) = replicaIndex(2, 2,0, 3)= (2+(1+(2+0)%(3-1)))%3=0。继续计算下⼀个副本的位置replicaIndex(2, 2,1, 3) = (2+(1+(2+1)%(3-1)))%3=1。所以partitionId为0的副本分配位置列表为[2,0,1],如果brokerArray正好是从0开始编号,也正好是顺序不间断的,即brokerArray为[0,1,2]的话,那么当前partitionId为0的副本分配策略为[2,0,1]。如果brokerId不是从零开始,也不是顺序的(有可能之前集群的其中broker⼏个下线了),最终的brokerArray为[2,5,8],那么partitionId为0的分区的副本分配策略为[8,2,5]。为了便于说明问题,可以简单的假设brokerArray就是[0,1,2]。

      同样计算下⼀个分区,即partitionId为1的副本分配策略。此时nextReplicaShift还是为2,没有满⾜⾃增的条件。这个分区的firstReplicaIndex = (1+2)%3=0。第⼆个副本的位置replicaIndex(0,2,0,3) = (0+(1+(2+0)%(3-1)))%3 = 1,第三个副本的位置replicaIndex(0,2,1,3) = 2,最终partitionId为2的分区分配策略为[0,1,2]

      1. 指定机架策略
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      private def assignReplicasToBrokersRackAware(nPartitions: Int,
      replicationFactor: Int,
      brokerMetadatas: Seq[BrokerMetadata],
      fixedStartIndex: Int,
      startPartitionId: Int): Map[Int, Seq[Int]] = {
      val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
      id -> rack
      }.toMap
      val numRacks = brokerRackMap.values.toSet.size
      val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
      val numBrokers = arrangedBrokerList.size
      val ret = mutable.Map[Int, Seq[Int]]()
      val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
      var currentPartitionId = math.max(0, startPartitionId)
      var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
      for (_ <- 0 until nPartitions) {
      if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
      nextReplicaShift += 1
      val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
      val leader = arrangedBrokerList(firstReplicaIndex)
      val replicaBuffer = mutable.ArrayBuffer(leader)
      val racksWithReplicas = mutable.Set(brokerRackMap(leader))
      val brokersWithReplicas = mutable.Set(leader)
      var k = 0
      for (_ <- 0 until replicationFactor - 1) {
      var done = false
      while (!done) {
      val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
      val rack = brokerRackMap(broker)
      // Skip this broker if
      // 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
      // that do not have any replica, or
      // 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
      if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
      && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
      replicaBuffer += broker
      racksWithReplicas += rack
      brokersWithReplicas += broker
      done = true
      }
      k += 1
      }
      }
      ret.put(currentPartitionId, replicaBuffer)
      currentPartitionId += 1
      }
      ret
      }
        1. assignReplicasToBrokersRackUnaware的执⾏前提是所有的broker都没有配置机架信息,⽽assignReplicasToBrokersRackAware的执⾏前提是所有的broker都配置了机架信息,如果出现部分broker配置了机架信息⽽另⼀部分没有配置的话,则会抛出AdminOperationException的异常,如果还想要顺利创建topic的话,此时需加上“–disable-rack-aware”
        1. 第⼀步获得brokerId和rack信息的映射关系列表brokerRackMap ,之后调⽤getRackAlternatedBrokerList()⽅法对brokerRackMap做进⼀步的处理⽣成⼀个brokerId的列表。举例:假设⽬前有3个机架rack1、rack2和rack3,以及9个broker,分别对应关系如下:
        1
        2
        3
        rack1: 0, 1, 2
        rack2: 3, 4, 5
        rack3: 6, 7, 8

        那么经过getRackAlternatedBrokerList()⽅法处理过后就变成了[0, 3, 6, 1, 4, 7, 2, 5, 8]这样⼀个列表,显⽽易⻅的这是轮询各个机架上的broker⽽产⽣的,之后你可以简单的将这个列表看成是brokerId的列表,对应assignReplicasToBrokersRackUnaware()⽅法中的brokerArray,但是其中包含了简单的机架分配信息。之后的步骤也和未指定机架信息的算法类似,同样包含startIndex、currentPartiionId, nextReplicaShift的概念,循环为每⼀个分区分配副本。分配副本时处理第⼀个副本之外,其余的也调⽤replicaIndex⽅法来获得⼀个broker,但是这⾥和assignReplicasToBrokersRackUnaware()不同的是,这⾥不是简单的将这个broker添加到当前分区的副本列表之中,还要经过⼀层的筛选,满⾜以下任意⼀个条件的broker不能被添加到当前分区的副本列表之中:

          1. 如果此broker所在的机架中已经存在⼀个broker拥有该分区的副本,并且还有其他的机架中没有任何⼀个broker拥有该分区的副本。对应代码中的(!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
          1. 如果此broker中已经拥有该分区的副本,并且还有其他broker中没有该分区的副本。对应代码中的(!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers))
    1. ⽆论是带机架信息的策略还是不带机架信息的策略,上层调⽤⽅法AdminUtils.assignReplicasToBrokers()最后都是获得⼀个[Int, Seq[Int]]类型的副本分配列表,其最后作为kafka zookeeper节 点/brokers/topics/{topic-name}节点数据。⾄此kafka的topic创建就讲解完了,有些同学会感到很疑问,全⽂通篇(包括上⼀篇)都是在讲述如何分配副本,最后得到的也不过是个分配的⽅案,并没有真正创建这些副本的环节,其实这个观点没有任何问题,对于通过kafka提供的kafka-topics.sh脚本创建topic的⽅法来说,它只是提供⼀个副本的分配⽅案,并在kafka zookeeper中创建相应的节点⽽已。kafka broker的服务会注册监听/brokers/topics/⽬录下是否有节点变化,如果有新节点创建就会监听到,然后根据其节点中的数据(即topic的分区副本分配⽅案)来创建对应的副本。