概念

消费者和消费组

消费者从订阅的主题消费消息,消费消息的偏移量保存在Kafka的名字是 __consumer_offsets 的主题中。

消费者还可以将⾃⼰的偏移量存储到Zookeeper,需要设置offset.storage=zookeeper。

推荐使⽤Kafka存储消费者的偏移量。因为Zookeeper不适合⾼并发。

多个从同⼀个主题消费的消费者可以加⼊到⼀个消费组中。

消费组中的消费者共享group_id。

group_id通过消费者的配置指定:configs.put(“group.id”, “xxx”);

group_id⼀般设置为应⽤的逻辑名称。⽐如多个订单处理程序组成⼀个消费组,可以设置group_id为"order_process"。

消费组均衡地给消费者分配分区,每个分区只由消费组中⼀个消费者消费。

⼀个拥有四个分区的主题,包含⼀个消费者的消费组。此时,消费组中的消费者消费主题中的所有分区。并且没有重复的可能。


如果在消费组中添加⼀个消费者2,则每个消费者分别从两个分区接收消息。


如果消费组有四个消费者,则每个消费者可以分配到⼀个分区。


如果向消费组中添加更多的消费者,超过主题分区数量,则有⼀部分消费者就会闲置,不会接收任何消息。


向消费组添加消费者是横向扩展消费能⼒的主要⽅式。必要时,需要为主题创建⼤量分区,在负载增⻓时可以加⼊更多的消费者。但是不要让消费者的数量超过主题分区的数量。


除了通过增加消费者来横向扩展单个应⽤的消费能⼒之外,经常出现多个应⽤程序从同⼀个主题消费的情况。此时,每个应⽤都可以获取到所有的消息。只要保证每个应⽤都有⾃⼰的消费组,就可以让它们获取到主题所有的消息。

横向扩展消费者和消费组不会对性能造成负⾯影响。

为每个需要获取⼀个或多个主题全部消息的应⽤创建⼀个消费组,然后向消费组添加消费者来横向扩展消费能⼒和应⽤的处理能⼒,则每个消费者只处理⼀部分消息。

⼼跳机制

消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。


由于broker宕机,主题X的分区3宕机,此时分区3没有Leader副本,触发再平衡,消费者4没有对应的主题分区,则消费者4闲置。


Kafka 的⼼跳是 Kafka Consumer 和 Broker 之间的健康检查,只有当 Broker Coordinator 正常时,Consumer才会发送⼼跳。

Consumer 和 Rebalance 相关的 2 个配置参数:

参数 字段
session.timeout.ms MemberMetadata.sessionTimeoutMs
max.poll.interval.ms MemberMetadata.rebalanceTimeoutMs
  • broker 端,sessionTimeoutMs 参数

    broker 处理⼼跳的逻辑在 GroupCoordinator 类中:如果⼼跳超期, broker coordinator 会把消费者从 group 中移除,并触发 rebalance。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member:MemberMetadata) {
    // complete current heartbeat expectation
    member.latestHeartbeat = time.milliseconds()
    val memberKey = MemberKey(member.groupId, member.memberId)
    heartbeatPurgatory.checkAndComplete(memberKey)
    // reschedule the next heartbeat expiration deadline
    // 计算⼼跳截⽌时刻
    val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
    val delayedHeartbeat = new DelayedHeartbeat(this, group, member,newHeartbeatDeadline, member.sessionTimeoutMs)
    heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
    }
    // ⼼跳过期
    def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata,heartbeatDeadline: Long) {
    group.inLock {
    if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
    info(s"Member ${member.memberId} in group ${group.groupId} has failed,
    removing it from the group")
    removeMemberAndUpdateGroup(group, member)
    }
    }
    }
  • consumer 端:sessionTimeoutMs,rebalanceTimeoutMs 参数

如果客户端发现⼼跳超期,客户端会标记 coordinator 为不可⽤,并阻塞⼼跳线程;如果超过了 poll 消息的间隔超过了 rebalanceTimeoutMs,则 consumer 告知 broker 主动离开消费组,也会触发 rebalance

如:org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
if (coordinatorUnknown()) {
if (findCoordinatorFuture != null || lookupCoordinator().failed())
// the immediate future check ensures that we backoff properly in the case that no
// brokers are available to connect to.
AbstractCoordinator.this.wait(retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing a successful heartbeat, so we should
// probably make sure the coordinator is still healthy.
markCoordinatorUnknown();
} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff in case the heartbeat failed or the
// coordinator disconnected
AbstractCoordinator.this.wait(retryBackoffMs);
} else {
heartbeat.sentHeartbeat(now);
sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
heartbeat.receiveHeartbeat(time.milliseconds());
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
if (e instanceof RebalanceInProgressException) {
// it is valid to continue heartbeating while the group is rebalancing. This
// ensures that the coordinator keeps the member in the group for as long
// as the duration of the rebalance timeout. If we stop sending heartbeats,
// however, then the session timeout may expire before we can rejoin.
heartbeat.receiveHeartbeat(time.milliseconds());
} else {
heartbeat.failHeartbeat();
// wake up the thread if it's sleeping to reschedule the heartbeat
AbstractCoordinator.this.notify();
}
}
}
});
}

消息接收

必要参数配置

