| /** * 验证如下信息: * 每条消息与其CRC是否匹配 * 每条消息的字节数是否匹配 * 传⼊记录批的序列号与现有状态以及彼此之间是否⼀致。 * 同时计算如下值: * 消息批中第⼀个偏移量 * 消息批中最后⼀个偏移量 * 消息个数 * 正确字节的个数 * 偏移量是否单调递增 * 是否使⽤了压缩编码解码器(如果使⽤了压缩编解码器,则给出最后⼀个) */ val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) // 如果没有消息需要追加或该消息集合与上⼀个消息集合重复,则返回 if (appendInfo.shallowCount == 0) return appendInfo // 在向磁盘⽇志追加之前剔除不正确的字节或剔除不完整的消息 var validRecords = trimInvalidBytes(records, appendInfo) // 消息集合剩余的正确部分,插⼊到⽇志中 lock synchronized { // 检查⽇志的MMap是否关闭了,如果关闭⽆法进⾏写操作,抛异常 checkIfMemoryMappedBufferClosed() if (assignOffsets) { // 如果需要给消息添加偏移量 val offset = new LongRef(nextOffsetMetadata.messageOffset) appendInfo.firstOffset = offset.value val now = time.milliseconds val validateAndOffsetAssignResult = try { // 校验消息和赋值给消息的偏移量是否正确⽆误 LogValidator.validateMessagesAndAssignOffsets(validRecords, offset, time, now, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact, config.messageFormatVersion.messageFormatVersion.value, config.messageTimestampType, config.messageTimestampDifferenceMaxMs, leaderEpoch, isFromClient) } catch { case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) } // 正确的消息集合,此时处于内存中 validRecords = validateAndOffsetAssignResult.validatedRecords // 要追加消息的最⼤时间戳 appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp // 要追加的消息的最⼤时间戳对应的偏移量 appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp // 最后⼀个偏移量是偏移量的值-1 appendInfo.lastOffset = offset.value - 1 appendInfo.recordsProcessingStats = validateAndOffsetAssignResult.recordsProcessingStats if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) // 如果消息时间戳的类型是⽇志追加的时间,则需要赋值当前系统时间 appendInfo.logAppendTime = now // 需要重新验证消息的⼤⼩,以防消息发⽣改变,如重新压缩或者转换了消息格式 if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { for (batch <- validRecords.batches.asScala) { // 如果消息集合的字节数⼤于配置的消息最⼤字节数,抛异常 if (batch.sizeInBytes > config.maxMessageSize) { // we record the original message set size instead of the trimmed size // to be consistent with pre-compression bytesRejectedRate recording brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d." .format(batch.sizeInBytes, config.maxMessageSize)) } } } else { // 如果不需要分配消息偏移量,则使⽤给定的消息偏移量 if (!appendInfo.offsetsMonotonic) // 如果偏移量不是单调递增的,抛异常 throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " + records.records.asScala.map(_.offset)) // 如果消息批的第⼀个偏移量⼩于分区leader⽇志中下⼀条记录的偏移量,抛异常。 if (appendInfo.firstOffset < nextOffsetMetadata.messageOffset) { // we may still be able to recover if the log is empty // one example: fetching from log start offset on the leader which is not batch aligned, // which may happen as a result of AdminClient#deleteRecords() // appendInfo.firstOffset maybe either first offset or last offset of the first batch. // get the actual first offset, which may require decompressing the data val firstOffset = records.batches.asScala.head.baseOffset() throw new UnexpectedAppendOffsetException(s"Unexpected offset in append to $topicPartition. First offset or last offset of the first batch " + s"${appendInfo.firstOffset} is less than the next offset ${nextOffsetMetadata.messageOffset}. " + s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" + s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset", firstOffset, appendInfo.lastOffset) } } // 使⽤leader给消息赋值的epoch值更新缓存的epoch值。 validRecords.batches.asScala.foreach { batch => if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) // 需要在epoch中记录leader的epoch值和消息集合的起始偏移量 leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } // 检查消息批的字节⼤⼩是否⼤于⽇志分段的最⼤值,如果是,则抛异常 if (validRecords.sizeInBytes > config.segmentSize) { throw new RecordBatchTooLargeException("Message batch size is %d bytes which exceeds the maximum configured segment size of %d.".format(validRecords.sizeInBytes, config.segmentSize)) } // 消息批的消息都正确,偏移量也都赋值了,时间戳也更新了 // 此时需要验证⽣产者的幂等性/事务状态,并收集⼀些元数据 val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient) // 如果是重复的消息批,则直接返回被重复的消息批的appendInfo maybeDuplicate.foreach { duplicate => appendInfo.firstOffset = duplicate.firstOffset appendInfo.lastOffset = duplicate.lastOffset appendInfo.logAppendTime = duplicate.timestamp appendInfo.logStartOffset = logStartOffset return appendInfo } // 如果当前⽇志分段写满了,则滚动⽇志分段 val segment = maybeRoll(messagesSize = validRecords.sizeInBytes, maxTimestampInMessages = appendInfo.maxTimestamp, maxOffsetInMessages = appendInfo.lastOffset) val logOffsetMetadata = LogOffsetMetadata(messageOffset = appendInfo.firstOffset, segmentBaseOffset = segment.baseOffset, relativePositionInSegment = segment.size) // ⽇志⽚段中追加消息 segment.append(firstOffset = appendInfo.firstOffset, largestOffset = appendInfo.lastOffset, largestTimestamp = appendInfo.maxTimestamp, shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords) // 更新⽣产者状态 for ((producerId, producerAppendInfo) <- updatedProducers) { producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) producerStateManager.update(producerAppendInfo) } // update the transaction index with the true last stable offset. The last offset visible // to consumers using READ_COMMITTED will be limited by this value and the high watermark. for (completedTxn <- completedTxns) { val lastStableOffset = producerStateManager.completeTxn(completedTxn) segment.updateTxnIndex(completedTxn, lastStableOffset) } // always update the last producer id map offset so that the snapshot reflects the current offset // even if there isn't any idempotent data being written producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1) // leader的LEO+1 updateLogEndOffset(appendInfo.lastOffset + 1) // update the first unstable offset (which is used to compute LSO) updateFirstUnstableOffset() trace(s"Appended message set to log with last offset ${appendInfo.lastOffset} " + s"first offset: ${appendInfo.firstOffset}, " + s"next offset: ${nextOffsetMetadata.messageOffset}, " + s"and messages: $validRecords") // 如果未刷盘的消息个数⼤于配置的消息个数,刷盘 if (unflushedMessages >= config.flushInterval) // 刷盘 flush() appendInfo } } }