Kafka开发实战
消息的发送与接收
⽣产者主要的对象有: KafkaProducer , ProducerRecord 。
其中 KafkaProducer 是⽤于发送消息的类, ProducerRecord 类⽤于封装Kafka的消息。
KafkaProducer 的创建需要指定的参数和含义:
参数 | 说明 |
---|---|
bootstrap.servers | 配置⽣产者如何与broker建⽴连接。该参数设置的是初始化参数。如果⽣产者需要连接的是Kafka集群,则这⾥配置集群中⼏个broker的地址,⽽不是全部,当⽣产者连接上此处指定的broker之后,在通过该连接发现集群中的其他节点。 |
key.serializer | 要发送信息的key数据的序列化类。设置的时候可以写类名,也可以使⽤该类的Class对象。 |
value.serializer | 要发送消息的alue数据的序列化类。设置的时候可以写类名,也可以使⽤该类的Class对象。 |
acks | 默认值:all。 acks=0:⽣产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。 该情形不能保证broker是否真的收到了消息,retries配置也不会⽣效。发送的消息的返回的消息偏移量永远是-1。 acks=1: 表示消息只需要写到主分区即可,然后就响应客户端,⽽不等待副本分区的确认。 在该情形下,如果主分区收到消息确认之后就宕机了,⽽副本分区还没来得及同步该消息,则该消息丢失。 acks=all: ⾸领分区会等待所有的ISR副本分区确认记录。 该处理保证了只要有⼀个ISR副本分区存活,消息就不会丢失。 这是Kafka最强的可靠性保证,等效于 acks=-1 |
retries | retries重试次数,当消息发送出现错误的时候,系统会重发消息。 跟客户端收到错误时重发⼀样。 如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1,否则在重试此失败消息的时候,其他的消息可能发送成功了 |
其他参数可以从 org.apache.kafka.clients.producer.ProducerConfig 中找到。我们后⾯的内容会介绍到。
消费者⽣产消息后,需要broker端的确认,可以同步确认,也可以异步确认。
同步确认效率低,异步确认效率⾼,但是需要设置回调对象。
-
- 创建maven项目,pom文件导入kafka依赖
1
2
3
4
5
6
7
8<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!-- 高版本兼容低版本,我们使用和broker一致的版本 -->
<version>1.0.2</version>
</dependency>
</dependencies> -
- ⽣产者,创建MyProducer1类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71package com.lagou.kafka.demo.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class MyProducer1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> configs = new HashMap<>();
// 指定初始连接用到的broker地址
configs.put("bootstrap.servers", "192.168.92.121:9092");
// 指定key的序列化类
configs.put("key.serializer", IntegerSerializer.class);
// 指定value的序列化类
configs.put("value.serializer", StringSerializer.class);
// configs.put("acks", "all");
// configs.put("reties", "3");
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
// 用于设置用户自定义的消息头字段
List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"topic_1",
0,
0,
"hello lagou 0",
headers
);
// 消息的同步确认
// final Future<RecordMetadata> future = producer.send(record);
// final RecordMetadata metadata = future.get();
// System.out.println("消息的主题:" + metadata.topic());
// System.out.println("消息的分区号:" + metadata.partition());
// System.out.println("消息的偏移量:" + metadata.offset());
// 消息的异步确认
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息的主题:" + metadata.topic());
System.out.println("消息的分区号:" + metadata.partition());
System.out.println("消息的偏移量:" + metadata.offset());
} else {
System.out.println("异常消息:" + exception.getMessage());
}
}
});
// 关闭生产者
producer.close();
}
} -
- 消费者,创建MyConsumer1类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
public class MyConsumer1 {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
// node1对应于192.168.100.101,windows的hosts文件中手动配置域名解析
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.92.121:9092");
// 使用常量代替手写的字符串,配置key的反序列化器
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
// 配置value的反序列化器
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 配置消费组ID
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_demo1");
// 如果找不到当前消费者的有效偏移量,则自动重置到最开始
// latest表示直接重置到消息偏移量的最后一个
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
// 先订阅,再消费
consumer.subscribe(Arrays.asList("topic_1"));
// while (true) {
// final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);
// }
// 如果主题中没有可以消费的消息,则该方法可以放到while循环中,每过3秒重新拉取一次
// 如果还没有拉取到,过3秒再次拉取,防止while循环太密集的poll调用。
// 批量从主题的分区拉取消息
final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);
// 遍历本次从主题的分区拉取的批量消息
consumerRecords.forEach(new Consumer<ConsumerRecord<Integer, String>>() {
public void accept(ConsumerRecord<Integer, String> record) {
System.out.println(record.topic() + "\t"
+ record.partition() + "\t"
+ record.offset() + "\t"
+ record.key() + "\t"
+ record.value());
}
});
consumer.close();
}
} -
- 改造以上生产者和消费者,实现循环生产和消费。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73package com.lagou.kafka.demo.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class MyProducer2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> configs = new HashMap<>();
// 指定初始连接用到的broker地址
configs.put("bootstrap.servers", "192.168.100.101:9092");
// 指定key的序列化类
configs.put("key.serializer", IntegerSerializer.class);
// 指定value的序列化类
configs.put("value.serializer", StringSerializer.class);
// configs.put("acks", "all");
// configs.put("reties", "3");
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
// 用于设置用户自定义的消息头字段
List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));
for (int i = 0; i < 100; i++) {
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"topic_1",
0,
i,
"hello lagou " + i,
headers
);
// 消息的同步确认
// final Future<RecordMetadata> future = producer.send(record);
// final RecordMetadata metadata = future.get();
// System.out.println("消息的主题:" + metadata.topic());
// System.out.println("消息的分区号:" + metadata.partition());
// System.out.println("消息的偏移量:" + metadata.offset());
// 消息的异步确认
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息的主题:" + metadata.topic());
System.out.println("消息的分区号:" + metadata.partition());
System.out.println("消息的偏移量:" + metadata.offset());
} else {
System.out.println("异常消息:" + exception.getMessage());
}
}
});
}
// 关闭生产者
producer.close();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
public class MyConsumer2 {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
// node1对应于192.168.100.101,windows的hosts文件中手动配置域名解析
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
// 使用常量代替手写的字符串,配置key的反序列化器
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
// 配置value的反序列化器
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 配置消费组ID
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_demo2");
// 如果找不到当前消费者的有效偏移量,则自动重置到最开始
// latest表示直接重置到消息偏移量的最后一个
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
// 先订阅,再消费
consumer.subscribe(Arrays.asList("topic_1"));
while (true) {
// 如果主题中没有可以消费的消息,则该方法可以放到while循环中,每过3秒重新拉取一次
// 如果还没有拉取到,过3秒再次拉取,防止while循环太密集的poll调用。
// 批量从主题的分区拉取消息
final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);
// 遍历本次从主题的分区拉取的批量消息
consumerRecords.forEach(new Consumer<ConsumerRecord<Integer, String>>() {
public void accept(ConsumerRecord<Integer, String> record) {
System.out.println(record.topic() + "\t"
+ record.partition() + "\t"
+ record.offset() + "\t"
+ record.key() + "\t"
+ record.value());
}
});
}
// consumer.close();
}
}
SpringBoot和Kafka整合
-
- 创建springboot项目,添加 Spring Web 和 Spring for Apache Kafka 依赖项,pom文件如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.lagou.kafka.demo</groupId>
<artifactId>demo-02-springboot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo-02-springboot-kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project> -
- 编辑application.properties文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24spring.application.name=springboot-kafka-02
server.port=8080
# kafka的配置
spring.kafka.bootstrap-servers=192.168.92.121:9092
#producer配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者每个批次最多放多少条记录
spring.kafka.producer.batch-size=16384
# 生产者一端总的可用发送缓冲区大小,此处设置为32MB
spring.kafka.producer.buffer-memory=33554432
#consumer配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer02
# 如果在kafka中找不到当前消费者的偏移量,则直接将偏移量重置为最早的
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消费者偏移量自动提交的时间间隔
spring.kafka.consumer.auto-commit-interval=1000 -
- 创建同步发送消息的KafkaSyncProducerController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40package com.lagou.kafka.demo.controller;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;
public class KafkaSyncProducerController {
private KafkaTemplate<Integer, String> template;
public String send( { String message)
final ListenableFuture<SendResult<Integer, String>> future = template.send("topic-spring-01", 0, 0, message);
// 同步发送消息
try {
final SendResult<Integer, String> sendResult = future.get();
final RecordMetadata metadata = sendResult.getRecordMetadata();
System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "success";
}
} -
- 创建异步发送消息的KafkaAsyncProducerController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.lagou.kafka.demo.controller;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
public class KafkaAsyncProducerController {
private KafkaTemplate<Integer, String> template;
public String send( { String message)
final ListenableFuture<SendResult<Integer, String>> future = this.template.send("topic-spring-01", 0, 1, message);
// 设置回调函数,异步等待broker端的返回结果
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
public void onFailure(Throwable throwable) {
System.out.println("发送消息失败:" + throwable.getMessage());
}
public void onSuccess(SendResult<Integer, String> result) {
final RecordMetadata metadata = result.getRecordMetadata();
System.out.println("发送消息成功:" + metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
}
});
return "success";
}
} -
- 创建监听器MyConsumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
public class MyConsumer {
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println("消费者收到的消息:"
+ record.topic() + "\t"
+ record.partition() + "\t"
+ record.offset() + "\t"
+ record.key() + "\t"
+ record.value());
}
}
```
- 6. 对KafkaAutoConfiguration进行修改,可以自定义其中配置
```JAVA
package com.lagou.kafka.demo.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
public class KafkaConfig {
//启动配置新建主题
public NewTopic topic1() {
return new NewTopic("nptc-01", 3, (short) 1);
}
//启动配置新建主题
public NewTopic topic2() {
return new NewTopic("nptc-02", 5, (short) 1);
}
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "node1:9092");
KafkaAdmin admin = new KafkaAdmin(configs);
return admin;
}
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
// 覆盖ProducerFactory原有设置
Map<String, Object> configsOverride = new HashMap<>();
configsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);
KafkaTemplate<Integer, String> template = new KafkaTemplate<Integer, String>(
producerFactory, configsOverride
);
return template;
}
}
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 WeiJia_Rao!