参数 说明
bootstrap.servers 向Kafka集群建⽴初始连接⽤到的host/port列表。
客户端会使⽤这⾥列出的所有服务器进⾏集群其他服务器的发现,⽽不管是否指定了哪个服务器⽤作引导。这个列表仅影响⽤来发现集群所有服务器的初始主机。
字符串形式:host1:port1,host2:port2,…
由于这组服务器仅⽤于建⽴初始链接,然后发现集群中的所有服务器,因此没有必要将集群中的所有地址写在这⾥。
⼀般最好两台,以防其中⼀台宕掉。
key.deserializer key的反序列化类,该类需要实现 org.apache.kafka.common.serialization.Deserializer 接⼝。
value.deserializer 实现了 org.apache.kafka.common.serialization.Deserializer 接⼝的反序列化器,⽤于对消息的value进⾏反序列化。
client.id 当从服务器消费消息的时候向服务器发送的id字符串。
在ip/port基础上提供应⽤的逻辑名称,记录在服务端的请求⽇志中,⽤于追踪请求的源。
group.id ⽤于唯⼀标志当前消费者所属的消费组的字符串。
如果消费者使⽤组管理功能如subscribe(topic)或使⽤基于Kafka的偏移量管理策略,该项必须设置。
auto.offset.reset 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理?
earliest:⾃动重置偏移量到最早的偏移量
latest:⾃动重置偏移量为最新的偏移量
none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常
anything:向消费者抛异常
enable.auto.commit 如果设置为true,消费者会⾃动周期性地向服务器提交偏移量。

订阅

consumer 采⽤ pull 模式从 broker 中读取数据。

采⽤ pull 模式,consumer 可⾃主控制消费消息的速率, 可以⾃⼰控制消费⽅式(批量消费/逐条消费),还可以选择不同的提交⽅式从⽽实现不同的传输语义。

consumer.subscribe(“tp_demo_01,tp_demo_02”)

反序列化

Kafka的broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进⾏反序列化处理,然后才能交给⽤户程序消费处理。

消费者的反序列化器包括key的和value的反序列化器。如:key.deserializer,value.deserializer。

消费者从订阅的主题拉取消息:consumer.poll(3_000);在Fetcher类中,对拉取到的消息⾸先进⾏反序列化处理。


Kafka默认提供了⼏个反序列化的实现:org.apache.kafka.common.serialization 包下包含了这⼏个实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package org.apache.kafka.common.serialization;

import java.util.Map;

public class ByteArrayDeserializer implements Deserializer<byte[]> {
public ByteArrayDeserializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
}

public byte[] deserialize(String topic, byte[] data) {
return data;
}

