Kafka特性之消息重复解决和consumer-offsets
消息重复解决方案
消息重复和丢失是kafka中很常⻅的问题,主要发⽣在以下三个阶段:
-
⽣产者阶段
-
broke阶段
-
消费者阶段
⽣产者阶段重复场景
-
根本原因
⽣产发送的消息没有收到正确的broke响应,导致⽣产者重试。
⽣产者发出⼀条消息,broke落盘以后因为⽹络等种种原因发送端得到⼀个发送失败的响应或者⽹络中断,然后⽣产者收到⼀个可恢复的Exception重试消息导致消息重复。
-
重试过程
-
new KafkaProducer()后创建⼀个后台线程KafkaThread扫描RecordAccumulator中是否有消息;
-
调⽤KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;
-
后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;
-
如果发送成功,那么返回成功;
-
如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败的结果;如果允许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送;
-
-
可恢复异常说明
异常是RetriableException类型或者TransactionManager允许重试;RetriableException类继承关系如下:
-
记录顺序问题
如果设置 max.in.flight.requests.per.connection ⼤于1(默认5,单个连接上发送的未确认请求的最⼤数量,表示上⼀个发出的请求没有确认下⼀个请求⼜发出了)。⼤于1可能会改变记录的顺序,因为如果将两个batch发送到单个分区,第⼀个batch处理失败并重试,但是第⼆个batch处理成功,那么第⼆个batch处理中的记录可能先出现被消费。
设置 max.in.flight.requests.per.connection 为1,可能会影响吞吐量,可以解决单个⽣产者发送顺序问题。如果多个⽣产者,⽣产者1先发送⼀个请求,⽣产者2后发送请求,此时⽣产者1返回可恢复异常,重试⼀定次数成功了。虽然⽣产者1先发送消息,但⽣产者2发送的消息会被先消费。
⽣产者发送重复解决⽅案
-
启动kafka的幂等性
要启动kafka的幂等性,设置: enable.idempotence=true ,以及 ack=all 以及 retries > 1 。
-
ack=0,不重试。
可能会丢消息,适⽤于吞吐量指标重要性⾼于数据丢失,例如:⽇志收集。
⽣产者和broker阶段消息丢失场景
-
ack=0,不重试
⽣产者发送消息完,不管结果了,如果发送失败也就丢失了。
-
ack=1,leader crash
⽣产者发送消息完,只等待Leader写⼊成功就返回了,Leader分区丢失了,此时Follower没来及同步,消息丢失。
-
unclean.leader.election.enable 配置true
允许选举ISR以外的副本作为leader,会导致数据丢失,默认为false。⽣产者发送异步消息,只等待Lead写⼊成功就返回,Leader分区丢失,此时ISR中没有Follower,Leader从OSR中选举,因为OSR中本来落后于Leader造成消息丢失。
解决⽣产者和broker阶段消息丢失
-
禁⽤unclean选举,ack=all
ack=all / -1,tries > 1,unclean.leader.election.enable : false
⽣产者发完消息,等待Follower同步完再返回,如果异常则重试。副本的数量可能影响吞吐量,不超过5个,⼀般三个。
不允许unclean Leader选举。
-
配置:min.insync.replicas > 1
当⽣产者将 acks 设置为 all (或 -1 )时, min.insync.replicas>1 。指定确认消息写成功需要的最⼩副本数量。达不到这个最⼩值,⽣产者将引发⼀个异常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。
当⼀起使⽤时, min.insync.replicas 和 ack 允许执⾏更⼤的持久性保证。⼀个典型的场景是创建⼀个复制因⼦为3的主题,设置min.insync复制到2个,⽤ all 配置发送。将确保如果⼤多数副本没有收到写操作,则⽣产者将引发异常。
-
失败的offset单独记录
⽣产者发送消息,会⾃动重试,遇到不可恢复异常会抛出,这时可以捕获异常记录到数据库或缓存,进⾏单独处理。
消费者数据重复场景及解决⽅案
-
根本原因
数据消费完没有及时提交offset到broker。
-
场景
消息消费端在消费过程中挂掉没有及时提交offset到broke,另⼀个消费端启动拿之前记录的offset开始消费,由于offset的滞后性可能会导致新启动的客户端有少量重复消费。
-
解决⽅案
-
取消⾃动提交
每次消费完或者程序退出时⼿动提交。这可能也没法保证⼀条重复。
-
下游做幂等
⼀般是让下游做幂等或者尽量每消费⼀条消息都记录offset,对于少数严格的场景可能需要把offset或唯⼀ID(例如订单ID)和下游状态更新放在同⼀个数据库⾥⾯做事务来保证精确的⼀次更新或者在下游数据表⾥⾯同时记录消费offset,然后更新下游数据的时候⽤消费位移做乐观锁拒绝旧位移的数据更新。
-
consumer_offsets
Zookeeper不适合⼤批量的频繁写⼊操作。
Kafka 1.0.2将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题,并且默认提供了kafka_consumer_groups.sh脚本供⽤户查看consumer信息。
-
- 创建topic “tp_test_01”
1
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_test_01 --partitions 5 --replication-factor 1
-
- 使⽤kafka-console-producer.sh脚本⽣产消息
1
2
3[root@node1 ~]# for i in `seq 100`; do echo "hello lagou $i" >> messages.txt; done
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt由于默认没有指定key,所以根据round-robin⽅式,消息分布到不同的分区上。 (本例中⽣产了100条消息)
-
- 验证消息⽣产成功
1
2
3
4
5
6
7
8[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
[root@node1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node1:9092 --topic tp_test_01 --time -1
tp_test_01:2:20
tp_test_01:4:20
tp_test_01:1:20
tp_test_01:3:20
tp_test_01:0:20结果输出表明100条消息全部⽣产成功!
-
- 创建⼀个console consumer group
1
[root@node1 ~]#kafka-console-consumer.sh --bootstrap-server node1:9092 --topic tp_test_01 --from-beginning
-
- 获取该consumer group的group id(后⾯需要根据该id查询它的位移信息)
1
[root@node1 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
输出: console-consumer-49366 (记住这个id!)
-
- 查询__consumer_offsets topic所有内容
注意:运⾏下⾯命令前先要在consumer.properties中设置exclude.internal.topics=false
1
[root@node1 ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。
-
- 计算指定consumer group在__consumer_offsets topic中分区信息
这时候就⽤到了第5步获取的group.id (本例中是console-consumer-49366) 。Kafka会使⽤下⾯公式计算该group位移保存在__consumer_offsets的哪个分区上
1
Math.abs(groupID.hashCode()) % numPartitions
对应的分区=Math.abs(“console-consumer-49366”.hashCode()) % 50 = 19,即__consumer_offsets的分区19保 存了这个consumer group的位移信息。
-
- 获取指定consumer group的位移信息
1
[root@node1 ~]# kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 19 --broker-list node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
下⾯是输出结果:
1
2
3
4
5
6
7
8
9
10...
[console-consumer-49366,tp_test_01,3]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,4]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,0]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,1]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,2]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,3]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424707212,ExpirationTime 1596511107212]
[console-consumer-49366,tp_test_01,4]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424707212,ExpirationTime 1596511107212]
[console-consumer-49366,tp_test_01,0]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424707212,ExpirationTime 1596511107212]
...上图可⻅,该consumer group果然保存在分区11上,且位移信息都是对的(这⾥的位移信息是已消费的位移,严格来说不是第3步中的位移。由于我的consumer已经消费完了所有的消息,所以这⾥的位移与第3步中的位移相同)。另外,可以看到__consumer_offsets topic的每⼀⽇志项的格式都是:[Group,Topic,Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]。