消费者如何提交偏移量?

  • ⾃动提交

  • ⼿动提交

    • 同步提交

    • 异步提交

客户端提交偏移量,交给KafkaApis的handle⽅法,handle⽅法使⽤模式匹配,调⽤handleOffsetCommitRequest⽅法进⾏处理:


handleOffsetCommitRequest的实现。


如果apiVersion的值是0,则交给zookeeper保存偏移量信息。


否则调⽤组协调器负责处理偏移量提交请求。


handleCommitOffsets的实现:⾸先根据groupId查找消费组元数据,如果没有找到消费组元数据,则要么该消费组不依赖Kafka进⾏消费组管理,允许提交;要么提交的偏移量信息是消费组再平衡之前的偏移量,旧请求,拒绝。

正常情况就是最后的分⽀:找到了消费组元数据,调⽤doCommitOffsets处理。偏移量提交的请求。


doCommitOffsets的实现,该⽅法判断消费组的状态:

  1. 如果是Dead,则响应错误信息。

  2. 如果消费组还在等待消费者同步,则响应错误信息。

  3. 如果消费组中没有这个消费者,则响应错误信息。

  4. 如果请求中的纪元数字和消费组当前纪元数字不符,则响应错误信息。

  5. 如果仅使⽤Kafka存储偏移量,⽽不需要管理,则直接保存偏移量。

  6. 正常情况下,找到了消费组,消费组中有这个消费者,同时消费组⼯作正常,则保存偏移量信息。


storeOffsets⽅法的实现。


需要先计算当前消费组的偏移量需要提交到 __consumer_offsets 主题的哪个分区中。


将消息追加到 __consumer_offsets 主题的指定分区中。


其中计算 __consumer_offsets 分区的实现。


上图中的函数,计算⽅式如下:获取消费组ID的散列值,取绝对值,然后将此绝对值对 __consumer_offsets 主题分区个数取模得到。

appendForGroup⽅法的实现:调⽤副本管理器的⽅法将消息追加到 __consumer_offsets 主题的指定分区⽇志中。


如果偏移量消息追加成功,则调⽤callback响应客户端。


缓存偏移量信息。




responseCallback最终是KafkaApis中的308⾏(有可能不是,因为我加注释了,差不多这么多⾏):该函数将消费者提交的偏移量追加到⽇志中并添加到消费组缓存中之后,返回结果给消费者客户端。


消费者提交偏移量:KafkaApis,KafkaApis->GroupCoordinator的⽅法-> GroupMetadata

不仅需要将消费组的偏移量提交到⽇志中,还需要在内存维护该偏移量信息。

其实对于消费者,获取结果后,也需要在消费者客户端解析该响应,将消费者的偏移量缓存到消费者客户端。

消费者客户端消费消息的⽅法:KafkaConsumer.poll(1_000)。

调⽤poll⽅法拉取消息,该⽅法调⽤pollOnce进⾏消息的拉取。


pollOnce⽅法会调⽤coordinator的poll⽅法周期性地提交偏移量。


其中poll⽅法的实现。


poll⽅法中,最后会判断是否需要⾃动提交偏移量。






invokeCompletedOffsetCommitCallbacks⽅法⽤于轮询偏移量提交后broker端的响应信息。




onCommitCompleted的实现。


lastCommittedOffsets为:


KafkaConsumer -> Broker -> KafkaApis -handle-> GroupCoordinator -> GroupMetadataManager -> GroupMetadata -> ReplicaManager -> log-> KafkaConsumer -> lastCommittedOffsets集合。

在Kafka 1.0.2之前的版本中有⼀个OffsetManager负责偏移量的处理。

OffsetManager主要提供对offset的保存和读取,kafka管理topic的偏移量有2种⽅式:

  1. zookeeper,即把偏移量提交⾄zk上;

  2. kafka,即把偏移量提交⾄kafka内部,主要由offsets.storage参数决定。1.0.2版本中默认是kafka。也就是说如果配置offsets.storage= kafka,则kafka会把这种offsetcommit请求转变为⼀种Producer,保存⾄topic为 __consumer_offsets 的log⾥⾯。

1
2
3
4
5
6
7
8
9
10
11
class OffsetManager(val config: OffsetManagerConfig,
replicaManager: ReplicaManager,
zkClient: ZkClient,
scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
//通过offsetsCache提供对GroupTopicPartition的查询
private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
//把过时的偏移量刷⼊磁盘,因为这些偏移量⻓时间没有被更新,意味着消费者可能不再消费了,也就不需要了,因此刷⼊到磁盘
scheduler.schedule(name = "offsets-cache-compactor",
fun = compact,
period = config.offsetsRetentionCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)

主要完成2件事情:

  1. 提供对topic偏移量的查询

  2. 将偏移量消息刷⼊__consumer_offsets主题的log中。