public void close() {
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package org.apache.kafka.common.serialization;

import java.nio.ByteBuffer;
import java.util.Map;

public class ByteBufferDeserializer implements Deserializer<ByteBuffer> {
public ByteBufferDeserializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
}

public ByteBuffer deserialize(String topic, byte[] data) {
return data == null ? null : ByteBuffer.wrap(data);
}

public void close() {
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package org.apache.kafka.common.serialization;

import java.util.Map;
import org.apache.kafka.common.utils.Bytes;

public class BytesDeserializer implements Deserializer<Bytes> {
public BytesDeserializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
}

public Bytes deserialize(String topic, byte[] data) {
return data == null ? null : new Bytes(data);
}

public void close() {
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package org.apache.kafka.common.serialization;

import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;

public class DoubleDeserializer implements Deserializer<Double> {
public DoubleDeserializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
}

public Double deserialize(String topic, byte[] data) {
if (data == null) {
return null;
} else if (data.length != 8) {
throw new SerializationException("Size of data received by Deserializer is not 8");
} else {
long value = 0L;
byte[] arr$ = data;
int len$ = data.length;

for(int i$ = 0; i$ < len$; ++i$) {
byte b = arr$[i$];
value <<= 8;
value |= (long)(b & 255);
}

return Double.longBitsToDouble(value);
}
}

public void close() {
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

public class FloatDeserializer implements Deserializer<Float> {
public FloatDeserializer() {
}

public Float deserialize(String topic, byte[] data) {
if (data == null) {
return null;
} else if (data.length != 4) {
throw new SerializationException("Size of data received by Deserializer is not 4");
} else {
int value = 0;
byte[] var4 = data;
int var5 = data.length;

for(int var6 = 0; var6 < var5; ++var6) {
byte b = var4[var6];
value <<= 8;
value |= b & 255;
}

return Float.intBitsToFloat(value);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package org.apache.kafka.common.serialization;

import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;

public class IntegerDeserializer implements Deserializer<Integer> {
public IntegerDeserializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
}

public Integer deserialize(String topic, byte[] data) {
if (data == null) {
return null;
} else if (data.length != 4) {
throw new SerializationException("Size of data received by IntegerDeserializer is not 4");
} else {
int value = 0;
byte[] arr$ = data;
int len$ = data.length;

for(int i$ = 0; i$ < len$; ++i$) {
byte b = arr$[i$];
value <<= 8;
value |= b & 255;
}

return value;
}
}

public void close() {
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

public class LongDeserializer implements Deserializer<Long> {
public LongDeserializer() {
}

public Long deserialize(String topic, byte[] data) {
if (data == null) {
return null;
} else if (data.length != 8) {
throw new SerializationException("Size of data received by LongDeserializer is not 8");
} else {
long value = 0L;
byte[] var5 = data;
int var6 = data.length;

for(int var7 = 0; var7 < var6; ++var7) {
byte b = var5[var7];
value <<= 8;
value |= (long)(b & 255);
}

return value;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

public class ShortDeserializer implements Deserializer<Short> {
public ShortDeserializer() {
}

public Short deserialize(String topic, byte[] data) {
if (data == null) {
return null;
} else if (data.length != 2) {
throw new SerializationException("Size of data received by ShortDeserializer is not 2");
} else {
short value = 0;
byte[] var4 = data;
int var5 = data.length;

for(int var6 = 0; var6 < var5; ++var6) {
byte b = var4[var6];
value = (short)(value << 8);
value = (short)(value | b & 255);
}

return value;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package org.apache.kafka.common.serialization;

import java.io.UnsupportedEncodingException;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;

public class StringDeserializer implements Deserializer<String> {
private String encoding = "UTF8";

public StringDeserializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null) {
encodingValue = configs.get("deserializer.encoding");
}

if (encodingValue != null && encodingValue instanceof String) {
this.encoding = (String)encodingValue;
}

}

public String deserialize(String topic, byte[] data) {
try {
return data == null ? null : new String(data, this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + this.encoding);
}
}

public void close() {
}
}

⾃定义反序列化

    1. 实现 org.apache.kafka.common.serialization.Deserializer 接⼝。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    package com.lagou.kafka.demo.deserializer;

    import com.lagou.kafka.demo.entity.User;
    import org.apache.kafka.common.serialization.Deserializer;

    import java.nio.ByteBuffer;
    import java.util.Map;

    public class UserDeserializer implements Deserializer<User> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public User deserialize(String topic, byte[] data) {
    ByteBuffer buffer = ByteBuffer.allocate(data.length);

    buffer.put(data);
    buffer.flip();

    final int userId = buffer.getInt();
    final int usernameLength = buffer.getInt();

    String username = new String(data, 8, usernameLength);

    return new User(userId, username);
    }

    @Override
    public void close() {

    }
    }
    1. 在Consumer中设置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    package com.lagou.kafka.demo.consumer;

    import com.lagou.kafka.demo.deserializer.UserDeserializer;
    import com.lagou.kafka.demo.entity.User;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;

    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.function.Consumer;

    public class UserConsumer {

    public static void main(String[] args) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    // 设置自定义的反序列化器
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);

    configs.put(ConsumerConfig.GROUP_ID_CONFIG, "user_consumer");
    configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_id");
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KafkaConsumer<String, User> consumer = new KafkaConsumer<String, User>(configs);

    // 订阅主题
    consumer.subscribe(Collections.singleton("tp_user_01"));

    final ConsumerRecords<String, User> records = consumer.poll(Long.MAX_VALUE);

    records.forEach(new Consumer<ConsumerRecord<String, User>>() {
    @Override
    public void accept(ConsumerRecord<String, User> record) {
    System.out.println(record.value());
    }
    });

    // 关闭消费者
    consumer.close();

    }

    }

位移提交

  1. Consumer需要向Kafka记录⾃⼰的位移数据,这个汇报过程称为 提交位移(Committing Offsets)

  2. Consumer 需要为分配给它的每个分区提交各⾃的位移数据

  3. 位移提交的由Consumer端负责的,Kafka只负责保管。__consumer_offsets

  4. 位移提交分为⾃动提交和⼿动提交

  5. 位移提交分为同步提交和异步提交

⾃动提交

开启⾃动提交: enable.auto.commit=true; 配置⾃动提交间隔:Consumer端: auto.commit.interval.ms ,默认 5s。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "node1:9092");
configs.put("group.id", "mygrp");
// 设置偏移量⾃动提交。⾃动提交是默认值。这⾥做示例。
configs.put("enable.auto.commit", "true");
// 偏移量⾃动提交的时间间隔
configs.put("auto.commit.interval.ms", "3000");
configs.put("key.deserializer", StringDeserializer.class);
configs.put("value.deserializer", StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Collections.singleton("tp_demo_01"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic()
+ "\t" + record.partition()
+ "\t" + record.offset()
+ "\t" + record.key()
+ "\t" + record.value());
}
}
  • ⾃动提交位移的顺序

    • 配置 enable.auto.commit = true

    • Kafka会保证在开始调⽤poll⽅法时,提交上次poll返回的所有消息

    • 因此⾃动提交不会出现消息丢失,但会重复消费

  • 重复消费举例

    • Consumer 每 5s 提交 offset

    • 假设提交 offset 后的 3s 发⽣了 Rebalance

    • Rebalance 之后的所有 Consumer 从上⼀次提交的 offset 处继续消费

    • 因此 Rebalance 发⽣前 3s 的消息会被重复消费

手动同步提交

使⽤ KafkaConsumer#commitSync():会提交 KafkaConsumer#poll() 返回的最新 offset。

该⽅法为同步操作,等待直到 offset 被成功提交才返回。

1
2
3
4
5
6
7
8
9
10
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}

commitSync 在处理完所有消息之后

⼿动同步提交可以控制offset提交的时机和频率

  • ⼿动同步提交会:

    • 调⽤ commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果

    • 会影响 TPS

    • 可以选择拉⻓提交间隔,但有以下问题

      • 会导致 Consumer 的提交频率下降

      • Consumer 重启后,会有更多的消息被消费

手动异步提交

使用 KafkaConsumer#commitAsync()

1
2
3
4
5
6
7
8
9
while (true) {
ConsumerRecords<String, String> records = consumer.poll(3_000);
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
handle(exception);
}
});
}

commitAsync出现问题不会⾃动重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try {
while(true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使⽤异步提交规避阻塞
}
} catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后⼀次提交使⽤同步阻塞式提交
} finally {
consumer.close();
}
}

消费者位移管理

Kafka中,消费者根据消息的位移顺序消费消息。

消费者的位移由消费者管理,可以存储于zookeeper中,也可以存储于Kafka主题__consumer_offsets中。

Kafka提供了消费者API,让消费者可以管理⾃⼰的位移。

API 说明
public void assign(Collection partitions) 给当前消费者⼿动分配⼀系列主题分区。
⼿动分配分区不⽀持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配。
如果给出的主题分区是空的,则等价于调⽤unsubscribe⽅法。
⼿动分配主题分区的⽅法不使⽤消费组管理功能。当消费组成员变了,或者集群或主题的元数据改变了,不会触发分区分配的再平衡。
⼿动分区分配assign(Collection)不能和⾃动分区分配subscribe(Collection,ConsumerRebalanceListener)⼀起使⽤。
如果启⽤了⾃动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区分配中的消费偏移量进⾏异步提交。
public Set assignment() 获取给当前消费者分配的分区集合。如果订阅是通过调⽤assign⽅法直接分配主题分区,则返回相同的集合。
如果使⽤了主题订阅,该⽅法返回当前分配给该消费者的主题分区集合。
如果分区订阅还没开始进⾏分区分配,或者正在重新分配分区,则会返回none。
public Map<String, List> listTopics() 获取对⽤户授权的所有主题分区元数据。该⽅法会对服务器发起远程调⽤。
public List partitionsFor(String topic) 获取指定主题的分区元数据。如果当前消费者没有关于该主题的元数据,就会对服务器发起远程调⽤。
public Map<TopicPartition, Long> beginningOffsets(Collection partitions) 对于给定的主题分区,列出它们第⼀个消息的偏移量。
注意,如果指定的分区不存在,该⽅法可能会永远阻塞。
该⽅法不改变分区的当前消费者偏移量。
public void seekToEnd(Collection partitions) 将偏移量移动到每个给定分区的最后⼀个。
该⽅法延迟执⾏,只有当调⽤过poll⽅法或position⽅法之后才可以使⽤。
如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后。
如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消费偏移量移动到最后⼀个稳定的偏移量,即下⼀个要消费的消息现在还是未提交状态的事务消息。
public void seek(TopicPartition partition, long offset) 将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下⼀条要消费的消息偏移量。
若该⽅法多次调⽤,则最后⼀次的覆盖前⾯的。
如果在消费中间随意使⽤,可能会丢失数据。
public long position(TopicPartition partition) 检查指定主题分区的消费偏移量。
public void seekToBeginning(Collection partitions) 将给定每个分区的消费者偏移量移动到它们的起始偏移量。
该⽅法懒执⾏,只有当调⽤过poll⽅法或position⽅法之后才会执⾏。
如果没有提供分区,则将所有分配给当前消费者的分区消费偏移量移动到起始偏移量。
    1. 准备数据
    1
    2
    3
    4
    5
    6
    7
    8
    # ⽣成消息⽂件
    [root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> nm.txt; done

    # 创建主题,三个分区,每个分区⼀个副本
    [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_demo_01 --partitions 3 --replication-factor 1

    # 将消息⽣产到主题中
    [root@node1 ~]# kafka-console-producer.sh --broker-list 192.168.91.121:9092 --topic tp_demo_01 < nm.txt
    1. API实战
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    package com.lagou.kafka.demo.consumer;

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.Node;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import java.util.*;

    /**
    * 消费者位移管理
    */
    public class MyConsumerMgr1 {
    public static void main(String[] args) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String> (configs);

    /**
    * 给当前消费者⼿动分配⼀系列主题分区。
    * ⼿动分配分区不⽀持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配。
    * 如果给出的主题分区是空的,则等价于调⽤unsubscribe⽅法。
    * ⼿动分配主题分区的⽅法不使⽤消费组管理功能。当消费组成员变了,或者集群或主题的元数据改变了,不会触发分区分配的再平衡。
    * ⼿动分区分配assign(Collection)不能和⾃动分区分配subscribe(Collection,ConsumerRebalanceListener)⼀起使⽤。
    * 如果启⽤了⾃动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区分配中的消费偏移量进⾏异步提交。
    */

    // consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 0)));
    // Set<TopicPartition> assignment = consumer.assignment();
    // for (TopicPartition topicPartition : assignment) {
    // System.out.println(topicPartition);
    // }

    // 获取对⽤户授权的所有主题分区元数据。该⽅法会对服务器发起远程调⽤。
    // Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
    // stringListMap.forEach((k, v) -> {
    // System.out.println("主题:" + k);
    // v.forEach(info -> {
    // System.out.println(info);
    // });
    // });

    // Set<String> strings = consumer.listTopics().keySet();
    // strings.forEach(topicName -> {
    // System.out.println(topicName);
    // });

    // List<PartitionInfo> partitionInfos = consumer.partitionsFor("tp_demo_01");
    // for (PartitionInfo partitionInfo : partitionInfos) {
    // Node leader = partitionInfo.leader();
    // System.out.println(leader);
    // System.out.println(partitionInfo);
    // // 当前分区在线副本
    // Node[] nodes = partitionInfo.inSyncReplicas();
    // // 当前分区下线副本
    // Node[] nodes1 = partitionInfo.offlineReplicas();
    // }

    // ⼿动分配主题分区给当前消费者
    consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 0),new TopicPartition("tp_demo_01", 1),new TopicPartition("tp_demo_01", 2)));

    // 列出当前主题分配的所有主题分区
    // Set<TopicPartition> assignment = consumer.assignment();
    // assignment.forEach(k -> {
    // System.out.println(k);
    // });

    // 对于给定的主题分区,列出它们第⼀个消息的偏移量。
    // 注意,如果指定的分区不存在,该⽅法可能会永远阻塞。
    // 该⽅法不改变分区的当前消费者偏移量。
    // Map<TopicPartition, Long> topicPartitionLongMap = consumer.beginningOffsets(consumer.assignment());
    // topicPartitionLongMap.forEach((k, v) -> {
    // System.out.println("主题:" + k.topic() + "\t分区:" + k.partition() + "偏移量\t" + v);
    // });

    // 将偏移量移动到每个给定分区的最后⼀个。
    // 该⽅法延迟执⾏,只有当调⽤过poll⽅法或position⽅法之后才可以使⽤。
    // 如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后。
    // 如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消费偏移量移动到
    // 最后⼀个稳定的偏移量,即下⼀个要消费的消息现在还是未提交状态的事务消息。
    // consumer.seekToEnd(consumer.assignment());

    // 将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下⼀条要消费的消息偏移量。
    // 若该⽅法多次调⽤,则最后⼀次的覆盖前⾯的。
    // 如果在消费中间随意使⽤,可能会丢失数据。
    // consumer.seek(new TopicPartition("tp_demo_01", 1), 10);

    // 检查指定主题分区的消费偏移量
    // long position = consumer.position(new TopicPartition("tp_demo_01", 1));
    // System.out.println(position);

    consumer.seekToEnd(Arrays.asList(new TopicPartition("tp_demo_01", 1)));
    // 检查指定主题分区的消费偏移量
    long position = consumer.position(new TopicPartition("tp_demo_01", 1));
    System.out.println(position);
    // 关闭⽣产者
    consumer.close();
    }
    }

