管理

使⽤kafka-topics.sh脚本:

选项 说明
–config <String: name=value> 为创建的或修改的主题指定配置信息。⽀持下述配置条⽬如下方详细列表
–create 创建⼀个新主题
–delete 删除⼀个主题
–delete-config <String: name> 删除现有主题的⼀个主题配置条⽬。这些条⽬就是在 --config 中给出的配置条⽬。
–alter 更改主题的分区数量,副本分配和/或配置条⽬。
–describe 列出给定主题的细节。
–disable-rack-aware 禁⽤副本分配的机架感知。
–force 抑制控制台提示信息。
–help 打印帮助信息。
–if-exists 如果指定了该选项,则在修改或删除主题的时候,只有主题存在才可以执⾏。
–if-not-exists 在创建主题的时候,如果指定了该选项,则只有主题不存在的时候才可以执⾏命令。
–list 列出所有可⽤的主题。
–partitions <Integer: # of partitions> 要创建或修改主题的分区数。
–replica-assignment <String:broker_id_for_part1_replica1 :broker_id_for_part1_replica2,broker_id_for_part2_replica1 :broker_id_for_part2_replica2 , …> 当创建或修改主题的时候⼿动指定partition-to-broker的分配关系。
–replication-factor <Integer:replication factor> 要创建的主题分区副本数。1表示只有⼀个副本,也就是Leader副本。
–topic <String: topic> 要创建、修改或描述的主题名称。除了创建,修改和描述在这⾥还可以使⽤正则表达式。
–topics-with-overrides 如果在描述主题时设置,则仅显示已重写配置的主题
–unavailable-partitions 如果在描述主题时设置,则仅显示leader不可用的分区
–under-replicated-partitions 如果在描述主题时设置,则仅显示下属复制分区
–zookeeper <String: urls> 必需的参数:连接zookeeper的字符串,逗号分隔的多个host:port列表。多个URL可以故障转移。

主题中可以使⽤的参数定义:

属性 默认值 服务器默认属性 说明
cleanup.policy delete log.cleanup.policy 要么是”delete“要么是”compact“; 这个字符串指明了针对旧⽇志部分的利⽤⽅式;
默认⽅式(“delete”)将会丢弃旧的部分当他们的回收时间或者尺⼨限制到达时。”compact“将会进⾏⽇志压缩
compression.type none producer⽤于压缩数据的压缩类型。默认是⽆压缩。正确的选项值是none、gzip、snappy。
压缩最好⽤于批量处理,批量处理消息越多,压缩性能越好。
delete.retention.ms 86400000(24hours) log.cleaner.delete.retention.ms 对于压缩⽇志保留的最⻓时间,也是客户端消费消息的最⻓时间,通log.retention.minutes的区别在于⼀个控制未压缩数据,⼀个控制压缩后的数据。此项配置可以在topic创建时的置顶参数覆盖
flush.ms None log.flush.interval.ms 此项配置⽤来置顶强制进⾏fsync⽇志到磁盘的时间间隔;例如,如果设置为1000,那么每1000ms就需要进⾏⼀次fsync。⼀般不建议使⽤这个选项
flush.messages None log.flush.interval.messages 此项配置指定时间间隔:强制进⾏fsync⽇志。
例如,如果这个选项设置为1,那么每条消息之后都需要进⾏fsync,如果设置为5,则每5条消息就需要进⾏⼀次fsync。
⼀般来说,建议你不要设置这个值。此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过⼤,将会导致每次"fsync"的时间较⻓(IO阻塞),如果此值过⼩,将会导致"fsync"的次数较多,这也意味着整体的client请求有⼀定的延迟.物理server故障,将会导致没有fsync的消息丢失。
index.interval.bytes 4096 log.index.interval.bytes 默认设置保证了我们每4096个字节就对消息添加⼀个索引,更多的索引使得阅读的消息更加靠近,但是索引规模却会由此增⼤;⼀般不需要改变这个选项。
max.message.bytes 1000000 max.message.bytes kafka追加消息的最⼤尺⼨。注意如果你增⼤这个尺⼨,你也必须增⼤你consumer的fetch 尺⼨,这样consumer才能fetch到这些最⼤尺⼨的消息。
min.cleanable.dirty.ratio 0.5 min.cleanable.dirty.ratio 此项配置控制log压缩器试图进⾏清除⽇志的频率。默认情况下,将避免清除压缩率超过50%的⽇志。这个⽐率避免了最⼤的空间浪费。
min.insync.replicas 1 min.insync.replicas 当producer设置request.required.acks为-1时,min.insync.replicas指定replicas的最⼩数⽬(必须确认每⼀个repica的写数据都是成功的),如果这个数⽬没有达到,producer会产⽣异常。
retention.bytes None log.retention.bytes 如果使⽤“delete”的retention 策略,这项配置就是指在删除⽇志之前,⽇志所能达到的最⼤尺⼨。默认情况下,没有尺⼨限制⽽只有时间限制。
retention.ms 7 days log.retention.minutes 如果使⽤“delete”的retention策略,这项配置就是指删除⽇志前⽇志保存的时间。
segment.bytes 1GB log.segment.bytes kafka中log⽇志是分成⼀块块存储的,此配置是指log⽇志划分成块的⼤⼩。
segment.index.bytes 10MB log.index.size.max.bytes 此配置是有关offsets和⽂件位置之间映射的索引⽂件的⼤⼩;⼀般不需要修改这个配置。
segment.jitter.ms 0 log.roll.jitter.{ms,hours} The maximum jitter to subtract from logRollTimeMillis.
segment.ms 7 days log.roll.hours 即使log的分块⽂件没有达到需要删除、压缩的⼤⼩,⼀旦log 的时间达到这个上限,就会强制新建⼀个log分块⽂件。
unclean.leader.election.enable true 指明了是否能够使不在ISR中replicas设置⽤来作为leader

