From b3b0264e00d48f52ae1407fdddca6ecdde08b949 Mon Sep 17 00:00:00 2001 From: shizhili Date: Mon, 11 Dec 2023 16:08:37 +0800 Subject: [PATCH] backport support message filter --- ...2-backport-Support-message-filtering.patch | 1196 +++++++++++++++++ rocketmq.spec | 6 +- 2 files changed, 1201 insertions(+), 1 deletion(-) create mode 100644 patch042-backport-Support-message-filtering.patch diff --git a/patch042-backport-Support-message-filtering.patch b/patch042-backport-Support-message-filtering.patch new file mode 100644 index 0000000..0143b52 --- /dev/null +++ b/patch042-backport-Support-message-filtering.patch @@ -0,0 +1,1196 @@ +From aec1055830e78f7e710e32ebd467f9f7d208855d Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Mon, 4 Dec 2023 16:12:42 +0800 +Subject: [PATCH] [ISSUE #7585] Support message filtering in rocketmq tiered + storage (#7594) + +--- + .../tieredstore/TieredMessageFetcher.java | 325 ++++++++---------- + .../tieredstore/TieredMessageStore.java | 6 +- + .../common/GetMessageResultExt.java | 76 ++++ + .../common/SelectBufferResult.java | 51 +++ + ...er.java => SelectBufferResultWrapper.java} | 53 +-- + .../common/TieredMessageStoreConfig.java | 9 + + .../metrics/TieredStoreMetricsManager.java | 4 +- + .../provider/TieredFileSegment.java | 2 +- + .../tieredstore/util/MessageBufferUtil.java | 71 ++-- + .../tieredstore/TieredMessageFetcherTest.java | 9 +- + .../common/GetMessageResultExtTest.java | 65 ++++ + .../common/SelectBufferResultTest.java | 37 ++ + .../util/MessageBufferUtilTest.java | 19 +- + 13 files changed, 478 insertions(+), 249 deletions(-) + create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java + create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java + rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/{SelectMappedBufferResultWrapper.java => SelectBufferResultWrapper.java} (55%) + create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java + create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +index f739773eb..7b0c47c59 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +@@ -19,17 +19,14 @@ package org.apache.rocketmq.tieredstore; + import com.github.benmanes.caffeine.cache.Cache; + import com.github.benmanes.caffeine.cache.Caffeine; + import com.github.benmanes.caffeine.cache.Scheduler; ++import com.google.common.annotations.VisibleForTesting; + import com.google.common.base.Stopwatch; +-import com.google.common.collect.Sets; + import io.opentelemetry.api.common.Attributes; + import java.nio.ByteBuffer; + import java.util.ArrayList; +-import java.util.HashSet; + import java.util.List; +-import java.util.Set; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.TimeUnit; +-import javax.annotation.Nullable; + import org.apache.commons.lang3.tuple.Pair; + import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.message.MessageQueue; +@@ -40,12 +37,13 @@ import org.apache.rocketmq.store.GetMessageStatus; + import org.apache.rocketmq.store.MessageFilter; + import org.apache.rocketmq.store.QueryMessageResult; + import org.apache.rocketmq.store.SelectMappedBufferResult; ++import org.apache.rocketmq.tieredstore.common.GetMessageResultExt; + import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture; + import org.apache.rocketmq.tieredstore.common.MessageCacheKey; +-import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper; ++import org.apache.rocketmq.tieredstore.common.SelectBufferResult; ++import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper; + import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; + import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; +-import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode; + import org.apache.rocketmq.tieredstore.exception.TieredStoreException; + import org.apache.rocketmq.tieredstore.file.CompositeFlatFile; + import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile; +@@ -66,10 +64,10 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + private static final Logger LOGGER = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); + + private final String brokerName; +- private final TieredMessageStoreConfig storeConfig; + private final TieredMetadataStore metadataStore; ++ private final TieredMessageStoreConfig storeConfig; + private final TieredFlatFileManager flatFileManager; +- private final Cache readAheadCache; ++ private final Cache readAheadCache; + + public TieredMessageFetcher(TieredMessageStoreConfig storeConfig) { + this.storeConfig = storeConfig; +@@ -79,7 +77,7 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + this.readAheadCache = this.initCache(storeConfig); + } + +- private Cache initCache(TieredMessageStoreConfig storeConfig) { ++ private Cache initCache(TieredMessageStoreConfig storeConfig) { + long memoryMaxSize = + (long) (Runtime.getRuntime().maxMemory() * storeConfig.getReadAheadCacheSizeThresholdRate()); + +@@ -88,60 +86,35 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + .expireAfterWrite(storeConfig.getReadAheadCacheExpireDuration(), TimeUnit.MILLISECONDS) + .maximumWeight(memoryMaxSize) + // Using the buffer size of messages to calculate memory usage +- .weigher((MessageCacheKey key, SelectMappedBufferResultWrapper msg) -> msg.getDuplicateResult().getSize()) ++ .weigher((MessageCacheKey key, SelectBufferResultWrapper msg) -> msg.getBufferSize()) + .recordStats() + .build(); + } + +- protected SelectMappedBufferResultWrapper putMessageToCache(CompositeFlatFile flatFile, +- long queueOffset, SelectMappedBufferResult result, long minOffset, long maxOffset, int size) { +- +- return putMessageToCache(flatFile, queueOffset, result, minOffset, maxOffset, size, false); +- } +- +- protected SelectMappedBufferResultWrapper putMessageToCache(CompositeFlatFile flatFile, +- long queueOffset, SelectMappedBufferResult result, long minOffset, long maxOffset, int size, boolean used) { +- +- SelectMappedBufferResultWrapper wrapper = +- new SelectMappedBufferResultWrapper(result, queueOffset, minOffset, maxOffset, size); +- if (used) { +- wrapper.addAccessCount(); +- } +- readAheadCache.put(new MessageCacheKey(flatFile, queueOffset), wrapper); +- return wrapper; +- } +- +- // Visible for metrics monitor +- public Cache getMessageCache() { ++ @VisibleForTesting ++ public Cache getMessageCache() { + return readAheadCache; + } + +- // Waiting for the request in transit to complete +- protected CompletableFuture getMessageFromCacheAsync( +- CompositeQueueFlatFile flatFile, String group, long queueOffset, int maxCount) { +- +- return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, true); ++ protected void putMessageToCache(CompositeFlatFile flatFile, SelectBufferResultWrapper result) { ++ readAheadCache.put(new MessageCacheKey(flatFile, result.getOffset()), result); + } + +- @Nullable +- protected SelectMappedBufferResultWrapper getMessageFromCache(CompositeFlatFile flatFile, long queueOffset) { +- MessageCacheKey cacheKey = new MessageCacheKey(flatFile, queueOffset); +- return readAheadCache.getIfPresent(cacheKey); ++ protected SelectBufferResultWrapper getMessageFromCache(CompositeFlatFile flatFile, long offset) { ++ return readAheadCache.getIfPresent(new MessageCacheKey(flatFile, offset)); + } + +- protected void recordCacheAccess(CompositeFlatFile flatFile, String group, long queueOffset, +- List resultWrapperList) { +- if (resultWrapperList.size() > 0) { +- queueOffset = resultWrapperList.get(resultWrapperList.size() - 1).getCurOffset(); ++ protected void recordCacheAccess(CompositeFlatFile flatFile, ++ String group, long offset, List resultWrapperList) { ++ if (!resultWrapperList.isEmpty()) { ++ offset = resultWrapperList.get(resultWrapperList.size() - 1).getOffset(); + } +- flatFile.recordGroupAccess(group, queueOffset); +- for (SelectMappedBufferResultWrapper wrapper : resultWrapperList) { +- wrapper.addAccessCount(); +- if (wrapper.getAccessCount() >= flatFile.getActiveGroupCount()) { +- MessageCacheKey cacheKey = new MessageCacheKey(flatFile, wrapper.getCurOffset()); +- readAheadCache.invalidate(cacheKey); ++ flatFile.recordGroupAccess(group, offset); ++ resultWrapperList.forEach(wrapper -> { ++ if (wrapper.incrementAndGet() >= flatFile.getActiveGroupCount()) { ++ readAheadCache.invalidate(new MessageCacheKey(flatFile, wrapper.getOffset())); + } +- } ++ }); + } + + private void prefetchMessage(CompositeQueueFlatFile flatFile, String group, int maxCount, long nextBeginOffset) { +@@ -149,7 +122,6 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + return; + } + +- MessageQueue mq = flatFile.getMessageQueue(); + // make sure there is only one request per group and request range + int prefetchBatchSize = Math.min(maxCount * flatFile.getReadAheadFactor(), storeConfig.getReadAheadMessageCountThreshold()); + InFlightRequestFuture inflightRequest = flatFile.getInflightRequest(group, nextBeginOffset, prefetchBatchSize); +@@ -166,13 +138,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + long maxOffsetOfLastRequest = inflightRequest.getLastFuture().join(); + boolean lastRequestIsExpired = getMessageFromCache(flatFile, nextBeginOffset) == null; + +- // if message fetch by last request is expired, we need to prefetch the message from tiered store +- int cacheRemainCount = (int) (maxOffsetOfLastRequest - nextBeginOffset); +- LOGGER.debug("TieredMessageFetcher#preFetchMessage: group={}, nextBeginOffset={}, maxOffsetOfLastRequest={}, lastRequestIsExpired={}, cacheRemainCount={}", +- group, nextBeginOffset, maxOffsetOfLastRequest, lastRequestIsExpired, cacheRemainCount); +- +- if (lastRequestIsExpired +- || maxOffsetOfLastRequest != -1L && nextBeginOffset >= inflightRequest.getStartOffset()) { ++ if (lastRequestIsExpired || ++ maxOffsetOfLastRequest != -1L && nextBeginOffset >= inflightRequest.getStartOffset()) { + + long queueOffset; + if (lastRequestIsExpired) { +@@ -196,12 +163,12 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + long nextQueueOffset = queueOffset; + if (flag == 1) { + int firstBatchSize = factor % storeConfig.getReadAheadBatchSizeFactorThreshold() * maxCount; +- CompletableFuture future = prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset, firstBatchSize); ++ CompletableFuture future = prefetchMessageThenPutToCache(flatFile, nextQueueOffset, firstBatchSize); + futureList.add(Pair.of(firstBatchSize, future)); + nextQueueOffset += firstBatchSize; + } + for (long i = 0; i < concurrency - flag; i++) { +- CompletableFuture future = prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset + i * requestBatchSize, requestBatchSize); ++ CompletableFuture future = prefetchMessageThenPutToCache(flatFile, nextQueueOffset + i * requestBatchSize, requestBatchSize); + futureList.add(Pair.of(requestBatchSize, future)); + } + flatFile.putInflightRequest(group, queueOffset, maxCount * factor, futureList); +@@ -211,52 +178,52 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + } + } + +- private CompletableFuture prefetchMessageThenPutToCache(CompositeQueueFlatFile flatFile, MessageQueue mq, +- long queueOffset, int batchSize) { ++ private CompletableFuture prefetchMessageThenPutToCache( ++ CompositeQueueFlatFile flatFile, long queueOffset, int batchSize) { ++ ++ MessageQueue mq = flatFile.getMessageQueue(); + return getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize) +- .thenApplyAsync(result -> { +- if (result.getStatus() != GetMessageStatus.FOUND) { +- LOGGER.warn("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed: topic: {}, queue: {}, queue offset: {}, batch size: {}, result: {}", +- mq.getTopic(), mq.getQueueId(), queueOffset, batchSize, result.getStatus()); +- return -1L; +- } +- // put message into cache +- List offsetList = result.getMessageQueueOffset(); +- List msgList = result.getMessageMapedList(); +- if (offsetList.size() != msgList.size()) { +- LOGGER.error("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed, result is illegal: topic: {}, queue: {}, queue offset: {}, batch size: {}, offsetList size: {}, msgList size: {}", +- mq.getTopic(), mq.getQueueId(), queueOffset, batchSize, offsetList.size(), msgList.size()); ++ .thenApply(result -> { ++ if (result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE || ++ result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_BADLY) { + return -1L; + } +- if (offsetList.isEmpty()) { +- LOGGER.error("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed, result is FOUND but msgList is empty: topic: {}, queue: {}, queue offset: {}, batch size: {}", +- mq.getTopic(), mq.getQueueId(), queueOffset, batchSize); ++ if (result.getStatus() != GetMessageStatus.FOUND) { ++ LOGGER.warn("MessageFetcher prefetch message then put to cache failed, result: {}, " + ++ "topic: {}, queue: {}, queue offset: {}, batch size: {}", ++ result.getStatus(), mq.getTopic(), mq.getQueueId(), queueOffset, batchSize); + return -1L; + } +- Long minOffset = offsetList.get(0); +- Long maxOffset = offsetList.get(offsetList.size() - 1); +- int size = offsetList.size(); +- for (int n = 0; n < offsetList.size(); n++) { +- putMessageToCache(flatFile, offsetList.get(n), msgList.get(n), minOffset, maxOffset, size); +- } +- if (size != batchSize || maxOffset != queueOffset + batchSize - 1) { +- LOGGER.warn("TieredMessageFetcher#prefetchAndPutMsgToCache: size not match: except: {}, actual: {}, queue offset: {}, min offset: {}, except offset: {}, max offset: {}", +- batchSize, size, queueOffset, minOffset, queueOffset + batchSize - 1, maxOffset); ++ try { ++ List offsetList = result.getMessageQueueOffset(); ++ List tagCodeList = result.getTagCodeList(); ++ List msgList = result.getMessageMapedList(); ++ for (int i = 0; i < offsetList.size(); i++) { ++ SelectMappedBufferResult msg = msgList.get(i); ++ SelectBufferResultWrapper bufferResult = new SelectBufferResultWrapper( ++ msg, offsetList.get(i), tagCodeList.get(i), false); ++ this.putMessageToCache(flatFile, bufferResult); ++ } ++ return offsetList.get(offsetList.size() - 1); ++ } catch (Exception e) { ++ LOGGER.error("MessageFetcher prefetch message then put to cache failed, " + ++ "topic: {}, queue: {}, queue offset: {}, batch size: {}", ++ mq.getTopic(), mq.getQueueId(), queueOffset, batchSize, e); + } +- return maxOffset; +- }, TieredStoreExecutor.fetchDataExecutor); ++ return -1L; ++ }); + } + +- public CompletableFuture getMessageFromCacheAsync(CompositeQueueFlatFile flatFile, ++ public CompletableFuture getMessageFromCacheAsync(CompositeQueueFlatFile flatFile, + String group, long queueOffset, int maxCount, boolean waitInflightRequest) { + + MessageQueue mq = flatFile.getMessageQueue(); + + long lastGetOffset = queueOffset - 1; +- List resultWrapperList = new ArrayList<>(maxCount); ++ List resultWrapperList = new ArrayList<>(maxCount); + for (int i = 0; i < maxCount; i++) { + lastGetOffset++; +- SelectMappedBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset); ++ SelectBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset); + if (wrapper == null) { + lastGetOffset--; + break; +@@ -281,19 +248,19 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + flatFile.getInflightRequest(group, queueOffset, maxCount).getFuture(queueOffset); + if (!future.isDone()) { + Stopwatch stopwatch = Stopwatch.createStarted(); +- // to prevent starvation issues, only allow waiting for inflight request once +- return future.thenCompose(v -> { ++ // to prevent starvation issues, only allow waiting for processing request once ++ return future.thenComposeAsync(v -> { + LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: wait for response cost: {}ms", + stopwatch.elapsed(TimeUnit.MILLISECONDS)); + return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, false); +- }); ++ }, TieredStoreExecutor.fetchDataExecutor); + } + } + + // try to get message from cache again when prefetch request is done + for (int i = 0; i < maxCount - resultWrapperList.size(); i++) { + lastGetOffset++; +- SelectMappedBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset); ++ SelectBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset); + if (wrapper == null) { + lastGetOffset--; + break; +@@ -303,74 +270,94 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + + recordCacheAccess(flatFile, group, queueOffset, resultWrapperList); + +- // if cache hit, result will be returned immediately and asynchronously prefetch messages for later requests +- if (!resultWrapperList.isEmpty()) { +- LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: " + +- "topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}", +- mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); +- prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); ++ if (resultWrapperList.isEmpty()) { ++ // If cache miss, pull messages immediately ++ LOGGER.info("MessageFetcher cache miss, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}", ++ group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); ++ } else { ++ // If cache hit, return buffer result immediately and asynchronously prefetch messages ++ LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}, resultSize: {}", ++ group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); + +- GetMessageResult result = new GetMessageResult(); ++ GetMessageResultExt result = new GetMessageResultExt(); + result.setStatus(GetMessageStatus.FOUND); + result.setMinOffset(flatFile.getConsumeQueueMinOffset()); + result.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); + result.setNextBeginOffset(queueOffset + resultWrapperList.size()); +- resultWrapperList.forEach(wrapper -> result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset())); ++ resultWrapperList.forEach(wrapper -> result.addMessageExt( ++ wrapper.getDuplicateResult(), wrapper.getOffset(), wrapper.getTagCode())); ++ ++ if (lastGetOffset < result.getMaxOffset()) { ++ this.prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); ++ } + return CompletableFuture.completedFuture(result); + } + +- // if cache is miss, immediately pull messages +- LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + +- "topic: {}, queue: {}, queue offset: {}, max message num: {}", +- mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); +- +- CompletableFuture resultFuture; ++ CompletableFuture resultFuture; + synchronized (flatFile) { + int batchSize = maxCount * storeConfig.getReadAheadMinFactor(); + resultFuture = getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize) +- .thenApplyAsync(result -> { ++ .thenApply(result -> { + if (result.getStatus() != GetMessageStatus.FOUND) { + return result; + } +- GetMessageResult newResult = new GetMessageResult(); +- newResult.setStatus(GetMessageStatus.FOUND); +- newResult.setMinOffset(flatFile.getConsumeQueueMinOffset()); +- newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); + ++ GetMessageResultExt newResult = new GetMessageResultExt(); + List offsetList = result.getMessageQueueOffset(); ++ List tagCodeList = result.getTagCodeList(); + List msgList = result.getMessageMapedList(); +- Long minOffset = offsetList.get(0); +- Long maxOffset = offsetList.get(offsetList.size() - 1); +- int size = offsetList.size(); ++ + for (int i = 0; i < offsetList.size(); i++) { +- Long offset = offsetList.get(i); + SelectMappedBufferResult msg = msgList.get(i); +- // put message into cache +- SelectMappedBufferResultWrapper resultWrapper = putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true); +- // try to meet maxCount ++ SelectBufferResultWrapper bufferResult = new SelectBufferResultWrapper( ++ msg, offsetList.get(i), tagCodeList.get(i), true); ++ this.putMessageToCache(flatFile, bufferResult); + if (newResult.getMessageMapedList().size() < maxCount) { +- newResult.addMessage(resultWrapper.getDuplicateResult(), offset); ++ newResult.addMessageExt(msg, offsetList.get(i), tagCodeList.get(i)); + } + } ++ ++ newResult.setStatus(GetMessageStatus.FOUND); ++ newResult.setMinOffset(flatFile.getConsumeQueueMinOffset()); ++ newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); + newResult.setNextBeginOffset(queueOffset + newResult.getMessageMapedList().size()); + return newResult; +- }, TieredStoreExecutor.fetchDataExecutor); ++ }); + + List>> futureList = new ArrayList<>(); + CompletableFuture inflightRequestFuture = resultFuture.thenApply(result -> +- result.getStatus() == GetMessageStatus.FOUND ? result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : -1L); ++ result.getStatus() == GetMessageStatus.FOUND ? ++ result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : -1L); + futureList.add(Pair.of(batchSize, inflightRequestFuture)); + flatFile.putInflightRequest(group, queueOffset, batchSize, futureList); + } + return resultFuture; + } + +- public CompletableFuture getMessageFromTieredStoreAsync(CompositeQueueFlatFile flatFile, +- long queueOffset, int batchSize) { ++ public CompletableFuture getMessageFromTieredStoreAsync( ++ CompositeQueueFlatFile flatFile, long queueOffset, int batchSize) { + +- GetMessageResult result = new GetMessageResult(); ++ GetMessageResultExt result = new GetMessageResultExt(); + result.setMinOffset(flatFile.getConsumeQueueMinOffset()); + result.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); ++ ++ if (queueOffset < result.getMaxOffset()) { ++ batchSize = Math.min(batchSize, (int) Math.min(result.getMaxOffset() - queueOffset, Integer.MAX_VALUE)); ++ } else if (queueOffset == result.getMaxOffset()) { ++ result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE); ++ result.setNextBeginOffset(queueOffset); ++ return CompletableFuture.completedFuture(result); ++ } else if (queueOffset > result.getMaxOffset()) { ++ result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY); ++ result.setNextBeginOffset(result.getMaxOffset()); ++ return CompletableFuture.completedFuture(result); ++ } ++ ++ LOGGER.info("MessageFetcher#getMessageFromTieredStoreAsync, " + ++ "topic: {}, queueId: {}, broker offset: {}-{}, offset: {}, expect: {}", ++ flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), ++ result.getMinOffset(), result.getMaxOffset(), queueOffset, batchSize); ++ + CompletableFuture readConsumeQueueFuture; + try { + readConsumeQueueFuture = flatFile.getConsumeQueueAsync(queueOffset, batchSize); +@@ -389,66 +376,56 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + } + } + +- CompletableFuture readCommitLogFuture = readConsumeQueueFuture.thenComposeAsync(cqBuffer -> { ++ CompletableFuture readCommitLogFuture = readConsumeQueueFuture.thenCompose(cqBuffer -> { + long firstCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer); + cqBuffer.position(cqBuffer.remaining() - TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); + long lastCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer); + if (lastCommitLogOffset < firstCommitLogOffset) { +- MessageQueue mq = flatFile.getMessageQueue(); +- LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: message is not in order, try to fetch data in next store, topic: {}, queueId: {}, batch size: {}, queue offset {}", +- mq.getTopic(), mq.getQueueId(), batchSize, queueOffset); +- throw new TieredStoreException(TieredStoreErrorCode.ILLEGAL_OFFSET, "message is not in order"); ++ LOGGER.error("MessageFetcher#getMessageFromTieredStoreAsync, " + ++ "last offset is smaller than first offset, " + ++ "topic: {} queueId: {}, offset: {}, firstOffset: {}, lastOffset: {}", ++ flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), queueOffset, ++ firstCommitLogOffset, lastCommitLogOffset); ++ return CompletableFuture.completedFuture(ByteBuffer.allocate(0)); + } +- long length = lastCommitLogOffset - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer); + +- // prevent OOM +- long originLength = length; +- while (cqBuffer.limit() > TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE && length > storeConfig.getReadAheadMessageSizeThreshold()) { ++ // Get the total size of the data by reducing the length limit of cq to prevent OOM ++ long length = lastCommitLogOffset - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer); ++ while (cqBuffer.limit() > TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE && ++ length > storeConfig.getReadAheadMessageSizeThreshold()) { + cqBuffer.limit(cqBuffer.position()); + cqBuffer.position(cqBuffer.limit() - TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); +- length = CQItemBufferUtil.getCommitLogOffset(cqBuffer) - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer); +- } +- +- if (originLength != length) { +- MessageQueue mq = flatFile.getMessageQueue(); +- LOGGER.info("TieredMessageFetcher#getMessageFromTieredStoreAsync: msg data is too large, topic: {}, queueId: {}, batch size: {}, fix it from {} to {}", +- mq.getTopic(), mq.getQueueId(), batchSize, originLength, length); ++ length = CQItemBufferUtil.getCommitLogOffset(cqBuffer) ++ - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer); + } + + return flatFile.getCommitLogAsync(firstCommitLogOffset, (int) length); +- }, TieredStoreExecutor.fetchDataExecutor); ++ }); + +- return readConsumeQueueFuture.thenCombineAsync(readCommitLogFuture, (cqBuffer, msgBuffer) -> { +- List> msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer); +- if (!msgList.isEmpty()) { +- int requestSize = cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE; ++ int finalBatchSize = batchSize; ++ return readConsumeQueueFuture.thenCombine(readCommitLogFuture, (cqBuffer, msgBuffer) -> { ++ List bufferList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer); ++ int requestSize = cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE; ++ if (bufferList.isEmpty()) { ++ result.setStatus(GetMessageStatus.NO_MATCHED_MESSAGE); ++ result.setNextBeginOffset(queueOffset + requestSize); ++ } else { + result.setStatus(GetMessageStatus.FOUND); +- result.setNextBeginOffset(queueOffset + msgList.size()); +- msgList.forEach(pair -> { +- msgBuffer.position(pair.getLeft()); +- ByteBuffer slice = msgBuffer.slice(); +- slice.limit(pair.getRight()); +- result.addMessage(new SelectMappedBufferResult(pair.getLeft(), slice, pair.getRight(), null), MessageBufferUtil.getQueueOffset(slice)); +- }); +- if (requestSize != msgList.size()) { +- Set requestOffsetSet = new HashSet<>(); +- for (int i = 0; i < requestSize; i++) { +- requestOffsetSet.add(queueOffset + i); +- } +- LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: split message buffer failed, batch size: {}, request message count: {}, actual message count: {}, these messages may lost: {}", batchSize, requestSize, msgList.size(), Sets.difference(requestOffsetSet, Sets.newHashSet(result.getMessageQueueOffset()))); +- } else if (requestSize != batchSize) { +- LOGGER.debug("TieredMessageFetcher#getMessageFromTieredStoreAsync: message count does not meet batch size, maybe dispatch delay: batch size: {}, request message count: {}", batchSize, requestSize); ++ result.setNextBeginOffset(queueOffset + requestSize); ++ ++ for (SelectBufferResult bufferResult : bufferList) { ++ ByteBuffer slice = bufferResult.getByteBuffer().slice(); ++ slice.limit(bufferResult.getSize()); ++ SelectMappedBufferResult msg = new SelectMappedBufferResult(bufferResult.getStartOffset(), ++ bufferResult.getByteBuffer(), bufferResult.getSize(), null); ++ result.addMessageExt(msg, MessageBufferUtil.getQueueOffset(slice), bufferResult.getTagCode()); + } +- return result; + } +- long nextBeginOffset = queueOffset + cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE; +- LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: split message buffer failed, consume queue buffer size: {}, message buffer size: {}, change offset from {} to {}", cqBuffer.remaining(), msgBuffer.remaining(), queueOffset, nextBeginOffset); +- result.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING); +- result.setNextBeginOffset(nextBeginOffset); + return result; +- }, TieredStoreExecutor.fetchDataExecutor).exceptionally(e -> { ++ }).exceptionally(e -> { + MessageQueue mq = flatFile.getMessageQueue(); +- LOGGER.warn("TieredMessageFetcher#getMessageFromTieredStoreAsync: get message failed: topic: {} queueId: {}", mq.getTopic(), mq.getQueueId(), e); ++ LOGGER.warn("MessageFetcher#getMessageFromTieredStoreAsync failed, " + ++ "topic: {} queueId: {}, offset: {}, batchSize: {}", mq.getTopic(), mq.getQueueId(), queueOffset, finalBatchSize, e); + result.setStatus(GetMessageStatus.OFFSET_FOUND_NULL); + result.setNextBeginOffset(queueOffset); + return result; +@@ -498,7 +475,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + return CompletableFuture.completedFuture(result); + } + +- return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount); ++ return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, true) ++ .thenApply(messageResultExt -> messageResultExt.doFilterMessage(messageFilter)); + } + + @Override +@@ -546,7 +524,7 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + return flatFile.getOffsetInConsumeQueueByTime(timestamp, type); + } catch (Exception e) { + LOGGER.error("TieredMessageFetcher#getOffsetInQueueByTime: " + +- "get offset in queue by time failed: topic: {}, queue: {}, timestamp: {}, type: {}", ++ "get offset in queue by time failed: topic: {}, queue: {}, timestamp: {}, type: {}", + topic, queueId, timestamp, type, e); + } + return -1L; +@@ -598,7 +576,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result); + }).whenComplete((result, throwable) -> { + if (result != null) { +- LOGGER.info("MessageFetcher#queryMessageAsync, query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}", ++ LOGGER.info("MessageFetcher#queryMessageAsync, " + ++ "query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}", + result.getMessageBufferList().size(), topic, topicId, key, maxCount, begin, end); + } + }); +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +index edaa5d19f..015c27efa 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +@@ -213,8 +213,10 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + // so there is no need to update the maximum offset to the local cq offset here, + // otherwise it will cause repeated consumption after next begin offset over commit offset. + +- logger.trace("GetMessageAsync result, group: {}, topic: {}, queueId: {}, offset: {}, count:{}, {}", +- group, topic, queueId, offset, maxMsgNums, result); ++ if (storeConfig.isRecordGetMessageResult()) { ++ logger.info("GetMessageAsync result, {}, group: {}, topic: {}, queueId: {}, offset: {}, count:{}", ++ result, group, topic, queueId, offset, maxMsgNums); ++ } + + return result; + }).exceptionally(e -> { +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java +new file mode 100644 +index 000000000..52462b5dc +--- /dev/null ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java +@@ -0,0 +1,76 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.tieredstore.common; ++ ++import java.util.ArrayList; ++import java.util.List; ++import org.apache.rocketmq.store.GetMessageResult; ++import org.apache.rocketmq.store.GetMessageStatus; ++import org.apache.rocketmq.store.MessageFilter; ++import org.apache.rocketmq.store.SelectMappedBufferResult; ++import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; ++ ++public class GetMessageResultExt extends GetMessageResult { ++ ++ private final List tagCodeList; ++ ++ public GetMessageResultExt() { ++ this.tagCodeList = new ArrayList<>(); ++ } ++ ++ public void addMessageExt(SelectMappedBufferResult bufferResult, long queueOffset, long tagCode) { ++ super.addMessage(bufferResult, queueOffset); ++ this.tagCodeList.add(tagCode); ++ } ++ ++ public List getTagCodeList() { ++ return tagCodeList; ++ } ++ ++ public GetMessageResult doFilterMessage(MessageFilter messageFilter) { ++ if (GetMessageStatus.FOUND != super.getStatus() || messageFilter == null) { ++ return this; ++ } ++ ++ GetMessageResult result = new GetMessageResult(); ++ result.setStatus(GetMessageStatus.FOUND); ++ result.setMinOffset(this.getMinOffset()); ++ result.setMaxOffset(this.getMaxOffset()); ++ result.setNextBeginOffset(this.getNextBeginOffset()); ++ ++ for (int i = 0; i < this.getMessageMapedList().size(); i++) { ++ if (!messageFilter.isMatchedByConsumeQueue(this.tagCodeList.get(i), null)) { ++ continue; ++ } ++ ++ SelectMappedBufferResult bufferResult = this.getMessageMapedList().get(i); ++ if (!messageFilter.isMatchedByCommitLog(bufferResult.getByteBuffer().slice(), null)) { ++ continue; ++ } ++ ++ result.addMessage(new SelectMappedBufferResult(bufferResult.getStartOffset(), ++ bufferResult.getByteBuffer(), bufferResult.getSize(), null), ++ MessageBufferUtil.getQueueOffset(bufferResult.getByteBuffer())); ++ } ++ ++ if (result.getBufferTotalSize() == 0) { ++ result.setStatus(GetMessageStatus.NO_MATCHED_MESSAGE); ++ } ++ return result; ++ } ++} +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java +new file mode 100644 +index 000000000..d265ed0fc +--- /dev/null ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java +@@ -0,0 +1,51 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.tieredstore.common; ++ ++import java.nio.ByteBuffer; ++ ++public class SelectBufferResult { ++ ++ private final ByteBuffer byteBuffer; ++ private final long startOffset; ++ private final int size; ++ private final long tagCode; ++ ++ public SelectBufferResult(ByteBuffer byteBuffer, long startOffset, int size, long tagCode) { ++ this.startOffset = startOffset; ++ this.byteBuffer = byteBuffer; ++ this.size = size; ++ this.tagCode = tagCode; ++ } ++ ++ public ByteBuffer getByteBuffer() { ++ return byteBuffer; ++ } ++ ++ public long getStartOffset() { ++ return startOffset; ++ } ++ ++ public int getSize() { ++ return size; ++ } ++ ++ public long getTagCode() { ++ return tagCode; ++ } ++} +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java +similarity index 55% +rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java +rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java +index af0785f71..4f9f00a07 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java +@@ -16,32 +16,21 @@ + */ + package org.apache.rocketmq.tieredstore.common; + +-import java.util.concurrent.atomic.LongAdder; ++import java.util.concurrent.atomic.AtomicInteger; + import org.apache.rocketmq.store.SelectMappedBufferResult; + +-public class SelectMappedBufferResultWrapper { ++public class SelectBufferResultWrapper { + + private final SelectMappedBufferResult result; +- private final LongAdder accessCount; +- +- private final long curOffset; +- private final long minOffset; +- private final long maxOffset; +- private final long size; +- +- public SelectMappedBufferResultWrapper( +- SelectMappedBufferResult result, long curOffset, long minOffset, long maxOffset, long size) { ++ private final long offset; ++ private final long tagCode; ++ private final AtomicInteger accessCount; + ++ public SelectBufferResultWrapper(SelectMappedBufferResult result, long offset, long tagCode, boolean used) { + this.result = result; +- this.accessCount = new LongAdder(); +- this.curOffset = curOffset; +- this.minOffset = minOffset; +- this.maxOffset = maxOffset; +- this.size = size; +- } +- +- public SelectMappedBufferResult getResult() { +- return result; ++ this.offset = offset; ++ this.tagCode = tagCode; ++ this.accessCount = new AtomicInteger(used ? 1 : 0); + } + + public SelectMappedBufferResult getDuplicateResult() { +@@ -53,27 +42,23 @@ public class SelectMappedBufferResultWrapper { + result.getMappedFile()); + } + +- public long getCurOffset() { +- return curOffset; +- } +- +- public long getMinOffset() { +- return minOffset; ++ public long getOffset() { ++ return offset; + } + +- public long getMaxOffset() { +- return maxOffset; ++ public int getBufferSize() { ++ return this.result.getSize(); + } + +- public long getSize() { +- return size; ++ public long getTagCode() { ++ return tagCode; + } + +- public void addAccessCount() { +- accessCount.increment(); ++ public int incrementAndGet() { ++ return accessCount.incrementAndGet(); + } + +- public long getAccessCount() { +- return accessCount.sum(); ++ public int getAccessCount() { ++ return accessCount.get(); + } + } +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java +index 595db6b86..b0750e550 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java +@@ -82,6 +82,7 @@ public class TieredMessageStoreConfig { + + private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; + private boolean messageIndexEnable = true; ++ private boolean recordGetMessageResult = false; + + // CommitLog file size, default is 1G + private long tieredStoreCommitLogMaxSize = 1024 * 1024 * 1024; +@@ -182,6 +183,14 @@ public class TieredMessageStoreConfig { + this.messageIndexEnable = messageIndexEnable; + } + ++ public boolean isRecordGetMessageResult() { ++ return recordGetMessageResult; ++ } ++ ++ public void setRecordGetMessageResult(boolean recordGetMessageResult) { ++ this.recordGetMessageResult = recordGetMessageResult; ++ } ++ + public long getTieredStoreCommitLogMaxSize() { + return tieredStoreCommitLogMaxSize; + } +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java +index d8a07f0a7..2b9fc59d8 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java +@@ -46,7 +46,7 @@ import org.apache.rocketmq.store.MessageStore; + import org.apache.rocketmq.tieredstore.TieredMessageFetcher; + import org.apache.rocketmq.tieredstore.common.FileSegmentType; + import org.apache.rocketmq.tieredstore.common.MessageCacheKey; +-import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper; ++import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper; + import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; + import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile; + import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager; +@@ -265,7 +265,7 @@ public class TieredStoreMetricsManager { + .setUnit("bytes") + .ofLongs() + .buildWithCallback(measurement -> { +- Optional> eviction = fetcher.getMessageCache().policy().eviction(); ++ Optional> eviction = fetcher.getMessageCache().policy().eviction(); + eviction.ifPresent(resultEviction -> measurement.record(resultEviction.weightedSize().orElse(0), newAttributesBuilder().build())); + }); + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java +index aad42de98..5e3d8c562 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java +@@ -295,7 +295,7 @@ public abstract class TieredFileSegment implements Comparable + return future; + } + if (position + length > commitPosition) { +- logger.warn("TieredFileSegment#readAsync request position + length is greater than commit position," + ++ logger.debug("TieredFileSegment#readAsync request position + length is greater than commit position," + + " correct length using commit position, file: {}, request position: {}, commit position:{}, change length from {} to {}", + getPath(), position, commitPosition, length, commitPosition - position); + length = (int) (commitPosition - position); +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java +index 6db45a747..2c4a6e578 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java +@@ -20,11 +20,11 @@ import java.nio.ByteBuffer; + import java.util.ArrayList; + import java.util.List; + import java.util.Map; +-import org.apache.commons.lang3.tuple.Pair; + import org.apache.rocketmq.common.UtilAll; + import org.apache.rocketmq.common.message.MessageDecoder; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; ++import org.apache.rocketmq.tieredstore.common.SelectBufferResult; + import org.apache.rocketmq.tieredstore.file.TieredCommitLog; + import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue; + +@@ -113,53 +113,72 @@ public class MessageBufferUtil { + return MessageDecoder.decodeProperties(slice); + } + +- public static List> splitMessageBuffer( +- ByteBuffer cqBuffer, ByteBuffer msgBuffer) { ++ public static List splitMessageBuffer(ByteBuffer cqBuffer, ByteBuffer msgBuffer) { ++ + cqBuffer.rewind(); + msgBuffer.rewind(); +- List> messageList = new ArrayList<>(cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); ++ ++ List bufferResultList = new ArrayList<>( ++ cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); ++ ++ if (msgBuffer.remaining() == 0) { ++ logger.error("MessageBufferUtil#splitMessage, msg buffer length is zero"); ++ return bufferResultList; ++ } ++ + if (cqBuffer.remaining() % TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE != 0) { +- logger.warn("MessageBufferUtil#splitMessage: consume queue buffer size {} is not an integer multiple of CONSUME_QUEUE_STORE_UNIT_SIZE {}", +- cqBuffer.remaining(), TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); +- return messageList; ++ logger.error("MessageBufferUtil#splitMessage, consume queue buffer size incorrect, {}", cqBuffer.remaining()); ++ return bufferResultList; + } ++ + try { +- long startCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer); +- for (int pos = cqBuffer.position(); pos < cqBuffer.limit(); pos += TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) { +- cqBuffer.position(pos); +- int diff = (int) (CQItemBufferUtil.getCommitLogOffset(cqBuffer) - startCommitLogOffset); +- int size = CQItemBufferUtil.getSize(cqBuffer); +- if (diff + size > msgBuffer.limit()) { +- logger.error("MessageBufferUtil#splitMessage: message buffer size is incorrect: record in consume queue: {}, actual: {}", diff + size, msgBuffer.remaining()); +- return messageList; ++ long firstCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer); ++ ++ for (int position = cqBuffer.position(); position < cqBuffer.limit(); ++ position += TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) { ++ ++ cqBuffer.position(position); ++ long logOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer); ++ int bufferSize = CQItemBufferUtil.getSize(cqBuffer); ++ long tagCode = CQItemBufferUtil.getTagCode(cqBuffer); ++ ++ int offset = (int) (logOffset - firstCommitLogOffset); ++ if (offset + bufferSize > msgBuffer.limit()) { ++ logger.error("MessageBufferUtil#splitMessage, message buffer size incorrect. " + ++ "Expect length in consume queue: {}, actual length: {}", offset + bufferSize, msgBuffer.limit()); ++ break; + } +- msgBuffer.position(diff); + ++ msgBuffer.position(offset); + int magicCode = getMagicCode(msgBuffer); + if (magicCode == TieredCommitLog.BLANK_MAGIC_CODE) { +- logger.warn("MessageBufferUtil#splitMessage: message decode error: blank magic code, this message may be coda, try to fix offset"); +- diff = diff + TieredCommitLog.CODA_SIZE; +- msgBuffer.position(diff); ++ offset += TieredCommitLog.CODA_SIZE; ++ msgBuffer.position(offset); + magicCode = getMagicCode(msgBuffer); + } +- if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) { +- logger.warn("MessageBufferUtil#splitMessage: message decode error: unknown magic code"); ++ if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && ++ magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) { ++ logger.warn("MessageBufferUtil#splitMessage, found unknown magic code. " + ++ "Message offset: {}, wrong magic code: {}", offset, magicCode); + continue; + } + +- if (getTotalSize(msgBuffer) != size) { +- logger.warn("MessageBufferUtil#splitMessage: message size is not right: except: {}, actual: {}", size, getTotalSize(msgBuffer)); ++ if (bufferSize != getTotalSize(msgBuffer)) { ++ logger.warn("MessageBufferUtil#splitMessage, message length in commitlog incorrect. " + ++ "Except length in commitlog: {}, actual: {}", getTotalSize(msgBuffer), bufferSize); + continue; + } + +- messageList.add(Pair.of(diff, size)); ++ ByteBuffer sliceBuffer = msgBuffer.slice(); ++ sliceBuffer.limit(bufferSize); ++ bufferResultList.add(new SelectBufferResult(sliceBuffer, offset, bufferSize, tagCode)); + } + } catch (Exception e) { +- logger.error("MessageBufferUtil#splitMessage: split message failed, maybe decode consume queue item failed", e); ++ logger.error("MessageBufferUtil#splitMessage, split message buffer error", e); + } finally { + cqBuffer.rewind(); + msgBuffer.rewind(); + } +- return messageList; ++ return bufferResultList; + } + } +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java +index 4e0d7e697..4e8287533 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java +@@ -31,7 +31,7 @@ import org.apache.rocketmq.store.GetMessageStatus; + import org.apache.rocketmq.store.QueryMessageResult; + import org.apache.rocketmq.store.SelectMappedBufferResult; + import org.apache.rocketmq.tieredstore.common.AppendResult; +-import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper; ++import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper; + import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; + import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; + import org.apache.rocketmq.tieredstore.file.CompositeFlatFile; +@@ -143,17 +143,18 @@ public class TieredMessageFetcherTest { + + fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, new ArrayList<>()); + Assert.assertEquals(0, fetcher.getMessageCache().estimatedSize()); +- fetcher.putMessageToCache(flatFile, 0, new SelectMappedBufferResult(0, msg1, msg1.remaining(), null), 0, 0, 1); ++ SelectMappedBufferResult bufferResult = new SelectMappedBufferResult(0, msg1, msg1.remaining(), null); ++ fetcher.putMessageToCache(flatFile, new SelectBufferResultWrapper(bufferResult, 0, 0, false)); + Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize()); + +- GetMessageResult getMessageResult = fetcher.getMessageFromCacheAsync(flatFile, "group", 0, 32).join(); ++ GetMessageResult getMessageResult = fetcher.getMessageFromCacheAsync(flatFile, "group", 0, 32, true).join(); + Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus()); + Assert.assertEquals(1, getMessageResult.getMessageBufferList().size()); + Assert.assertEquals(msg1, getMessageResult.getMessageBufferList().get(0)); + + Awaitility.waitAtMost(3, TimeUnit.SECONDS) + .until(() -> fetcher.getMessageCache().estimatedSize() == 2); +- ArrayList wrapperList = new ArrayList<>(); ++ ArrayList wrapperList = new ArrayList<>(); + wrapperList.add(fetcher.getMessageFromCache(flatFile, 0)); + fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, wrapperList); + Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize()); +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java +new file mode 100644 +index 000000000..deb8770d2 +--- /dev/null ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java +@@ -0,0 +1,65 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.tieredstore.common; ++ ++import java.nio.ByteBuffer; ++import java.util.Map; ++import org.apache.rocketmq.store.ConsumeQueueExt; ++import org.apache.rocketmq.store.GetMessageResult; ++import org.apache.rocketmq.store.GetMessageStatus; ++import org.apache.rocketmq.store.MessageFilter; ++import org.apache.rocketmq.store.SelectMappedBufferResult; ++import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; ++import org.junit.Assert; ++import org.junit.Test; ++ ++import static org.junit.Assert.assertEquals; ++ ++public class GetMessageResultExtTest { ++ ++ @Test ++ public void doFilterTest() { ++ GetMessageResultExt resultExt = new GetMessageResultExt(); ++ Assert.assertEquals(0, resultExt.doFilterMessage(null).getMessageCount()); ++ resultExt.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE); ++ Assert.assertEquals(0, resultExt.doFilterMessage(null).getMessageCount()); ++ resultExt.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY); ++ Assert.assertEquals(0, resultExt.doFilterMessage(null).getMessageCount()); ++ ++ resultExt.addMessageExt(new SelectMappedBufferResult( ++ 1000L, MessageBufferUtilTest.buildMockedMessageBuffer(), 100, null), ++ 0, "TagA".hashCode()); ++ resultExt.addMessageExt(new SelectMappedBufferResult( ++ 2000L, MessageBufferUtilTest.buildMockedMessageBuffer(), 100, null), ++ 0, "TagB".hashCode()); ++ assertEquals(2, resultExt.getMessageCount()); ++ ++ resultExt.setStatus(GetMessageStatus.FOUND); ++ GetMessageResult getMessageResult = resultExt.doFilterMessage(new MessageFilter() { ++ @Override ++ public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { ++ return false; ++ } ++ ++ @Override ++ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map properties) { ++ return false; ++ } ++ }); ++ Assert.assertEquals(0, getMessageResult.getMessageCount()); ++ } ++} +\ No newline at end of file +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java +new file mode 100644 +index 000000000..b7e6e639f +--- /dev/null ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java +@@ -0,0 +1,37 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.tieredstore.common; ++ ++import java.nio.ByteBuffer; ++import org.junit.Assert; ++import org.junit.Test; ++ ++public class SelectBufferResultTest { ++ @Test ++ public void testSelectBufferResult() { ++ ByteBuffer buffer = ByteBuffer.allocate(10); ++ long startOffset = 5L; ++ int size = 10; ++ long tagCode = 1L; ++ ++ SelectBufferResult result = new SelectBufferResult(buffer, startOffset, size, tagCode); ++ Assert.assertEquals(buffer, result.getByteBuffer()); ++ Assert.assertEquals(startOffset, result.getStartOffset()); ++ Assert.assertEquals(size, result.getSize()); ++ Assert.assertEquals(tagCode, result.getTagCode()); ++ } ++} +\ No newline at end of file +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java +index 68277cacc..a0b438948 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java +@@ -22,9 +22,9 @@ import java.nio.charset.StandardCharsets; + import java.util.HashMap; + import java.util.List; + import java.util.Map; +-import org.apache.commons.lang3.tuple.Pair; + import org.apache.rocketmq.common.message.MessageConst; + import org.apache.rocketmq.common.message.MessageDecoder; ++import org.apache.rocketmq.tieredstore.common.SelectBufferResult; + import org.apache.rocketmq.tieredstore.file.TieredCommitLog; + import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue; + import org.junit.Assert; +@@ -206,10 +206,12 @@ public class MessageBufferUtilTest { + cqBuffer.flip(); + cqBuffer1.rewind(); + cqBuffer2.rewind(); +- List> msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer); ++ List msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer); + Assert.assertEquals(2, msgList.size()); +- Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0)); +- Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1)); ++ Assert.assertEquals(0, msgList.get(0).getStartOffset()); ++ Assert.assertEquals(MSG_LEN, msgList.get(0).getSize()); ++ Assert.assertEquals(MSG_LEN + TieredCommitLog.CODA_SIZE, msgList.get(1).getStartOffset()); ++ Assert.assertEquals(MSG_LEN, msgList.get(1).getSize()); + + cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 2); + cqBuffer.put(cqBuffer1); +@@ -219,7 +221,8 @@ public class MessageBufferUtilTest { + cqBuffer4.rewind(); + msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer); + Assert.assertEquals(1, msgList.size()); +- Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0)); ++ Assert.assertEquals(0, msgList.get(0).getStartOffset()); ++ Assert.assertEquals(MSG_LEN, msgList.get(0).getSize()); + + cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 3); + cqBuffer.put(cqBuffer1); +@@ -227,8 +230,10 @@ public class MessageBufferUtilTest { + cqBuffer.flip(); + msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer); + Assert.assertEquals(2, msgList.size()); +- Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0)); +- Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1)); ++ Assert.assertEquals(0, msgList.get(0).getStartOffset()); ++ Assert.assertEquals(MSG_LEN, msgList.get(0).getSize()); ++ Assert.assertEquals(MSG_LEN + TieredCommitLog.CODA_SIZE, msgList.get(1).getStartOffset()); ++ Assert.assertEquals(MSG_LEN, msgList.get(1).getSize()); + + cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); + cqBuffer.put(cqBuffer5); +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index a5091dd..2a7a057 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.5 -Release: 42 +Release: 43 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -51,6 +51,7 @@ Patch0038: patch038-backport-SlaveActingMaster-Timer-Message-retry-without-escap Patch0039: patch039-backport-add-some-validations.patch Patch0040: patch040-backport-add-some-test-cases.patch Patch0041: patch041-backport-improve-performance.patch +Patch0042: patch042-backport-Support-message-filtering.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -91,6 +92,9 @@ exit 0 %changelog +* Mon Dec 11 2023 ShiZhili - 5.1.3-43 +- backport Support message filtering + * Mon Dec 11 2023 ShiZhili - 5.1.3-42 - backport add improve performance -- Gitee