Kafka源码之Consumer消费者流程
Consumer示例
KafkaConsumer,消费者的根本⽬的是从Kafka服务端拉取消息,并交给业务逻辑进⾏处理。
开发⼈员不必关⼼与Kafka服务端之间⽹络连接的管理、⼼跳检测、请求超时重试等底层操作也不必关⼼订阅Topic的分区数量、分区Leader副本的⽹络拓扑以及消费组的Rebalance等细节,另外还提供了⾃动提交offset的功能。
1 | public static void main(String[] args) throws InterruptedException { |
Kafka服务端并不会记录消费者的消费位置,⽽是由消费者⾃⼰决定如何保存如何记录其消费的offset。在Kafka服务端中添加了⼀个名为“__consumer_offsets"的内部topic来保存消费者提交的offset,当出现消费者上、下线时会触发Consumer Group进⾏Rebalance操作,对分区进⾏重新分配,待Rebalance操作完成后。消费者就可以读取该topic中记录的offset,并从此offset位置继续消费。当然,使⽤该topic记录消费者的offset只是默认选项,开发⼈员可以根据业务需求将offset记录在别的存储中。
在消费者消费消息的过程中,提交offset的时机⾮常重要,因为它决定了消费者故障重启后的消费位置。在上⾯的示例中,我们通过将 enable.auto.commit 选项设置为true可以起到⾃动提交offset的功能, auto.commit.interval.ms 选项则设置了⾃动提交的时间间隔。每次在调⽤ KafkaConsumer.poll() ⽅法时都会检测是否需要⾃动提交,并提交上次 poll() ⽅法返回的最后⼀个消息的offset。为了避免消息丢失,建议poll()⽅法之前要处理完上次poll()⽅法拉取的全部消息。KafkaConsumer中还提供了两个⼿动提交offset的⽅法,分别是 commitSync() 和 commitAsync() ,它们都可以指定提交的offset值,区别在于前者是同步提交,后者是异步提交。
KafkaConsumer实例化
了解了 KafkaConsumer 的基本使⽤,开始深⼊了解 KafkaConsumer 原理和实现,先看⼀下构造⽅法核⼼逻辑
1 | private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { |
-
- 初始化参数配置
-
- 初始化⽹络客户端 NetworkClient
-
- 初始化消费者⽹络客户端 ConsumerNetworkClient
-
- 初始化offset提交策略,默认⾃动提交
-
- 初始化消费者协调器 ConsumerCoordinator
-
- 初始化拉取器 Fetcher
订阅Topic
下⾯我们先来看⼀下subscribe⽅法都有哪些逻辑:
1 | public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { |
-
- KafkaConsumer不是线程安全类,开启轻量级锁,topics为空抛异常,topics是空集合开始取消订阅,再次判断topics集合中是否有⾮法数据,判断消费者协调者是否为空。开始订阅对应topic。listener默认为 NoOpConsumerRebalanceListener ,⼀个空操作
- 轻量级锁:分别记录了当前使⽤KafkaConsumer的线程id和重⼊次数,KafkaConsumer的acquire()和release()⽅法实现了⼀个”轻量级锁“,它并⾮真正的锁,仅是检测是否有多线程并发操作KafkaConsumer⽽已
-
- 每⼀个KafkaConsumer实例内部都拥有⼀个SubscriptionState对象,subscribe内部调⽤了subscribe⽅法,subscribe⽅法订阅信息记录到 SubscriptionState ,多次订阅会覆盖旧数据。
-
- 更新metadata,判断如果metadata中不包含当前groupSubscription,开始标记更新(后⾯会有更新的逻辑),并且消费者侧的topic不会过期
消息消费过程
下⾯KafkaConsumer的核⼼⽅法poll是如何拉取消息的,先来看⼀下下⾯的代码:
poll
1 | public ConsumerRecords<K, V> poll(long timeout) { |
-
使⽤轻量级锁检测kafkaConsumer是否被其他线程使⽤
-
检查超时时间是否⼩于0,⼩于0抛出异常,停⽌消费
-
检查这个 consumer 是否订阅的相应的 topic-partition
-
调⽤ pollOnce() ⽅法获取相应的 records
-
在返回获取的 records 前,发送下⼀次的 fetch 请求,避免⽤户在下次请求时线程 block 在 pollOnce() ⽅法中
-
如果在给定的时间(timeout)内获取不到可⽤的 records,返回空数据
这⾥可以看出,poll ⽅法的真正实现是在 pollOnce ⽅法中,poll ⽅法通过 pollOnce ⽅法获取可⽤的数据
pollOnce
1 | // 除了获取新数据外,还会做⼀些必要的 offset-commit和reset-offset的操作 |
pollOnce 可以简单分为6步来看,其作⽤分别如下:
-
coordinator.poll()
获取 GroupCoordinator 的地址,并建⽴相应 tcp 连接,发送 join-group、sync-group,之后才真正加⼊到了⼀个 group 中,这时会获取其要消费的 topic-partition 列表,如果设置了⾃动 commit,也会在这⼀步进⾏ commit。总之,对于⼀个新建的 group,group 状态将会从 Empty –> PreparingRebalance –> AwaiSync –> Stable;
-
获取 GroupCoordinator 的地址,并建⽴相应 tcp 连接;
-
发送 join-group 请求,然后 group 将会进⾏ rebalance;
-
发送 sync-group 请求,之后才正在加⼊到了⼀个 group 中,这时会通过请求获取其要消费的 topicpartition 列表;
-
如果设置了⾃动 commit,也会在这⼀步进⾏ commit offset
-
-
updateFetchPositions()
这个⽅法主要是⽤来更新这个 consumer 实例订阅的 topic-partition 列表的 fetch-offset 信息。⽬的就是为了获取其订阅的每个 topic-partition 对应的 position,这样 Fetcher 才知道从哪个 offset 开始去拉取这个 topic-partition 的数据
1
2
3
4
5
6
7
8
9
10private void updateFetchPositions(Set<TopicPartition> partitions) {
// 先重置那些调⽤ seekToBegin 和 seekToEnd 的 offset 的 tp,设置其 the fetch position 的 offset
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions(partitions)) {
// 获取所有分配 tp 的 offset, 即 committed offset, 更新到 TopicPartitionState中的 committed offset 中
coordinator.refreshCommittedOffsetsIfNeeded();
// 如果 the fetch position 值⽆效,则将上步获取的 committed offset 设置为 the fetch position
fetcher.updateFetchPositions(partitions);
}
}在 Fetcher 中,这个 consumer 实例订阅的每个 topic-partition 都会有⼀个对应的 TopicPartitionState 对象,在这个对象中会记录以下这些内容:
1
2
3
4
5
6
7
8
9
10
11
12
13private static class TopicPartitionState {
// Fetcher 下次去拉取时的 offset,Fecher 在拉取时需要知道这个值
private Long position; // last consumed position
// 最后⼀次获取的⾼⽔位标记
private Long highWatermark; // the high watermark from last fetch
private Long lastStableOffset;
// consumer 已经处理完的最新⼀条消息的 offset,consumer 主动调⽤ offset-commit 时会更新这个值;
private OffsetAndMetadata committed; // last committed position
// 是否暂停
private boolean paused; // whether this partition has been paused by the user
// 这 topic-partition offset 重置的策略,重置之后,这个策略就会改为 null,防⽌再次操作
private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting
} -
fetcher.fetchedRecords()
返回其 fetched records,并更新其 fetch-position offset,只有在 offset-commit 时(⾃动 commit 时,是在第⼀步实现的),才会更新其 committed offset;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
// 在 max.poll.records 中设置单词最⼤的拉取条数
int recordsRemaining = maxPollRecords;
try {
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isFetched) {
// 从队列中获取但不移除此队列的头;如果此队列为空,返回null
CompletedFetch completedFetch = completedFetches.peek();
if (completedFetch == null) break;
// 获取下⼀个要处理的 nextInLineRecords
nextInLineRecords = parseCompletedFetch(completedFetch);
completedFetches.poll();
} else {
// 拉取records,更新 position
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
TopicPartition partition = nextInLineRecords.partition;
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
fetched.put(partition, records);
} else {
List<ConsumerRecord<K, V>> newRecords = new ArrayList<> (records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
} catch (KafkaException e) {
if (fetched.isEmpty()) throw e;
}
return fetched;
}
private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) {
if (!subscriptions.isAssigned(partitionRecords.partition)) {
log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
} else {
long position = subscriptions.position(partitionRecords.partition);
// 这个 tp 不能来消费了,⽐如调⽤ pause⽅法暂停消费
if (!subscriptions.isFetchable(partitionRecords.partition)) {
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
} else if (partitionRecords.nextFetchOffset == position) {
// 获取该 tp 对应的records,并更新 partitionRecords 的 fetchOffset(⽤于判断是否顺序)
List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords);
long nextOffset = partitionRecords.nextFetchOffset;
log.trace("Returning fetched records at offset {} for assigned partition {} and update " + "position to {}", position, partitionRecords.partition, nextOffset);
// 更新消费的到 offset( the fetch position)
subscriptions.position(partitionRecords.partition, nextOffset);
// 获取 Lag(即 position与 hw 之间差值),hw 为 null 时,才返回 null
Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel);
if (partitionLag != null) this.sensors.recordPartitionLag(partitionRecords.partition,partitionLag);
return partRecords;
} else {
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", partitionRecords.partition, partitionRecords.nextFetchOffset, position);
}
}
partitionRecords.drain();
return emptyList();
} -
fetcher.sendFetches()
只要订阅的 topic-partition list 没有未处理的 fetch 请求,就发送对这个 topic-partition 的 fetch 请求,在真正发送时,还是会按 node 级别去发送,leader 是同⼀个 node 的 topic-partition 会合成⼀个请求去发送;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36// 向订阅的所有 partition (只要该 leader 暂时没有拉取请求)所在 leader 发送 fetch 请求
public int sendFetches() {
// 1. 创建 Fetch Request
Map<Node, FetchRequest.Builder> fetchRequestMap = createFetchRequests();
for (Map.Entry<Node, FetchRequest.Builder> fetchEntry : fetchRequestMap.entrySet()) {
final FetchRequest.Builder request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();
log.debug("Sending {} fetch for partitions {} to broker {}", isolationLevel, request.fetchData().keySet(), fetchTarget);
// 2 发送 Fetch Request
client.send(fetchTarget, request).addListener(new RequestFutureListener<ClientResponse>() {
public void onSuccess(ClientResponse resp) {
FetchResponse response = (FetchResponse) resp.responseBody();
if (!matchesRequestedPartitions(request, response)) {
log.warn("Ignoring fetch response containing partitions {} since it does not match " + "the requested partitions {}", response.responseData().keySet(), request.fetchData().keySet());
return;
}
Set<TopicPartition> partitions = new HashSet<> (response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = request.fetchData().get(partition).fetchOffset;
FetchResponse.PartitionData fetchData = entry.getValue();
log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", isolationLevel, fetchOffset, partition, fetchData);
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, resp.requestHeader().apiVersion()));
}
sensors.fetchLatency.record(resp.requestLatencyMs());
}
public void onFailure(RuntimeException e) {
log.debug("Fetch request {} to {} failed", request.fetchData(), fetchTarget, e);
}
});
}
return fetchRequestMap.size();
}-
createFetchRequests():为订阅的所有 topic-partition list 创建 fetch 请求(只要该topic-partition 没有还在处理的请求),创建的 fetch 请求依然是按照 node 级别创建的;
-
client.send():发送 fetch 请求,并设置相应的 Listener,请求处理成功的话,就加⼊到 completedFetches 中,在加⼊这个 completedFetches 集合时,是按照 topic-partition 级别去加⼊,这样也就⽅便了后续的处理。
从这⾥可以看出,在每次发送 fetch 请求时,都会向所有可发送的 topic-partition 发送 fetch 请求,调⽤⼀次fetcher.sendFetches,拉取到的数据,可需要多次 pollOnce 循环才能处理完,因为 Fetcher 线程是在后台运⾏,这也保证了尽可能少地阻塞⽤户的处理线程,因为如果 Fetcher 中没有可处理的数据,⽤户的线程是会阻塞在 poll ⽅法中的
-
-
client.poll()
调⽤底层 NetworkClient 提供的接⼝去发送相应的请求;
-
coordinator.needRejoin()
如果当前实例分配的 topic-partition 列表发送了变化,那么这个 consumer group 就需要进⾏ rebalance
⾃动提交
最简单的提交⽅式是让悄费者⾃动提交偏移量。如果enable.auto.commit被设为 true,消费者会⾃动把从 poll() ⽅法接收到的最⼤偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。与消费者⾥的其他东⻄ ⼀样,⾃动提交也是在轮询(poll() )⾥进⾏的。消费者每次在进⾏轮询时会检查是否该提交偏移量了,如果是,那 么就会提交从上⼀次轮询返回的偏移量。
不过,这种简便的⽅式也会带来⼀些问题,来看⼀下下⾯的例⼦:
假设我们仍然使⽤默认的 5s提交时间间隔,在最近⼀次提交之后的 3s发⽣了再均衡,再 均衡之后,消费者从最后⼀次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减⼩可能出现重复消息的时间窗,不过这种情况是⽆也完全避免的。
⼿动提交
同步提交
取消⾃动提交,把 auto.commit.offset 设为 false,让应⽤程序决定何时提交 偏 移量。使⽤ commitSync() 提交偏移量最简单也最可靠。这个 API会提交由 poll() ⽅法返回 的最新偏移量,提交成 功后⻢上返回,如果提交失败就抛出异常。
1 | while (true) { |
异步提交
同步提交有⼀个不⾜之处,在 broker对提交请求作出回应之前,应⽤程序会⼀直阻塞,这样会限制应⽤程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发⽣了再均衡, 会增加重复消息的数量。这个时候可以使⽤异步提交 API。我们只管发送提交请求,⽆需等待 broker的响应。
1 | while (true) { |