创建主题

1
2
3
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x --partitions 1 --replication-factor 1

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_02 --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760

查看主题

1
2
3
4
5
kafka-topics.sh --zookeeper localhost:2181/myKafka --list

kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_x

kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides --describe

修改主题

1
2
3
4
5
6
7
8
9
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 --partitions 2 --replication-factor 1

kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config max.message.bytes=1048576

kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test_01

kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config segment.bytes=10485760

kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic topic_test_01

删除主题

1
kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x 1

给主题添加删除的标记:如


要过⼀段时间删除。

增加分区

通过命令⾏⼯具操作,主题的分区只能增加,不能减少。否则报错:

1
ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic myTop1 currently has 2 partitions,1 would not be an increase.

通过–alter修改主题的分区数,增加分区。

1
kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 --partitions 2

分区副本的分配

副本分配的三个⽬标:

  1. 均衡地将副本分散于各个broker上。

  2. 对于某个broker上分配的分区,它的其他副本在其他broker上。

  3. 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。

在不考虑机架信息的情况下:

  1. 第⼀个副本分区通过轮询的⽅式挑选⼀个broker,进⾏分配。该轮询从broker列表的随机位置进⾏轮询。

  2. 其余副本通过增加偏移进⾏分配。

考虑到机架信息,⾸先为每个机架创建⼀个broker列表。如:

三个机架(rack1,rack2,rack3),六个broker(0,1,2,3,4,5)

brokerID -> rack

0 -> “rack1”, 1 -> “rack3”, 2 -> “rack3”, 3 -> “rack2”, 4 -> “rack2”, 5 -> “rack1”

rack1:0,5

rack2:3,4

rack3:1,2

这broker列表为rack1的0,rack2的3,rack3的1,rack1的5,rack2的4,rack3的2,即:0, 3, 1, 5, 4, 2


每个分区副本在分配的时候在上⼀个分区第⼀个副本开始分配的位置右移⼀位。

六个broker,六个分区,正好最后⼀个分区的第⼀个副本分配的位置是该broker列表的最后⼀个。

如果有更多的分区需要分配,则算法开始对follower副本进⾏移位分配。

这主要是为了避免每次都得到相同的分配序列。

此时,如果有⼀个分区等待分配(分区6),这按照如下⽅式分配:

6 -> 0,4,2 (⽽不是像分区0那样重复0,3,1)

