Kafka特性之主题
管理
使⽤kafka-topics.sh脚本:
选项 | 说明 |
---|---|
–config <String: name=value> | 为创建的或修改的主题指定配置信息。⽀持下述配置条⽬如下方详细列表 |
–create | 创建⼀个新主题 |
–delete | 删除⼀个主题 |
–delete-config <String: name> | 删除现有主题的⼀个主题配置条⽬。这些条⽬就是在 --config 中给出的配置条⽬。 |
–alter | 更改主题的分区数量,副本分配和/或配置条⽬。 |
–describe | 列出给定主题的细节。 |
–disable-rack-aware | 禁⽤副本分配的机架感知。 |
–force | 抑制控制台提示信息。 |
–help | 打印帮助信息。 |
–if-exists | 如果指定了该选项,则在修改或删除主题的时候,只有主题存在才可以执⾏。 |
–if-not-exists | 在创建主题的时候,如果指定了该选项,则只有主题不存在的时候才可以执⾏命令。 |
–list | 列出所有可⽤的主题。 |
–partitions <Integer: # of partitions> | 要创建或修改主题的分区数。 |
–replica-assignment <String:broker_id_for_part1_replica1 :broker_id_for_part1_replica2,broker_id_for_part2_replica1 :broker_id_for_part2_replica2 , …> | 当创建或修改主题的时候⼿动指定partition-to-broker的分配关系。 |
–replication-factor <Integer:replication factor> | 要创建的主题分区副本数。1表示只有⼀个副本,也就是Leader副本。 |
–topic <String: topic> | 要创建、修改或描述的主题名称。除了创建,修改和描述在这⾥还可以使⽤正则表达式。 |
–topics-with-overrides | 如果在描述主题时设置,则仅显示已重写配置的主题 |
–unavailable-partitions | 如果在描述主题时设置,则仅显示leader不可用的分区 |
–under-replicated-partitions | 如果在描述主题时设置,则仅显示下属复制分区 |
–zookeeper <String: urls> | 必需的参数:连接zookeeper的字符串,逗号分隔的多个host:port列表。多个URL可以故障转移。 |
主题中可以使⽤的参数定义:
属性 | 默认值 | 服务器默认属性 | 说明 |
---|---|---|---|
cleanup.policy | delete | log.cleanup.policy | 要么是”delete“要么是”compact“; 这个字符串指明了针对旧⽇志部分的利⽤⽅式; 默认⽅式(“delete”)将会丢弃旧的部分当他们的回收时间或者尺⼨限制到达时。”compact“将会进⾏⽇志压缩 |
compression.type | none | producer⽤于压缩数据的压缩类型。默认是⽆压缩。正确的选项值是none、gzip、snappy。 压缩最好⽤于批量处理,批量处理消息越多,压缩性能越好。 |
|
delete.retention.ms | 86400000(24hours) | log.cleaner.delete.retention.ms | 对于压缩⽇志保留的最⻓时间,也是客户端消费消息的最⻓时间,通log.retention.minutes的区别在于⼀个控制未压缩数据,⼀个控制压缩后的数据。此项配置可以在topic创建时的置顶参数覆盖 |
flush.ms | None | log.flush.interval.ms | 此项配置⽤来置顶强制进⾏fsync⽇志到磁盘的时间间隔;例如,如果设置为1000,那么每1000ms就需要进⾏⼀次fsync。⼀般不建议使⽤这个选项 |
flush.messages | None | log.flush.interval.messages | 此项配置指定时间间隔:强制进⾏fsync⽇志。 例如,如果这个选项设置为1,那么每条消息之后都需要进⾏fsync,如果设置为5,则每5条消息就需要进⾏⼀次fsync。 ⼀般来说,建议你不要设置这个值。此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过⼤,将会导致每次"fsync"的时间较⻓(IO阻塞),如果此值过⼩,将会导致"fsync"的次数较多,这也意味着整体的client请求有⼀定的延迟.物理server故障,将会导致没有fsync的消息丢失。 |
index.interval.bytes | 4096 | log.index.interval.bytes | 默认设置保证了我们每4096个字节就对消息添加⼀个索引,更多的索引使得阅读的消息更加靠近,但是索引规模却会由此增⼤;⼀般不需要改变这个选项。 |
max.message.bytes | 1000000 | max.message.bytes | kafka追加消息的最⼤尺⼨。注意如果你增⼤这个尺⼨,你也必须增⼤你consumer的fetch 尺⼨,这样consumer才能fetch到这些最⼤尺⼨的消息。 |
min.cleanable.dirty.ratio | 0.5 | min.cleanable.dirty.ratio | 此项配置控制log压缩器试图进⾏清除⽇志的频率。默认情况下,将避免清除压缩率超过50%的⽇志。这个⽐率避免了最⼤的空间浪费。 |
min.insync.replicas | 1 | min.insync.replicas | 当producer设置request.required.acks为-1时,min.insync.replicas指定replicas的最⼩数⽬(必须确认每⼀个repica的写数据都是成功的),如果这个数⽬没有达到,producer会产⽣异常。 |
retention.bytes | None | log.retention.bytes | 如果使⽤“delete”的retention 策略,这项配置就是指在删除⽇志之前,⽇志所能达到的最⼤尺⼨。默认情况下,没有尺⼨限制⽽只有时间限制。 |
retention.ms | 7 days | log.retention.minutes | 如果使⽤“delete”的retention策略,这项配置就是指删除⽇志前⽇志保存的时间。 |
segment.bytes | 1GB | log.segment.bytes | kafka中log⽇志是分成⼀块块存储的,此配置是指log⽇志划分成块的⼤⼩。 |
segment.index.bytes | 10MB | log.index.size.max.bytes | 此配置是有关offsets和⽂件位置之间映射的索引⽂件的⼤⼩;⼀般不需要修改这个配置。 |
segment.jitter.ms | 0 | log.roll.jitter.{ms,hours} | The maximum jitter to subtract from logRollTimeMillis. |
segment.ms | 7 days | log.roll.hours | 即使log的分块⽂件没有达到需要删除、压缩的⼤⼩,⼀旦log 的时间达到这个上限,就会强制新建⼀个log分块⽂件。 |
unclean.leader.election.enable | true | 指明了是否能够使不在ISR中replicas设置⽤来作为leader |
创建主题
1 | kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x --partitions 1 --replication-factor 1 |
查看主题
1 | kafka-topics.sh --zookeeper localhost:2181/myKafka --list |
修改主题
1 | kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 --partitions 2 --replication-factor 1 |
删除主题
1 | kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x 1 |
给主题添加删除的标记:如
要过⼀段时间删除。
增加分区
通过命令⾏⼯具操作,主题的分区只能增加,不能减少。否则报错:
1 | ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic myTop1 currently has 2 partitions,1 would not be an increase. |
通过–alter修改主题的分区数,增加分区。
1 | kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 --partitions 2 |
分区副本的分配
副本分配的三个⽬标:
-
均衡地将副本分散于各个broker上。
-
对于某个broker上分配的分区,它的其他副本在其他broker上。
-
如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。
在不考虑机架信息的情况下:
-
第⼀个副本分区通过轮询的⽅式挑选⼀个broker,进⾏分配。该轮询从broker列表的随机位置进⾏轮询。
-
其余副本通过增加偏移进⾏分配。
考虑到机架信息,⾸先为每个机架创建⼀个broker列表。如:
三个机架(rack1,rack2,rack3),六个broker(0,1,2,3,4,5)
brokerID -> rack
0 -> “rack1”, 1 -> “rack3”, 2 -> “rack3”, 3 -> “rack2”, 4 -> “rack2”, 5 -> “rack1”
rack1:0,5
rack2:3,4
rack3:1,2
这broker列表为rack1的0,rack2的3,rack3的1,rack1的5,rack2的4,rack3的2,即:0, 3, 1, 5, 4, 2
每个分区副本在分配的时候在上⼀个分区第⼀个副本开始分配的位置右移⼀位。
六个broker,六个分区,正好最后⼀个分区的第⼀个副本分配的位置是该broker列表的最后⼀个。
如果有更多的分区需要分配,则算法开始对follower副本进⾏移位分配。
这主要是为了避免每次都得到相同的分配序列。
此时,如果有⼀个分区等待分配(分区6),这按照如下⽅式分配:
6 -> 0,4,2 (⽽不是像分区0那样重复0,3,1)
跟机架相关的副本分配中,永远在机架相关的broker列表中轮询地分配第⼀个副本。
其余的副本,倾向于机架上没有副本的broker进⾏副本分配,除⾮每个机架有⼀个副本。
然后其他的副本⼜通过轮询的⽅式分配给broker。
结果是,如果副本的个数⼤于等于机架数,保证每个机架最少有⼀个副本。否则每个机架最多保有⼀个副本。
如果副本的个数和机架的个数相同,并且每个机架包含相同个数的broker,可以保证副本在机架和broker之间均匀分布。
上图,tp_eagle_01主题的分区0分配信息:leader分区在broker1上,同步副本分区是1和2,也就是在broker1和broker2上的两个副本分区是同步副本分区,其中⼀个是leader分区。
KafkaAdminClient应⽤
除了使⽤Kafka的bin⽬录下的脚本⼯具来管理Kafka,还可以使⽤管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采⽤Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,⼜多了⼀个AdminClient,在kafka-client包下,⼀个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient。
KafkaAdminClient包含了⼀下⼏种功能(以Kafka1.0.2版本为准):
-
创建主题: createTopics(final Collection
newTopics, final CreateTopicsOptions options) -
删除主题: deleteTopics(final Collection
topicNames, DeleteTopicsOptions options) -
列出所有主题: listTopics(final ListTopicsOptions options)
-
查询主题: describeTopics(final Collection
topicNames, DescribeTopicsOptions options) -
查询集群信息: describeCluster(DescribeClusterOptions options)
-
查询配置信息: describeConfigs(Collection
configResources, final DescribeConfigsOptions options) -
修改配置信息: alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options)
-
修改副本的⽇志⽬录: alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options)
-
查询节点的⽇志⽬录信息: describeLogDirs(Collection
brokers, DescribeLogDirsOptions options) -
查询副本的⽇志⽬录信息: describeReplicaLogDirs(Collection
replicas,DescribeReplicaLogDirsOptions options) -
增加分区: createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options)
主要操作步骤:
-
客户端根据⽅法的调⽤创建相应的协议请求,⽐如创建Topic的createTopics⽅法,其内部就是发送CreateTopicRequest请求。
-
客户端发送请求⾄Kafka Broker。
-
Kafka Broker处理相应的请求并回执,⽐如与CreateTopicRequest对应的是CreateTopicResponse。
-
客户端接收相应的回执并进⾏解析处理。
-
和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和响应类的两个⽗类。
1 | package com.lagou.kafka.demo; |
偏移量管理
Kafka 1.0.2,__consumer_offsets主题中保存各个消费组的偏移量。早期由zookeeper管理消费组的偏移量。
通过原⽣ kafka 提供的⼯具脚本进⾏查询,⼯具脚本的位置与名称为 bin/kafka-consumer-groups.sh 。
⾸先运⾏脚本,查看帮助:
参数 | 说明 |
---|---|
–all-topics | 将所有关联到指定消费组的主题都划归到 reset-offsets 操作范围。 |
–bootstrap-server <String: server to connect to> | 必须:(基于消费组的新的消费者): 要连接的服务器地址。 |
–by-duration <String: duration> | 距离当前时间戳的⼀个时间段。格式:‘PnDTnHnMnS’ |
–command-config <String: command config property file> | 指定配置⽂件,该⽂件内容传递给Admin Client和消费者。 |
–delete | 传值消费组名称,删除整个消费组与所有主题的各个分区偏移量和所有者关系。如: --group g1 --group g2 。 传值消费组名称和单个主题,仅删除该消费组到指定主题的分区偏移量和所属关系。如: --group g1 --group g2 --topic t1 。 传值⼀个主题名称,仅删除指定主题与所有消费组分区偏移量以及所属关系。如: --topic t1 注意:消费组的删除仅对基于ZK保存偏移量的消费组有效,并且要⼩⼼使⽤,仅删除不活跃的消费组。 |
–describe | 描述给定消费组的偏移量差距(有多少消息还没有消费)。 |
–execute | 执⾏操作。⽀持的操作: reset-offsets 。 |
–export | 导出操作的结果到CSV⽂件。⽀持的操作: reset-offsets 。 |
–from-file <String: path to CSV file> | 重置偏移量到CSV⽂件中定义的值。 |
–group <String: consumer group> | ⽬标消费组。 |
–list | 列出所有消费组。 |
–new-consumer | 使⽤新的消费者实现。这是默认值。随后的发⾏版中会删除这⼀操作。 |
–reset-offsets | 重置消费组的偏移量。当前⼀次操作只⽀持⼀个消费组,并且该消费组应该是不活跃的。 有三个操作选项 1. (默认)plan:要重置哪个偏移量。 2. execute:执⾏ reset-offsets 操作。 3. process:配合 --export 将操作结果导出到CSV格式。 可以使⽤如下选项:–to-datetime,–by-period,–to-earliest,–to-latest,–shift-by,–from-file,–to-current 。 必须选择⼀个选项使⽤。 要定义操作的范围,使⽤:–all-topics,–topic 。 必须选择⼀个,除⾮使⽤ --from-file 选项。 |
–shift-by <Long: number-of-offsets> | 重置偏移量n个。n可以是正值,也可以是负值。 |
–timeout <Long: timeout(ms)> | 对某些操作设置超时时间。 如:对于描述指定消费组信息,指定毫秒值的最⼤等待时间,以获取正常数据(如刚创建的消费组,或者消费组做了⼀些更改操作)。默认时间: 5000 。 |
–to-current | 重置到当前的偏移量。 |
–to-datetime <String: datetime> | 重置偏移量到指定的时间戳。格式:‘YYYY-MM-DDTHH:mm:SS.sss’ |
–to-earliest | 重置为最早的偏移量 |
–to-latest | 重置为最新的偏移量 |
–to-offset <Long: offset> | 重置到指定的偏移量。 |
–topic <String: topic> | 指定哪个主题的消费组需要删除,或者指定哪个主题的消费组需要包含到 resetoffsets 操作中。对于 reset-offsets 操作,还可以指定分区: topic1:0,1,2 。其中0,1,2表示要包含到操作中的分区号。重置偏移量的操作⽀持多个主题⼀起操作。 |
–zookeeper <String: urls> | 必须,它的值,你懂的。 --zookeeper node1:2181/myKafka 。 |
我们先启动消费者,再启动⽣产者, 再通过 bin/kafka-consumer-groups.sh 进⾏消费偏移量查询。
由于kafka 消费者记录group的消费偏移量有两种⽅式 :
-
kafka ⾃维护 (新)
-
zookpeer 维护 (旧) ,已经逐渐被废弃
所以 ,脚本只查看由broker维护的,由zookeeper维护的可以将 --bootstrap-server 换成 --zookeeper 即可
-
- 查看有那些 group ID 正在进⾏消费
1
2
3
4[root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
Note: This will not show information about old Zookeeper-based consumers.
group注意:1. 这⾥⾯是没有指定 topic,查看的是所有topic消费者的 group.id 的列表。2. 注意: 重名的 group.id 只会显示⼀次
-
2.查看指定group.id 的消费者消费情况
1
2
3
4
5
6[root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group group
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
tp_demo_02 0 923 923 0 consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 consumer-1
tp_demo_02 1 872 872 0 consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 consumer-1
tp_demo_02 2 935 935 0 consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 consumer-1如果消费者停⽌,查看偏移量信息:
将偏移量设置为最早的:
将偏移量设置为最新的:
分别将指定主题的指定分区的偏移量向前移动10个消息:
-
生产者的多线程单例模式配置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117package com.lagou.kafka.demo.producer;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerSingleton {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerSingleton.class);
private static KafkaProducer<String, String> kafkaProducer;
private Random random = new Random();
private String topic;
private int retry;
private KafkaProducerSingleton() {
}
/**
* 静态内部类
*/
private static class LazyHandler {
private static final KafkaProducerSingleton instance = new KafkaProducerSingleton();
}
/**
* 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例
* @return
*/
public static final KafkaProducerSingleton getInstance() {
return LazyHandler.instance;
}
/**
* kafka生产者进行初始化
*
* @return KafkaProducer
*/
public void init(String topic, int retry) {
this.topic = topic;
this.retry = retry;
if (null == kafkaProducer) {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducer = new KafkaProducer<String, String>(props);
}
}
/**
* 通过kafkaProducer发送消息
* @param message
*/
public void sendKafkaMessage(final String message) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, random.nextInt(3), "", message);
kafkaProducer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata,Exception exception) {
if (null != exception) {
LOGGER.error("kafka发送消息失败:" + exception.getMessage(), exception);
retryKakfaMessage(message);
}
}
});
}
/**
* 当kafka消息发送失败后,重试
*
* @param retryMessage
*/
private void retryKakfaMessage(final String retryMessage) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, random.nextInt(3), "", retryMessage);
for (int i = 1; i <= retry; i++) {
try {
kafkaProducer.send(record);
return;
} catch (Exception e) {
LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);
retryKakfaMessage(retryMessage);
}
}
}
/**
* kafka实例销毁
*/
public void close() {
if (null != kafkaProducer) {
kafkaProducer.close();
}
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getRetry() {
return retry;
}
public void setRetry(int retry) {
this.retry = retry;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26package com.lagou.kafka.demo.producer;
public class ProducerHandler implements Runnable {
private String message;
public ProducerHandler(String message) {
this.message = message;
}
public void run() {
KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton.getInstance();
kafkaProducerSingleton.init("tp_demo_02", 3);
int i = 0;
while (true) {
try {
System.out.println("当前线程:" + Thread.currentThread().getName()
+ "\t获取的kafka实例:" + kafkaProducerSingleton);
kafkaProducerSingleton.sendKafkaMessage("发送消息: " + message + " " + (++i));
Thread.sleep(100);
} catch (Exception e) {
}
}
}
}1
2
3
4
5
6
7
8package com.lagou.kafka.demo.producer;
public class MyProducer {
public static void main(String[] args){
Thread thread = new Thread(new ProducerHandler("hello lagou "));
thread.start();
}
} -
消费者的多线程配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class KafkaConsumerAuto {
/**
* kafka消费者不是线程安全的
*/
private final KafkaConsumer<String, String> consumer;
private ExecutorService executorService;
public KafkaConsumerAuto() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
// 打开自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put("auto.commit.interval.ms", "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
// 订阅主题
consumer.subscribe(Collections.singleton("tp_demo_02"));
}
public void execute() throws InterruptedException {
executorService = Executors.newFixedThreadPool(2);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(2_000);
if (null != records) {
executorService.submit(new ConsumerThreadAuto(records, consumer));
}
Thread.sleep(1000);
}
}
public void shutdown() {
try {
if (consumer != null) {
consumer.close();
}
if (executorService != null) {
executorService.shutdown();
}
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("关闭线程池超时。。。");
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerThreadAuto implements Runnable {
private ConsumerRecords<String, String> records;
private KafkaConsumer<String, String> consumer;
public ConsumerThreadAuto(ConsumerRecords<String, String> records,
KafkaConsumer<String, String> consumer) {
this.records = records;
this.consumer = consumer;
}
public void run() {
for(ConsumerRecord<String,String> record : records){
System.out.println("当前线程:" + Thread.currentThread()
+ "\t主题:" + record.topic()
+ "\t偏移量:" + record.offset() + "\t分区:" + record.partition()
+ "\t获取的消息:" + record.value());
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15package com.lagou.kafka.demo.consumer;
public class ConsumerAutoMain {
public static void main(String[] args) {
KafkaConsumerAuto kafka_consumerAuto = new KafkaConsumerAuto();
try {
kafka_consumerAuto.execute();
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
kafka_consumerAuto.shutdown();
}
}
}