在分区消费模式,需要⼿动指定消费者要消费的主题和主题的分区信息。
可以设置从分区的哪个偏移量开始消费。
典型的分区消费:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| Map<String, Object> configs = new HashMap<>(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092"); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "mycsmr" + System.currentTimeMillis()); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs); TopicPartition tp0 = new TopicPartition("tp_demo_01", 0); TopicPartition tp1 = new TopicPartition("tp_demo_01", 1); TopicPartition tp2 = new TopicPartition("tp_demo_01", 2);
consumer.assign(Arrays.asList(tp0, tp1, tp2)); consumer.seek(tp0, 0); consumer.seek(tp1, 0); consumer.seek(tp2, 0); ConsumerRecords<String, String> records = consumer.poll(1000); records.forEach(record -> { System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value()); });
consumer.close();
|
上⾯代码中的assign⽅法的实现:
assignFromUser的实现:
调⽤seek⽅法指定各个主题分区从哪个偏移量开始消费:
subscriptions的seek⽅法实现:
上图中seek的实现:
此时poll⽅法的调⽤为:
pollOnce⽅法的实现:
发起请求:
该⽅法的实现:
创建需要发送的请求对象并发起请求:
client.send⽅法添加监听器,等待broker端的响应:
监听的逻辑:
上⾯⽅法中createFetchRequests⽤于创建需要发起的请求:
fetchablePartitions⽅法的实现:
subscriptions.fetchablePartitions()⽅法的实现:
最终,pollOnce⽅法返回拉取的结果: