消息发送

数据⽣产流程解析


  1. Producer创建时,会创建⼀个Sender线程并设置为守护线程。

  2. ⽣产消息时,内部其实是异步流程;⽣产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。

  3. 批次发送的条件为:缓冲区数据⼤⼩达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。

  4. 批次发送后,发往指定分区,然后落盘到broker;如果⽣产者配置了retrires参数⼤于0并且失败原因允许重试,那么客户端内部会对该消息进⾏重试。

  5. 落盘到broker成功,返回⽣产元数据给⽣产者。

  6. 元数据返回有两种⽅式:⼀种是通过阻塞直接返回,另⼀种是通过回调返回。

必要参数配置

属性 说明 重要性
bootstrap.servers ⽣产者客户端与broker集群建⽴初始连接需要的broker地址列表,由该初始连接发现Kafka集群中其他的所有broker。该地址列表不需要写全部的Kafka集群中broker的地址,但也不要写⼀个,以防该节点宕机的时候不可⽤。形式为: host1:port1,host2:port2,… high
key.serializer 实现了接⼝ org.apache.kafka.common.serialization.Serializer 的key序列化类。 high
value.serializer 实现了接⼝ org.apache.kafka.common.serialization.Serializer 的value序列化类。 high
acks 该选项控制着已发送消息的持久性。
acks=0 :⽣产者不等待broker的任何消息确认。只要将消息放到了socket的缓冲区,就认为消息已发送。不能保证服务器是否收到该消息, retries 设置也不起作⽤,因为客户端不关⼼消息是否发送失败。客户端收到的消息偏移量永远是-1。
acks=1 :leader将记录写到它本地⽇志,就响应客户端确认消息,⽽不等待follower副本的确认。如果leader确认了消息就宕机,则可能会丢失消息,因为follower副本可能还没来得及同步该消息。
acks=all :leader等待所有同步的副本确认该消息。保证了只要有⼀个同步副本存在,消息就不会丢失。这是最强的可⽤性保证。等价于 acks=-1 。
默认值为1,字符串。可选值:[all, -1, 0, 1]
high
compression.type ⽣产者⽣成数据的压缩格式。默认是none(没有压缩)。允许的值: none , gzip , snappy 和 lz4 。压缩是对整个消息批次来讲的。消息批的效率也影响压缩的⽐例。消息批越⼤,压缩效率越好。字符串类型的值。默认是none。 high
retries 设置该属性为⼀个⼤于1的值,将在消息发送失败的时候重新发送消息。该重试与客户端收到异常重新发送并⽆⼆⾄。允许重试但是不设置 max.in.flight.requests.per.connection 为1,存在消息乱序的可能,因为如果两个批次发送到同⼀个分区,第⼀个失败了重试,第⼆个成功了,则第⼀个消息批在第⼆个消息批后。int类型的值,默认:0,可选值:[0,…,2147483647] high

序列化器


由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。

序列化器的作⽤就是⽤于序列化要发送的消息的。

Kafka使⽤ org.apache.kafka.common.serialization.Serializer 接⼝⽤于定义序列化器,将泛型指定类型的数据转换为字节数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package org.apache.kafka.common.serialization;

import java.io.Closeable;
import java.util.Map;

/**
* 将对象转换为byte数组的接⼝
*
* 该接⼝的实现类需要提供⽆参构造器
* @param <T> 从哪个类型转换
*/
public interface Serializer<T> extends Closeable {
/**
* 类的配置信息
* @param configs key/value pairs
* @param isKey key的序列化还是value的序列化
*/
void configure(Map<String, ?> configs, boolean isKey);

/**
* 将对象转换为字节数组
*
* @param topic 主题名称
* @param data 需要转换的对象
* @return 序列化的字节数组
*/
byte[] serialize(String topic, T data);

/**
* 关闭序列化器
* 该⽅法需要提供幂等性,因为可能调⽤多次。
*/
@Override
void close();
}

系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.ByteArraySerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package org.apache.kafka.common.serialization;

import java.util.Map;

public class ByteArraySerializer implements Serializer<byte[]> {
public ByteArraySerializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
}

public byte[] serialize(String topic, byte[] data) {
return data;
}

public void close() {
}
}

系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.ByteBufferSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package org.apache.kafka.common.serialization;

import java.nio.ByteBuffer;
import java.util.Map;

public class ByteBufferSerializer implements Serializer<ByteBuffer> {
public ByteBufferSerializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
}