跟机架相关的副本分配中,永远在机架相关的broker列表中轮询地分配第⼀个副本。

其余的副本,倾向于机架上没有副本的broker进⾏副本分配,除⾮每个机架有⼀个副本。

然后其他的副本⼜通过轮询的⽅式分配给broker。

结果是,如果副本的个数⼤于等于机架数,保证每个机架最少有⼀个副本。否则每个机架最多保有⼀个副本。

如果副本的个数和机架的个数相同,并且每个机架包含相同个数的broker,可以保证副本在机架和broker之间均匀分布。


上图,tp_eagle_01主题的分区0分配信息:leader分区在broker1上,同步副本分区是1和2,也就是在broker1和broker2上的两个副本分区是同步副本分区,其中⼀个是leader分区。

KafkaAdminClient应⽤

除了使⽤Kafka的bin⽬录下的脚本⼯具来管理Kafka,还可以使⽤管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采⽤Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,⼜多了⼀个AdminClient,在kafka-client包下,⼀个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient。

KafkaAdminClient包含了⼀下⼏种功能(以Kafka1.0.2版本为准):

  1. 创建主题: createTopics(final Collection newTopics, final CreateTopicsOptions options)

  2. 删除主题: deleteTopics(final Collection topicNames, DeleteTopicsOptions options)

  3. 列出所有主题: listTopics(final ListTopicsOptions options)

  4. 查询主题: describeTopics(final Collection topicNames, DescribeTopicsOptions options)

  5. 查询集群信息: describeCluster(DescribeClusterOptions options)

  6. 查询配置信息: describeConfigs(Collection configResources, final DescribeConfigsOptions options)

  7. 修改配置信息: alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options)

  8. 修改副本的⽇志⽬录: alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options)

  9. 查询节点的⽇志⽬录信息: describeLogDirs(Collection brokers, DescribeLogDirsOptions options)

  10. 查询副本的⽇志⽬录信息: describeReplicaLogDirs(Collection replicas,DescribeReplicaLogDirsOptions options)

  11. 增加分区: createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options)

主要操作步骤:

  1. 客户端根据⽅法的调⽤创建相应的协议请求,⽐如创建Topic的createTopics⽅法,其内部就是发送CreateTopicRequest请求。

  2. 客户端发送请求⾄Kafka Broker。

  3. Kafka Broker处理相应的请求并回执,⽐如与CreateTopicRequest对应的是CreateTopicResponse。

  4. 客户端接收相应的回执并进⾏解析处理。

  5. 和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和响应类的两个⽗类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package com.lagou.kafka.demo;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class MyAdminClient {

private KafkaAdminClient client;

@Before
public void before() {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "node1:9092");
configs.put("client.id", "admin_001");
client = (KafkaAdminClient) KafkaAdminClient.create(configs);
}

@After
public void after() {
// 关闭admin客户端
client.close();
}

@Test
public void testListTopics() throws ExecutionException, InterruptedException {
// 列出主题
// final ListTopicsResult listTopicsResult = client.listTopics();

ListTopicsOptions options = new ListTopicsOptions();
// 列出内部主题
options.listInternal(true);
// 设置请求超时时间,单位是毫秒
options.timeoutMs(500);

final ListTopicsResult listTopicsResult = client.listTopics(options);

// final Set<String> strings = listTopicsResult.names().get();
// strings.forEach(name -> {
// System.out.println(name);
// });

// 将请求变成同步的请求,直接获取结果
final Collection<TopicListing> topicListings = listTopicsResult.listings().get();
topicListings.forEach(new Consumer<TopicListing>() {
@Override
public void accept(TopicListing topicListing) {
// 该主题是否是内部主题
final boolean internal = topicListing.isInternal();
// 该主题的名字
final String name = topicListing.name();

System.out.println("主题是否是内部主题:" + internal);
System.out.println("主题的名字:" + name);
System.out.println(topicListing);
System.out.println("=====================================");
}
});
}

@Test
public void testCreateTopic() throws ExecutionException, InterruptedException {
Map<String, String> configs = new HashMap<>();
configs.put("max.message.bytes", "1048576");
configs.put("segment.bytes", "1048576000");
NewTopic newTopic = new NewTopic("adm_tp_01", 2, (short) 1);
newTopic.configs(configs);
CreateTopicsResult topics = client.createTopics(Collections.singleton(newTopic));
KafkaFuture<Void> all = topics.all();
Void aVoid = all.get();
System.out.println(aVoid);
}

@Test
public void testDeleteTopic() throws ExecutionException, InterruptedException {
DeleteTopicsOptions options = new DeleteTopicsOptions();
options.timeoutMs(500);
DeleteTopicsResult deleteResult = client.deleteTopics(Collections.singleton("adm_tp_01"), options);
deleteResult.all().get();
}

@Test
public void testAlterTopic() throws ExecutionException, InterruptedException {
NewPartitions newPartitions = NewPartitions.increaseTo(5);
Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
newPartitionsMap.put("adm_tp_01", newPartitions);
CreatePartitionsOptions option = new CreatePartitionsOptions();
// Set to true if the request should be validated without creating new partitions.
// 如果只是验证,⽽不创建分区,则设置为true, option.validateOnly(true);
CreatePartitionsResult partitionsResult = client.createPartitions(newPartitionsMap, option);
Void aVoid = partitionsResult.all().get();
}

@Test
public void testDescribeTopics() throws ExecutionException, InterruptedException {
DescribeTopicsOptions options = new DescribeTopicsOptions();
options.timeoutMs(3000);
DescribeTopicsResult topicsResult = client.describeTopics(Collections.singleton("adm_tp_01"), options);
Map<String, TopicDescription> stringTopicDescriptionMap = topicsResult.all().get();
stringTopicDescriptionMap.forEach((k, v) -> {
System.out.println(k + "\t" + v);
System.out.println("=======================================");
System.out.println(k);
boolean internal = v.isInternal();
String name = v.name();
List<TopicPartitionInfo> partitions = v.partitions();
String partitionStr = Arrays.toString(partitions.toArray());
System.out.println("内部的?" + internal);
System.out.println("topic name = " + name);
System.out.println("分区:" + partitionStr);
partitions.forEach(partition -> {
System.out.println(partition);
});
});
}

@Test
public void testDescribeCluster() throws ExecutionException, InterruptedException {
DescribeClusterResult describeClusterResult = client.describeCluster();
KafkaFuture<String> stringKafkaFuture = describeClusterResult.clusterId();
String s = stringKafkaFuture.get();
System.out.println("cluster name = " + s);
KafkaFuture<Node> controller = describeClusterResult.controller();
Node node = controller.get();
System.out.println("集群控制器:" + node);
Collection<Node> nodes = describeClusterResult.nodes().get();
nodes.forEach(node1 -> {
System.out.println(node1);
});
}

@Test
public void testDescribeConfigs() throws ExecutionException, InterruptedException, TimeoutException {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
DescribeConfigsResult describeConfigsResult = client.describeConfigs(Collections.singleton(configResource));
Map<ConfigResource, Config> configMap = describeConfigsResult.all().get(15, TimeUnit.SECONDS);
configMap.forEach(new BiConsumer<ConfigResource, Config>() {
@Override
public void accept(ConfigResource configResource, Config config) {
ConfigResource.Type type = configResource.type();
String name = configResource.name();
System.out.println("资源名称:" + name);
Collection<ConfigEntry> entries = config.entries();
entries.forEach(new Consumer<ConfigEntry>() {
@Override
public void accept(ConfigEntry configEntry) {
boolean aDefault = configEntry.isDefault();
boolean readOnly = configEntry.isReadOnly();
boolean sensitive = configEntry.isSensitive();
String name1 = configEntry.name();
String value = configEntry.value();
System.out.println("是否默认:" + aDefault + "\t是否只读?" +
readOnly + "\t是否敏感?" + sensitive
+ "\t" + name1 + " --> " + value);
}
});
ConfigEntry retries = config.get("retries");
if (retries != null) {
System.out.println(retries.name() + " -->" + retries.value());
} else {
System.out.println("没有这个属性");
}
}
});
}

