Producer示例

⾸先我们先通过⼀段代码来展示 KafkaProducer 的使⽤⽅法。

在下⾯的示例中,我们使⽤ KafkaProducer 实现 向kafka发送消息的功能。

在示例程序中,⾸先将 KafkaProduce 使⽤的配置写⼊到 Properties 中,每项配置的具体含义在注释中进⾏解释。之后以此 Properties 对象为参数构造 KafkaProducer 对象,最后通过 send ⽅法完成发送,代码中包含同步发送、异步发送两种情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
// 客户端id
props.put("client.id", "KafkaProducerDemo");
// kafka地址,列表格式为host1:port1,host2:port2,…,⽆需添加所有的集群地址,kafka会根据提供的地址发现其他的地址(建议多提供⼏个,以防提供的服务器关闭)
props.put("bootstrap.servers", "localhost:9092");
// 发送返回应答⽅式
// 0:Producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最⾼。
// 1:Producer 往集群发送数据只要 Leader 应答就可以发送下⼀条,只确保Leader接收成功。
// -1或者all:Producer 往集群发送数据需要所有的ISR Follower都完成从Leader的同步才会发送下⼀条,确保Leader发送成功和所有的副本都成功接收。安全性最⾼,但是效率最低。
props.put("acks", "all");
// 重试次数
props.put("retries", 0);
// 重试间隔时间
props.put("retries.backoff.ms", 100);
// 批量发送的⼤⼩
props.put("batch.size", 16384);
// ⼀个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去
props.put("linger.ms", 10);
// 缓冲区⼤⼩
props.put("buffer.memory", 33554432);
// key序列化⽅式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化⽅式
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// topic
String topic = "lagou_edu";
Producer<String, String> producer = new KafkaProducer<>(props);
AtomicInteger count = new AtomicInteger();
while (true) {
int num = count.get();
String key = Integer.toString(num);
String value = Integer.toString(num);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
if (num % 2 == 0) {
// 偶数异步发送
// 第⼀个参数record封装了topic、key、value
// 第⼆个参数是⼀个callback对象,当⽣产者接收到kafka发来的ACK确认消息时,会调⽤此CallBack对象的onComplete⽅法
producer.send(record, (recordMetadata, e) -> {
System.out.println("num:" + num + " topic:" + recordMetadata.topic() + " offset:" + recordMetadata.offset());
});
} else {
// 同步发送
// KafkaProducer.send⽅法返回的类型是Future<RecordMetadata>,通过get⽅法阻塞当前线程,等待kafka服务端ACK响应
producer.send(record).get();
}
count.incrementAndGet();
TimeUnit.MILLISECONDS.sleep(100);
}
}

同步发送

  1. KafkaProducer.send⽅法返回的类型是Future,通过get⽅法阻塞当前线程,等待kafka服务端ACK响应
1
producer.send(record).get()

异步发送

  1. 第⼀个参数record封装了topic、key、value

  2. 第⼆个参数是⼀个callback对象,当⽣产者接收到kafka发来的ACK确认消息时,会调⽤此CallBack对象的onComplete⽅法

1
2
3
producer.send(record, (recordMetadata, e) -> {
System.out.println("num:" + num + " topic:" + recordMetadata.topic() + " offset:" + recordMetadata.offset());
});

KafkaProducer实例化

了解了 KafkaProducer 的基本使⽤,开始深⼊了解的KafkaProducer原理和实现,先看⼀下构造⽅法核⼼逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
try {
// 获取⽤户的配置
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
// 系统时间
this.time = Time.SYSTEM;
// 获取client.id配置
String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
// 如果client.id为空,设置默认值:producer-1
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
// 获取事务id,如果没有配置则为null
String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ? (String)userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
LogContext logContext;
if (transactionalId == null)
logContext = new LogContext(String.format("[Producer clientId=%s] ",clientId));
else
logContext = new LogContext(String.format("[Producer clientId=%s,transactionalId=%s] ", clientId, transactionalId));
log = logContext.logger(KafkaProducer.class);
log.trace("Starting the Kafka producer");
// 创建client-id的监控map
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
// 设置监控配置,包含样本量、取样时间窗⼝、记录级别
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)).timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),TimeUnit.MILLISECONDS).recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))).tags(metricTags);
// 监控数据上报类
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
// ⽣成⽣产者监控
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
// 分区类
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
// 重试时间 retry.backoff.ms 默认100ms
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
if (keySerializer == null) {
// 反射⽣成key序列化⽅式
this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class));
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = ensureExtended(keySerializer);
}
if (valueSerializer == null) {
// 反射⽣成key序列化⽅式
this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class));
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = ensureExtended(valueSerializer);
}
// load interceptors and make sure they get clientId
// 确认client.id添加到⽤户的配置⾥⾯
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
// 获取多个拦截器,为空则不处理
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

// 集群资源监听器,在元数据变更时会有通知
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);

// ⽣产者每隔⼀段时间都要去更新⼀下集群的元数据,默认5分钟
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, true, clusterResourceListeners);

// ⽣产者往服务端发送消息的时候,规定⼀条消息最⼤多⼤?
// 如果你超过了这个规定消息的⼤⼩,你的消息就不能发送过去。
// 默认是1M,这个值偏⼩,在⽣产环境中,我们需要修改这个值。
// 经验值是10M。但是⼤家也可以根据⾃⼰公司的情况来。
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
//指的是缓存⼤⼩
//默认值是32M,这个值⼀般是够⽤,如果有特殊情况的时候,我们可以去修改这个值。
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
// kafka是⽀持压缩数据的,可以设置压缩格式,默认是不压缩,⽀持gzip、snappy、lz4
// ⼀次发送出去的消息就更多。⽣产者这⼉会消耗更多的cpu.
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
// 配置控制了KafkaProducer.send()并将KafkaProducer.partitionsFor()被阻塞多⻓时间,由于缓冲区已满或元数据不可⽤,这些⽅法可能会被阻塞⽌
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
// 控制客户端等待请求响应的最⻓时间。如果在超时过去之前未收到响应,客户端将在必要时重新发送请求,或者如果重试耗尽,请求失败
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
// 事务管理器
this.transactionManager = configureTransactionState(config, logContext, log);
// 重试次数
int retries = configureRetries(config, transactionManager != null, log);
// 使⽤幂等性,需要将 enable.idempotence 配置项设置为true。并且它对单个分区的发送,⼀次性最多发送5条
int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
// 如果开启了幂等性,但是⽤户指定的ack不为 -1,则会抛出异常
short acks = configureAcks(config, transactionManager != null, log);
this.apiVersions = new ApiVersions();
// 创建核⼼组件:记录累加器
this.accumulator = new RecordAccumulator(logContext, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, metrics, time, apiVersions, transactionManager);
// 获取broker地址列表
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
// 更新元数据
this.metadata.update(Cluster.bootstrap(addresses), Collections. <String>emptySet(), time.milliseconds());
// 创建通道,是否需要加密
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
// 初始化了⼀个重要的管理⽹路的组件
// connections.max.idle.ms: 默认值是9分钟, ⼀个⽹络连接最多空闲多久,超过这个空闲时间,就关闭这个⽹络连接。
// max.in.flight.requests.per.connection:默认是5, producer向broker发送数据的时候,其实是有多个⽹络连接。每个⽹络连接可以忍受 producer端发送给broker 消息然后消息没有响应的个数
NetworkClient client = new NetworkClient(new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder, logContext), this.metadata, clientId, maxInflightRequests, config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), this.requestTimeoutMs, time, true, apiVersions, throttleTimeSensor, logContext);
// 发送线程
this.sender = new Sender(logContext, client, this.metadata, this.accumulator, maxInflightRequests == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM, this.requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions);
// 线程名称
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// 启动守护线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
// 把⽤户配置的参数,但是没有⽤到的打印出来
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
}

消息发送过程

Kafka消息实际发送以 send ⽅法为⼊⼝:

1
2
3
4
5
6
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}

拦截器

⾸先⽅法会先进⼊拦截器集合 ProducerInterceptors , onSend ⽅法是遍历拦截器 onSend ⽅法,拦截器的⽬的是将数据处理加⼯, kafka 本身并没有给出默认的拦截器的实现。如果需要使⽤拦截器功能,必须⾃⼰实现ProducerInterceptor 接⼝。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
// 遍历所有拦截器,顺序执⾏,如果有异常只打印⽇志,不会向上抛出
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue calling other interceptors
// be careful not to throw exception from here
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
else
log.warn("Error executing interceptor onSend callback", e);
}
}
return interceptRecord;
}
}

拦截器核⼼逻辑

ProducerInterceptor 接⼝包括三个⽅法:

    1. onSend(ProducerRecord):该⽅法封装进KafkaProducer.send⽅法中,即它运⾏在⽤户主线程中的。

    Producer确保在消息被序列化以计算分区前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响⽬标分区的计算

    1. onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运⾏在producer的IO线程中,因此不要在该⽅法中放⼊很重的逻辑,否则会拖慢producer的消息发送效率
    1. close:关闭interceptor,主要⽤于执⾏⼀些资源清理⼯作
    1. 拦截器可能被运⾏在多个线程中,因此在具体实现时⽤户需要⾃⾏确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调⽤它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误⽇志中⽽⾮在向上传递。

发送五步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
// ⾸先创建⼀个主题分区类
TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
// ⾸先确保该topic的元数据可⽤
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(),record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
// 序列化 record 的 key 和 value
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(),
record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " +
record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(),
record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce);
}
// 获取该 record 要发送到的 partition
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
// 给header设置只读
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
// 向 accumulator 中追加 record 数据,数据会先进⾏缓存
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);
// 如果追加完数据后,对应的 RecordBatch 已经达到了 batch.size 的⼤⼩(或者batch的剩余空间不⾜以添加下⼀条 Record),则唤醒 sender 线程发送数据。
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
    1. Producer 通过 waitOnMetadata() ⽅法来获取对应 topic 的 metadata 信息,需要先该 topic 是可⽤的
    1. Producer 端对 record 的 key 和 value 值进⾏序列化操作,在 Consumer 端再进⾏相应的反序列化
    1. 获取partition值,具体分为下⾯三种情况:
      1. 指明 partition 的情况下,直接将指明的值直接作为 partiton 值
      1. 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进⾏取余得到partition 值
      1. 既没有 partition 值⼜没有 key 值的情况下,第⼀次调⽤时随机⽣成⼀个整数(后⾯每次调⽤在这个整数上⾃增),将这个值与 topic 可⽤的 partition 总数取余得到 partition 值,也就是常说的round-robin 算法
      1. Producer 默认使⽤的 partitioner 是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
    1. 向 accumulator 写数据,先将 record 写⼊到 buffer 中,当达到⼀个 batch.size 的⼤⼩时,再唤起 sender 线程去发送 RecordBatch,这⾥仔细分析⼀下Producer是如何向buffer写⼊数据的
      1. 获取该 topic-partition 对应的 queue,没有的话会创建⼀个空的 queue
      1. 向 queue 中追加数据,先获取 queue 中最新加⼊的那个 RecordBatch,如果不存在或者存在但剩余空余不⾜以添加本条 record 则返回 null,成功写⼊的话直接返回结果,写⼊成功
      1. 创建⼀个新的 RecordBatch,初始化内存⼤⼩根据 max(batch.size, Records.LOG_OVERHEAD + Record.recordSize(key, value)) 来确定(防⽌单条 record 过⼤的情况)
      1. 向新建的 RecordBatch 写⼊ record,并将 RecordBatch 添加到 queue 中,返回结果,写⼊成功
    1. 发送 RecordBatch,当 record 写⼊成功后,如果发现 RecordBatch 已满⾜发送的条件(通常是 queue 中有多个 batch,那么最先添加的那些 batch 肯定是可以发送了),那么就会唤醒 sender 线程,发送RecordBatch 。sender 线程对 RecordBatch 的处理是在 run() ⽅法中进⾏的,该⽅法具体实现如下:
      1. 获取那些已经可以发送的 RecordBatch 对应的 nodes
      1. 如果与node 没有连接(如果可以连接,同时初始化该连接),就证明该 node 暂时不能发送数据,暂时移除该 node
      1. 返回该 node 对应的所有可以发送的 RecordBatch 组成的 batches(key 是 node.id),并将 RecordBatch 从对应的 queue 中移除
      1. 将由于元数据不可⽤⽽导致发送超时的 RecordBatch 移除
      1. 发送 RecordBatch

MetaData更新机制

    1. metadata.requestUpdate() 将 metadata 的 needUpdate 变量设置为 true(强制更新),并返回当前的版本号(version),通过版本号来判断 metadata 是否完成更新
    1. sender.wakeup() 唤醒 sender 线程,sender 线程⼜会去唤醒NetworkClient线程去更新
    1. metadata.awaitUpdate(version, remainingWaitMs) 等待 metadata 的更新
    1. 所以,每次 Producer 请求更新 metadata 时,会有以下⼏种情况:
      1. 如果 node 可以发送请求,则直接发送请求
      1. 如果该 node 正在建⽴连接,则直接返回
      1. 如果该 node 还没建⽴连接,则向 broker 初始化链接
    1. NetworkClient的poll⽅法中判断是否需要更新meta数据, handleCompletedReceives 处理 metadata 的更新,最终是调⽤的 DefaultMetadataUpdater 中的 handleCompletedMetadataResponse ⽅法处理