public byte[] serialize(String topic, ByteBuffer data) {
if (data == null) {
return null;
} else {
data.rewind();
byte[] arr;
if (data.hasArray()) {
arr = data.array();
if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
return arr;
}
}

arr = new byte[data.remaining()];
data.get(arr, 0, arr.length);
data.rewind();
return arr;
}
}

public void close() {
}
}

系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.BytesSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package org.apache.kafka.common.serialization;

import java.util.Map;
import org.apache.kafka.common.utils.Bytes;

public class BytesSerializer implements Serializer<Bytes> {
public BytesSerializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
}

public byte[] serialize(String topic, Bytes data) {
return data == null ? null : data.get();
}

public void close() {
}
}

系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.DoubleSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package org.apache.kafka.common.serialization;

import java.util.Map;

public class DoubleSerializer implements Serializer<Double> {
public DoubleSerializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
}

public byte[] serialize(String topic, Double data) {
if (data == null) {
return null;
} else {
long bits = Double.doubleToLongBits(data);
return new byte[]{(byte)((int)(bits >>> 56)), (byte)((int)(bits >>> 48)), (byte)((int)(bits >>> 40)), (byte)((int)(bits >>> 32)), (byte)((int)(bits >>> 24)), (byte)((int)(bits >>> 16)), (byte)((int)(bits >>> 8)), (byte)((int)bits)};
}
}

public void close() {
}
}

系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.FloatSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package org.apache.kafka.common.serialization;

import java.util.Map;

public class FloatSerializer implements Serializer<Float> {
public FloatSerializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
}

public byte[] serialize(String topic, Float data) {
if (data == null) {
return null;
} else {
long bits = (long)Float.floatToRawIntBits(data);
return new byte[]{(byte)((int)(bits >>> 24)), (byte)((int)(bits >>> 16)), (byte)((int)(bits >>> 8)), (byte)((int)bits)};
}
}

public void close() {
}
}

系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.IntegerSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package org.apache.kafka.common.serialization;

import java.util.Map;

public class IntegerSerializer implements Serializer<Integer> {
public IntegerSerializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
}

public byte[] serialize(String topic, Integer data) {
return data == null ? null : new byte[]{(byte)(data >>> 24), (byte)(data >>> 16), (byte)(data >>> 8), data.byteValue()};
}

public void close() {
}
}

系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.StringSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package org.apache.kafka.common.serialization;

import java.io.UnsupportedEncodingException;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;

public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";

public StringSerializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null) {
encodingValue = configs.get("serializer.encoding");
}

if (encodingValue != null && encodingValue instanceof String) {
this.encoding = (String)encodingValue;
}

}

public byte[] serialize(String topic, String data) {
try {
return data == null ? null : data.getBytes(this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
}
}

public void close() {
}
}

系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.LongSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package org.apache.kafka.common.serialization;

import java.util.Map;

public class LongSerializer implements Serializer<Long> {
public LongSerializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
}

public byte[] serialize(String topic, Long data) {
return data == null ? null : new byte[]{(byte)((int)(data >>> 56)), (byte)((int)(data >>> 48)), (byte)((int)(data >>> 40)), (byte)((int)(data >>> 32)), (byte)((int)(data >>> 24)), (byte)((int)(data >>> 16)), (byte)((int)(data >>> 8)), data.byteValue()};
}

public void close() {
}
}

系统提供了该接⼝的⼦接⼝以及实现类:org.apache.kafka.common.serialization.ShortSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package org.apache.kafka.common.serialization;

import java.util.Map;

public class ShortSerializer implements Serializer<Short> {
public ShortSerializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
}

public byte[] serialize(String topic, Short data) {
return data == null ? null : new byte[]{(byte)(data >>> 8), data.byteValue()};
}

public void close() {
}
}

⾃定义序列化器

数据的序列化⼀般⽣产中使⽤avro。