@Test
public void testAlterConfig() throws ExecutionException, InterruptedException {
// 这⾥设置后,原来资源中不冲突的属性也会丢失,直接按照这⾥的配置设置
Map<ConfigResource, Config> configMap = new HashMap<>();
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC,"adm_tp_01");
Config config = new Config(Collections.singleton(new ConfigEntry("segment.bytes", "1048576000")));
configMap.put(resource, config);
AlterConfigsResult alterConfigsResult = client.alterConfigs(configMap);
Void aVoid = alterConfigsResult.all().get();
}

@Test
public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
final DescribeLogDirsResult describeLogDirsResult = client.describeLogDirs(Collections.singleton(0));

final Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> integerMapMap
= describeLogDirsResult.all().get();

integerMapMap.forEach(new BiConsumer<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>() {
@Override
public void accept(Integer integer, Map<String, DescribeLogDirsResponse.LogDirInfo> stringLogDirInfoMap) {
System.out.println("broker.id = " + integer);
// log.dirs可以设置多个目录
stringLogDirInfoMap.forEach(new BiConsumer<String, DescribeLogDirsResponse.LogDirInfo>() {
@Override
public void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo) {
System.out.println("logdir = " + s);
final Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfos = logDirInfo.replicaInfos;

replicaInfos.forEach(new BiConsumer<TopicPartition, DescribeLogDirsResponse.ReplicaInfo>() {
@Override
public void accept(TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
System.out.println("主题分区:" + topicPartition.partition());
System.out.println("主题:" + topicPartition.topic());
// final boolean isFuture = replicaInfo.isFuture;
// final long offsetLag = replicaInfo.offsetLag;
// final long size = replicaInfo.size;
}
});
}
});
}
});
}

}

偏移量管理

Kafka 1.0.2,__consumer_offsets主题中保存各个消费组的偏移量。早期由zookeeper管理消费组的偏移量。

通过原⽣ kafka 提供的⼯具脚本进⾏查询,⼯具脚本的位置与名称为 bin/kafka-consumer-groups.sh 。

⾸先运⾏脚本,查看帮助:

参数 说明
–all-topics 将所有关联到指定消费组的主题都划归到 reset-offsets 操作范围。
–bootstrap-server <String: server to connect to> 必须:(基于消费组的新的消费者): 要连接的服务器地址。
–by-duration <String: duration> 距离当前时间戳的⼀个时间段。格式:‘PnDTnHnMnS’
–command-config <String: command config property file> 指定配置⽂件,该⽂件内容传递给Admin Client和消费者。
–delete 传值消费组名称,删除整个消费组与所有主题的各个分区偏移量和所有者关系。如: --group g1 --group g2 。
传值消费组名称和单个主题,仅删除该消费组到指定主题的分区偏移量和所属关系。如: --group g1 --group g2 --topic t1 。
传值⼀个主题名称,仅删除指定主题与所有消费组分区偏移量以及所属关系。如: --topic t1
注意:消费组的删除仅对基于ZK保存偏移量的消费组有效,并且要⼩⼼使⽤,仅删除不活跃的消费组。
–describe 描述给定消费组的偏移量差距(有多少消息还没有消费)。
–execute 执⾏操作。⽀持的操作: reset-offsets 。
–export 导出操作的结果到CSV⽂件。⽀持的操作: reset-offsets 。
–from-file <String: path to CSV file> 重置偏移量到CSV⽂件中定义的值。
–group <String: consumer group> ⽬标消费组。
–list 列出所有消费组。
–new-consumer 使⽤新的消费者实现。这是默认值。随后的发⾏版中会删除这⼀操作。
–reset-offsets 重置消费组的偏移量。当前⼀次操作只⽀持⼀个消费组,并且该消费组应该是不活跃的。
有三个操作选项
1. (默认)plan:要重置哪个偏移量。
2. execute:执⾏ reset-offsets 操作。
3. process:配合 --export 将操作结果导出到CSV格式。
可以使⽤如下选项:–to-datetime,–by-period,–to-earliest,–to-latest,–shift-by,–from-file,–to-current 。
必须选择⼀个选项使⽤。
要定义操作的范围,使⽤:–all-topics,–topic 。
必须选择⼀个,除⾮使⽤ --from-file 选项。
–shift-by <Long: number-of-offsets> 重置偏移量n个。n可以是正值,也可以是负值。
–timeout <Long: timeout(ms)> 对某些操作设置超时时间。
如:对于描述指定消费组信息,指定毫秒值的最⼤等待时间,以获取正常数据(如刚创建的消费组,或者消费组做了⼀些更改操作)。默认时间: 5000 。
–to-current 重置到当前的偏移量。
–to-datetime <String: datetime> 重置偏移量到指定的时间戳。格式:‘YYYY-MM-DDTHH:mm:SS.sss’
–to-earliest 重置为最早的偏移量
–to-latest 重置为最新的偏移量
–to-offset <Long: offset> 重置到指定的偏移量。
–topic <String: topic> 指定哪个主题的消费组需要删除,或者指定哪个主题的消费组需要包含到 resetoffsets 操作中。对于 reset-offsets 操作,还可以指定分区: topic1:0,1,2 。其中0,1,2表示要包含到操作中的分区号。重置偏移量的操作⽀持多个主题⼀起操作。
–zookeeper <String: urls> 必须,它的值,你懂的。 --zookeeper node1:2181/myKafka 。

