Kafka源码之KafkaApis
当启动KafkaServer的时候,在其startup⽅法中实例化了KafkaApi,并赋值给KafkaRequestHandlerPool⽤于执⾏具体的请求处理逻辑:
KafkaApi 主构造器参数:
各种请求的处理逻辑⼊⼝:
使⽤模式匹配:
1234567891011121314151617181920212223242526272829303132333435363738case ApiKeys.PRODUCE => handleProduceRequest(request)case ApiKeys.FETCH => handleFetchRequest(requ ...
Kafka源码之OffsetManager
消费者如何提交偏移量?
⾃动提交
⼿动提交
同步提交
异步提交
客户端提交偏移量,交给KafkaApis的handle⽅法,handle⽅法使⽤模式匹配,调⽤handleOffsetCommitRequest⽅法进⾏处理:
handleOffsetCommitRequest的实现。
如果apiVersion的值是0,则交给zookeeper保存偏移量信息。
否则调⽤组协调器负责处理偏移量提交请求。
handleCommitOffsets的实现:⾸先根据groupId查找消费组元数据,如果没有找到消费组元数据,则要么该消费组不依赖Kafka进⾏消费组管理, ...
Kafka源码之ReplicaManager
副本管理器的启动和ISR的收缩和扩展
在启动KafkaServer的时候,运⾏KafkaServer的startup⽅法。在该⽅法中实例化ReplicaManager,并调⽤ReplicaManager的startup⽅法:
ReplicaManager的startup⽅法:
处理ISR收缩的情况:
123456789101112131415161718192021222324252627282930def maybeShrinkIsr(replicaMaxLagTimeMs: Long) { val leaderHWIncremented = inWriteLock( ...
Kafka源码之LogManager
kafka⽇志管理⼦系统的⼊⼝。⽇志管理器负责⽇志的创建、抽取、和清理。
所有的读写操作都代理给具体的Log实例。
⽇志管理器在⼀个或多个⽬录维护⽇志。新的⽇志创建到拥有最少log的⽬录中。
分区不移动。
通过⼀个后台线程通过定期截断多余的⽇志段来处理⽇志保留。
启动Kafka服务器的脚本。
main⽅法中创建KafkaServerStartable对象。
该类中包含KakfaServer对象,startup⽅法调⽤的是KafkaServer的startup⽅法。
KafkaServer的startup⽅法中,启动了LogManager。
12345 ...
Kafka源码之KafkaRequestHandlerPool
KafkaRequestHandlerPool的作⽤是创建numThreads个KafkaRequestHandler实例,使⽤numThreads个线程启动KafkaRequestHandler。
每个KafkaRequestHandler包含了id,brokerId,线程数,请求的channel,处理请求的api等信息。
只要该类进⾏实例化,就执⾏创建KafkaRequestHandler实例并启动的逻辑。
12345678910111213141516171819202122/*** @param brokerId* @param requestChannel* @param apis ...
Kafka源码之SocketServer
线程模型:
当前broker上配置了多少个listener,就有多少个Acceptor,⽤于新建连接。
每个Acceptor对应N个线程的处理器(Processor),⽤于接收客户端请求。
处理器们对应M个线程的处理程序(Handler),处理⽤户请求,并将响应发送给等待给客户写响应的处理器线程。
在启动KakfaServer的startup⽅法中启动SocketServer。
每个listener就是⼀个端点,每个端点创建多个处理程序。
究竟启动多少个处理程序?
processor个数为numProcessorThreads个。上图中for循环为从processo ...
Kafka源码之消息存储机制
⾸先查看Kafka如何处理⽣产的消息。
调⽤副本管理器,将记录追加到分区的副本中。
将数据追加到本地的Log⽇志中。
追加消息的实现。
遍历需要追加的每个主题分区的消息。
调⽤partition的⽅法将记录追加到该分区的leader分区中。
如果在本地找到了该分区的leader。
执⾏下述逻辑将消息追加到leader分区:
12345678910111213141516// 获取该分区的logval log = leaderReplica.log.get// 获取最⼩ISR副本数val minIsr = log.config.minInS ...
Kafka源码之Consumer消费者流程
Consumer示例
KafkaConsumer,消费者的根本⽬的是从Kafka服务端拉取消息,并交给业务逻辑进⾏处理。
开发⼈员不必关⼼与Kafka服务端之间⽹络连接的管理、⼼跳检测、请求超时重试等底层操作也不必关⼼订阅Topic的分区数量、分区Leader副本的⽹络拓扑以及消费组的Rebalance等细节,另外还提供了⾃动提交offset的功能。
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152public static void main(St ...
Kafka源码之Producer⽣产者流程
Producer示例
⾸先我们先通过⼀段代码来展示 KafkaProducer 的使⽤⽅法。
在下⾯的示例中,我们使⽤ KafkaProducer 实现 向kafka发送消息的功能。
在示例程序中,⾸先将 KafkaProduce 使⽤的配置写⼊到 Properties 中,每项配置的具体含义在注释中进⾏解释。之后以此 Properties 对象为参数构造 KafkaProducer 对象,最后通过 send ⽅法完成发送,代码中包含同步发送、异步发送两种情况。
1234567891011121314151617181920212223242526272829303132333435363 ...
Kafka源码之Topic创建流程
Topic创建
有两种创建⽅式:⾃动创建、⼿动创建。
在server.properties中配置 auto.create.topics.enable=true 时,kafka在发现该topic不存在的时候会按照默认配置⾃动创建topic,触发⾃动创建topic有以下两种情况:
Producer向某个不存在的Topic写⼊消息
Consumer从某个不存在的Topic读取消息
⼿动创建
当 auto.create.topics.enable=false 时,需要⼿动创建topic,否则消息会发送失败。⼿动创建topic的⽅式如下:
1bin/kafka-topics.sh --c ...