Kafka源码之Producer⽣产者流程
Producer示例
⾸先我们先通过⼀段代码来展示 KafkaProducer 的使⽤⽅法。
在下⾯的示例中,我们使⽤ KafkaProducer 实现 向kafka发送消息的功能。
在示例程序中,⾸先将 KafkaProduce 使⽤的配置写⼊到 Properties 中,每项配置的具体含义在注释中进⾏解释。之后以此 Properties 对象为参数构造 KafkaProducer 对象,最后通过 send ⽅法完成发送,代码中包含同步发送、异步发送两种情况。
1 | public static void main(String[] args) throws ExecutionException, InterruptedException { |
同步发送
- KafkaProducer.send⽅法返回的类型是Future
,通过get⽅法阻塞当前线程,等待kafka服务端ACK响应
1 | producer.send(record).get() |
异步发送
-
第⼀个参数record封装了topic、key、value
-
第⼆个参数是⼀个callback对象,当⽣产者接收到kafka发来的ACK确认消息时,会调⽤此CallBack对象的onComplete⽅法
1 | producer.send(record, (recordMetadata, e) -> { |
KafkaProducer实例化
了解了 KafkaProducer 的基本使⽤,开始深⼊了解的KafkaProducer原理和实现,先看⼀下构造⽅法核⼼逻辑。
1 | private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { |
消息发送过程
Kafka消息实际发送以 send ⽅法为⼊⼝:
1 |
|
拦截器
⾸先⽅法会先进⼊拦截器集合 ProducerInterceptors , onSend ⽅法是遍历拦截器 onSend ⽅法,拦截器的⽬的是将数据处理加⼯, kafka 本身并没有给出默认的拦截器的实现。如果需要使⽤拦截器功能,必须⾃⼰实现ProducerInterceptor 接⼝。
1 | public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { |
拦截器核⼼逻辑
ProducerInterceptor 接⼝包括三个⽅法:
-
- onSend(ProducerRecord):该⽅法封装进KafkaProducer.send⽅法中,即它运⾏在⽤户主线程中的。
Producer确保在消息被序列化以计算分区前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响⽬标分区的计算
-
- onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运⾏在producer的IO线程中,因此不要在该⽅法中放⼊很重的逻辑,否则会拖慢producer的消息发送效率
-
- close:关闭interceptor,主要⽤于执⾏⼀些资源清理⼯作
-
- 拦截器可能被运⾏在多个线程中,因此在具体实现时⽤户需要⾃⾏确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调⽤它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误⽇志中⽽⾮在向上传递。
发送五步骤
1 | private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { |
-
- Producer 通过 waitOnMetadata() ⽅法来获取对应 topic 的 metadata 信息,需要先该 topic 是可⽤的
-
- Producer 端对 record 的 key 和 value 值进⾏序列化操作,在 Consumer 端再进⾏相应的反序列化
-
- 获取partition值,具体分为下⾯三种情况:
-
- 指明 partition 的情况下,直接将指明的值直接作为 partiton 值
-
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进⾏取余得到partition 值
-
- 既没有 partition 值⼜没有 key 值的情况下,第⼀次调⽤时随机⽣成⼀个整数(后⾯每次调⽤在这个整数上⾃增),将这个值与 topic 可⽤的 partition 总数取余得到 partition 值,也就是常说的round-robin 算法
-
- Producer 默认使⽤的 partitioner 是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
-
- 向 accumulator 写数据,先将 record 写⼊到 buffer 中,当达到⼀个 batch.size 的⼤⼩时,再唤起 sender 线程去发送 RecordBatch,这⾥仔细分析⼀下Producer是如何向buffer写⼊数据的
-
- 获取该 topic-partition 对应的 queue,没有的话会创建⼀个空的 queue
-
- 向 queue 中追加数据,先获取 queue 中最新加⼊的那个 RecordBatch,如果不存在或者存在但剩余空余不⾜以添加本条 record 则返回 null,成功写⼊的话直接返回结果,写⼊成功
-
- 创建⼀个新的 RecordBatch,初始化内存⼤⼩根据 max(batch.size, Records.LOG_OVERHEAD + Record.recordSize(key, value)) 来确定(防⽌单条 record 过⼤的情况)
-
- 向新建的 RecordBatch 写⼊ record,并将 RecordBatch 添加到 queue 中,返回结果,写⼊成功
-
- 发送 RecordBatch,当 record 写⼊成功后,如果发现 RecordBatch 已满⾜发送的条件(通常是 queue 中有多个 batch,那么最先添加的那些 batch 肯定是可以发送了),那么就会唤醒 sender 线程,发送RecordBatch 。sender 线程对 RecordBatch 的处理是在 run() ⽅法中进⾏的,该⽅法具体实现如下:
-
- 获取那些已经可以发送的 RecordBatch 对应的 nodes
-
- 如果与node 没有连接(如果可以连接,同时初始化该连接),就证明该 node 暂时不能发送数据,暂时移除该 node
-
- 返回该 node 对应的所有可以发送的 RecordBatch 组成的 batches(key 是 node.id),并将 RecordBatch 从对应的 queue 中移除
-
- 将由于元数据不可⽤⽽导致发送超时的 RecordBatch 移除
-
- 发送 RecordBatch
MetaData更新机制
-
- metadata.requestUpdate() 将 metadata 的 needUpdate 变量设置为 true(强制更新),并返回当前的版本号(version),通过版本号来判断 metadata 是否完成更新
-
- sender.wakeup() 唤醒 sender 线程,sender 线程⼜会去唤醒NetworkClient线程去更新
-
- metadata.awaitUpdate(version, remainingWaitMs) 等待 metadata 的更新
-
- 所以,每次 Producer 请求更新 metadata 时,会有以下⼏种情况:
-
- 如果 node 可以发送请求,则直接发送请求
-
- 如果该 node 正在建⽴连接,则直接返回
-
- 如果该 node 还没建⽴连接,则向 broker 初始化链接
-
- NetworkClient的poll⽅法中判断是否需要更新meta数据, handleCompletedReceives 处理 metadata 的更新,最终是调⽤的 DefaultMetadataUpdater 中的 handleCompletedMetadataResponse ⽅法处理
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 WeiJia_Rao!