我们先启动消费者,再启动⽣产者, 再通过 bin/kafka-consumer-groups.sh 进⾏消费偏移量查询。

由于kafka 消费者记录group的消费偏移量有两种⽅式 :

  1. kafka ⾃维护 (新)

  2. zookpeer 维护 (旧) ,已经逐渐被废弃

所以 ,脚本只查看由broker维护的,由zookeeper维护的可以将 --bootstrap-server 换成 --zookeeper 即可

    1. 查看有那些 group ID 正在进⾏消费
    1
    2
    3
    4
    [root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
    Note: This will not show information about old Zookeeper-based consumers.

    group

    注意:1. 这⾥⾯是没有指定 topic,查看的是所有topic消费者的 group.id 的列表。2. 注意: 重名的 group.id 只会显示⼀次

  • 2.查看指定group.id 的消费者消费情况

    1
    2
    3
    4
    5
    6
    [root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group group
    Note: This will not show information about old Zookeeper-based consumers.
    TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
    tp_demo_02 0 923 923 0 consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 consumer-1
    tp_demo_02 1 872 872 0 consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 consumer-1
    tp_demo_02 2 935 935 0 consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 consumer-1

    如果消费者停⽌,查看偏移量信息:


    将偏移量设置为最早的:


    将偏移量设置为最新的:


    分别将指定主题的指定分区的偏移量向前移动10个消息:


  • 生产者的多线程单例模式配置。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    package com.lagou.kafka.demo.producer;

    import org.apache.kafka.clients.producer.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.util.Properties;
    import java.util.Random;

    public class KafkaProducerSingleton {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerSingleton.class);
    private static KafkaProducer<String, String> kafkaProducer;
    private Random random = new Random();
    private String topic;
    private int retry;

    private KafkaProducerSingleton() {
    }

    /**
    * 静态内部类
    */
    private static class LazyHandler {
    private static final KafkaProducerSingleton instance = new KafkaProducerSingleton();
    }

    /**
    * 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例
    * @return
    */
    public static final KafkaProducerSingleton getInstance() {
    return LazyHandler.instance;
    }

    /**
    * kafka生产者进行初始化
    *
    * @return KafkaProducer
    */
    public void init(String topic, int retry) {
    this.topic = topic;
    this.retry = retry;
    if (null == kafkaProducer) {
    Properties props = new Properties();
    props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.setProperty(ProducerConfig.ACKS_CONFIG, "1");

    kafkaProducer = new KafkaProducer<String, String>(props);
    }
    }

    /**
    * 通过kafkaProducer发送消息
    * @param message
    */
    public void sendKafkaMessage(final String message) {

    ProducerRecord<String, String> record = new ProducerRecord<String, String>(
    topic, random.nextInt(3), "", message);

    kafkaProducer.send(record, new Callback() {
    public void onCompletion(RecordMetadata recordMetadata,Exception exception) {
    if (null != exception) {
    LOGGER.error("kafka发送消息失败:" + exception.getMessage(), exception);
    retryKakfaMessage(message);
    }
    }
    });
    }

    /**
    * 当kafka消息发送失败后,重试
    *
    * @param retryMessage
    */
    private void retryKakfaMessage(final String retryMessage) {
    ProducerRecord<String, String> record = new ProducerRecord<String, String>(
    topic, random.nextInt(3), "", retryMessage);
    for (int i = 1; i <= retry; i++) {
    try {
    kafkaProducer.send(record);
    return;
    } catch (Exception e) {
    LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);
    retryKakfaMessage(retryMessage);
    }
    }
    }

    /**
    * kafka实例销毁
    */
    public void close() {
    if (null != kafkaProducer) {
    kafkaProducer.close();
    }
    }

    public String getTopic() {
    return topic;
    }

    public void setTopic(String topic) {
    this.topic = topic;
    }

    public int getRetry() {
    return retry;
    }

    public void setRetry(int retry) {
    this.retry = retry;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    package com.lagou.kafka.demo.producer;

    public class ProducerHandler implements Runnable {
    private String message;

    public ProducerHandler(String message) {
    this.message = message;
    }

    @Override
    public void run() {
    KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton.getInstance();
    kafkaProducerSingleton.init("tp_demo_02", 3);
    int i = 0;

    while (true) {
    try {
    System.out.println("当前线程:" + Thread.currentThread().getName()
    + "\t获取的kafka实例:" + kafkaProducerSingleton);
    kafkaProducerSingleton.sendKafkaMessage("发送消息: " + message + " " + (++i));
    Thread.sleep(100);
    } catch (Exception e) {
    }
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    package com.lagou.kafka.demo.producer;

    public class MyProducer {
    public static void main(String[] args){
    Thread thread = new Thread(new ProducerHandler("hello lagou "));
    thread.start();
    }
    }
  • 消费者的多线程配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    package com.lagou.kafka.demo.consumer;

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;

    import java.util.Arrays;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;

    public class KafkaConsumerAuto {
    /**
    * kafka消费者不是线程安全的
    */
    private final KafkaConsumer<String, String> consumer;

    private ExecutorService executorService;

    public KafkaConsumerAuto() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
    // 打开自动提交
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put("auto.commit.interval.ms", "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

    consumer = new KafkaConsumer<String, String>(props);
    // 订阅主题
    consumer.subscribe(Collections.singleton("tp_demo_02"));
    }

    public void execute() throws InterruptedException {
    executorService = Executors.newFixedThreadPool(2);
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(2_000);
    if (null != records) {
    executorService.submit(new ConsumerThreadAuto(records, consumer));
    }
    Thread.sleep(1000);
    }
    }

    public void shutdown() {
    try {
    if (consumer != null) {
    consumer.close();
    }
    if (executorService != null) {
    executorService.shutdown();
    }
    if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
    System.out.println("关闭线程池超时。。。");
    }
    } catch (InterruptedException ex) {
    Thread.currentThread().interrupt();
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    package com.lagou.kafka.demo.consumer;

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;

    public class ConsumerThreadAuto implements Runnable {
    private ConsumerRecords<String, String> records;
    private KafkaConsumer<String, String> consumer;

    public ConsumerThreadAuto(ConsumerRecords<String, String> records,
    KafkaConsumer<String, String> consumer) {
    this.records = records;
    this.consumer = consumer;
    }

    @Override
    public void run() {

    for(ConsumerRecord<String,String> record : records){
    System.out.println("当前线程:" + Thread.currentThread()
    + "\t主题:" + record.topic()
    + "\t偏移量:" + record.offset() + "\t分区:" + record.partition()
    + "\t获取的消息:" + record.value());
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    package com.lagou.kafka.demo.consumer;

    public class ConsumerAutoMain {
    public static void main(String[] args) {
    KafkaConsumerAuto kafka_consumerAuto = new KafkaConsumerAuto();
    try {
    kafka_consumerAuto.execute();
    Thread.sleep(20000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    kafka_consumerAuto.shutdown();
    }
    }
    }