再均衡

再均衡其实就是⼀个协议,它规定了如何让消费者组下的所有消费者来分配topic中的每⼀个分区。⽐如⼀个topic有100个分区,⼀个消费者组内有20个消费者,在协调者的控制下让组内每⼀个消费者分配到5个分区,这个分配的过程就是再均衡。

再均衡的触发条件主要有三个:

    1. 消费者组内成员发⽣变更,这个变更包括了增加和减少消费者,⽐如消费者宕机退出消费组。

    可以查看心跳机制,有相关例图。

    1. 主题的分区数发⽣变更,kafka⽬前只⽀持增加分区,当增加的时候就会触发再均衡

    1. 订阅的主题发⽣变化,当消费者组使⽤正则表达式订阅主题,⽽恰好⼜新建了对应的主题,就会触发再均衡

为什么说再均衡为⼈诟病呢?

因为再均衡过程中,消费者⽆法从kafka消费消息,这对kafka的TPS影响极⼤,⽽如果kafka集内节点较多,⽐如数百个,那再均衡可能会耗时极多。数分钟到数⼩时都有可能,⽽这段时间kafka基本处于不可⽤状态。所以在实际环境中,应该尽量避免再均衡发⽣。

  • 避免重平衡

    要说完全避免重平衡,是不可能,因为你⽆法完全保证消费者不会故障。⽽消费者故障其实也是最常⻅的引发重平衡的地⽅,所以我们需要保证尽⼒避免消费者故障。

    ⽽其他⼏种触发重平衡的⽅式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主动控制。

    如果消费者真正挂掉了,就没办法了,但实际中,会有⼀些情况,kafka错误地认为⼀个正常的消费者已经挂掉了,我们要的就是避免这样的情况出现。

    • ⾸先要知道哪些情况会出现错误判断挂掉的情况。

      在分布式系统中,通常是通过⼼跳来维持分布式系统的,kafka也不例外。

      在分布式系统中,由于⽹络问题你不清楚没接收到⼼跳,是因为对⽅真正挂了还是只是因为负载过重没来得及发⽣⼼跳或是⽹络堵塞。所以⼀般会约定⼀个时间,超时即判定对⽅挂了。⽽在kafka消费者场景中,session.timout.ms参数就是规定这个超时时间是多少。

      还有⼀个参数,heartbeat.interval.ms,这个参数控制发送⼼跳的频率,频率越⾼越不容易被误判,但也会消耗更多资源。

      此外,还有最后⼀个参数,max.poll.interval.ms,消费者poll数据后,需要⼀些处理,再进⾏拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过 max.poll.interval.ms 这个参数的值。这个参数的默认值是5分钟,⽽如果消费者接收到数据后会执⾏耗时的操作,则应该将其设置得⼤⼀些。

      三个参数:session.timout.ms控制⼼跳超时时间,heartbeat.interval.ms控制⼼跳发送频率,max.poll.interval.ms控制poll的间隔。

      这⾥给出⼀个相对较为合理的配置,如下:

      session.timout.ms:设置为6s

      heartbeat.interval.ms:设置2s

      max.poll.interval.ms:推荐为消费者处理消息最⻓耗时再加1分钟

