⽇志存储

概述

Kafka 消息是以主题为单位进⾏归类,各个主题之间是彼此独⽴的,互不影响。

每个主题⼜可以分为⼀个或多个分区。

每个分区各⾃存在⼀个记录消息数据的⽇志⽂件。


图中,创建了⼀个 tp_demo_01 主题,其存在6个 Parition,对应的每个Parition下存在⼀个 [Topic-Parition] 命名的消息⽇志⽂件。在理想情况下,数据流量分摊到各个 Parition 中,实现了负载均衡的效果。在分区⽇志⽂件中,你会发现很多类型的⽂件,⽐如: .index、.timestamp、.log、.snapshot 等。

其中,⽂件名⼀致的⽂件集合就称为 LogSement。


  • LogSegment

    1. 分区⽇志⽂件中包含很多的 LogSegment

    2. Kafka ⽇志追加是顺序写⼊的

    3. LogSegment 可以减⼩⽇志⽂件的⼤⼩

    4. 进⾏⽇志删除的时候和数据查找的时候可以快速定位。

    5. ActiveLogSegment 是活跃的⽇志分段,拥有⽂件拥有写⼊权限,其余的 LogSegment 只有只读的权限。

    6. ⽇志⽂件存在多种后缀⽂件,重点需要关注 .index、.timestamp、.log 三种类型。

    后缀名 说明
    .index 偏移量索引⽂件
    .timestamp 时间戳索引⽂件
    .log ⽇志⽂件
    .snapshot 快照⽂件
    .deleted 预删除文件
    .cleaned ⽇志清理时临时⽂件
    .swap ⽇志压缩之后的临时⽂件
    leader-epoch-checkpoint

    每个 LogSegment 都有⼀个基准偏移量,表示当前 LogSegment 中第⼀条消息的 offset。

    偏移量是⼀个 64 位的⻓整形数,固定是20位数字,⻓度未达到,⽤ 0 进⾏填补,索引⽂件和⽇志⽂件都由该作为⽂件名命名规则(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。

    如果⽇志⽂件名为 00000000000000000121.log ,则当前⽇志⽂件的⼀条数据偏移量就是 121(偏移量从 0 开始)。

    配置条⽬ 默认值 说明
    log.index.interval.bytes 4096(4K) 增加索引项字节间隔密度,会影响索引⽂件中的区间密度和查询效率
    log.segment.bytes 1073741824(1G) ⽇志⽂件最⼤值
    log.roll.ms 当前⽇志分段中消息的最⼤时间戳与当前系统的时间戳的差值允许的最⼤范围,单位毫秒
    log.roll.hours 168(7天) 当前⽇志分段中消息的最⼤时间戳与当前系统的时间戳的差值允许的最⼤范围,单位⼩时
    log.index.size.max.bytes 10485760(10MB) 触发偏移量索引⽂件或时间戳索引⽂件分段字节限额

    偏移量索引⽂件⽤于记录消息偏移量与物理地址之间的映射关系。

    时间戳索引⽂件则根据时间戳查找对应的偏移量。

    Kafka 中的索引⽂件是以稀疏索引的⽅式构造消息的索引,并不保证每⼀个消息在索引⽂件中都有对应的索引项。

    每当写⼊⼀定量的消息时,偏移量索引⽂件和时间戳索引⽂件分别增加⼀个偏移量索引项和时间戳索引项。

    通过修改 log.index.interval.bytes 的值,改变索引项的密度。

  • 切分⽂件

    当满⾜如下⼏个条件中的其中之⼀,就会触发⽂件的切分:

    1. 当前⽇志分段⽂件的⼤⼩超过了 broker 端参数 log.segment.bytes 配置的值。 log.segment.bytes 参数的默认值为 1073741824,即 1GB。

    2. 当前⽇志分段中消息的最⼤时间戳与当前系统的时间戳的差值⼤于 log.roll.ms 或 log.roll.hours 参数配置的值。如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级⾼。默认情况下,只配置了 log.roll.hours 参数,其值为168,即 7 天。

    3. 偏移量索引⽂件或时间戳索引⽂件的⼤⼩达到 broker 端参数 log.index.size.max.bytes 配置的值。 log.index.size.max.bytes 的默认值为 10485760,即 10MB。

    4. 追加的消息的偏移量与当前⽇志分段的偏移量之间的差值⼤于 Integer.MAX_VALUE ,即要追加的消息的偏移量不能转变为相对偏移量。

    • 为什么是 Integer.MAX_VALUE ?

      大小 1024 * 1024 * 1024=1073741824

      在偏移量索引⽂件中,每个索引项共占⽤ 8 个字节,并分为两部分。相对偏移量和物理地址。

      相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节

      物理地址:消息在⽇志分段⽂件中对应的物理位置,也占 4 个字节

      4 个字节刚好对应 Integer.MAX_VALUE ,如果⼤于 Integer.MAX_VALUE ,则不能⽤ 4 个字节进⾏表示了。

  • 索引⽂件切分过程

    索引⽂件会根据 log.index.size.max.bytes 值进⾏预先分配空间,即⽂件创建的时候就是最⼤值。

    当真正的进⾏索引⽂件切分的时候,才会将其裁剪到实际数据⼤⼩的⽂件。

    这⼀点是跟⽇志⽂件有所区别的地⽅。其意义降低了代码逻辑的复杂性。

存储

索引

偏移量索引⽂件⽤于记录消息偏移量与物理地址之间的映射关系。时间戳索引⽂件则根据时间戳查找对应的偏移量。

查看⼀个topic分区⽬录下的内容,发现有log、index和timeindex三个⽂件:

  1. log⽂件名是以⽂件中第⼀条message的offset来命名的,实际offset⻓度是64位,但是这⾥只使⽤了20位,应付⽣产是⾜够的。

  2. ⼀组index+log+timeindex⽂件的名字是⼀样的,并且log⽂件默认写满1G后,会进⾏log rolling形成⼀个新的组合来记录消息,这个是通过broker端 log.segment.bytes =1073741824指定的。

  3. index和timeindex在刚使⽤时会分配10M的⼤⼩,当进⾏ log rolling 后,它会修剪为实际的⼤⼩。


  • 1、创建主题

    1
    [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_demo_05 --partitions 1 --replication-factor 1 --config segment.bytes=104857600
  • 2、创建消息⽂件

    1
    [root@node1 ~]# for i in `seq 10000000`; do echo "hello lagou $i" >> nmm.txt; done

  • 3、将⽂本消息⽣产到主题中

    1
    [root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_demo_05 < nmm.txt
  • 4、查看存储⽂件


  • 5、查看log⽂件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    [root@node1 tp_demo_05-0]# kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log | head
    Dumping 00000000000000000000.log
    Starting offset: 0
    baseOffset: 0 lastOffset: 716 baseSequence: -1 lastSequence: -1 producerId: -1
    producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 0
    CreateTime: 1596513421661 isvalid: true size: 16380 magic: 2 compresscodec: NONE crc:
    2973274901
    baseOffset: 717 lastOffset: 1410 baseSequence: -1 lastSequence: -1 producerId: -1
    producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 16380
    CreateTime: 1596513421715 isvalid: true size: 16371 magic: 2 compresscodec: NONE crc:
    1439993110
    baseOffset: 1411 lastOffset: 2092 baseSequence: -1 lastSequence: -1 producerId: -1
    producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 32751
    CreateTime: 1596513421747 isvalid: true size: 16365 magic: 2 compresscodec: NONE crc:
    3528903590
    baseOffset: 2093 lastOffset: 2774 baseSequence: -1 lastSequence: -1 producerId: -1
    producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 49116
    CreateTime: 1596513421791 isvalid: true size: 16365 magic: 2 compresscodec: NONE crc:
    763876977
    baseOffset: 2775 lastOffset: 3456 baseSequence: -1 lastSequence: -1 producerId: -1
    producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 65481
    CreateTime: 1596513421795 isvalid: true size: 16365 magic: 2 compresscodec: NONE crc:
    2218198476
    baseOffset: 3457 lastOffset: 4138 baseSequence: -1 lastSequence: -1 producerId: -1
    producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 81846
    CreateTime: 1596513421798 isvalid: true size: 16365 magic: 2 compresscodec: NONE crc:
    4018065070
    baseOffset: 4139 lastOffset: 4820 baseSequence: -1 lastSequence: -1 producerId: -1
    producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 98211
    CreateTime: 1596513421802 isvalid: true size: 16365 magic: 2 compresscodec: NONE crc:
    3073882858
    baseOffset: 4821 lastOffset: 5502 baseSequence: -1 lastSequence: -1 producerId: -1
    producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 114576
    CreateTime: 1596513421819 isvalid: true size: 16365 magic: 2 compresscodec: NONE crc:
    207330377
    • offset是逐渐增加的整数,每个offset对应⼀个消息的偏移量。

    • position:消息批字节数,⽤于计算物理地址。

    • CreateTime:时间戳。

    • magic:2代表这个消息类型是V2,如果是0则代表是V0类型,1代表V1类型。

    • compresscodec:None说明没有指定压缩类型,kafka⽬前提供了4种可选择,0-None、1-GZIP、2-snappy、3-lz4。

    • crc:对所有字段进⾏校验后的crc值。

  • 消息存储

    1. 消息内容保存在log⽇志⽂件中。

    2. 消息封装为Record,追加到log⽇志⽂件末尾,采⽤的是顺序写模式

    3. ⼀个topic的不同分区,可认为是queue,顺序写⼊接收到的消息。


    消费者有offset。下图中,消费者A消费的offset是9,消费者B消费的offset是11,不同的消费者offset是交给⼀个内部公共topic来记录的。


    时间戳索引⽂件,它的作⽤是可以让⽤户查询某个时间段内的消息,它⼀条数据的结构是时间戳(8byte) +相对offset(4byte),如果要使⽤这个索引⽂件,⾸先需要通过时间范围,找到对应的相对offset,然后再去对应的 index⽂件找到position信息,然后才能遍历log⽂件,它也是需要使⽤上⾯说的index⽂件的。

    但是由于producer⽣产消息可以指定消息的时间戳,这可能将导致消息的时间戳不⼀定有先后顺序,因此尽量不要⽣产消息时指定时间戳

偏移量
  1. 位置索引保存在index⽂件中

  2. log⽇志默认每写⼊4K(log.index.interval.bytes设定的),会写⼊⼀条索引信息到index⽂件中,因此索引⽂件是稀疏索引,它不会为每条⽇志都建⽴索引信息。

  3. log⽂件中的⽇志,是顺序写⼊的,由message+实际offset+position组成

  4. 索引⽂件的数据结构则是由相对offset(4byte)+position(4byte)组成,由于保存的是相对第⼀个消息的相对offset,只需要4byte就可以了,可以节省空间,在实际查找后还需要计算回实际的offset,这对⽤户是透明的。

稀疏索引,索引密度不⾼,但是offset有序,⼆分查找的时间复杂度为O(lgN),如果从头遍历时间复杂度是O(N)。

示意图如下:


偏移量索引由相对偏移量和物理地址组成。


可以通过如下命令解析 .index ⽂件

1
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index --print-data-log | head

注意:offset 与 position 没有直接关系,因为会删除数据和清理⽇志。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
[root@node1 tp_demo_05-0]# kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000003925423.log --print-data-log | head
Dumping 00000000000003925423.log
Starting offset: 3925423
baseOffset: 3925423 lastOffset: 3926028 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 0
CreateTime: 1596513434779 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
4049330741
baseOffset: 3926029 lastOffset: 3926634 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 16359
CreateTime: 1596513434786 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
2290699169
baseOffset: 3926635 lastOffset: 3927240 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 32718
CreateTime: 1596513434787 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
368995405
baseOffset: 3927241 lastOffset: 3927846 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 49077
CreateTime: 1596513434788 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
143415655
baseOffset: 3927847 lastOffset: 3928452 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 65436
CreateTime: 1596513434789 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
572340120
baseOffset: 3928453 lastOffset: 3929058 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 81795
CreateTime: 1596513434790 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
1029643347
baseOffset: 3929059 lastOffset: 3929664 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 98154
CreateTime: 1596513434791 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
2163818250
baseOffset: 3929665 lastOffset: 3930270 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 114513
CreateTime: 1596513434792 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
3747213735

在偏移量索引⽂件中,索引数据都是顺序记录 offset ,但时间戳索引⽂件中每个追加的索引时间戳必须⼤于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息元数据中存在若⼲的时间戳信息。如果 broker 端参数 log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳必定能保持单调增⻓。反之如果是CreateTime 则⽆法保证顺序。

注意:timestamp⽂件中的 offset 与 index ⽂件中的 relativeOffset 不是⼀⼀对应的。因为数据的写⼊是各⾃追加。

如何查看偏移量为23的消息?

Kafka 中存在⼀个 ConcurrentSkipListMap 来保存在每个⽇志分段,通过跳跃表⽅式,定位到在00000000000000000000.index ,通过⼆分法在偏移量索引⽂件中找到不⼤于 23 的最⼤索引项,即 offset 20 那栏,然后从⽇志分段⽂件中的物理位置为320 开始顺序查找偏移量为 23 的消息。

时间戳

在偏移量索引⽂件中,索引数据都是顺序记录 offset ,但时间戳索引⽂件中每个追加的索引时间戳必须⼤于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息信息中存在若⼲的时间戳信息。如果 broker 端参数 log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳必定能保持单调增⻓。反之如果是 CreateTime 则⽆法保证顺序。

通过时间戳⽅式进⾏查找消息,需要通过查找时间戳索引偏移量索引两个⽂件。

时间戳索引索引格式:前⼋个字节表示时间戳,后四个字节表示偏移量



查找时间戳为 1557554753430 开始的消息?

  1. 查找该时间戳应该在哪个⽇志分段中。将1557554753430和每个⽇志分段中最⼤时间戳largestTimeStamp逐⼀对⽐,直到找到不⼩于1557554753430所对应的⽇志分段。⽇志分段中的largestTimeStamp的计算是:先查询该⽇志分段所对应时间戳索引⽂件,找到最后⼀条索引项,若最后⼀条索引项的时间戳字段值⼤于0,则取该值,否则取该⽇志分段的最近修改时间。

  2. 查找该⽇志分段的偏移量索引⽂件,查找该偏移量对应的物理地址。

  3. ⽇志⽂件中从 320 的物理位置开始查找不⼩于 1557554753430 数据。

注意:timestamp⽂件中的 offset 与 index ⽂件中的 relativeOffset 不是⼀⼀对应的,因为数据的写⼊是各⾃追加。

清理

Kafka 提供两种⽇志清理策略:

⽇志删除:按照⼀定的删除策略,将不满⾜条件的数据进⾏数据删除。

⽇志压缩:针对每个消息的 Key 进⾏整合,对于有相同 Key 的不同 Value 值,只保留最后⼀个版本。

Kafka 提供 log.cleanup.policy 参数进⾏相应配置,默认值: delete ,还可以选择 compact 。

主题级别的配置项是 cleanup.policy 。

⽇志删除
  • 基于时间

    ⽇志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设定⽇志保留的时间节点。如果超过该设定值,就需要进⾏删除。默认是 7 天, log.retention.ms 优先级最⾼。

    Kafka 依据⽇志分段中最⼤的时间戳进⾏定位。

    ⾸先要查询该⽇志分段所对应的时间戳索引⽂件,查找时间戳索引⽂件中最后⼀条索引项,若最后⼀条索引项的时间戳字段值⼤于 0,则取该值,否则取最近修改时间。

    为什么不直接选最近修改时间呢?

    因为⽇志⽂件可以有意⽆意的被修改,并不能真实的反应⽇志分段的最⼤时间信息。

    删除过程

    1. 从⽇志对象中所维护⽇志分段的跳跃表中移除待删除的⽇志分段,保证没有线程对这些⽇志分段进⾏读取操作。

    2. 这些⽇志分段所有⽂件添加 上 .delete 后缀。

    3. 交由⼀个以 “delete-file” 命名的延迟任务来删除这些 .delete 为后缀的⽂件。延迟执⾏时间可以通过file.delete.delay.ms 进⾏设置

    如果活跃的⽇志分段中也存在需要删除的数据时?

    Kafka 会先切分出⼀个新的⽇志分段作为活跃⽇志分段,该⽇志分段不删除,删除原来的⽇志分段。先腾出地⽅,再删除。

  • 基于⽇志⼤⼩

    ⽇志删除任务会检查当前⽇志的⼤⼩是否超过设定值。设定项为 log.retention.bytes ,单个⽇志分段的⼤⼩由 log.segment.bytes 进⾏设定。

    删除过程

    1. 计算需要被删除的⽇志总⼤⼩ (当前⽇志⽂件⼤⼩(所有分段)减去retention值)。

    2. 从⽇志⽂件第⼀个 LogSegment 开始查找可删除的⽇志分段的⽂件集合。

    3. 执⾏删除。

  • 基于偏移量

    根据⽇志分段的下⼀个⽇志分段的起始偏移量是否⼤于等于⽇志⽂件的起始偏移量,若是,则可以删除此⽇志分段。

    注意:⽇志⽂件的起始偏移量并不⼀定等于第⼀个⽇志分段的基准偏移量,存在数据删除,可能与之相等的那条数据已经被删除了。


    删除过程

    1. 从头开始遍历每个⽇志分段,⽇志分段1的下⼀个⽇志分段的起始偏移量为21,⼩于logStartOffset,将⽇志分段1加⼊到删除队列中

    2. ⽇志分段 2 的下⼀个⽇志分段的起始偏移量为35,⼩于 logStartOffset,将 ⽇志分段 2 加⼊到删除队列中

    3. ⽇志分段 3 的下⼀个⽇志分段的起始偏移量为57,⼩于logStartOffset,将⽇志分段3加⼊删除集合中

    4. ⽇志分段4的下⼀个⽇志分段的其实偏移量为71,⼤于logStartOffset,则不进⾏删除。

⽇志压缩
    1. 概念

    ⽇志压缩是Kafka的⼀种机制,可以提供较为细粒度的记录保留,⽽不是基于粗粒度的基于时间的保留。

    对于具有相同的Key,⽽数据不同,只保留最后⼀条数据,前⾯的数据在合适的情况下删除。

    1. 应⽤场景

    ⽇志压缩特性,就实时计算来说,可以在异常容灾⽅⾯有很好的应⽤途径。⽐如,我们在Spark、Flink中做实时计算时,需要⻓期在内存⾥⾯维护⼀些数据,这些数据可能是通过聚合了⼀天或者⼀周的⽇志得到的,这些数据⼀旦由于异常因素(内存、⽹络、磁盘等)崩溃了,从头开始计算需要很⻓的时间。⼀个⽐较有效可⾏的⽅式就是定时将内存⾥的数据备份到外部存储介质中,当崩溃出现时,再从外部存储介质中恢复并继续计算。

    使⽤⽇志压缩来替代这些外部存储有哪些优势及好处呢?这⾥为⼤家列举并总结了⼏点:

    • Kafka即是数据源⼜是存储⼯具,可以简化技术栈,降低维护成本

    • 使⽤外部存储介质的话,需要将存储的Key记录下来,恢复的时候再使⽤这些Key将数据取回,实现起来有⼀定的⼯程难度和复杂度。使⽤Kafka的⽇志压缩特性,只需要把数据写进Kafka,等异常出现恢复任务时再读回到内存就可以了

    • Kafka对于磁盘的读写做了⼤量的优化⼯作,⽐如磁盘顺序读写。相对于外部存储介质没有索引查询等⼯作量的负担,可以实现⾼性能。同时,Kafka的⽇志压缩机制可以充分利⽤廉价的磁盘,不⽤依赖昂贵的内存来处理,在性能相似的情况下,实现⾮常⾼的性价⽐(这个观点仅仅针对于异常处理和容灾的场景来说)

    1. 实现细节

    主题的 cleanup.policy 需要设置为compact。

    Kafka的后台线程会定时将Topic遍历两次:

    1. 记录每个key的hash值最后⼀次出现的偏移量

    2. 第⼆次检查每个offset对应的Key是否在后⾯的⽇志中出现过,如果出现了就删除对应的⽇志。

    ⽇志压缩允许删除,除最后⼀个key之外,删除先前出现的所有该key对应的记录。在⼀段时间后从⽇志中清理,以释放空间。

    注意:⽇志压缩与key有关,确保每个消息的key不为null。

    压缩是在Kafka后台通过定时重新打开Segment来完成的,Segment的压缩细节如下图所示:


    ⽇志压缩可以确保:

    • 任何保持在⽇志头部以内的使⽤者都将看到所写的每条消息,这些消息将具有顺序偏移量。可以使⽤Topic的min.compaction.lag.ms属性来保证消息在被压缩之前必须经过的最短时间。也就是说,它为每个消息在(未压缩)头部停留的时间提供了⼀个下限。可以使⽤Topic的max.compaction.lag.ms属性来保证从收到消息到消息符合压缩条件之间的最⼤延时

    • 消息始终保持顺序,压缩永远不会重新排序消息,只是删除⼀些⽽已

    • 消息的偏移量永远不会改变,它是⽇志中位置的永久标识符

    • 从⽇志开始的任何使⽤者将⾄少看到所有记录的最终状态,按记录的顺序写⼊。另外,如果使⽤者在⽐Topic的log.cleaner.delete.retention.ms短的时间内到达⽇志的头部,则会看到已删除记录的所有delete标记。保留时间默认是24⼩时。

默认情况下,启动⽇志清理器,若需要启动特定Topic的⽇志清理,请添加特定的属性。配置⽇志清理器,这⾥为⼤家总结了以下⼏点:

  1. log.cleanup.policy 设置为 compact ,Broker的配置,影响集群中所有的Topic。

  2. log.cleaner.min.compaction.lag.ms ,⽤于防⽌对更新超过最⼩消息进⾏压缩,如果没有设置,除最后⼀个Segment之外,所有Segment都有资格进⾏压缩

  3. log.cleaner.max.compaction.lag.ms ,⽤于防⽌低⽣产速率的⽇志在⽆限制的时间内不压缩。

Kafka的⽇志压缩原理并不复杂,就是定时把所有的⽇志读取两遍,写⼀遍,⽽CPU的速度超过磁盘完全不是问题,只要⽇志的量对应的读取两遍和写⼊⼀遍的时间在可接受的范围内,那么它的性能就是可以接受的

磁盘存储

零拷⻉

kafka⾼性能,是多⽅⾯协同的结果,包括宏观架构、分布式partition存储、ISR数据同步、以及“⽆所不⽤其极”的⾼效利⽤磁盘/操作系统特性。

零拷⻉并不是不需要拷⻉,⽽是减少不必要的拷⻉次数。通常是说在IO读写过程中。

nginx的⾼性能也有零拷⻉的身影。

传统IO,⽐如:读取⽂件,socket发送。

1
2
buffer = File.read
Socket.send(buffer)

传统⽅式实现:先读取、再发送,实际经过1~4四次copy。


  1. 第⼀次:将磁盘⽂件,读取到操作系统内核缓冲区;

  2. 第⼆次:将内核缓冲区的数据,copy到application应⽤程序的buffer;

  3. 第三步:将application应⽤程序buffer中的数据,copy到socket⽹络发送缓冲区(属于操作系统内核的缓冲区);

  4. 第四次:将socket buffer的数据,copy到⽹络协议栈,由⽹卡进⾏⽹络传输。

实际IO读写,需要进⾏IO中断,需要CPU响应中断(内核态到⽤户态转换),尽管引⼊DMA(Direct Memory Access,直接存储器访问)来接管CPU的中断请求,但四次copy是存在“不必要的拷⻉”的。

实际上并不需要第⼆个和第三个数据副本。数据可以直接从读缓冲区传输到套接字缓冲区。

kafka的两个过程:

  1. ⽹络数据持久化到磁盘 (Producer 到 Broker)

  2. 磁盘⽂件通过⽹络发送(Broker 到 Consumer)

数据落盘通常都是⾮实时的,Kafka的数据并不是实时的写⼊硬盘,它充分利⽤了现代操作系统分⻚存储来利⽤内存提⾼I/O效率。

  • 磁盘⽂件通过⽹络发送(Broker 到 Consumer)

    磁盘数据通过DMA(Direct Memory Access,直接存储器访问)拷⻉到内核态 Buffer 直接通过 DMA 拷⻉到 NIC Buffer(socket buffer),⽆需 CPU 拷⻉。

    除了减少数据拷⻉外,整个读⽂件 ==> ⽹络发送由⼀个 sendfile 调⽤完成,整个过程只有两次上下⽂切换,因此⼤⼤提⾼了性能。

    Java NIO对sendfile的⽀持就是FileChannel.transferTo()/transferFrom()。

    把磁盘⽂件读取OS内核缓冲区后的fileChannel,直接转给socketChannel发送;底层就是sendfile。消费者从broker读取数据,就是由此实现。

    具体来看,Kafka 的数据传输通过 TransportLayer 来完成,其⼦类 PlaintextTransportLayer 通过 Java NIO 的 FileChannel 的 transferTo 和 transferFrom ⽅法实现零拷⻉。


    注: transferTo 和 transferFrom 并不保证⼀定能使⽤零拷⻉,需要操作系统⽀持。

    Linux 2.4+ 内核通过 sendfile 系统调⽤,提供了零拷⻉。

⻚缓存

⻚缓存是操作系统实现的⼀种主要的磁盘缓存,以此⽤来减少对磁盘 I/O 的操作。

具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。

Kafka接收来⾃socket buffer的⽹络数据,应⽤进程不需要中间处理、直接进⾏持久化时。可以使⽤mmap内存⽂件映射。

Memory Mapped Files,简称mmap,简单描述其作⽤就是:将磁盘⽂件映射到内存, ⽤户通过修改内存就能修改磁盘⽂件。

它的⼯作原理是直接利⽤操作系统的Page来实现磁盘⽂件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。


通过mmap,进程像读写硬盘⼀样读写内存(当然是虚拟机内存)。使⽤这种⽅式可以获取很⼤的I/O提升,省去了⽤户空间到内核空间复制的开销。

mmap也有⼀个很明显的缺陷:不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调⽤flush的时候才把数据真正的写到硬盘。

Kafka提供了⼀个参数 producer.type 来控制是不是主动flush;

如果Kafka写⼊到mmap之后就⽴即flush然后再返回Producer叫同步(sync);写⼊mmap之后⽴即返回Producer不调⽤flush叫异步(async)。

Java NIO,提供了⼀个MappedByteBuffer 类可以⽤来实现内存映射。

MappedByteBuffer只能通过调⽤FileChannel的map()取得,再没有其他⽅式。

FileChannel.map()是抽象⽅法,具体实现是在 FileChannelImpl.map()可⾃⾏查看JDK源码,其map0()⽅法就是调⽤了Linux内核的mmap的API


使⽤ MappedByteBuffer类要注意的是: mmap的⽂件映射,在full gc时才会进⾏释放。当close时,需要⼿动清除内存映射⽂件,可以反射调⽤sun.misc.Cleaner⽅法。

当⼀个进程准备读取磁盘上的⽂件内容时:

  1. 操作系统会先查看待读取的数据所在的⻚ (page)是否在⻚缓存(pagecache)中,如果存在(命中)则直接返回数据,从⽽避免了对物理磁盘的 I/O 操作。

  2. 如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据⻚存⼊⻚缓存,之后再将数据返回给进程。

如果⼀个进程需要将数据写⼊磁盘:

  1. 操作系统也会检测数据对应的⻚是否在⻚缓存中,如果不存在,则会先在⻚缓存中添加相应的⻚,最后将数据写⼊对应的⻚。

  2. 被修改过后的⻚也就变成了脏⻚,操作系统会在合适的时间把脏⻚中的数据写⼊磁盘,以保持数据的⼀致性。

对⼀个进程⽽⾔,它会在进程内部缓存处理所需的数据,然⽽这些数据有可能还缓存在操作系统的⻚缓存中,因此同⼀份数据有可能被缓存了两次。并且,除⾮使⽤Direct I/O的⽅式, 否则⻚缓存很难被禁⽌。

当使⽤⻚缓存的时候,即使Kafka服务重启, ⻚缓存还是会保持有效,然⽽进程内的缓存却需要重建。这样也极⼤地简化了代码逻辑,因为维护⻚缓存和⽂件之间的⼀致性交由操作系统来负责,这样会⽐进程内维护更加安全有效。

Kafka中⼤量使⽤了⻚缓存,这是 Kafka 实现⾼吞吐的重要因素之⼀。

消息先被写⼊⻚缓存,由操作系统负责刷盘任务。

顺序写⼊

操作系统可以针对线性读写做深层次的优化,⽐如预读(read-ahead,提前将⼀个⽐较⼤的磁盘块读⼊内存) 和后写(write-behind,将很多⼩的逻辑写操作合并起来组成⼀个⼤的物理写操作)技术。


Kafka 在设计时采⽤了⽂件追加的⽅式来写⼊消息,即只能在⽇志⽂件的尾部追加新的消息,并且也不允许修改已写⼊的消息,这种⽅式属于典型的顺序写盘的操作,所以就算 Kafka 使⽤磁盘作为存储介质,也能承载⾮常⼤的吞吐量。

mmap和sendfile:

  1. Linux内核提供、实现零拷⻉的API;

  2. sendfile 是将读到内核空间的数据,转到socket buffer,进⾏⽹络发送。

  3. mmap将磁盘⽂件映射到内存,⽀持读和写,对内存的操作会反映在磁盘⽂件上。

  4. RocketMQ 在消费消息时,使⽤了 mmap。kafka 使⽤了 sendFile。

Kafka速度快是因为:

  1. partition顺序读写,充分利⽤磁盘特性,这是基础。

  2. Producer⽣产的数据持久化到broker,采⽤mmap⽂件映射,实现顺序的快速写⼊。

  3. Customer从broker读取数据,采⽤sendfile,将磁盘⽂件读到OS内核缓冲区后,直接转到socket buffer进⾏⽹络发送。