1. kafka⽇志管理⼦系统的⼊⼝。⽇志管理器负责⽇志的创建、抽取、和清理。

  2. 所有的读写操作都代理给具体的Log实例。

  3. ⽇志管理器在⼀个或多个⽬录维护⽇志。新的⽇志创建到拥有最少log的⽬录中。

  4. 分区不移动。

  5. 通过⼀个后台线程通过定期截断多余的⽇志段来处理⽇志保留。

启动Kafka服务器的脚本。


main⽅法中创建KafkaServerStartable对象。


该类中包含KakfaServer对象,startup⽅法调⽤的是KafkaServer的startup⽅法。


KafkaServer的startup⽅法中,启动了LogManager。



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* @param logDirs 主题分区⽬录的File对象
* @param initialOfflineDirs
* @param topicConfigs 主题配置
* @param defaultConfig 主题的默认配置
* @param cleanerConfig ⽇志清理器配置
* @param ioThreads IO线程数
* @param flushCheckMs
* @param flushRecoveryOffsetCheckpointMs
* @param flushStartOffsetCheckpointMs
* @param retentionCheckMs 检查⽇志保留的时间
* @param maxPidExpirationMs
* @param scheduler
* @param brokerState
* @param brokerTopicStats
* @param logDirFailureChannel
* @param time 时间
*/

LogManager的startup⽅法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 启动后台线程们⽤于将⽇志刷盘以及⽇志的清理
*/
def startup() {
if (scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
// ⽤于清除⽇志⽚段的调度任务,没有压缩,周期性执⾏
scheduler.schedule("kafka-log-retention",
cleanupLogs _,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
// ⽤于⽇志⽚段刷盘的调度任务,周期性执⾏
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs _,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
// ⽤于将当前broker上各个分区的恢复点写到⽂本⽂件的调度任务,周期性执⾏
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointLogRecoveryOffsets _,
delay = InitialTaskDelayMs,
period = flushRecoveryOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
// ⽤于将当前broker上各个分区起始偏移量写到⽂本⽂件的调度任务,周期性执⾏
scheduler.schedule("kafka-log-start-offset-checkpoint",
checkpointLogStartOffsets _,
delay = InitialTaskDelayMs,
period = flushStartOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-delete-logs",
deleteLogs _,
delay = InitialTaskDelayMs,
period = defaultConfig.fileDeleteDelayMs,
TimeUnit.MILLISECONDS)
}
// 如果配置了⽇志的清理,则启动清理任务
if (cleanerConfig.enableCleaner)
cleaner.startup()
}

清除⽇志⽚段


cleanupLogs的具体实现。


deleteOldSegments()的实现。



⾸先找到所有可以删除的⽇志⽚段,然后执⾏删除。



该⽅法执⾏⽇志⽚段的异步删除。步骤如下:

  1. 将⽇志⽚段的信息从map集合移除,之后再也不读了

  2. 在⽇志⽚段的索引和log⽂件名称后追加.deleted,加标记⽽已

  3. 调度异步删除操作,执⾏.deleted⽂件的真正删除。

异步删除允许在读取⽂件的同时执⾏删除,⽽不需要进⾏同步,避免了在读取⼀个⽂件的同时物理删除引起的冲突。

该⽅法不需要将IOException转换为KafkaStorageException,因为该⽅法要么在所有⽇志加载之前调⽤,要么在使⽤中由调⽤者处理IOException。


根据⽇志⽚段⼤⼩进⾏删除。


shouldDelete是⼀个函数,作为deleteOldSegments删除⽇志⽚段的判断条件。

根据偏移量删除⽇志⽚段:对于当前⽇志⽚段是否需要删除,要看它的下⼀个⽇志⽚段的baseOffset是否⼩于等于⽇志对外暴露给消费者的⽇志偏移量,如果⼩,消费者不⽤读取,当前⽇志⽚段就可以删除。


⽇志⽚段刷盘

在LogManager的startup中,启动了刷盘的线程:调⽤flushDirtyLogs⽅法进⾏⽇志刷盘处理。


Kafka推荐让操作系统后台进⾏刷盘,使⽤副本保证数据⾼可⽤,这样效率更⾼。

因此此种⽅式不推荐。


执⾏刷盘的⽅法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* ⽇志⽚段刷盘到offset-1的偏移量位置。
*
* @param offset 从上⼀个恢复点开始刷盘到该偏移量-1的位置。offset偏移量的不刷盘。
* offset是新的恢复点值。
*/
def flush(offset: Long) : Unit = {
maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset") {
// 如果偏移量⼩于等于该⽇志的恢复点,则不需要刷盘
if (offset <= this.recoveryPoint)
return
debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime,
current time: ${time.milliseconds()}, " + s"unflushed: $unflushedMessages")
// 遍历需要刷盘的⽇志⽚段
for (segment <- logSegments(this.recoveryPoint, offset))
// 执⾏刷盘
segment.flush()
lock synchronized {
// 检查MMAP是否关闭
checkIfMemoryMappedBufferClosed()
// 如果偏移量⼤于恢复点
if (offset > this.recoveryPoint) {
// 设置新的恢复点,表示到达这个偏移量位置的消息都已经刷盘了
this.recoveryPoint = offset
// 设置当前时间为刷盘的时间
lastflushedTime.set(time.milliseconds)
}
}
}
}

将当前broker上各个分区的恢复点写到⽂本⽂件




将当前broker上各个分区起始偏移量写到⽂本⽂件




删除⽇志⽚段


对标记为删除的⽇志执⾏删除的动作。



clearner

如果配置了⽇志清理,则启动清理任务。



cleaners是多个CleanerThread集合。


最终执⾏清理的是,压缩。