消费者拦截器

消费者在拉取了分区消息之后,要⾸先经过反序列化器对key和value进⾏反序列化处理。

处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应⽤程序进⾏处理。


系统提供 org.apache.kafka.clients.consumer.ConsumerInterceptor<K, V> 接⼝。

  1. ⼀个可插拔接⼝,允许拦截甚⾄更改消费者接收到的消息。⾸要的⽤例在于将第三⽅组件引⼊消费者应⽤程序,⽤于定制的监控、⽇志处理等。

  2. 该接⼝的实现类通过configre⽅法获取消费者配置的属性,如果消费者配置中没有指定clientID,还可以获取KafkaConsumer⽣成的clientId。获取的这个配置是跟其他拦截器共享的,需要保证不会在各个拦截器之间产⽣冲突。

  3. ConsumerInterceptor⽅法抛出的异常会被捕获、记录,但是不会向下传播。如果⽤户配置了错误的key或value类型参数,消费者不会抛出异常,⽽仅仅是记录下来。

  4. ConsumerInterceptor回调发⽣在org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)⽅法同⼀个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;

public interface ConsumerInterceptor<K, V> extends Configurable {
/**
*
* 该⽅法在poll⽅法返回之前调⽤。调⽤结束后poll⽅法就返回消息了。
*
* 该⽅法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或⽣成新的消息。
* 如果有多个拦截器,则该⽅法按照KafkaConsumer的configs中配置的顺序调⽤。
*
* @param records 由上个拦截器返回的由客户端消费的消息。
*/
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

/**
* 当消费者提交偏移量时,调⽤该⽅法。
* 该⽅法抛出的任何异常调⽤者都会忽略。
*/
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

public void close();
}
  • 实现 org.apache.kafka.clients.consumer.ConsumerInterceptor<K, V> 接⼝

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    package com.lagou.kafka.demo.interceptor;

    import org.apache.kafka.clients.consumer.ConsumerInterceptor;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;

    import java.util.Map;

    public class OneInterceptor implements ConsumerInterceptor<String, String> {
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
    // poll方法返回结果之前最后要调用的方法
    System.out.println("One -- 开始");

    // 消息不做处理,直接返回
    return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    // 消费者提交偏移量的时候,经过该方法
    System.out.println("One -- 结束");
    }

    @Override
    public void close() {
    // 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
    }

    @Override
    public void configure(Map<String, ?> configs) {
    // 用于获取消费者的设置参数
    configs.forEach((k, v) -> {
    System.out.println(k + "\t" + v);
    });
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    package com.lagou.kafka.demo.interceptor;

    import org.apache.kafka.clients.consumer.ConsumerInterceptor;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;

    import java.util.Map;

    public class TwoInterceptor implements ConsumerInterceptor<String, String> {
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
    // poll方法返回结果之前最后要调用的方法
    System.out.println("Two -- 开始");

    // 消息不做处理,直接返回
    return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    // 消费者提交偏移量的时候,经过该方法
    System.out.println("Two -- 结束");
    }

    @Override
    public void close() {
    // 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
    }

    @Override
    public void configure(Map<String, ?> configs) {
    // 用于获取消费者的设置参数
    configs.forEach((k, v) -> {
    System.out.println(k + "\t" + v);
    });
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    package com.lagou.kafka.demo.interceptor;

    import org.apache.kafka.clients.consumer.ConsumerInterceptor;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;

    import java.util.Map;

    public class ThreeInterceptor implements ConsumerInterceptor<String, String> {
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
    // poll方法返回结果之前最后要调用的方法
    System.out.println("Three -- 开始");

    // 消息不做处理,直接返回
    return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    // 消费者提交偏移量的时候,经过该方法
    System.out.println("Three -- 结束");
    }

    @Override
    public void close() {
    // 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
    }

    @Override
    public void configure(Map<String, ?> configs) {
    // 用于获取消费者的设置参数
    configs.forEach((k, v) -> {
    System.out.println(k + "\t" + v);
    });
    }
    }
  • consumer类中配置自定义的拦截器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    package com.lagou.kafka.demo.consumer;

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;

    import java.util.Collections;
    import java.util.Properties;

    public class MyConsumer {
    public static void main(String[] args) {
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mygrp");
    // props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "myclient");
    // 如果在kafka中找不到当前消费者的偏移量,则设置为最旧的
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // 配置拦截器
    // One -> Two -> Three,接收消息和发送偏移量确认都是这个顺序
    props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
    "com.lagou.kafka.demo.interceptor.OneInterceptor" +
    ",com.lagou.kafka.demo.interceptor.TwoInterceptor" +
    ",com.lagou.kafka.demo.interceptor.ThreeInterceptor"
    );

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

    // 订阅主题
    consumer.subscribe(Collections.singleton("tp_demo_01"));

    while (true) {
    final ConsumerRecords<String, String> records = consumer.poll(3_000);
    records.forEach(record -> {
    System.out.println(record.topic()
    + "\t" + record.partition()
    + "\t" + record.offset()
    + "\t" + record.key()
    + "\t" + record.value());
    });
    // consumer.commitAsync();
    // consumer.commitSync();
    }
    // consumer.close();
    }
    }