⾃定义序列化器需要实现org.apache.kafka.common.serialization.Serializer接⼝,并实现其中的serialize ⽅法。

    1. 创建实体类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    package com.lagou.kafka.demo.entity;

    /**
    * 用户自定义的封装消息的实体类
    */
    public class User {
    private Integer userId;
    private String username;

    public Integer getUserId() {
    return userId;
    }

    public void setUserId(Integer userId) {
    this.userId = userId;
    }

    public String getUsername() {
    return username;
    }

    public void setUsername(String username) {
    this.username = username;
    }
    }
    1. 创建序列化类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    package com.lagou.kafka.demo.serialization;

    import com.lagou.kafka.demo.entity.User;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Serializer;

    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.util.Map;

    public class UserSerializer implements Serializer<User> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    // do nothing
    // 用于接收对序列化器的配置参数,并对当前序列化器进行配置和初始化的
    }

    @Override
    public byte[] serialize(String topic, User data) {
    try {
    if (data == null) {
    return null;
    } else {
    final Integer userId = data.getUserId();
    final String username = data.getUsername();

    if (userId != null) {
    if (username != null) {
    final byte[] bytes = username.getBytes("UTF-8");
    int length = bytes.length;
    // 第一个4个字节用于存储userId的值
    // 第二个4个字节用于存储username字节数组的长度int值
    // 第三个长度,用于存放username序列化之后的字节数组
    ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
    // 设置userId
    buffer.putInt(userId);
    // 设置username字节数组长度
    buffer.putInt(length);
    // 设置username字节数组
    buffer.put(bytes);
    // 以字节数组形式返回user对象的值
    return buffer.array();
    }
    }
    }
    } catch (Exception e) {
    throw new SerializationException("数据序列化失败");
    }
    return null;
    }

    @Override
    public void close() {
    // do nothing
    // 用于关闭资源等操作。需要幂等,即多次调用,效果是一样的。
    }
    }
    1. 创建⽣产者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    package com.lagou.kafka.demo.producer;

    import com.lagou.kafka.demo.entity.User;
    import com.lagou.kafka.demo.serialization.UserSerializer;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.util.HashMap;
    import java.util.Map;

    public class MyProducer {
    public static void main(String[] args) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // 设置自定义的序列化器
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);

    KafkaProducer<String, User> producer = new KafkaProducer<String, User>(configs);

    User user = new User();
    // user.setUserId(1001);
    // user.setUsername("张三");
    // user.setUsername("李四");
    // user.setUsername("王五");
    user.setUserId(400);
    user.setUsername("赵四");

    ProducerRecord<String, User> record = new ProducerRecord<String, User>(
    "tp_user_01", // topic
    user.getUsername(), // key
    user // value
    );


    producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception != null) {
    System.out.println("消息发送异常");
    } else {
    System.out.println("主题:" + metadata.topic() + "\t"
    + "分区:" + metadata.partition() + "\t"
    + "生产者偏移量:" + metadata.offset());
    }
    }
    });

    // 关闭生产者
    producer.close();

    }
    }

分区器


默认(DefaultPartitioner)分区计算:

  1. 如果record提供了分区号,则使⽤record提供的分区号

  2. 如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模

  3. 如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号。



kafka 使⽤ org.apache.kafka.clients.producer.Partitioner 接⼝⽤于定义分区器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Cluster;
import java.io.Closeable;

/**
* 分区器接⼝
*/
public interface Partitioner extends Configurable, Closeable {
/**
* 为指定的消息记录计算分区值
*
* @param topic 主题名称
* @param key 根据该key的值进⾏分区计算,如果没有则为null。
* @param keyBytes key的序列化字节数组,根据该数组进⾏分区计算。如果没有key,则为null
* @param value 根据value值进⾏分区计算,如果没有,则为null
* @param valueBytes value的序列化字节数组,根据此值进⾏分区计算。如果没有,则为null
* @param cluster 当前集群的元数据
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value,byte[] valueBytes, Cluster cluster);

/**
* 关闭分区器的时候调⽤该⽅法
*/
public void close();
}

系统提供了该接⼝的默认⼦接⼝以及实现类:org.apache.kafka.clients.producer.internals.DefaultPartitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package org.apache.kafka.clients.producer.internals;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

/**
* 默认的分区策略:
*
* 如果在记录中指定了分区,则使⽤指定的分区
* 如果没有指定分区,但是有key的值,则使⽤key值的散列值计算分区
* 如果没有指定分区也没有key的值,则使⽤轮询的⽅式选择⼀个分区
*/
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

public void configure(Map<String, ?> configs) {}

/**
* 为指定的消息记录计算分区值
*
* @param topic 主题名称
* @param key 根据该key的值进⾏分区计算,如果没有则为null。
* @param keyBytes key的序列化字节数组,根据该数组进⾏分区计算。如果没有key,则为null
* @param value 根据value值进⾏分区计算,如果没有,则为null
* @param valueBytes value的序列化字节数组,根据此值进⾏分区计算。如果没有,则为null
* @param cluster 当前集群的元数据
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value,byte[] valueBytes, Cluster cluster) {
// 获取指定主题的所有分区信息
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 分区的数量
int numPartitions = partitions.size();
// 如果没有提供key
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions =
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
// 如果有,就计算keyBytes的哈希值,然后对当前主题的个数取模
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic,counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}

public void close() {}
}

⾃定义分区器

    1. 实现Partitioner接⼝⾃定义分区器:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    package com.lagou.kafka.demo.partitioner;

    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;

    import java.util.Map;

    /**
    * 自定义分区器
    */
    public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    // 此处可以计算分区的数字。
    // 我们直接指定为2
    return 2;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
    }
    1. 在⽣产者中配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package com.lagou.kafka.demo.producer;

    import com.lagou.kafka.demo.partitioner.MyPartitioner;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.util.HashMap;
    import java.util.Map;

    public class MyProducer {
    public static void main(String[] args) {

    Map<String, Object> configs = new HashMap<>();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    // 指定自定义的分区器
    configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);

    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

    // 此处不要设置partition的值
    ProducerRecord<String, String> record = new ProducerRecord<String, String>(
    "tp_part_01",
    "mykey",
    "myvalue"
    );

    producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception != null) {
    System.out.println("消息发送失败");
    } else {
    System.out.println(metadata.topic());
    System.out.println(metadata.partition());
    System.out.println(metadata.offset());
    }
    }
    });

    // 关闭生产者
    producer.close();

    }
    }

拦截器


Producer端interceptor和Consumer端Interceptor是在Kafka 0.10版本被引⼊的,主要⽤于实现Client端的定制化控制逻辑。

对于Producer⽽⾔,Interceptor使得⽤户在消息发送前以及Producer回调逻辑前有机会对消息做⼀些定制化需求,⽐如修改消息等。同时,Producer允许⽤户指定多个Interceptor按序作⽤于同⼀条消息从⽽形成⼀个拦截链(interceptor chain)。

Intercetpor的实现接⼝是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的⽅法包括:

onSend(ProducerRecord):该⽅法封装进KafkaProducer.send⽅法中,即运⾏在⽤户主线程中。Producer确保在消息被序列化以计算分区前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响⽬标分区的计算。

onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运⾏在Producer的IO线程中,因此不要在该⽅法中放⼊很重的逻辑,否则会拖慢Producer的消息发送效率。

close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。

如前所述,Interceptor可能被运⾏在多个线程中,因此在具体实现时⽤户需要⾃⾏确保线程安全。另外倘若指定了多个Interceptor,则Producer将按照指定顺序调⽤它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误⽇志中⽽⾮在向上传递。这在使⽤过程中要特别留意。

⾃定义拦截器

    1. 实现ProducerInterceptor接⼝
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    package com.lagou.kafka.demo.interceptor;

    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.header.Headers;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.util.Map;

    public class InterceptorOne implements ProducerInterceptor<Integer, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);

    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
    System.out.println("拦截器1 -- go");
    // 消息发送的时候,经过拦截器,调用该方法

    // 要发送的消息内容
    final String topic = record.topic();
    final Integer partition = record.partition();
    final Integer key = record.key();
    final String value = record.value();
    final Long timestamp = record.timestamp();
    final Headers headers = record.headers();

    // 拦截器拦下来之后根据原来消息创建的新的消息
    // 此处对原消息没有做任何改动
    ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
    topic,
    partition,
    timestamp,
    key,
    value,
    headers
    );
    // 传递新的消息
    return newRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    System.out.println("拦截器1 -- back");
    // 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
    // 会影响kafka生产者的性能。
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
    final Object classContent = configs.get("classContent");
    System.out.println(classContent);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    package com.lagou.kafka.demo.interceptor;

    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.header.Headers;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.util.Map;

    public class InterceptorTwo implements ProducerInterceptor<Integer, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);

    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
    System.out.println("拦截器2 -- go");
    // 消息发送的时候,经过拦截器,调用该方法

    // 要发送的消息内容
    final String topic = record.topic();
    final Integer partition = record.partition();
    final Integer key = record.key();
    final String value = record.value();
    final Long timestamp = record.timestamp();
    final Headers headers = record.headers();

    // 拦截器拦下来之后根据原来消息创建的新的消息
    // 此处对原消息没有做任何改动
    ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
    topic,
    partition,
    timestamp,
    key,
    value,
    headers
    );
    // 传递新的消息
    return newRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    System.out.println("拦截器2 -- back");
    // 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
    // 会影响kafka生产者的性能。
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
    final Object classContent = configs.get("classContent");
    System.out.println(classContent);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    package com.lagou.kafka.demo.interceptor;

    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.header.Headers;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.util.Map;

    public class InterceptorThree implements ProducerInterceptor<Integer, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorThree.class);

    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
    System.out.println("拦截器3 -- go");
    // 消息发送的时候,经过拦截器,调用该方法

    // 要发送的消息内容
    final String topic = record.topic();
    final Integer partition = record.partition();
    final Integer key = record.key();
    final String value = record.value();
    final Long timestamp = record.timestamp();
    final Headers headers = record.headers();

    // 拦截器拦下来之后根据原来消息创建的新的消息
    // 此处对原消息没有做任何改动
    ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
    topic,
    partition,
    timestamp,
    key,
    value,
    headers
    );
    // 传递新的消息
    return newRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    System.out.println("拦截器3 -- back");
    // 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
    // 会影响kafka生产者的性能。
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
    final Object classContent = configs.get("classContent");
    System.out.println(classContent);
    }
    }
    1. 在Producer的设置中设置⾃定义的拦截器
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    package com.lagou.kafka.demo.producer;

    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.IntegerSerializer;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.util.HashMap;
    import java.util.Map;

    public class MyProducer {
    public static void main(String[] args) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    // 保证等待确认的消息只有设置的这几个。如果设置为1,则只有一个请求在等待响应
    // 此时可以保证发送消息即使在重试的情况下也是有序的。
    configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    // configs.put("max.in.flight.requests.per.connection", 1);

    // interceptor.classes
    // 如果有多个拦截器,则设置为多个拦截器类的全限定类名,中间用逗号隔开
    configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.lagou.kafka.demo.interceptor.InterceptorOne," +
    "com.lagou.kafka.demo.interceptor.InterceptorTwo," +
    "com.lagou.kafka.demo.interceptor.InterceptorThree");

    configs.put("classContent", "this is lagou's kafka class");

    KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);

    ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
    "tp_inter_01",
    0,
    1001,
    "this is lagou's 1001 message"
    );

    producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception == null) {
    System.out.println(metadata.offset());
    }
    }
    });

    // 关闭生产者
    producer.close();

    }
    }

原理剖析


由上图可以看出:KafkaProducer有两个基本线程:

  • 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中;

    • 消息收集器RecoderAccumulator为每个分区都维护了⼀个 Deque 类型的双端队列。

    • ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低⽹络影响;

    • 由于⽣产者客户端使⽤ java.io.ByteBuffer 在发送消息之前进⾏消息保存,并维护了⼀个BufferPool 实现 ByteBuffer 的复⽤;该缓存池只针对特定⼤⼩( batch.size 指定)的 ByteBuffer进⾏管理,对于消息过⼤的缓存,不能做到重复利⽤。

    • 每次追加⼀条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取⼀个ProducerBatch,判断当前消息的⼤⼩是否可以写⼊该批次中。若可以写⼊则写⼊;若不可以写⼊,则新建⼀个ProducerBatch,判断该消息⼤⼩是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建⽴新的ProducerBatch,这样⽅便进⾏缓存重复利⽤;若超过,则以计算的消息⼤⼩建⽴对应的 ProducerBatch ,缺点就是该内存不能被复⽤了。

  • Sender线程:

    • 该线程从消息收集器获取缓存的消息,将其处理为 <Node, List 的形式, Node表示集群的broker节点。

    • 进⼀步将<Node, List转化为<Node, Request>形式,此时才可以向服务端发送数据。

    • 在发送之前,Sender线程将消息以 Map<NodeId, Deque> 的形式保存到InFlightRequests 中进⾏缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压⼒最⼩的⼀个,以实现消息的尽快发出。

⽣产者参数配置补充

参数名称 描述
retry.backoff.ms 在向⼀个指定的主题分区重发消息的时候,重试之间的等待时间。⽐如3次重试,每次重试之后等待该时间⻓度,再接着重试。在⼀些失败的场景,避免了密集循环的重新发送请求。long型值,默认100。可选值:[0,…]
request.timeout.ms 客户端等待请求响应的最⼤时⻓。如果服务端响应超时,则会重发请求,除⾮达到重试次数。该设置应该⽐replica.lag.time.max.ms (a broker configuration)要⼤,以免在服务器延迟时间内重发消息。int类型值,默认:30000,可选值:[0,…]
interceptor.classes 在⽣产者接收到该消息,向Kafka集群传输之前,由序列化器处理之前,可以通过拦截器对消息进⾏处理。要求拦截器类必须实现org.apache.kafka.clients.producer.ProducerInterceptor 接⼝。默认没有拦截器。
Map<String, Object> configs中通过List集合配置多个拦截器类名。
batch.size 当多个消息发送到同⼀个分区的时候,⽣产者尝试将多个记录作为⼀个批来处理。
批处理提⾼了客户端和服务器的处理效率。该配置项以字节为单位控制默认批的⼤⼩。
所有的批⼩于等于该值。
发送给broker的请求将包含多个批次,每个分区⼀个,并包含可发送的数据。
如果该值设置的⽐较⼩,会限制吞吐量(设置为0会完全禁⽤批处理)。
如果设置的很⼤,⼜有⼀点浪费内存,因为Kafka会永远分配这么⼤的内存来参与到消息的批整合中。
client.id ⽣产者发送请求的时候传递给broker的id字符串。
⽤于在broker的请求⽇志中追踪什么应⽤发送了什么消息。
⼀般该id是跟业务有关的字符串。
send.buffer.bytes TCP发送数据的时候使⽤的缓冲区(SO_SNDBUF)⼤⼩。如果设置为0,则使⽤操作系统默认的。
buffer.memory ⽣产者可以⽤来缓存等待发送到服务器的记录的总内存字节。如果记录的发送速度超过了将记录发送到服务器的速度,则⽣产者将阻塞 max.block.ms 的时间,此后它将引发异常。
此设置应⼤致对应于⽣产者将使⽤的总内存,但并⾮⽣产者使⽤的所有内存都⽤于缓冲。
⼀些额外的内存将⽤于压缩(如果启⽤了压缩)以及维护运⾏中的请求。
long型数据。默认值:33554432,可选值:[0,…]
connections.max.idle.ms 当连接空闲时间达到这个值,就关闭连接。long型数据,默认:540000
linger.ms ⽣产者在发送请求传输间隔会对需要发送的消息进⾏累积,然后作为⼀个批次发送。
⼀般情况是消息的发送的速度⽐消息累积的速度慢。
有时客户端需要减少请求的次数,即使是在发送负载不⼤的情况下。
该配置设置了⼀个延迟,⽣产者不会⽴即将消息发送到broker,⽽是等待这么⼀段时间以累积消息,然后将这段时间之内的消息作为⼀个批次发送。
该设置是批处理的另⼀个上限:⼀旦批消息达到了 batch.size 指定的值,消息批会⽴即发送,如果积累的消息字节数达不到 batch.size 的值,可以设置该毫秒值,等待这么⻓时间之后,也会发送消息批。
该属性默认值是0(没有延迟)。
如果设置 linger.ms=5 ,则在⼀个请求发送之前先等待5ms。long型值,默认:0,可选值:[0,…]
max.block.ms 控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 阻塞的时⻓。
当缓存满了或元数据不可⽤的时候,这些⽅法阻塞。在⽤户提供的序列化器和分区器的阻塞时间不计⼊。long型值,默认:60000,可选值:[0,…]
max.request.size 单个请求的最⼤字节数。该设置会限制单个请求中消息批的消息个数,以免单个请求发送太多的数据。
服务器有⾃⼰的限制批⼤⼩的设置,与该配置可能不⼀样。
int类型值,默认1048576,可选值:[0,…]
partitioner.class 实现了接⼝ org.apache.kafka.clients.producer.Partitioner 的分区器实现类。默认值为:org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes TCP接收缓存(SO_RCVBUF),如果设置为-1,则使⽤操作系统默认的值。int类型值,默认32768,可选值:[-1,…]
security.protocol 跟broker通信的协议:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
string类型值,默认:PLAINTEXT
max.in.flight.requests.per.connection 单个连接上未确认请求的最⼤数量。达到这个数量,客户端阻塞。如果该值⼤于1,且存在失败的请求,在重试的时候消息顺序不能保证。
int类型值,默认5。可选值:[1,…]
reconnect.backoff.max.ms 对于每个连续的连接失败,每台主机的退避将成倍增加,直⾄达到此最⼤值。在计算退避增量之后,添加20%的随机抖动以避免连接⻛暴。
long型值,默认1000,可选值:[0,…]
reconnect.backoff.ms 尝试重连指定主机的基础等待时间。避免了到该主机的密集重连。该退避时间应⽤于该客户端到broker的所有连接。
long型值,默认50。可选值:[0,…]