7 Star 5 Fork 24

src-openEuler/kafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
0017-fix-log-clean.patch 3.77 KB
一键复制 编辑 原始数据 按行查看 历史
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 7a8a13c6e7..177b460d38 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -840,7 +840,10 @@ private[log] class Cleaner(val id: Int,
logSize + segs.head.size <= maxSize &&
indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize &&
timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
- lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) {
+ //if first segment size is 0, we don't need to do the index offset range check.
+ //this will avoid empty log left every 2^31 message.
+ (segs.head.size == 0 ||
+ lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue)) {
group = segs.head :: group
logSize += segs.head.size
indexSize += segs.head.offsetIndex.sizeInBytes
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 43bc3b9f28..e5984c4f31 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1258,6 +1258,53 @@ class LogCleanerTest {
"All but the last group should be the target size.")
}
+ @Test
+ def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={
+ val cleaner = makeCleaner(Int.MaxValue)
+ val logProps = new Properties()
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+ val k="key".getBytes()
+ val v="val".getBytes()
+
+ //create 3 segments
+ for(i <- 0 until 3){
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
+ //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment
+ val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 )
+ log.appendAsFollower(records)
+ assertEquals(i + 1, log.numberOfSegments)
+ }
+
+ //4th active segment, not clean
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
+
+ val totalSegments = 4
+ //last segment not cleanable
+ val firstUncleanableOffset = log.logEndOffset - 1
+ val notCleanableSegments = 1
+
+ assertEquals(totalSegments, log.numberOfSegments)
+ var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
+ //because index file uses 4 byte relative index offset and current segments all none empty,
+ //segments will not group even their size is very small.
+ assertEquals(totalSegments - notCleanableSegments, groups.size)
+ //do clean to clean first 2 segments to empty
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
+ assertEquals(totalSegments, log.numberOfSegments)
+ assertEquals(0, log.logSegments.head.size)
+
+ //after clean we got 2 empty segment, they will group together this time
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
+ val noneEmptySegment = 1
+ assertEquals(noneEmptySegment + 1, groups.size)
+
+ //trigger a clean and 2 empty segments should cleaned to 1
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
+ assertEquals(totalSegments - 1, log.numberOfSegments)
+ }
+
+
/**
* Validate the logic for grouping log segments together for cleaning when only a small number of
* messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/src-openeuler/kafka.git
git@gitee.com:src-openeuler/kafka.git
src-openeuler
kafka
kafka
master

搜索帮助

D67c1975 1850385 1daf7b77 1850385