7 Star 3 Fork 3

src-openEuler/rocketmq

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch 48.10 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986
From ead3d905016d9db4785a46beaa555c7fafd4f9bb Mon Sep 17 00:00:00 2001
From: Dongyuan Pan <dongyuanpan0@gmail.com>
Date: Wed, 8 Nov 2023 10:40:52 +0800
Subject: [PATCH 1/2] [ISSUE #7511] Lock granularity issue causing LMQ message
loss (#7525)
* bug fix: assignOffset and increaseOffset in LMQ has concurrency issues in topicQueueLock, should be in putMessageLock
* fix MultiDispatchTest
* fix MultiDispatchTest
* fix unit test
---
.../common/message/MessageExtBrokerInner.java | 10 ++
.../org/apache/rocketmq/store/CommitLog.java | 94 ++++++++++++--
.../apache/rocketmq/store/ConsumeQueue.java | 44 +------
.../rocketmq/store/DefaultMessageStore.java | 1 -
.../rocketmq/store/MessageExtEncoder.java | 118 ++++++++++++++++--
.../apache/rocketmq/store/MultiDispatch.java | 77 ++++++++++++
.../queue/AbstractConsumeQueueStore.java | 10 ++
.../store/queue/ConsumeQueueInterface.java | 1 -
.../queue/ConsumeQueueStoreInterface.java | 14 +++
...iDispatch.java => MultiDispatchUtils.java} | 17 +--
.../store/queue/QueueOffsetOperator.java | 6 +-
.../store/queue/RocksDBConsumeQueue.java | 42 -------
.../rocketmq/store/AppendCallbackTest.java | 6 +-
.../rocketmq/store/AppendPropCRCTest.java | 5 +-
.../rocketmq/store/MultiDispatchTest.java | 12 +-
.../rocketmq/store/kv/CompactionLogTest.java | 2 +-
16 files changed, 322 insertions(+), 137 deletions(-)
create mode 100644 store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
rename store/src/main/java/org/apache/rocketmq/store/queue/{MultiDispatch.java => MultiDispatchUtils.java} (78%)
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
index 52501dbca..147f23f12 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
@@ -28,6 +28,8 @@ public class MessageExtBrokerInner extends MessageExt {
private ByteBuffer encodedBuff;
+ private volatile boolean encodeCompleted;
+
private MessageVersion version = MessageVersion.MESSAGE_VERSION_V1;
public ByteBuffer getEncodedBuff() {
@@ -92,4 +94,12 @@ public class MessageExtBrokerInner extends MessageExt {
this.setPropertiesString(MessageDecoder.messageProperties2String(this.getProperties()));
}
}
+
+ public boolean isEncodeCompleted() {
+ return encodeCompleted;
+ }
+
+ public void setEncodeCompleted(boolean encodeCompleted) {
+ this.encodeCompleted = encodeCompleted;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 6c3afde70..35c1d0e2d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -35,6 +35,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
@@ -56,6 +57,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.MessageExtEncoder.PutMessageThreadLocal;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.logfile.MappedFile;
@@ -101,6 +103,7 @@ public class CommitLog implements Swappable {
protected int commitLogSize;
private final boolean enabledAppendPropCRC;
+ protected final MultiDispatch multiDispatch;
public CommitLog(final DefaultMessageStore messageStore) {
String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog();
@@ -119,13 +122,11 @@ public class CommitLog implements Swappable {
this.flushManager = new DefaultFlushManager();
this.coldDataCheckService = new ColdDataCheckService();
- this.appendMessageCallback = new DefaultAppendMessageCallback();
+ this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig());
putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
@Override
protected PutMessageThreadLocal initialValue() {
- return new PutMessageThreadLocal(
- defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(),
- defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC());
+ return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig());
}
};
this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
@@ -137,6 +138,8 @@ public class CommitLog implements Swappable {
this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
this.enabledAppendPropCRC = messageStore.getMessageStoreConfig().isEnabledAppendPropCRC();
+
+ this.multiDispatch = new MultiDispatch(defaultMessageStore);
}
public void setFullStorePaths(Set<String> fullStorePaths) {
@@ -1830,15 +1833,84 @@ public class CommitLog implements Swappable {
// Store the message content
private final ByteBuffer msgStoreItemMemory;
private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN;
+ private final MessageStoreConfig messageStoreConfig;
- DefaultAppendMessageCallback() {
+ DefaultAppendMessageCallback(MessageStoreConfig messageStoreConfig) {
this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
+ this.messageStoreConfig = messageStoreConfig;
+ }
+
+ public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer, final MessageExtBrokerInner msgInner) {
+ if (msgInner.isEncodeCompleted()) {
+ return null;
+ }
+
+ multiDispatch.wrapMultiDispatch(msgInner);
+
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+
+ final byte[] propertiesData =
+ msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+
+ boolean needAppendLastPropertySeparator = enabledAppendPropCRC && propertiesData != null && propertiesData.length > 0
+ && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR;
+
+ final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength;
+
+ if (propertiesLength > Short.MAX_VALUE) {
+ log.warn("putMessage message properties length too long. length={}", propertiesData.length);
+ return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
+ }
+
+ int msgLenWithoutProperties = preEncodeBuffer.getInt(0);
+
+ int msgLen = msgLenWithoutProperties + 2 + propertiesLength;
+
+ // Exceeds the maximum message
+ if (msgLen > this.messageStoreConfig.getMaxMessageSize()) {
+ log.warn("message size exceeded, msg total size: " + msgLen + ", maxMessageSize: " + this.messageStoreConfig.getMaxMessageSize());
+ return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
+ }
+
+ // Back filling total message length
+ preEncodeBuffer.putInt(0, msgLen);
+ // Modify position to msgLenWithoutProperties
+ preEncodeBuffer.position(msgLenWithoutProperties);
+
+ preEncodeBuffer.putShort((short) propertiesLength);
+
+ if (propertiesLength > crc32ReservedLength) {
+ preEncodeBuffer.put(propertiesData);
+ }
+
+ if (needAppendLastPropertySeparator) {
+ preEncodeBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR);
+ }
+ // 18 CRC32
+ preEncodeBuffer.position(preEncodeBuffer.position() + crc32ReservedLength);
+
+ msgInner.setEncodeCompleted(true);
+
+ return null;
}
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
+ ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
+ boolean isMultiDispatchMsg = messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner);
+ if (isMultiDispatchMsg) {
+ AppendMessageResult appendMessageResult = handlePropertiesForLmqMsg(preEncodeBuffer, msgInner);
+ if (appendMessageResult != null) {
+ return appendMessageResult;
+ }
+ }
+
+ final int msgLen = preEncodeBuffer.getInt(0);
+ preEncodeBuffer.position(0);
+ preEncodeBuffer.limit(msgLen);
+
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();
@@ -1872,9 +1944,6 @@ public class CommitLog implements Swappable {
break;
}
- ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
- final int msgLen = preEncodeBuffer.getInt(0);
-
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.msgStoreItemMemory.clear();
@@ -1919,6 +1988,11 @@ public class CommitLog implements Swappable {
byteBuffer.put(preEncodeBuffer);
CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS");
msgInner.setEncodedBuff(null);
+
+ if (isMultiDispatchMsg) {
+ CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
+ }
+
return new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum);
}
@@ -2159,6 +2233,10 @@ public class CommitLog implements Swappable {
return flushManager;
}
+ public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) {
+ return StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
+ }
+
private boolean isCloseReadAhead() {
return !MixAll.isWindows() && !defaultMessageStore.getMessageStoreConfig().isDataReadAheadEnable();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 623509c8b..453c9d1dc 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -27,7 +27,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -38,7 +37,7 @@ import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
-import org.apache.rocketmq.store.queue.MultiDispatch;
+import org.apache.rocketmq.store.queue.MultiDispatchUtils;
import org.apache.rocketmq.store.queue.QueueOffsetOperator;
import org.apache.rocketmq.store.queue.ReferredIterator;
@@ -702,7 +701,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
- if (MultiDispatch.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) {
+ if (MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) {
multiDispatchLmqQueue(request, maxRetries);
}
return;
@@ -776,28 +775,6 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
String topicQueueKey = getTopic() + "-" + getQueueId();
long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey);
msg.setQueueOffset(queueOffset);
-
-
- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28),
- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue.
- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) {
- return;
- }
- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
- if (StringUtils.isBlank(multiDispatchQueue)) {
- return;
- }
- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
- Long[] queueOffsets = new Long[queues.length];
- for (int i = 0; i < queues.length; i++) {
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) {
- String key = MultiDispatch.lmqQueueKey(queues[i]);
- queueOffsets[i] = queueOffsetOperator.getLmqOffset(key);
- }
- }
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
- StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
- msg.removeWaitStorePropertyString();
}
@Override
@@ -805,23 +782,6 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
short messageNum) {
String topicQueueKey = getTopic() + "-" + getQueueId();
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
-
- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28),
- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue.
- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) {
- return;
- }
- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
- if (StringUtils.isBlank(multiDispatchQueue)) {
- return;
- }
- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
- for (int i = 0; i < queues.length; i++) {
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) {
- String key = MultiDispatch.lmqQueueKey(queues[i]);
- queueOffsetOperator.increaseLmqOffset(key, (short) 1);
- }
- }
}
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 99a54e2d7..dc5f312e5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2112,7 +2112,6 @@ public class DefaultMessageStore implements MessageStore {
}
}
-
@Override
public void increaseOffset(MessageExtBrokerInner msg, short messageNum) {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
index c1d808728..20e9a652b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageVersion;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
public class MessageExtEncoder {
protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -38,20 +39,22 @@ public class MessageExtEncoder {
// The maximum length of the full message.
private int maxMessageSize;
private final int crc32ReservedLength;
+ private MessageStoreConfig messageStoreConfig;
- public MessageExtEncoder(final int maxMessageBodySize) {
- this(maxMessageBodySize, false);
+ public MessageExtEncoder(final int maxMessageBodySize, final MessageStoreConfig messageStoreConfig) {
+ this(messageStoreConfig);
}
- public MessageExtEncoder(final int maxMessageBodySize, boolean enabledAppendPropCRC) {
+ public MessageExtEncoder(final MessageStoreConfig messageStoreConfig) {
ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
+ this.messageStoreConfig = messageStoreConfig;
+ this.maxMessageBodySize = messageStoreConfig.getMaxMessageSize();
//Reserve 64kb for encoding buffer outside body
int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ?
maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
byteBuf = alloc.directBuffer(maxMessageSize);
- this.maxMessageBodySize = maxMessageBodySize;
this.maxMessageSize = maxMessageSize;
- this.crc32ReservedLength = enabledAppendPropCRC ? CommitLog.CRC32_RESERVED_LEN : 0;
+ this.crc32ReservedLength = messageStoreConfig.isEnabledAppendPropCRC() ? CommitLog.CRC32_RESERVED_LEN : 0;
}
public static int calMsgLength(MessageVersion messageVersion,
@@ -79,8 +82,103 @@ public class MessageExtEncoder {
+ 2 + (Math.max(propertiesLength, 0)); //propertiesLength
}
+ public static int calMsgLengthNoProperties(MessageVersion messageVersion,
+ int sysFlag, int bodyLength, int topicLength) {
+
+ int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
+ int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
+
+ return 4 //TOTALSIZE
+ + 4 //MAGICCODE
+ + 4 //BODYCRC
+ + 4 //QUEUEID
+ + 4 //FLAG
+ + 8 //QUEUEOFFSET
+ + 8 //PHYSICALOFFSET
+ + 4 //SYSFLAG
+ + 8 //BORNTIMESTAMP
+ + bornhostLength //BORNHOST
+ + 8 //STORETIMESTAMP
+ + storehostAddressLength //STOREHOSTADDRESS
+ + 4 //RECONSUMETIMES
+ + 8 //Prepared Transaction Offset
+ + 4 + (Math.max(bodyLength, 0)) //BODY
+ + messageVersion.getTopicLengthSize() + topicLength; //TOPIC
+ }
+
+ public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner msgInner) {
+
+ final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+ final int topicLength = topicData.length;
+
+ final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
+
+ // Exceeds the maximum message body
+ if (bodyLength > this.maxMessageBodySize) {
+ CommitLog.log.warn("message body size exceeded, msg body size: " + bodyLength
+ + ", maxMessageSize: " + this.maxMessageBodySize);
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+ }
+
+ final int msgLenNoProperties = calMsgLengthNoProperties(msgInner.getVersion(), msgInner.getSysFlag(), bodyLength, topicLength);
+
+ // 1 TOTALSIZE
+ this.byteBuf.writeInt(msgLenNoProperties);
+ // 2 MAGICCODE
+ this.byteBuf.writeInt(msgInner.getVersion().getMagicCode());
+ // 3 BODYCRC
+ this.byteBuf.writeInt(msgInner.getBodyCRC());
+ // 4 QUEUEID
+ this.byteBuf.writeInt(msgInner.getQueueId());
+ // 5 FLAG
+ this.byteBuf.writeInt(msgInner.getFlag());
+ // 6 QUEUEOFFSET, need update later
+ this.byteBuf.writeLong(0);
+ // 7 PHYSICALOFFSET, need update later
+ this.byteBuf.writeLong(0);
+ // 8 SYSFLAG
+ this.byteBuf.writeInt(msgInner.getSysFlag());
+ // 9 BORNTIMESTAMP
+ this.byteBuf.writeLong(msgInner.getBornTimestamp());
+
+ // 10 BORNHOST
+ ByteBuffer bornHostBytes = msgInner.getBornHostBytes();
+ this.byteBuf.writeBytes(bornHostBytes.array());
+
+ // 11 STORETIMESTAMP
+ this.byteBuf.writeLong(msgInner.getStoreTimestamp());
+
+ // 12 STOREHOSTADDRESS
+ ByteBuffer storeHostBytes = msgInner.getStoreHostBytes();
+ this.byteBuf.writeBytes(storeHostBytes.array());
+
+ // 13 RECONSUMETIMES
+ this.byteBuf.writeInt(msgInner.getReconsumeTimes());
+ // 14 Prepared Transaction Offset
+ this.byteBuf.writeLong(msgInner.getPreparedTransactionOffset());
+ // 15 BODY
+ this.byteBuf.writeInt(bodyLength);
+ if (bodyLength > 0)
+ this.byteBuf.writeBytes(msgInner.getBody());
+
+ // 16 TOPIC
+ if (MessageVersion.MESSAGE_VERSION_V2.equals(msgInner.getVersion())) {
+ this.byteBuf.writeShort((short) topicLength);
+ } else {
+ this.byteBuf.writeByte((byte) topicLength);
+ }
+ this.byteBuf.writeBytes(topicData);
+
+ return null;
+ }
+
public PutMessageResult encode(MessageExtBrokerInner msgInner) {
this.byteBuf.clear();
+
+ if (messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner)) {
+ return encodeWithoutProperties(msgInner);
+ }
+
/**
* Serialize message
*/
@@ -303,7 +401,7 @@ public class MessageExtEncoder {
}
public ByteBuffer getEncoderBuffer() {
- return this.byteBuf.nioBuffer();
+ return this.byteBuf.nioBuffer(0, this.byteBuf.capacity());
}
public int getMaxMessageBodySize() {
@@ -322,12 +420,8 @@ public class MessageExtEncoder {
private final MessageExtEncoder encoder;
private final StringBuilder keyBuilder;
- PutMessageThreadLocal(int size) {
- this(size, false);
- }
-
- PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) {
- encoder = new MessageExtEncoder(size, enabledAppendPropCRC);
+ PutMessageThreadLocal(MessageStoreConfig messageStoreConfig) {
+ encoder = new MessageExtEncoder(messageStoreConfig);
keyBuilder = new StringBuilder();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
new file mode 100644
index 000000000..5bc587a8e
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+
+/**
+ * MultiDispatch for lmq, not-thread-safe
+ */
+public class MultiDispatch {
+ private final StringBuilder keyBuilder = new StringBuilder();
+ private final DefaultMessageStore messageStore;
+ private static final short VALUE_OF_EACH_INCREMENT = 1;
+
+ public MultiDispatch(DefaultMessageStore messageStore) {
+ this.messageStore = messageStore;
+ }
+
+ public String queueKey(String queueName, MessageExtBrokerInner msgInner) {
+ keyBuilder.delete(0, keyBuilder.length());
+ keyBuilder.append(queueName);
+ keyBuilder.append('-');
+ int queueId = msgInner.getQueueId();
+ if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
+ queueId = 0;
+ }
+ keyBuilder.append(queueId);
+ return keyBuilder.toString();
+ }
+
+ public void wrapMultiDispatch(final MessageExtBrokerInner msg) {
+
+ String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+ String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ Long[] queueOffsets = new Long[queues.length];
+ if (messageStore.getMessageStoreConfig().isEnableLmq()) {
+ for (int i = 0; i < queues.length; i++) {
+ String key = queueKey(queues[i], msg);
+ if (MixAll.isLmq(key)) {
+ queueOffsets[i] = messageStore.getQueueStore().getLmqQueueOffset(key);
+ }
+ }
+ }
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
+ StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
+ msg.removeWaitStorePropertyString();
+ }
+
+ public void updateMultiQueueOffset(final MessageExtBrokerInner msgInner) {
+ String multiDispatchQueue = msgInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+ String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ for (String queue : queues) {
+ String key = queueKey(queue, msgInner);
+ if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) {
+ messageStore.getQueueStore().increaseLmqOffset(key, VALUE_OF_EACH_INCREMENT);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
index 30054fa50..d76b05577 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
@@ -74,6 +74,16 @@ public abstract class AbstractConsumeQueueStore implements ConsumeQueueStoreInte
consumeQueue.increaseQueueOffset(this.queueOffsetOperator, msg, messageNum);
}
+ @Override
+ public void increaseLmqOffset(String queueKey, short messageNum) {
+ queueOffsetOperator.increaseLmqOffset(queueKey, messageNum);
+ }
+
+ @Override
+ public long getLmqQueueOffset(String queueKey) {
+ return queueOffsetOperator.getLmqOffset(queueKey);
+ }
+
@Override
public void removeTopicQueueTable(String topic, Integer queueId) {
this.queueOffsetOperator.remove(topic, queueId);
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index c65f2a68b..768c782b1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -181,7 +181,6 @@ public interface ConsumeQueueInterface extends FileQueueLifeCycle {
*/
void assignQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg) throws RocksDBException;
-
/**
* Increase queue offset.
* @param queueOffsetAssigner the delegated queue offset assigner
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
index 268803dcc..e68880a82 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
@@ -183,6 +183,20 @@ public interface ConsumeQueueStoreInterface {
*/
void increaseQueueOffset(MessageExtBrokerInner msg, short messageNum);
+ /**
+ * Increase lmq offset
+ * @param queueKey
+ * @param messageNum
+ */
+ void increaseLmqOffset(String queueKey, short messageNum);
+
+ /**
+ * get lmq queue offset
+ * @param queueKey
+ * @return
+ */
+ long getLmqQueueOffset(String queueKey);
+
/**
* recover topicQueue table by minPhyOffset
* @param minPhyOffset
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
similarity index 78%
rename from store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java
rename to store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
index d6291d908..44397a2fc 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.store.queue;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
@@ -27,7 +25,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-public class MultiDispatch {
+public class MultiDispatchUtils {
public static String lmqQueueKey(String queueName) {
StringBuilder keyBuilder = new StringBuilder();
@@ -60,17 +58,4 @@ public class MultiDispatch {
}
return true;
}
-
- public static List<DispatchRequest> checkMultiDispatchQueue(MessageStoreConfig messageStoreConfig, List<DispatchRequest> dispatchRequests) {
- if (!messageStoreConfig.isEnableMultiDispatch() || dispatchRequests == null || dispatchRequests.size() == 0) {
- return null;
- }
- List<DispatchRequest> result = new ArrayList<>();
- for (DispatchRequest dispatchRequest : dispatchRequests) {
- if (checkMultiDispatchQueue(messageStoreConfig, dispatchRequest)) {
- result.add(dispatchRequest);
- }
- }
- return dispatchRequests;
- }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
index 8da374828..5b4bf994e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
@@ -71,9 +71,9 @@ public class QueueOffsetOperator {
return this.lmqTopicQueueTable.get(topicQueueKey);
}
- public void increaseLmqOffset(String topicQueueKey, short messageNum) {
- Long lmqOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey, k -> 0L);
- this.lmqTopicQueueTable.put(topicQueueKey, lmqOffset + messageNum);
+ public void increaseLmqOffset(String queueKey, short messageNum) {
+ Long lmqOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, queueKey, k -> 0L);
+ this.lmqTopicQueueTable.put(queueKey, lmqOffset + messageNum);
}
public long currentQueueOffset(String topicQueueKey) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 759be395d..5a981bb4d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -19,14 +19,10 @@ package org.apache.rocketmq.store.queue;
import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BoundaryType;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -217,50 +213,12 @@ public class RocksDBConsumeQueue implements ConsumeQueueInterface {
queueOffsetOperator.updateQueueOffset(topicQueueKey, queueOffset);
}
msg.setQueueOffset(queueOffset);
-
- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28),
- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue.
- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) {
- return;
- }
- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
- if (StringUtils.isBlank(multiDispatchQueue)) {
- return;
- }
- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
- Long[] queueOffsets = new Long[queues.length];
- for (int i = 0; i < queues.length; i++) {
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) {
- String key = MultiDispatch.lmqQueueKey(queues[i]);
- queueOffsets[i] = queueOffsetOperator.getLmqTopicQueueNextOffset(key);
- }
- }
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
- StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
- msg.removeWaitStorePropertyString();
}
@Override
public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg, short messageNum) {
String topicQueueKey = getTopic() + "-" + getQueueId();
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
-
- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28),
- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue.
- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) {
- return;
- }
- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
- if (StringUtils.isBlank(multiDispatchQueue)) {
- return;
- }
- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
- for (int i = 0; i < queues.length; i++) {
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) {
- String key = MultiDispatch.lmqQueueKey(queues[i]);
- queueOffsetOperator.increaseLmqOffset(key, (short) 1);
- }
- }
}
@Override
diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
index 87bfe85da..374857149 100644
--- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
@@ -44,7 +44,7 @@ public class AppendCallbackTest {
AppendMessageCallback callback;
- MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024);
+ MessageExtEncoder batchEncoder;
@Before
public void init() throws Exception {
@@ -53,12 +53,14 @@ public class AppendCallbackTest {
messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
messageStoreConfig.setMaxHashSlotNum(100);
messageStoreConfig.setMaxIndexNum(100 * 10);
+ messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024);
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore");
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog");
//too much reference
DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>());
CommitLog commitLog = new CommitLog(messageStore);
- callback = commitLog.new DefaultAppendMessageCallback();
+ callback = commitLog.new DefaultAppendMessageCallback(messageStoreConfig);
+ batchEncoder = new MessageExtEncoder(messageStoreConfig);
}
@After
diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
index c8ed4d74d..d882fc9d9 100644
--- a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
@@ -56,6 +56,7 @@ public class AppendPropCRCTest {
messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
messageStoreConfig.setMaxHashSlotNum(100);
messageStoreConfig.setMaxIndexNum(100 * 10);
+ messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024);
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore");
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog");
messageStoreConfig.setForceVerifyPropCRC(true);
@@ -63,8 +64,8 @@ public class AppendPropCRCTest {
//too much reference
DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>());
commitLog = new CommitLog(messageStore);
- encoder = new MessageExtEncoder(10 * 1024 * 1024, true);
- callback = commitLog.new DefaultAppendMessageCallback();
+ encoder = new MessageExtEncoder(messageStoreConfig);
+ callback = commitLog.new DefaultAppendMessageCallback(messageStoreConfig);
}
@After
diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
index 2447bbf68..eae5eaa07 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
@@ -28,20 +28,19 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.queue.MultiDispatch;
+import org.apache.rocketmq.store.queue.MultiDispatchUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.rocksdb.RocksDBException;
-import static org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathConsumeQueue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MultiDispatchTest {
- private ConsumeQueue consumeQueue;
+ private MultiDispatch multiDispatch;
private DefaultMessageStore messageStore;
@@ -61,8 +60,7 @@ public class MultiDispatchTest {
BrokerConfig brokerConfig = new BrokerConfig();
//too much reference
messageStore = new DefaultMessageStore(messageStoreConfig, null, null, brokerConfig, new ConcurrentHashMap<>());
- consumeQueue = new ConsumeQueue("xxx", 0,
- getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()), messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore);
+ multiDispatch = new MultiDispatch(messageStore);
}
@After
@@ -74,14 +72,14 @@ public class MultiDispatchTest {
public void lmqQueueKey() {
MessageExtBrokerInner messageExtBrokerInner = mock(MessageExtBrokerInner.class);
when(messageExtBrokerInner.getQueueId()).thenReturn(2);
- String ret = MultiDispatch.lmqQueueKey("%LMQ%lmq123");
+ String ret = MultiDispatchUtils.lmqQueueKey("%LMQ%lmq123");
assertEquals(ret, "%LMQ%lmq123-0");
}
@Test
public void wrapMultiDispatch() throws RocksDBException {
MessageExtBrokerInner messageExtBrokerInner = buildMessageMultiQueue();
- messageStore.assignOffset(messageExtBrokerInner);
+ multiDispatch.wrapMultiDispatch(messageExtBrokerInner);
assertEquals(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET), "0,0");
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
index df3c31c6e..e113b18f1 100644
--- a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
@@ -86,7 +86,7 @@ public class CompactionLogTest {
int compactionCqFileSize = 1024;
- private static MessageExtEncoder encoder = new MessageExtEncoder(1024);
+ private static MessageExtEncoder encoder = new MessageExtEncoder(1024, new MessageStoreConfig());
private static SocketAddress storeHost;
private static SocketAddress bornHost;
--
2.32.0.windows.2
From 70dc93abbcb9bf161378d66fcaca55bedc78b905 Mon Sep 17 00:00:00 2001
From: yangguodong <1174533476@qq.com>
Date: Wed, 8 Nov 2023 21:14:54 -0600
Subject: [PATCH 2/2] Fix tiered store README.md error about Configuration
(#7436)
* Fix tiered store README.md error about Configuration
* Fix change tieredStoreFilePath to tieredStoreFilepath
* revert README.md change
---------
Co-authored-by: yangguodong.cn <yangguodong.cn@bytedance.com>
---
.../tieredstore/common/TieredMessageStoreConfig.java | 10 +++++-----
.../tieredstore/provider/posix/PosixFileSegment.java | 4 ++--
.../rocketmq/tieredstore/file/TieredCommitLogTest.java | 2 +-
.../provider/posix/PosixFileSegmentTest.java | 2 +-
4 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
index 595db6b86..a112ea6b1 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
@@ -115,7 +115,7 @@ public class TieredMessageStoreConfig {
private long readAheadCacheExpireDuration = 10 * 1000;
private double readAheadCacheSizeThresholdRate = 0.3;
- private String tieredStoreFilePath = "";
+ private String tieredStoreFilepath = "";
private String objectStoreEndpoint = "";
@@ -350,12 +350,12 @@ public class TieredMessageStoreConfig {
this.readAheadCacheSizeThresholdRate = rate;
}
- public String getTieredStoreFilePath() {
- return tieredStoreFilePath;
+ public String getTieredStoreFilepath() {
+ return tieredStoreFilepath;
}
- public void setTieredStoreFilePath(String tieredStoreFilePath) {
- this.tieredStoreFilePath = tieredStoreFilePath;
+ public void setTieredStoreFilepath(String tieredStoreFilepath) {
+ this.tieredStoreFilepath = tieredStoreFilepath;
}
public void setObjectStoreEndpoint(String objectStoreEndpoint) {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 7e949cb28..708ce33f9 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -66,8 +66,8 @@ public class PosixFileSegment extends TieredFileSegment {
super(storeConfig, fileType, filePath, baseOffset);
// basePath
- String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilePath(),
- StringUtils.appendIfMissing(storeConfig.getTieredStoreFilePath(), File.separator));
+ String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilepath(),
+ StringUtils.appendIfMissing(storeConfig.getTieredStoreFilepath(), File.separator));
// fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset
String brokerClusterName = storeConfig.getBrokerClusterName();
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
index 6693d3cb7..80cdba977 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
@@ -49,7 +49,7 @@ public class TieredCommitLogTest {
TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig();
storeConfig.setBrokerName("brokerName");
storeConfig.setStorePathRootDir(storePath);
- storeConfig.setTieredStoreFilePath(storePath + File.separator);
+ storeConfig.setTieredStoreFilepath(storePath + File.separator);
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
storeConfig.setCommitLogRollingInterval(0);
storeConfig.setTieredStoreCommitLogMaxSize(1000);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
index db33ae847..ede62b8ce 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
@@ -42,7 +42,7 @@ public class PosixFileSegmentTest {
@Before
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
- storeConfig.setTieredStoreFilePath(storePath);
+ storeConfig.setTieredStoreFilepath(storePath);
mq = new MessageQueue("OSSFileSegmentTest", "broker", 0);
TieredStoreExecutor.init();
}
--
2.32.0.windows.2
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/src-openeuler/rocketmq.git
git@gitee.com:src-openeuler/rocketmq.git
src-openeuler
rocketmq
rocketmq
master

搜索帮助