Kafka特性之⽣产者
消息发送
数据⽣产流程解析
-
Producer创建时,会创建⼀个Sender线程并设置为守护线程。
-
⽣产消息时,内部其实是异步流程;⽣产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。
-
批次发送的条件为:缓冲区数据⼤⼩达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。
-
批次发送后,发往指定分区,然后落盘到broker;如果⽣产者配置了retrires参数⼤于0并且失败原因允许重试,那么客户端内部会对该消息进⾏重试。
-
落盘到broker成功,返回⽣产元数据给⽣产者。
-
元数据返回有两种⽅式:⼀种是通过阻塞直接返回,另⼀种是通过回调返回。
必要参数配置
属性 | 说明 | 重要性 |
---|---|---|
bootstrap.servers | ⽣产者客户端与broker集群建⽴初始连接需要的broker地址列表,由该初始连接发现Kafka集群中其他的所有broker。该地址列表不需要写全部的Kafka集群中broker的地址,但也不要写⼀个,以防该节点宕机的时候不可⽤。形式为: host1:port1,host2:port2,… | high |
key.serializer | 实现了接⼝ org.apache.kafka.common.serialization.Serializer 的key序列化类。 | high |
value.serializer | 实现了接⼝ org.apache.kafka.common.serialization.Serializer 的value序列化类。 | high |
acks | 该选项控制着已发送消息的持久性。 acks=0 :⽣产者不等待broker的任何消息确认。只要将消息放到了socket的缓冲区,就认为消息已发送。不能保证服务器是否收到该消息, retries 设置也不起作⽤,因为客户端不关⼼消息是否发送失败。客户端收到的消息偏移量永远是-1。 acks=1 :leader将记录写到它本地⽇志,就响应客户端确认消息,⽽不等待follower副本的确认。如果leader确认了消息就宕机,则可能会丢失消息,因为follower副本可能还没来得及同步该消息。 acks=all :leader等待所有同步的副本确认该消息。保证了只要有⼀个同步副本存在,消息就不会丢失。这是最强的可⽤性保证。等价于 acks=-1 。 默认值为1,字符串。可选值:[all, -1, 0, 1] |
high |
compression.type | ⽣产者⽣成数据的压缩格式。默认是none(没有压缩)。允许的值: none , gzip , snappy 和 lz4 。压缩是对整个消息批次来讲的。消息批的效率也影响压缩的⽐例。消息批越⼤,压缩效率越好。字符串类型的值。默认是none。 | high |
retries | 设置该属性为⼀个⼤于1的值,将在消息发送失败的时候重新发送消息。该重试与客户端收到异常重新发送并⽆⼆⾄。允许重试但是不设置 max.in.flight.requests.per.connection 为1,存在消息乱序的可能,因为如果两个批次发送到同⼀个分区,第⼀个失败了重试,第⼆个成功了,则第⼀个消息批在第⼆个消息批后。int类型的值,默认:0,可选值:[0,…,2147483647] | high |
序列化器
由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。
序列化器的作⽤就是⽤于序列化要发送的消息的。
Kafka使⽤ org.apache.kafka.common.serialization.Serializer 接⼝⽤于定义序列化器,将泛型指定类型的数据转换为字节数组。
1 | package org.apache.kafka.common.serialization; |
系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.ByteArraySerializer
1 | package org.apache.kafka.common.serialization; |
系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.ByteBufferSerializer
1 | package org.apache.kafka.common.serialization; |
系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.BytesSerializer
1 | package org.apache.kafka.common.serialization; |
系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.DoubleSerializer
1 | package org.apache.kafka.common.serialization; |
系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.FloatSerializer
1 | package org.apache.kafka.common.serialization; |
系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.IntegerSerializer
1 | package org.apache.kafka.common.serialization; |
系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.StringSerializer
1 | package org.apache.kafka.common.serialization; |
系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.LongSerializer
1 | package org.apache.kafka.common.serialization; |
系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.ShortSerializer
1 | package org.apache.kafka.common.serialization; |
⾃定义序列化器
数据的序列化⼀般⽣产中使⽤avro。
⾃定义序列化器需要实现org.apache.kafka.common.serialization.Serializer
-
- 创建实体类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25package com.lagou.kafka.demo.entity;
/**
* 用户自定义的封装消息的实体类
*/
public class User {
private Integer userId;
private String username;
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
} -
- 创建序列化类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57package com.lagou.kafka.demo.serialization;
import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;
public class UserSerializer implements Serializer<User> {
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
// 用于接收对序列化器的配置参数,并对当前序列化器进行配置和初始化的
}
public byte[] serialize(String topic, User data) {
try {
if (data == null) {
return null;
} else {
final Integer userId = data.getUserId();
final String username = data.getUsername();
if (userId != null) {
if (username != null) {
final byte[] bytes = username.getBytes("UTF-8");
int length = bytes.length;
// 第一个4个字节用于存储userId的值
// 第二个4个字节用于存储username字节数组的长度int值
// 第三个长度,用于存放username序列化之后的字节数组
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
// 设置userId
buffer.putInt(userId);
// 设置username字节数组长度
buffer.putInt(length);
// 设置username字节数组
buffer.put(bytes);
// 以字节数组形式返回user对象的值
return buffer.array();
}
}
}
} catch (Exception e) {
throw new SerializationException("数据序列化失败");
}
return null;
}
public void close() {
// do nothing
// 用于关闭资源等操作。需要幂等,即多次调用,效果是一样的。
}
} -
- 创建⽣产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53package com.lagou.kafka.demo.producer;
import com.lagou.kafka.demo.entity.User;
import com.lagou.kafka.demo.serialization.UserSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class MyProducer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 设置自定义的序列化器
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
KafkaProducer<String, User> producer = new KafkaProducer<String, User>(configs);
User user = new User();
// user.setUserId(1001);
// user.setUsername("张三");
// user.setUsername("李四");
// user.setUsername("王五");
user.setUserId(400);
user.setUsername("赵四");
ProducerRecord<String, User> record = new ProducerRecord<String, User>(
"tp_user_01", // topic
user.getUsername(), // key
user // value
);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("消息发送异常");
} else {
System.out.println("主题:" + metadata.topic() + "\t"
+ "分区:" + metadata.partition() + "\t"
+ "生产者偏移量:" + metadata.offset());
}
}
});
// 关闭生产者
producer.close();
}
}
分区器
默认(DefaultPartitioner)分区计算:
-
如果record提供了分区号,则使⽤record提供的分区号
-
如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模
-
如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号。
kafka 使⽤ org.apache.kafka.clients.producer.Partitioner 接⼝⽤于定义分区器。
1 | package org.apache.kafka.clients.producer; |
系统提供了该接⼝的默认⼦接⼝以及实现类:org.apache.kafka.clients.producer.internals.DefaultPartitioner
1 | package org.apache.kafka.clients.producer.internals; |
⾃定义分区器
-
- 实现Partitioner接⼝⾃定义分区器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28package com.lagou.kafka.demo.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定义分区器
*/
public class MyPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 此处可以计算分区的数字。
// 我们直接指定为2
return 2;
}
public void close() {
}
public void configure(Map<String, ?> configs) {
}
} -
- 在⽣产者中配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47package com.lagou.kafka.demo.producer;
import com.lagou.kafka.demo.partitioner.MyPartitioner;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class MyProducer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 指定自定义的分区器
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
// 此处不要设置partition的值
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"tp_part_01",
"mykey",
"myvalue"
);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("消息发送失败");
} else {
System.out.println(metadata.topic());
System.out.println(metadata.partition());
System.out.println(metadata.offset());
}
}
});
// 关闭生产者
producer.close();
}
}
拦截器
Producer端interceptor和Consumer端Interceptor是在Kafka 0.10版本被引⼊的,主要⽤于实现Client端的定制化控制逻辑。
对于Producer⽽⾔,Interceptor使得⽤户在消息发送前以及Producer回调逻辑前有机会对消息做⼀些定制化需求,⽐如修改消息等。同时,Producer允许⽤户指定多个Interceptor按序作⽤于同⼀条消息从⽽形成⼀个拦截链(interceptor chain)。
Intercetpor的实现接⼝是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的⽅法包括:
onSend(ProducerRecord):该⽅法封装进KafkaProducer.send⽅法中,即运⾏在⽤户主线程中。Producer确保在消息被序列化以计算分区前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响⽬标分区的计算。
onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运⾏在Producer的IO线程中,因此不要在该⽅法中放⼊很重的逻辑,否则会拖慢Producer的消息发送效率。
close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。
如前所述,Interceptor可能被运⾏在多个线程中,因此在具体实现时⽤户需要⾃⾏确保线程安全。另外倘若指定了多个Interceptor,则Producer将按照指定顺序调⽤它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误⽇志中⽽⾮在向上传递。这在使⽤过程中要特别留意。
⾃定义拦截器
-
- 实现ProducerInterceptor接⼝
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60package com.lagou.kafka.demo.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class InterceptorOne implements ProducerInterceptor<Integer, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
System.out.println("拦截器1 -- go");
// 消息发送的时候,经过拦截器,调用该方法
// 要发送的消息内容
final String topic = record.topic();
final Integer partition = record.partition();
final Integer key = record.key();
final String value = record.value();
final Long timestamp = record.timestamp();
final Headers headers = record.headers();
// 拦截器拦下来之后根据原来消息创建的新的消息
// 此处对原消息没有做任何改动
ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
topic,
partition,
timestamp,
key,
value,
headers
);
// 传递新的消息
return newRecord;
}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("拦截器1 -- back");
// 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
// 会影响kafka生产者的性能。
}
public void close() {
}
public void configure(Map<String, ?> configs) {
final Object classContent = configs.get("classContent");
System.out.println(classContent);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60package com.lagou.kafka.demo.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class InterceptorTwo implements ProducerInterceptor<Integer, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
System.out.println("拦截器2 -- go");
// 消息发送的时候,经过拦截器,调用该方法
// 要发送的消息内容
final String topic = record.topic();
final Integer partition = record.partition();
final Integer key = record.key();
final String value = record.value();
final Long timestamp = record.timestamp();
final Headers headers = record.headers();
// 拦截器拦下来之后根据原来消息创建的新的消息
// 此处对原消息没有做任何改动
ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
topic,
partition,
timestamp,
key,
value,
headers
);
// 传递新的消息
return newRecord;
}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("拦截器2 -- back");
// 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
// 会影响kafka生产者的性能。
}
public void close() {
}
public void configure(Map<String, ?> configs) {
final Object classContent = configs.get("classContent");
System.out.println(classContent);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60package com.lagou.kafka.demo.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class InterceptorThree implements ProducerInterceptor<Integer, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorThree.class);
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
System.out.println("拦截器3 -- go");
// 消息发送的时候,经过拦截器,调用该方法
// 要发送的消息内容
final String topic = record.topic();
final Integer partition = record.partition();
final Integer key = record.key();
final String value = record.value();
final Long timestamp = record.timestamp();
final Headers headers = record.headers();
// 拦截器拦下来之后根据原来消息创建的新的消息
// 此处对原消息没有做任何改动
ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
topic,
partition,
timestamp,
key,
value,
headers
);
// 传递新的消息
return newRecord;
}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("拦截器3 -- back");
// 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
// 会影响kafka生产者的性能。
}
public void close() {
}
public void configure(Map<String, ?> configs) {
final Object classContent = configs.get("classContent");
System.out.println(classContent);
}
} -
- 在Producer的设置中设置⾃定义的拦截器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52package com.lagou.kafka.demo.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class MyProducer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 保证等待确认的消息只有设置的这几个。如果设置为1,则只有一个请求在等待响应
// 此时可以保证发送消息即使在重试的情况下也是有序的。
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
// configs.put("max.in.flight.requests.per.connection", 1);
// interceptor.classes
// 如果有多个拦截器,则设置为多个拦截器类的全限定类名,中间用逗号隔开
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.lagou.kafka.demo.interceptor.InterceptorOne," +
"com.lagou.kafka.demo.interceptor.InterceptorTwo," +
"com.lagou.kafka.demo.interceptor.InterceptorThree");
configs.put("classContent", "this is lagou's kafka class");
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"tp_inter_01",
0,
1001,
"this is lagou's 1001 message"
);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata.offset());
}
}
});
// 关闭生产者
producer.close();
}
}
原理剖析
由上图可以看出:KafkaProducer有两个基本线程:
-
主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中;
-
消息收集器RecoderAccumulator为每个分区都维护了⼀个 Deque
类型的双端队列。 -
ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低⽹络影响;
-
由于⽣产者客户端使⽤ java.io.ByteBuffer 在发送消息之前进⾏消息保存,并维护了⼀个BufferPool 实现 ByteBuffer 的复⽤;该缓存池只针对特定⼤⼩( batch.size 指定)的 ByteBuffer进⾏管理,对于消息过⼤的缓存,不能做到重复利⽤。
-
每次追加⼀条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取⼀个ProducerBatch,判断当前消息的⼤⼩是否可以写⼊该批次中。若可以写⼊则写⼊;若不可以写⼊,则新建⼀个ProducerBatch,判断该消息⼤⼩是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建⽴新的ProducerBatch,这样⽅便进⾏缓存重复利⽤;若超过,则以计算的消息⼤⼩建⽴对应的 ProducerBatch ,缺点就是该内存不能被复⽤了。
-
-
Sender线程:
-
该线程从消息收集器获取缓存的消息,将其处理为 <Node, List
的形式, Node表示集群的broker节点。 -
进⼀步将<Node, List
转化为<Node, Request>形式,此时才可以向服务端发送数据。 -
在发送之前,Sender线程将消息以 Map<NodeId, Deque
> 的形式保存到InFlightRequests 中进⾏缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压⼒最⼩的⼀个,以实现消息的尽快发出。
-
⽣产者参数配置补充
参数名称 | 描述 |
---|---|
retry.backoff.ms | 在向⼀个指定的主题分区重发消息的时候,重试之间的等待时间。⽐如3次重试,每次重试之后等待该时间⻓度,再接着重试。在⼀些失败的场景,避免了密集循环的重新发送请求。long型值,默认100。可选值:[0,…] |
request.timeout.ms | 客户端等待请求响应的最⼤时⻓。如果服务端响应超时,则会重发请求,除⾮达到重试次数。该设置应该⽐replica.lag.time.max.ms (a broker configuration)要⼤,以免在服务器延迟时间内重发消息。int类型值,默认:30000,可选值:[0,…] |
interceptor.classes | 在⽣产者接收到该消息,向Kafka集群传输之前,由序列化器处理之前,可以通过拦截器对消息进⾏处理。要求拦截器类必须实现org.apache.kafka.clients.producer.ProducerInterceptor 接⼝。默认没有拦截器。 Map<String, Object> configs中通过List集合配置多个拦截器类名。 |
batch.size | 当多个消息发送到同⼀个分区的时候,⽣产者尝试将多个记录作为⼀个批来处理。 批处理提⾼了客户端和服务器的处理效率。该配置项以字节为单位控制默认批的⼤⼩。 所有的批⼩于等于该值。 发送给broker的请求将包含多个批次,每个分区⼀个,并包含可发送的数据。 如果该值设置的⽐较⼩,会限制吞吐量(设置为0会完全禁⽤批处理)。 如果设置的很⼤,⼜有⼀点浪费内存,因为Kafka会永远分配这么⼤的内存来参与到消息的批整合中。 |
client.id | ⽣产者发送请求的时候传递给broker的id字符串。 ⽤于在broker的请求⽇志中追踪什么应⽤发送了什么消息。 ⼀般该id是跟业务有关的字符串。 |
send.buffer.bytes | TCP发送数据的时候使⽤的缓冲区(SO_SNDBUF)⼤⼩。如果设置为0,则使⽤操作系统默认的。 |
buffer.memory | ⽣产者可以⽤来缓存等待发送到服务器的记录的总内存字节。如果记录的发送速度超过了将记录发送到服务器的速度,则⽣产者将阻塞 max.block.ms 的时间,此后它将引发异常。 此设置应⼤致对应于⽣产者将使⽤的总内存,但并⾮⽣产者使⽤的所有内存都⽤于缓冲。 ⼀些额外的内存将⽤于压缩(如果启⽤了压缩)以及维护运⾏中的请求。 long型数据。默认值:33554432,可选值:[0,…] |
connections.max.idle.ms | 当连接空闲时间达到这个值,就关闭连接。long型数据,默认:540000 |
linger.ms | ⽣产者在发送请求传输间隔会对需要发送的消息进⾏累积,然后作为⼀个批次发送。 ⼀般情况是消息的发送的速度⽐消息累积的速度慢。 有时客户端需要减少请求的次数,即使是在发送负载不⼤的情况下。 该配置设置了⼀个延迟,⽣产者不会⽴即将消息发送到broker,⽽是等待这么⼀段时间以累积消息,然后将这段时间之内的消息作为⼀个批次发送。 该设置是批处理的另⼀个上限:⼀旦批消息达到了 batch.size 指定的值,消息批会⽴即发送,如果积累的消息字节数达不到 batch.size 的值,可以设置该毫秒值,等待这么⻓时间之后,也会发送消息批。 该属性默认值是0(没有延迟)。 如果设置 linger.ms=5 ,则在⼀个请求发送之前先等待5ms。long型值,默认:0,可选值:[0,…] |
max.block.ms | 控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 阻塞的时⻓。 当缓存满了或元数据不可⽤的时候,这些⽅法阻塞。在⽤户提供的序列化器和分区器的阻塞时间不计⼊。long型值,默认:60000,可选值:[0,…] |
max.request.size | 单个请求的最⼤字节数。该设置会限制单个请求中消息批的消息个数,以免单个请求发送太多的数据。 服务器有⾃⼰的限制批⼤⼩的设置,与该配置可能不⼀样。 int类型值,默认1048576,可选值:[0,…] |
partitioner.class | 实现了接⼝ org.apache.kafka.clients.producer.Partitioner 的分区器实现类。默认值为:org.apache.kafka.clients.producer.internals.DefaultPartitioner |
receive.buffer.bytes | TCP接收缓存(SO_RCVBUF),如果设置为-1,则使⽤操作系统默认的值。int类型值,默认32768,可选值:[-1,…] |
security.protocol | 跟broker通信的协议:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. string类型值,默认:PLAINTEXT |
max.in.flight.requests.per.connection | 单个连接上未确认请求的最⼤数量。达到这个数量,客户端阻塞。如果该值⼤于1,且存在失败的请求,在重试的时候消息顺序不能保证。 int类型值,默认5。可选值:[1,…] |
reconnect.backoff.max.ms | 对于每个连续的连接失败,每台主机的退避将成倍增加,直⾄达到此最⼤值。在计算退避增量之后,添加20%的随机抖动以避免连接⻛暴。 long型值,默认1000,可选值:[0,…] |
reconnect.backoff.ms | 尝试重连指定主机的基础等待时间。避免了到该主机的密集重连。该退避时间应⽤于该客户端到broker的所有连接。 long型值,默认50。可选值:[0,…] |