消费者参数配置补充

配置项 说明
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为true,则该值定义了消费者偏移量向Kafka提交的频率。
fetch.min.bytes 服务器对每个拉取消息的请求返回的数据量最⼩值。
如果数据量达不到这个值,请求等待,以让更多的数据累积,达到这个值之后响应请求。
默认设置是1个字节,表示只要有⼀个字节的数据,就⽴即响应请求,或者在没有数据的时候请求超时。
将该值设置为⼤⼀点⼉的数字,会让服务器等待稍微⻓⼀点⼉的时间以累积数据。
如此则可以提⾼服务器的吞吐量,代价是额外的延迟时间。
fetch.max.wait.ms 如果服务器端的数据量达不到 fetch.min.bytes 的话,服务器端不能⽴即响应请求。
该时间⽤于配置服务器端阻塞请求的最⼤时⻓。
fetch.max.bytes 服务器给单个拉取请求返回的最⼤数据量。
消费者批量拉取消息,如果第⼀个⾮空消息批次的值⽐该值⼤,消息批也会返回,以让消费者可以接着进⾏。
即该配置并不是绝对的最⼤值。
broker可以接收的消息批最⼤值通过message.max.bytes (broker配置) 或 max.message.bytes (主题配置)来指定。
需要注意的是,消费者⼀般会并发拉取请求。
connections.max.idle.ms 在这个时间之后关闭空闲的连接。
check.crcs ⾃动计算被消费的消息的CRC32校验值。
可以确保在传输过程中或磁盘存储过程中消息没有被破坏。
它会增加额外的负载,在追求极致性能的场合禁⽤。
exclude.internal.topics 是否内部主题应该暴露给消费者。
如果该条⽬设置为true,则只能先订阅再拉取。
isolation.level 控制如何读取事务消息。
如果设置了 read_committed ,消费者的poll()⽅法只会返回已经提交的事务消息。
如果设置了 read_uncommitted (默认值),消费者的poll⽅法返回所有的消息,即使是已经取消的事务消息。
⾮事务消息以上两种情况都返回。
消息总是以偏移量的顺序返回。
read_committed 只能返回到达LSO的消息。
在LSO之后出现的消息只能等待相关的事务提交之后才能看到结果, read_committed 模式,如果有为提交的事务,消费者不能读取到直到HW的消息。
read_committed 的seekToEnd⽅法返回LSO。
heartbeat.interval.ms 当使⽤消费组的时候,该条⽬指定消费者向消费者协调器发送⼼跳的时间间隔。
⼼跳是为了确保消费者会话的活跃状态,同时在消费者加⼊或离开消费组的时候⽅便进⾏再平衡。
该条⽬的值必须⼩于 session.timeout.ms ,也不应该⾼于 session.timeout.ms 的1/3。
可以将其调整得更⼩,以控制正常重新平衡的预期时间。
session.timeout.ms 当使⽤Kafka的消费组的时候,消费者周期性地向broker发送⼼跳数据,表明⾃⼰的存在。
如果经过该超时时间还没有收到消费者的⼼跳,则broker将消费者从消费组移除,并启动再平衡。
该值必须在broker配置 group.min.session.timeout.msgroup.max.session.timeout.ms 之间。
max.poll.records ⼀次调⽤poll()⽅法返回的记录最⼤数量。
max.poll.interval.ms 使⽤消费组的时候调⽤poll()⽅法的时间间隔。
该条⽬指定了消费者调⽤poll()⽅法的最⼤时间间隔。
如果在此时间内消费者没有调⽤poll()⽅法,则broker认为消费者失败,触发再平衡,将分区分配给消费组中其他消费者。
max.partition.fetch.bytes 对每个分区,服务器返回的最⼤数量。消费者按批次拉取数据。
如果⾮空分区的第⼀个记录⼤于这个值,批处理依然可以返回,以保证消费者可以进⾏下去。
broker接收批的⼤⼩由 message.max.bytes (broker参数)或 max.message.bytes (主题参数)指定。
fetch.max.bytes ⽤于限制消费者单次请求的数据量。
send.buffer.bytes ⽤于TCP发送数据时使⽤的缓冲⼤⼩(SO_SNDBUF),-1表示使⽤OS默认的缓冲区⼤⼩。
retry.backoff.ms 在发⽣失败的时候如果需要重试,则该配置表示客户端等待多⻓时间再发起重试。
该时间的存在避免了密集循环。
request.timeout.ms 客户端等待服务端响应的最⼤时间。如果该时间超时,则客户端要么重新发起请求,要么如果重试耗尽,请求失败。
reconnect.backoff.ms 重新连接主机的等待时间。避免了重连的密集循环。
该等待时间应⽤于该客户端到broker的所有连接。
reconnect.backoff.max.ms 重新连接到反复连接失败的broker时要等待的最⻓时间(以毫秒为单位)。
如果提供此选项,则对于每个连续的连接失败,每台主机的退避将成倍增加,直⾄达到此最⼤值。
在计算退避增量之后,添加20%的随机抖动以避免连接⻛暴。
receive.buffer.bytes TCP连接接收数据的缓存(SO_RCVBUF)。-1表示使⽤操作系统的默认值。
partition.assignment.strategy 当使⽤消费组的时候,分区分配策略的类名。
metrics.sample.window.ms 计算指标样本的时间窗⼝。
metrics.recording.level 指标的最⾼记录级别。
metrics.num.samples ⽤于计算指标⽽维护的样本数量
interceptor.classes 拦截器类的列表。默认没有拦截器。
拦截器是消费者的拦截器,该拦截器需要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接⼝。
拦截器可⽤于对消费者接收到的消息进⾏拦截处理。

消费组管理

什么是消费者组?

consumer group是kafka提供的可扩展且具有容错性的消费者机制。

三个特性:

  1. 消费组有⼀个或多个消费者,消费者可以是⼀个进程,也可以是⼀个线程。

  2. group.id是⼀个字符串,唯⼀标识⼀个消费组。

  3. 消费组订阅的主题每个分区只能分配给消费组⼀个消费者。

消费者位移(consumer position)

消费者在消费的过程中记录已消费的数据,即消费位移(offset)信息。

每个消费组保存⾃⼰的位移信息,那么只需要简单的⼀个整数表示位置就够了;同时可以引⼊checkpoint机制定期持久化。

位移管理(offset management)

  • ⾃动VS⼿动

    Kafka默认定期⾃动提交位移( enable.auto.commit = true ),也⼿动提交位移。另外kafka会定期把group消费情况保存起来,做成⼀个offset map,如下图所示:


  • 位移提交

    位移是提交到Kafka中的 __consumer_offsets 主题。 __consumer_offsets 中的消息保存了每个消费组某⼀时刻提交的offset信息。

    1
    [root@node1 __consumer_offsets-0]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 192.168.92.121:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config /opt/kafka_2.12-1.0.2/config/consumer.properties --from-beginning | head

    上图中,标出来的,表示消费组为 test-consumer-group ,消费的主题为 __consumer_offsets ,消费的分区是4,偏移量为5。

