Kafka源码之同步发送模式
消息同步发送的代码:
所谓同步,就是调⽤Future的get⽅法同步等待。
send⽅法是异步的:
send⽅法将消息发送给broker,当前线程同步等待broker返回的消息。
send发的实现:
看doSend:
该⽅法⾸先将消息放到累加器中,判断是否需要发起请求,如果需要,则唤醒sender线程发送消息。
该⽅法的返回值:RecordApendResult.future:
RecordApendResult类:
累加器的append⽅法将消息追加到累加器,并返回追加到累加器的结果:
其中主要实现:
tryAppend的实现:
上述⽅法的返回值是FutureRecordMetadata,⽽该类的实现:
上述⽅法中,await⽅法等待broker端返回结果。
result实际上是tryAppend⽅法赋值的produceFuture对象:
produceFuture对象是:
该类中有⼀个CountDownLatch,future的get⽅法中的等待实际上就是该CountDownLatch的等待。
最终我们的producer.send⽅法的返回值就是FutureRecordMetadata对象。
future.get就是在等待该CountDownLatch的countDown的触发:
该⽅法何时调⽤?
completeFutureAndFireCallbacks⽅法调⽤
(Alt+F7 查看元素的使⽤位置)
completeFutureAndFireCallbacks⽅法何时调⽤?
done⽅法何时调⽤?
在completeBatch⽅法的最后,如果batch.done,则释放累加器的空间。
completeBatch⽅法何时调⽤?
在该⽅法中:
completeBatch何时调⽤?
在handleProduceResponse中如果有响应,则解析,并调⽤completeBatch⽅法
如果没有响应,表示是acks=0的情形,不需要解析响应,直接调⽤completeBatch⽅法即可。
handleProduceResponse何时调⽤?
Sender线程创建回调,回调中调⽤了handleProduceResponse⽅法,创建⽣产请求对象,该对象中封装了回调对象
发送请求,等待回调的触发。
sendProduceRequest的调⽤:
sendProducerData的调⽤:
总结:
所谓同步调⽤,指的是⽣产者调⽤ producer.send(record).get() ⽅法。
该⽅法⾸先将要发送的消息发送到消息累加器
判断累加器中的消息批次是否达到,或者当前批次没写满,但是加⼊当前消息会让消息批⼤于消息批最⼤值,则创建新的批次。
如果需要发送消息批次,则唤醒sender线程,让sender线程发送消息。
sender线程会返回⼀个future对象给⽣产者客户端线程。
若⽣产者调⽤该future的get⽅法,则该⽅法使⽤CountDownLatch阻塞,直到收到broker响应,触发CountDownLatch的countDown⽅法
此时⽣产者线程的get⽅法返回,得到发送的结果。