Kafka源码之组消费模式
组消费模式指的是在消费者消费消息的时候,使⽤组协调器的再平衡机制⾃动分配要消费的分区(们)。
此时需要在消费者的配置中指定消费组ID,同时如果需要,设置偏移量重置的策略。
然后消费者订阅主题,就可以消费消息了。
1 | Map<String, Object> configs = new HashMap<>(); |
consumer.subscribe ⽅法的实现:

上⾯⽅法中第⼀个参数是订阅的主题集合,第⼆个参数是⼀个监听器,当发送再平衡的时候消费者想要执⾏的操作。
默认是NoOpConsumerRebalanceListener,即什么都不做:
NoOpConsumerRebalanceListener的实现:

订阅⽅法的实现:

subscriptions的订阅操作实现:

就是对SubscriptionState的操作:

⽤户的poll的操作调⽤pollOnce⽅法:

pollOnce的实现:

coordinator.poll负责周期性地向broker提交偏移量信息。
上⾯⽅法中updateFetchPositions⽅法表示:如果订阅的主题分区没有偏移量信息,则更新主题分区的偏移量信息,这样就知道消费的时候从哪⾥开始消费了:

上图中的fetcher.resetOffsetsIfNeeded⽅法的实现:

resetOffsets的具体实现:


上述的实现表示:⾸先根据重置策略重置主题分区的偏移量请求类型,然后发送请求,真正从主题的分区中获取偏移量。
其中上图中的

需要向broker发请求,获取主题分区的偏移量,更新偏移量的值:

发送请求的实现:

发送的请求是ListOffsetRequest请求:

该请求在Broker中的处理:

具体处理:

该⽅法的实现:

如果是最晚的,直接设置最晚的偏移量,如果不是最晚的,则需要根据主题分区以及时间戳查找:

查找的逻辑:

对于消费者,向指定的broker发送ListOffsetRequest请求,获取指定主题分区的偏移量和时间戳信息:

调⽤handleListOffsetResponse处理获取的偏移量信息:

complete⽅法⽤于完成请求。当complete⽅法调⽤之后,successed⽅法返回true。
同时偏移量信息可以通过value⽅法获取:

即:变量offsetsByTimes的值就是下图中future.value()的值。此时各个主题分区的偏移量已经设置好了:


pollOnce⽅法:

在更新主题分区的偏移量之后,就可以发送请求消费消息了:

对于组消费,还需要定期将偏移量提交到 __consumer_offsets 主题中:

poll⽅法的实现:


如果是⾃动提交消费者偏移量到broker的 __consumer_offsets 主题,则maybeAutoCommitOffsetsAsync的实现:

doAutoCommitOffsetsAsync的实现:

commitOffsetsAsync的实现:

在异步提交消费者偏移量的时候,如果组协调器已知,直接发送; 如果未知,则异步提交等待,查找组协调器,等找到之后,异步提交消费者偏移量:

上图中sendOffsetCommitRequest的实现:
-
⾸先查找消费组协调器
-
然后创建偏移量提交请求对象
-
发送请求

在KafkaServer处理的时候:

handleOffsetCommitRequest的实现:


消费组协调器的处理:

doCommitOffsets的实现:


storeOffsets的实现:
其中:

appendForGroup的实现如下,将当前消费组的偏移量消息追加到 __consumer_offsets 的指定分区中:
