Kafka源码之同步发送模式
消息同步发送的代码:
所谓同步,就是调⽤Future的get⽅法同步等待。
data:image/s3,"s3://crabby-images/42dfb/42dfbe036bb863ab31a5be39e025d7c76cf5d0b3" alt=""
send⽅法是异步的:
data:image/s3,"s3://crabby-images/77b20/77b2049ca6b8ffe388b229c5f601d695e04c9c6b" alt=""
send⽅法将消息发送给broker,当前线程同步等待broker返回的消息。
send发的实现:
data:image/s3,"s3://crabby-images/eccf7/eccf7c8b129019340f69f521fdf277fe24ac7438" alt=""
看doSend:
data:image/s3,"s3://crabby-images/91a4e/91a4ec92ce718e0955f63f783ebbacc11d762860" alt=""
该⽅法⾸先将消息放到累加器中,判断是否需要发起请求,如果需要,则唤醒sender线程发送消息。
该⽅法的返回值:RecordApendResult.future:
data:image/s3,"s3://crabby-images/aa8a4/aa8a40237185714637a20cf447ef6fd6ce972f83" alt=""
RecordApendResult类:
data:image/s3,"s3://crabby-images/e7744/e77442d7255f297310d8dbe1b5ed7a10bba6dc33" alt=""
累加器的append⽅法将消息追加到累加器,并返回追加到累加器的结果:
data:image/s3,"s3://crabby-images/3f947/3f94775fb7b51033be097cb1a9a693036e8171f5" alt=""
其中主要实现:
data:image/s3,"s3://crabby-images/20a29/20a2929430d5c4d902260f3156062ee03c880ec9" alt=""
tryAppend的实现:
data:image/s3,"s3://crabby-images/c2440/c2440ae6a7646c4621d8331f4b8fb30462506af5" alt=""
上述⽅法的返回值是FutureRecordMetadata,⽽该类的实现:
data:image/s3,"s3://crabby-images/49521/495210293eda00b636481d7b2a4e1d90327ea06e" alt=""
上述⽅法中,await⽅法等待broker端返回结果。
result实际上是tryAppend⽅法赋值的produceFuture对象:
data:image/s3,"s3://crabby-images/5385e/5385e7acc274b5a3694af73de0acd73718fc5738" alt=""
produceFuture对象是:
data:image/s3,"s3://crabby-images/914e6/914e6eb3a5338b087c4373cb1c8e28d33e92d44b" alt=""
该类中有⼀个CountDownLatch,future的get⽅法中的等待实际上就是该CountDownLatch的等待。
最终我们的producer.send⽅法的返回值就是FutureRecordMetadata对象。
future.get就是在等待该CountDownLatch的countDown的触发:
data:image/s3,"s3://crabby-images/758f4/758f4c32e28cdf4d6c0c48832d3e90e3479eea3a" alt=""
该⽅法何时调⽤?
completeFutureAndFireCallbacks⽅法调⽤
data:image/s3,"s3://crabby-images/9f1ed/9f1ede6bef1e66f264b37ffd38ca9a45afbf89f0" alt=""
(Alt+F7 查看元素的使⽤位置)
completeFutureAndFireCallbacks⽅法何时调⽤?
data:image/s3,"s3://crabby-images/1231d/1231dbf4119b68a1e87b663e6fcd38a51b488c15" alt=""
done⽅法何时调⽤?
data:image/s3,"s3://crabby-images/a03e5/a03e5bbd76140a35448fc59453cd8d954e122b6d" alt=""
在completeBatch⽅法的最后,如果batch.done,则释放累加器的空间。
completeBatch⽅法何时调⽤?
data:image/s3,"s3://crabby-images/87602/876025ef3cfbda0f0f6b8d08460f6691f6cca4af" alt=""
在该⽅法中:
data:image/s3,"s3://crabby-images/57c71/57c7185680a1598c6fed29f016dd22f75502278b" alt=""
completeBatch何时调⽤?
data:image/s3,"s3://crabby-images/7cd7a/7cd7a4cf7927159fe8a2524d020e567086cc1f61" alt=""
在handleProduceResponse中如果有响应,则解析,并调⽤completeBatch⽅法
如果没有响应,表示是acks=0的情形,不需要解析响应,直接调⽤completeBatch⽅法即可。
data:image/s3,"s3://crabby-images/84265/8426559b6cc6bb4fc07da062fb6ec4ec84eb88f8" alt=""
handleProduceResponse何时调⽤?
Sender线程创建回调,回调中调⽤了handleProduceResponse⽅法,创建⽣产请求对象,该对象中封装了回调对象
发送请求,等待回调的触发。
data:image/s3,"s3://crabby-images/babf6/babf662082b61128934bbda0e0171051d8a46e98" alt=""
sendProduceRequest的调⽤:
data:image/s3,"s3://crabby-images/21f45/21f45c35bdc40ce57a98176861504416fddf8017" alt=""
data:image/s3,"s3://crabby-images/7e203/7e2039f7b7cdf193b4e625839a3a4bfb0c32be60" alt=""
sendProducerData的调⽤:
data:image/s3,"s3://crabby-images/93301/933016b2f0cc538bd99f4d5fa976ff64a65639a1" alt=""
总结:
所谓同步调⽤,指的是⽣产者调⽤ producer.send(record).get() ⽅法。
该⽅法⾸先将要发送的消息发送到消息累加器
判断累加器中的消息批次是否达到,或者当前批次没写满,但是加⼊当前消息会让消息批⼤于消息批最⼤值,则创建新的批次。
如果需要发送消息批次,则唤醒sender线程,让sender线程发送消息。
sender线程会返回⼀个future对象给⽣产者客户端线程。
若⽣产者调⽤该future的get⽅法,则该⽅法使⽤CountDownLatch阻塞,直到收到broker响应,触发CountDownLatch的countDown⽅法
此时⽣产者线程的get⽅法返回,得到发送的结果。