__consumers_offsets 主题配置了compact策略,使得它总是能够保存最新的位移信息,既控制了该topic总体的⽇志容量,也能实现保存最新offset的⽬的。

再谈再均衡

再均衡(Rebalance)本质上是⼀种协议,规定了⼀个消费组中所有消费者如何达成⼀致来分配订阅主题的每个分区。

⽐如某个消费组有20个消费组,订阅了⼀个具有100个分区的主题。正常情况下,Kafka平均会为每个消费者分配5个分区。这个分配的过程就叫再均衡。

再均衡的触发条件:

  1. 组成员发⽣变更(新消费者加⼊消费组组、已有消费者主动离开或崩溃了)。

  2. 订阅主题数发⽣变更。如果正则表达式进⾏订阅,则新建匹配正则表达式的主题触发再均衡。

  3. 订阅主题的分区数发⽣变更。

  • 如何进⾏组内分区分配?

    三种分配策略:RangeAssignor和RoundRobinAssignor以及StickyAssignor。后⾯讲。

  • 谁来执⾏再均衡和消费组管理?

    Kafka提供了⼀个⻆⾊:Group Coordinator来执⾏对于消费组的管理。

    Group Coordinator——每个消费组分配⼀个消费组协调器⽤于组管理和位移管理。当消费组的第⼀个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费者和该组协调器协调通信。

  • 如何确定coordinator?

    1. 确定消费组位移信息写⼊ __consumers_offsets 的哪个分区。具体计算公式:

    __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)

    注意:groupMetadataTopicPartitionCount由 offsets.topic.num.partitions 指定,默认是50个分区。

    1. 该分区leader所在的broker就是组协调器。
  • Rebalance Generation

    它表示Rebalance之后主题分区到消费组中消费者映射关系的⼀个版本,主要是⽤于保护消费组,隔离⽆效偏移量提交的。

    如上⼀个版本的消费者⽆法提交位移到新版本的消费组中,因为映射关系变了,你消费的或许已经不是原来的那个分区了。每次group进⾏Rebalance之后,Generation号都会加1,表示消费组和分区的映射关系到了⼀个新版本,如下图所示: Generation 1时group有3个成员,随后成员2退出组,消费组协调器触发Rebalance,消费组进⼊Generation 2,之后成员4加⼊,再次触发Rebalance,消费组进⼊Generation 3.


  • 协议(protocol)

    kafka提供了5个协议来处理与消费组协调相关的问题:

    • Heartbeat请求:consumer需要定期给组协调器发送⼼跳来表明⾃⼰还活着

    • LeaveGroup请求:主动告诉组协调器我要离开消费组

    • SyncGroup请求:消费组Leader把分配⽅案告诉组内所有成员

    • JoinGroup请求:成员请求加⼊组

    • DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配⽅案,订阅信息等。通常该请求是给管理员使⽤

    组协调器在再均衡的时候主要⽤到了前⾯4种请求。

  • liveness

    消费者如何向消费组协调器证明⾃⼰还活着? 通过定时向消费组协调器发送Heartbeat请求。如果超过了设定的超时时间,那么协调器认为该消费者已经挂了。⼀旦协调器认为某个消费者挂了,那么它就会开启新⼀轮再均衡,并且在当前其他消费者的⼼跳响应中添加“REBALANCE_IN_PROGRESS”,告诉其他消费者:重新分配分区。

  • 再均衡过程

    1. Join, 加⼊组。所有成员都向消费组协调器发送JoinGroup请求,请求加⼊消费组。⼀旦所有成员都发送了JoinGroup请求,协调i器从中选择⼀个消费者担任Leader的⻆⾊,并把组成员信息以及订阅信息发给Leader。

    2. Sync,Leader开始分配消费⽅案,即哪个消费者负责消费哪些主题的哪些分区。⼀旦完成分配,Leader会将这个⽅案封装进SyncGroup请求中发给消费组协调器,⾮Leader也会发SyncGroup请求,只是内容为空。

    消费组协调器接收到分配⽅案之后会把⽅案塞进SyncGroup的response中发给各个消费者。


    注意:在协调器收集到所有成员请求前,它会把已收到请求放⼊⼀个叫purgatory(炼狱)的地⽅。然后是分发分配⽅案的过程,即SyncGroup请求:


    注意:消费组的分区分配⽅案在客户端执⾏。Kafka交给客户端可以有更好的灵活性。Kafka默认提供三种分配策略:range和round-robin和sticky。可以通过消费者的参数: partition.assignment.strategy 来实现⾃⼰分配策略。

  • 消费组状态机

    消费组组协调器根据状态机对消费组做不同的处理:


    1. Dead:组内已经没有任何成员的最终状态,组的元数据也已经被组协调器移除了。这种状态响应各种请求都是⼀个response: UNKNOWN_MEMBER_ID

    2. Empty:组内⽆成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求

    3. PreparingRebalance:组准备开启新的rebalance,等待成员加⼊

    4. AwaitingSync:正在等待leader consumer将分配⽅案传给各个成员

    5. Stable:再均衡完成,可以开始消费。