Kafka源码之组消费模式
组消费模式指的是在消费者消费消息的时候,使⽤组协调器的再平衡机制⾃动分配要消费的分区(们)。
此时需要在消费者的配置中指定消费组ID,同时如果需要,设置偏移量重置的策略。
然后消费者订阅主题,就可以消费消息了。
1 | Map<String, Object> configs = new HashMap<>(); |
consumer.subscribe ⽅法的实现:
上⾯⽅法中第⼀个参数是订阅的主题集合,第⼆个参数是⼀个监听器,当发送再平衡的时候消费者想要执⾏的操作。
默认是NoOpConsumerRebalanceListener,即什么都不做:
NoOpConsumerRebalanceListener的实现:
订阅⽅法的实现:
subscriptions的订阅操作实现:
就是对SubscriptionState的操作:
⽤户的poll的操作调⽤pollOnce⽅法:
pollOnce的实现:
coordinator.poll负责周期性地向broker提交偏移量信息。
上⾯⽅法中updateFetchPositions⽅法表示:如果订阅的主题分区没有偏移量信息,则更新主题分区的偏移量信息,这样就知道消费的时候从哪⾥开始消费了:
上图中的fetcher.resetOffsetsIfNeeded⽅法的实现:
resetOffsets的具体实现:
上述的实现表示:⾸先根据重置策略重置主题分区的偏移量请求类型,然后发送请求,真正从主题的分区中获取偏移量。
其中上图中的
需要向broker发请求,获取主题分区的偏移量,更新偏移量的值:
发送请求的实现:
发送的请求是ListOffsetRequest请求:
该请求在Broker中的处理:
具体处理:
该⽅法的实现:
如果是最晚的,直接设置最晚的偏移量,如果不是最晚的,则需要根据主题分区以及时间戳查找:
查找的逻辑:
对于消费者,向指定的broker发送ListOffsetRequest请求,获取指定主题分区的偏移量和时间戳信息:
调⽤handleListOffsetResponse处理获取的偏移量信息:
complete⽅法⽤于完成请求。当complete⽅法调⽤之后,successed⽅法返回true。
同时偏移量信息可以通过value⽅法获取:
即:变量offsetsByTimes的值就是下图中future.value()的值。此时各个主题分区的偏移量已经设置好了:
pollOnce⽅法:
在更新主题分区的偏移量之后,就可以发送请求消费消息了:
对于组消费,还需要定期将偏移量提交到 __consumer_offsets 主题中:
poll⽅法的实现:
如果是⾃动提交消费者偏移量到broker的 __consumer_offsets 主题,则maybeAutoCommitOffsetsAsync的实现:
doAutoCommitOffsetsAsync的实现:
commitOffsetsAsync的实现:
在异步提交消费者偏移量的时候,如果组协调器已知,直接发送; 如果未知,则异步提交等待,查找组协调器,等找到之后,异步提交消费者偏移量:
上图中sendOffsetCommitRequest的实现:
-
⾸先查找消费组协调器
-
然后创建偏移量提交请求对象
-
发送请求
在KafkaServer处理的时候:
handleOffsetCommitRequest的实现:
消费组协调器的处理:
doCommitOffsets的实现:
storeOffsets的实现:
其中:
appendForGroup的实现如下,将当前消费组的偏移量消息追加到 __consumer_offsets 的指定分区中: