Consumer示例

KafkaConsumer,消费者的根本⽬的是从Kafka服务端拉取消息,并交给业务逻辑进⾏处理。

开发⼈员不必关⼼与Kafka服务端之间⽹络连接的管理、⼼跳检测、请求超时重试等底层操作也不必关⼼订阅Topic的分区数量、分区Leader副本的⽹络拓扑以及消费组的Rebalance等细节,另外还提供了⾃动提交offset的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public static void main(String[] args) throws InterruptedException {
// 是否⾃动提交
Boolean autoCommit = false;
// 是否异步提交
Boolean isSync = true;
Properties props = new Properties();
// kafka地址,列表格式为host1:port1,host2:port2,…,⽆需添加所有的集群地址,kafka会根据提供的地址发现其他的地址(建议多提供⼏个,以防提供的服务器关闭)
props.put("bootstrap.servers", "localhost:9092");
// 消费组
props.put("group.id", "test");
// 开启⾃动提交offset
props.put("enable.auto.commit", autoCommit.toString());
// 1s⾃动提交
props.put("auto.commit.interval.ms", "1000");
// 消费者和群组协调器的最⼤⼼跳时间,如果超过该时间则认为该消费者已经死亡或者故障,需要踢出消费者组
props.put("session.timeout.ms", "60000");
// ⼀次poll间隔最⼤时间
props.put("max.poll.interval.ms", "1000");
// 当消费者读取偏移量⽆效的情况下,需要重置消费起始位置,默认为latest(从消费者启动后⽣成的记录),另外⼀个选项值是 earliest,将从有效的最⼩位移位置开始消费
props.put("auto.offset.reset", "latest");
// consumer端⼀次拉取数据的最⼤字节数
props.put("fetch.max.bytes", "1024000");
// key序列化⽅式
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// value序列化⽅式
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "lagou_edu";
// 订阅topic列表
consumer.subscribe(Arrays.asList(topic));
while (true) {
// 消息拉取
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
if (!autoCommit) {
if (isSync) {
// 处理完成单次消息以后,提交当前的offset,如果失败会⼀直重试直⾄成功
consumer.commitSync();
} else {
// 异步提交
consumer.commitAsync((offsets, exception) -> {
exception.printStackTrace();
System.out.println(offsets.size());
});
}
}
TimeUnit.SECONDS.sleep(3);
}
}

Kafka服务端并不会记录消费者的消费位置,⽽是由消费者⾃⼰决定如何保存如何记录其消费的offset。在Kafka服务端中添加了⼀个名为“__consumer_offsets"的内部topic来保存消费者提交的offset,当出现消费者上、下线时会触发Consumer Group进⾏Rebalance操作,对分区进⾏重新分配,待Rebalance操作完成后。消费者就可以读取该topic中记录的offset,并从此offset位置继续消费。当然,使⽤该topic记录消费者的offset只是默认选项,开发⼈员可以根据业务需求将offset记录在别的存储中。

在消费者消费消息的过程中,提交offset的时机⾮常重要,因为它决定了消费者故障重启后的消费位置。在上⾯的示例中,我们通过将 enable.auto.commit 选项设置为true可以起到⾃动提交offset的功能, auto.commit.interval.ms 选项则设置了⾃动提交的时间间隔。每次在调⽤ KafkaConsumer.poll() ⽅法时都会检测是否需要⾃动提交,并提交上次 poll() ⽅法返回的最后⼀个消息的offset。为了避免消息丢失,建议poll()⽅法之前要处理完上次poll()⽅法拉取的全部消息。KafkaConsumer中还提供了两个⼿动提交offset的⽅法,分别是 commitSync() 和 commitAsync() ,它们都可以指定提交的offset值,区别在于前者是同步提交,后者是异步提交。

KafkaConsumer实例化

了解了 KafkaConsumer 的基本使⽤,开始深⼊了解 KafkaConsumer 原理和实现,先看⼀下构造⽅法核⼼逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
try {
// 获取client.id,如果为空则默认⽣成⼀个,默认:consumer-1
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.isEmpty())
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
// 获取消费组名
String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
this.log = logContext.logger(getClass());
log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
this.time = Time.SYSTEM;
// 与⽣产者逻辑相同
Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)).timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS).recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))).tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
// 消费者拦截器
// load interceptors and make sure they get clientId
Map<String, Object> userProvidedConfigs = config.originals();
userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs, false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);

// key反序列化
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.keyDeserializer.configure(config.originals(), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keyDeserializer = keyDeserializer;
}
// value反序列化
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.valueDeserializer.configure(config.originals(), false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
this.valueDeserializer = valueDeserializer;
}
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), true, false, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections. <String>emptySet(), 0);
String metricGrpPrefix = "consumer";
ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
// 事务隔离级别
IsolationLevel isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics);
// ⽹络组件
NetworkClient netClient = new NetworkClient(new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics, time, metricGrpPrefix, channelBuilder, logContext), this.metadata, clientId,
100, // a fixed large enough value will suffice for max inflight requests
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
time, true, new ApiVersions(), throttleTimeSensor, logContext);
// 客户端
this.client = new ConsumerNetworkClient(logContext, netClient, metadata, time, retryBackoffMs, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
// offset重置策略,默认是⾃动提交
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(offsetResetStrategy);
this.assignors = config.getConfiguredInstances(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, PartitionAssignor.class);
// offset协调者
this.coordinator = new ConsumerCoordinator(logContext, this.client, groupId,
config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
assignors, this.metadata, this.subscriptions, metrics, metricGrpPrefix, this.time, retryBackoffMs,
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
// 拉取器
this.fetcher = new Fetcher<>(logContext, this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
this.keyDeserializer, this.valueDeserializer, this.metadata, this.subscriptions,
metrics, metricsRegistry.fetcherMetrics, this.time, this.retryBackoffMs, isolationLevel);
// 打印⽤户设置,但是没有使⽤的配置项
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka consumer initialized");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(0, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
}
}
}
    1. 初始化参数配置
      1. client.idgroup.id、消费者拦截器、key/value序列化、事务隔离级别
    1. 初始化⽹络客户端 NetworkClient
    1. 初始化消费者⽹络客户端 ConsumerNetworkClient
    1. 初始化offset提交策略,默认⾃动提交
    1. 初始化消费者协调器 ConsumerCoordinator
    1. 初始化拉取器 Fetcher

订阅Topic

下⾯我们先来看⼀下subscribe⽅法都有哪些逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
// 轻量级锁
acquireAndEnsureOpen();
try {
if (topics == null) {
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
} else if (topics.isEmpty()) {
// topics为空,则开始取消订阅的逻辑
this.unsubscribe();
} else {
// topic合法性判断,包含null或者空字符串直接抛异常
for (String topic : topics) {
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
// 如果没有消费协调者直接抛异常
throwIfNoAssignorsConfigured();
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
// 开始订阅
this.subscriptions.subscribe(new HashSet<>(topics), listener);
// 更新元数据,如果metadata当前不包括所有的topics则标记强制更新
metadata.setTopics(subscriptions.groupSubscription());
}
} finally {
release();
}
}

public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
if (listener == null)
throw new IllegalArgumentException("RebalanceListener cannot be null");
// 按照指定的Topic名字进⾏订阅,⾃动分配分区
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
// 监听
this.listener = listener;
// 修改订阅信息
changeSubscription(topics);
}

private void changeSubscription(Set<String> topicsToSubscribe) {
if (!this.subscription.equals(topicsToSubscribe)) {
// 如果使⽤AUTO_TOPICS或AUTO_PARTITION模式,则使⽤此集合记录所有订阅的Topic
this.subscription = topicsToSubscribe;
// Consumer Group中会选⼀个Leader,Leader会使⽤这个集合记录Consumer Group中所有消费者订阅的Topic,⽽其他的Follower的这个集合只会保存⾃身订阅的Topic
this.groupSubscription.addAll(topicsToSubscribe);
}
}
    1. KafkaConsumer不是线程安全类,开启轻量级锁,topics为空抛异常,topics是空集合开始取消订阅,再次判断topics集合中是否有⾮法数据,判断消费者协调者是否为空。开始订阅对应topic。listener默认为 NoOpConsumerRebalanceListener ,⼀个空操作
    • 轻量级锁:分别记录了当前使⽤KafkaConsumer的线程id和重⼊次数,KafkaConsumer的acquire()和release()⽅法实现了⼀个”轻量级锁“,它并⾮真正的锁,仅是检测是否有多线程并发操作KafkaConsumer⽽已
    1. 每⼀个KafkaConsumer实例内部都拥有⼀个SubscriptionState对象,subscribe内部调⽤了subscribe⽅法,subscribe⽅法订阅信息记录到 SubscriptionState ,多次订阅会覆盖旧数据。
    1. 更新metadata,判断如果metadata中不包含当前groupSubscription,开始标记更新(后⾯会有更新的逻辑),并且消费者侧的topic不会过期

消息消费过程

下⾯KafkaConsumer的核⼼⽅法poll是如何拉取消息的,先来看⼀下下⾯的代码:

poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public ConsumerRecords<K, V> poll(long timeout) {
// 使⽤轻量级锁检测kafkaConsumer是否被其他线程使⽤
acquireAndEnsureOpen();
try {
// 超时时间⼩于0抛异常
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
// 订阅类型为NONE抛异常,表示当前消费者没有订阅任何topic或者没有分配分区
if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
// 核⼼⽅法,拉取消息
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
// 如果拉取到了消息,发送⼀次消息拉取的请求,不会阻塞不会被中断
// 在返回数据之前,发送下次的 fetch 请求,避免⽤户在下次获取数据时线程block
if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup();
// 经过拦截器处理后返回
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<> (records));
}
long elapsed = time.milliseconds() - start;
// 拉取超时就结束
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}
  1. 使⽤轻量级锁检测kafkaConsumer是否被其他线程使⽤

  2. 检查超时时间是否⼩于0,⼩于0抛出异常,停⽌消费

  3. 检查这个 consumer 是否订阅的相应的 topic-partition

  4. 调⽤ pollOnce() ⽅法获取相应的 records

  5. 在返回获取的 records 前,发送下⼀次的 fetch 请求,避免⽤户在下次请求时线程 block 在 pollOnce() ⽅法中

  6. 如果在给定的时间(timeout)内获取不到可⽤的 records,返回空数据

这⾥可以看出,poll ⽅法的真正实现是在 pollOnce ⽅法中,poll ⽅法通过 pollOnce ⽅法获取可⽤的数据

pollOnce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 除了获取新数据外,还会做⼀些必要的 offset-commit和reset-offset的操作
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
client.maybeTriggerWakeup();
// 1. 获取 GroupCoordinator 地址并连接、加⼊ Group、sync Group、⾃动 commit, join 及 sync 期间 group 会进⾏ rebalance
coordinator.poll(time.milliseconds(), timeout);
// 2. 更新订阅的 topic-partition 的 offset(如果订阅的 topic-partition list 没有有效的 offset 的情况下)
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// 3. 获取 fetcher 已经拉取到的数据
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
// 4. 发送 fetch 请求,会从多个 topic-partition 拉取数据(只要对应的 topic-partition 没有未完成的请求)
fetcher.sendFetches();
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
// 5. 调⽤ poll ⽅法发送请求(底层发送请求的接⼝)
client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
});
// 6. 如果 group 需要 rebalance,直接返回空数据,这样更快地让 group 进⾏稳定状态
if (coordinator.needRejoin())
return Collections.emptyMap();
// 获取到请求的结果
return fetcher.fetchedRecords();
}

pollOnce 可以简单分为6步来看,其作⽤分别如下:

  • coordinator.poll()

    获取 GroupCoordinator 的地址,并建⽴相应 tcp 连接,发送 join-group、sync-group,之后才真正加⼊到了⼀个 group 中,这时会获取其要消费的 topic-partition 列表,如果设置了⾃动 commit,也会在这⼀步进⾏ commit。总之,对于⼀个新建的 group,group 状态将会从 Empty –> PreparingRebalance –> AwaiSync –> Stable;

    1. 获取 GroupCoordinator 的地址,并建⽴相应 tcp 连接;

    2. 发送 join-group 请求,然后 group 将会进⾏ rebalance;

    3. 发送 sync-group 请求,之后才正在加⼊到了⼀个 group 中,这时会通过请求获取其要消费的 topicpartition 列表;

    4. 如果设置了⾃动 commit,也会在这⼀步进⾏ commit offset

  • updateFetchPositions()

    这个⽅法主要是⽤来更新这个 consumer 实例订阅的 topic-partition 列表的 fetch-offset 信息。⽬的就是为了获取其订阅的每个 topic-partition 对应的 position,这样 Fetcher 才知道从哪个 offset 开始去拉取这个 topic-partition 的数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    private void updateFetchPositions(Set<TopicPartition> partitions) {
    // 先重置那些调⽤ seekToBegin 和 seekToEnd 的 offset 的 tp,设置其 the fetch position 的 offset
    fetcher.resetOffsetsIfNeeded(partitions);
    if (!subscriptions.hasAllFetchPositions(partitions)) {
    // 获取所有分配 tp 的 offset, 即 committed offset, 更新到 TopicPartitionState中的 committed offset 中
    coordinator.refreshCommittedOffsetsIfNeeded();
    // 如果 the fetch position 值⽆效,则将上步获取的 committed offset 设置为 the fetch position
    fetcher.updateFetchPositions(partitions);
    }
    }

    在 Fetcher 中,这个 consumer 实例订阅的每个 topic-partition 都会有⼀个对应的 TopicPartitionState 对象,在这个对象中会记录以下这些内容:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    private static class TopicPartitionState {
    // Fetcher 下次去拉取时的 offset,Fecher 在拉取时需要知道这个值
    private Long position; // last consumed position
    // 最后⼀次获取的⾼⽔位标记
    private Long highWatermark; // the high watermark from last fetch
    private Long lastStableOffset;
    // consumer 已经处理完的最新⼀条消息的 offset,consumer 主动调⽤ offset-commit 时会更新这个值;
    private OffsetAndMetadata committed; // last committed position
    // 是否暂停
    private boolean paused; // whether this partition has been paused by the user
    // 这 topic-partition offset 重置的策略,重置之后,这个策略就会改为 null,防⽌再次操作
    private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting
    }
  • fetcher.fetchedRecords()

    返回其 fetched records,并更新其 fetch-position offset,只有在 offset-commit 时(⾃动 commit 时,是在第⼀步实现的),才会更新其 committed offset;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
    Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
    // 在 max.poll.records 中设置单词最⼤的拉取条数
    int recordsRemaining = maxPollRecords;
    try {
    while (recordsRemaining > 0) {
    if (nextInLineRecords == null || nextInLineRecords.isFetched) {
    // 从队列中获取但不移除此队列的头;如果此队列为空,返回null
    CompletedFetch completedFetch = completedFetches.peek();
    if (completedFetch == null) break;
    // 获取下⼀个要处理的 nextInLineRecords
    nextInLineRecords = parseCompletedFetch(completedFetch);
    completedFetches.poll();
    } else {
    // 拉取records,更新 position
    List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
    TopicPartition partition = nextInLineRecords.partition;
    if (!records.isEmpty()) {
    List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
    if (currentRecords == null) {
    fetched.put(partition, records);
    } else {
    List<ConsumerRecord<K, V>> newRecords = new ArrayList<> (records.size() + currentRecords.size());
    newRecords.addAll(currentRecords);
    newRecords.addAll(records);
    fetched.put(partition, newRecords);
    }
    recordsRemaining -= records.size();
    }
    }
    }
    } catch (KafkaException e) {
    if (fetched.isEmpty()) throw e;
    }
    return fetched;
    }
    private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) {
    if (!subscriptions.isAssigned(partitionRecords.partition)) {
    log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
    } else {
    long position = subscriptions.position(partitionRecords.partition);
    // 这个 tp 不能来消费了,⽐如调⽤ pause⽅法暂停消费
    if (!subscriptions.isFetchable(partitionRecords.partition)) {
    log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
    } else if (partitionRecords.nextFetchOffset == position) {
    // 获取该 tp 对应的records,并更新 partitionRecords 的 fetchOffset(⽤于判断是否顺序)
    List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords);
    long nextOffset = partitionRecords.nextFetchOffset;
    log.trace("Returning fetched records at offset {} for assigned partition {} and update " + "position to {}", position, partitionRecords.partition, nextOffset);
    // 更新消费的到 offset( the fetch position)
    subscriptions.position(partitionRecords.partition, nextOffset);
    // 获取 Lag(即 position与 hw 之间差值),hw 为 null 时,才返回 null
    Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel);
    if (partitionLag != null) this.sensors.recordPartitionLag(partitionRecords.partition,partitionLag);
    return partRecords;
    } else {
    log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", partitionRecords.partition, partitionRecords.nextFetchOffset, position);
    }
    }
    partitionRecords.drain();
    return emptyList();
    }
  • fetcher.sendFetches()

    只要订阅的 topic-partition list 没有未处理的 fetch 请求,就发送对这个 topic-partition 的 fetch 请求,在真正发送时,还是会按 node 级别去发送,leader 是同⼀个 node 的 topic-partition 会合成⼀个请求去发送;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    // 向订阅的所有 partition (只要该 leader 暂时没有拉取请求)所在 leader 发送 fetch 请求
    public int sendFetches() {
    // 1. 创建 Fetch Request
    Map<Node, FetchRequest.Builder> fetchRequestMap = createFetchRequests();
    for (Map.Entry<Node, FetchRequest.Builder> fetchEntry : fetchRequestMap.entrySet()) {
    final FetchRequest.Builder request = fetchEntry.getValue();
    final Node fetchTarget = fetchEntry.getKey();
    log.debug("Sending {} fetch for partitions {} to broker {}", isolationLevel, request.fetchData().keySet(), fetchTarget);
    // 2 发送 Fetch Request
    client.send(fetchTarget, request).addListener(new RequestFutureListener<ClientResponse>() {
    @Override
    public void onSuccess(ClientResponse resp) {
    FetchResponse response = (FetchResponse) resp.responseBody();
    if (!matchesRequestedPartitions(request, response)) {
    log.warn("Ignoring fetch response containing partitions {} since it does not match " + "the requested partitions {}", response.responseData().keySet(), request.fetchData().keySet());
    return;
    }
    Set<TopicPartition> partitions = new HashSet<> (response.responseData().keySet());
    FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
    for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
    TopicPartition partition = entry.getKey();
    long fetchOffset = request.fetchData().get(partition).fetchOffset;
    FetchResponse.PartitionData fetchData = entry.getValue();
    log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", isolationLevel, fetchOffset, partition, fetchData);
    completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, resp.requestHeader().apiVersion()));
    }
    sensors.fetchLatency.record(resp.requestLatencyMs());
    }
    @Override
    public void onFailure(RuntimeException e) {
    log.debug("Fetch request {} to {} failed", request.fetchData(), fetchTarget, e);
    }
    });
    }
    return fetchRequestMap.size();
    }
    1. createFetchRequests():为订阅的所有 topic-partition list 创建 fetch 请求(只要该topic-partition 没有还在处理的请求),创建的 fetch 请求依然是按照 node 级别创建的;

    2. client.send():发送 fetch 请求,并设置相应的 Listener,请求处理成功的话,就加⼊到 completedFetches 中,在加⼊这个 completedFetches 集合时,是按照 topic-partition 级别去加⼊,这样也就⽅便了后续的处理。

    从这⾥可以看出,在每次发送 fetch 请求时,都会向所有可发送的 topic-partition 发送 fetch 请求,调⽤⼀次fetcher.sendFetches,拉取到的数据,可需要多次 pollOnce 循环才能处理完,因为 Fetcher 线程是在后台运⾏,这也保证了尽可能少地阻塞⽤户的处理线程,因为如果 Fetcher 中没有可处理的数据,⽤户的线程是会阻塞在 poll ⽅法中的

  • client.poll()

    调⽤底层 NetworkClient 提供的接⼝去发送相应的请求;

  • coordinator.needRejoin()

    如果当前实例分配的 topic-partition 列表发送了变化,那么这个 consumer group 就需要进⾏ rebalance

⾃动提交

最简单的提交⽅式是让悄费者⾃动提交偏移量。如果enable.auto.commit被设为 true,消费者会⾃动把从 poll() ⽅法接收到的最⼤偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。与消费者⾥的其他东⻄ ⼀样,⾃动提交也是在轮询(poll() )⾥进⾏的。消费者每次在进⾏轮询时会检查是否该提交偏移量了,如果是,那 么就会提交从上⼀次轮询返回的偏移量。

不过,这种简便的⽅式也会带来⼀些问题,来看⼀下下⾯的例⼦:

假设我们仍然使⽤默认的 5s提交时间间隔,在最近⼀次提交之后的 3s发⽣了再均衡,再 均衡之后,消费者从最后⼀次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减⼩可能出现重复消息的时间窗,不过这种情况是⽆也完全避免的。

⼿动提交

同步提交

取消⾃动提交,把 auto.commit.offset 设为 false,让应⽤程序决定何时提交 偏 移量。使⽤ commitSync() 提交偏移量最简单也最可靠。这个 API会提交由 poll() ⽅法返回 的最新偏移量,提交成 功后⻢上返回,如果提交失败就抛出异常。

1
2
3
4
5
6
7
8
9
while (true) {
// 消息拉取
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 处理完成单次消息以后,提交当前的offset,如果提交失败就抛出异常
consumer.commitSync();
}

异步提交

同步提交有⼀个不⾜之处,在 broker对提交请求作出回应之前,应⽤程序会⼀直阻塞,这样会限制应⽤程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发⽣了再均衡, 会增加重复消息的数量。这个时候可以使⽤异步提交 API。我们只管发送提交请求,⽆需等待 broker的响应。

1
2
3
4
5
6
7
8
9
10
11
12
while (true) {
// 消息拉取
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 异步提交
consumer.commitAsync((offsets, exception) -> {
exception.printStackTrace();
System.out.println(offsets.size());
});
}