代码拉取完成,页面将自动刷新
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 7a8a13c6e7..177b460d38 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -840,7 +840,10 @@ private[log] class Cleaner(val id: Int,
logSize + segs.head.size <= maxSize &&
indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize &&
timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
- lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) {
+ //if first segment size is 0, we don't need to do the index offset range check.
+ //this will avoid empty log left every 2^31 message.
+ (segs.head.size == 0 ||
+ lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue)) {
group = segs.head :: group
logSize += segs.head.size
indexSize += segs.head.offsetIndex.sizeInBytes
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 43bc3b9f28..e5984c4f31 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1258,6 +1258,53 @@ class LogCleanerTest {
"All but the last group should be the target size.")
}
+ @Test
+ def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={
+ val cleaner = makeCleaner(Int.MaxValue)
+ val logProps = new Properties()
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+ val k="key".getBytes()
+ val v="val".getBytes()
+
+ //create 3 segments
+ for(i <- 0 until 3){
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
+ //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment
+ val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 )
+ log.appendAsFollower(records)
+ assertEquals(i + 1, log.numberOfSegments)
+ }
+
+ //4th active segment, not clean
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
+
+ val totalSegments = 4
+ //last segment not cleanable
+ val firstUncleanableOffset = log.logEndOffset - 1
+ val notCleanableSegments = 1
+
+ assertEquals(totalSegments, log.numberOfSegments)
+ var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
+ //because index file uses 4 byte relative index offset and current segments all none empty,
+ //segments will not group even their size is very small.
+ assertEquals(totalSegments - notCleanableSegments, groups.size)
+ //do clean to clean first 2 segments to empty
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
+ assertEquals(totalSegments, log.numberOfSegments)
+ assertEquals(0, log.logSegments.head.size)
+
+ //after clean we got 2 empty segment, they will group together this time
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
+ val noneEmptySegment = 1
+ assertEquals(noneEmptySegment + 1, groups.size)
+
+ //trigger a clean and 2 empty segments should cleaned to 1
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
+ assertEquals(totalSegments - 1, log.numberOfSegments)
+ }
+
+
/**
* Validate the logic for grouping log segments together for cleaning when only a small number of
* messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。