diff --git a/patch015-backport-fix-some-bugs.patch b/patch015-backport-fix-some-bugs.patch new file mode 100644 index 0000000000000000000000000000000000000000..11c10a6bdf3a3465e8adbebf182d74bf47919ef5 --- /dev/null +++ b/patch015-backport-fix-some-bugs.patch @@ -0,0 +1,1894 @@ +From bd0e9c09db9748f7f74a0c707579142dccf30afc Mon Sep 17 00:00:00 2001 +From: PiteXChen <44110731+RapperCL@users.noreply.github.com> +Date: Tue, 29 Aug 2023 19:39:27 +0800 +Subject: [PATCH 1/7] [ISSUE #7111] Remove responseFuture from the + responseTable when exception occurs (#7112) + +* remove responseFuture when exception +* Empty-Commit + +--------- +Co-authored-by: chenyong152 +--- + .../apache/rocketmq/remoting/netty/NettyRemotingAbstract.java | 1 + + 1 file changed, 1 insertion(+) + +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +index 44d6a3df4..fce2de267 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +@@ -529,6 +529,7 @@ public abstract class NettyRemotingAbstract { + log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); + }); + } catch (Exception e) { ++ responseTable.remove(opaque); + responseFuture.release(); + log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); + throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); +-- +2.32.0.windows.2 + + +From c78061bf6ca5f35452510ec4107c46735c51c316 Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Wed, 30 Aug 2023 22:29:51 +0800 +Subject: [PATCH 2/7] [ISSUE#7280] Fix and refactor handle commit exception in + tiered storage (#7281) + +* refactor handle commit exception + +* refactor handle commit exception + +* fix handle commit exception +--- + .../tieredstore/TieredDispatcher.java | 3 +- + .../tieredstore/TieredMessageFetcher.java | 57 ++-- + .../tieredstore/TieredMessageStore.java | 26 +- + .../provider/TieredFileSegment.java | 291 ++++++++++-------- + .../provider/TieredStoreProvider.java | 8 +- + .../provider/posix/PosixFileSegment.java | 4 +- + .../CommitLogInputStream.java} | 30 +- + .../FileSegmentInputStream.java} | 49 ++- + .../FileSegmentInputStreamFactory.java} | 26 +- + .../tieredstore/TieredMessageStoreTest.java | 14 +- + .../tieredstore/file/TieredFlatFileTest.java | 2 + + .../tieredstore/file/TieredIndexFileTest.java | 2 + + ...m.java => MockFileSegmentInputStream.java} | 8 +- + .../TieredFileSegmentInputStreamTest.java | 24 +- + .../provider/TieredFileSegmentTest.java | 89 +++++- + .../provider/memory/MemoryFileSegment.java | 27 +- + .../memory/MemoryFileSegmentWithoutCheck.java | 4 +- + 17 files changed, 427 insertions(+), 237 deletions(-) + rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/{inputstream/TieredCommitLogInputStream.java => stream/CommitLogInputStream.java} (88%) + rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/{inputstream/TieredFileSegmentInputStream.java => stream/FileSegmentInputStream.java} (77%) + rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/{inputstream/TieredFileSegmentInputStreamFactory.java => stream/FileSegmentInputStreamFactory.java} (54%) + rename tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/{MockTieredFileSegmentInputStream.java => MockFileSegmentInputStream.java} (82%) + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +index 1746190cd..430c2b62e 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +@@ -318,8 +318,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch + continue; + case FILE_CLOSED: + tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue()); +- logger.info("TieredDispatcher#dispatchFlatFile: file has been close and destroy, " + +- "topic: {}, queueId: {}", topic, queueId); ++ logger.info("File has been closed and destroy, topic: {}, queueId: {}", topic, queueId); + return; + default: + dispatchOffset--; +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +index 9a9a3e5a5..766ff64f6 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +@@ -273,15 +273,17 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + TieredStoreMetricsManager.cacheHit.add(resultWrapperList.size(), attributes); + } + +- // if no cached message found and there is currently an inflight request, wait for the request to end before continuing ++ // If there are no messages in the cache and there are currently requests being pulled. ++ // We need to wait for the request to return before continuing. + if (resultWrapperList.isEmpty() && waitInflightRequest) { +- CompletableFuture future = flatFile.getInflightRequest(group, queueOffset, maxCount) +- .getFuture(queueOffset); ++ CompletableFuture future = ++ flatFile.getInflightRequest(group, queueOffset, maxCount).getFuture(queueOffset); + if (!future.isDone()) { + Stopwatch stopwatch = Stopwatch.createStarted(); + // to prevent starvation issues, only allow waiting for inflight request once + return future.thenCompose(v -> { +- LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: wait for inflight request cost: {}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS)); ++ LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: wait for response cost: {}ms", ++ stopwatch.elapsed(TimeUnit.MILLISECONDS)); + return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, false); + }); + } +@@ -302,7 +304,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + + // if cache hit, result will be returned immediately and asynchronously prefetch messages for later requests + if (!resultWrapperList.isEmpty()) { +- LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: cache hit: topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}", ++ LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: " + ++ "topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}", + mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); + prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); + +@@ -316,8 +319,10 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + } + + // if cache is miss, immediately pull messages +- LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: topic: {}, queue: {}, queue offset: {}, max message num: {}", ++ LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + ++ "topic: {}, queue: {}, queue offset: {}, max message num: {}", + mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); ++ + CompletableFuture resultFuture; + synchronized (flatFile) { + int batchSize = maxCount * storeConfig.getReadAheadMinFactor(); +@@ -453,42 +458,42 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + public CompletableFuture getMessageAsync( + String group, String topic, int queueId, long queueOffset, int maxCount, final MessageFilter messageFilter) { + ++ GetMessageResult result = new GetMessageResult(); + CompositeQueueFlatFile flatFile = flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, queueId)); ++ + if (flatFile == null) { +- GetMessageResult result = new GetMessageResult(); + result.setNextBeginOffset(queueOffset); + result.setStatus(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE); + return CompletableFuture.completedFuture(result); + } + +- GetMessageResult result = new GetMessageResult(); +- long minQueueOffset = flatFile.getConsumeQueueMinOffset(); +- long maxQueueOffset = flatFile.getConsumeQueueCommitOffset(); +- result.setMinOffset(minQueueOffset); +- result.setMaxOffset(maxQueueOffset); ++ // Max queue offset means next message put position ++ result.setMinOffset(flatFile.getConsumeQueueMinOffset()); ++ result.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); ++ ++ // Fill result according file offset. ++ // Offset range | Result | Fix to ++ // (-oo, 0] | no message | current offset ++ // (0, min) | too small | min offset ++ // [min, max) | correct | ++ // [max, max] | overflow one | max offset ++ // (max, +oo) | overflow badly | max offset + +- if (flatFile.getConsumeQueueCommitOffset() <= 0) { ++ if (result.getMaxOffset() <= 0) { + result.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE); + result.setNextBeginOffset(queueOffset); + return CompletableFuture.completedFuture(result); +- } +- +- // request range | result +- // (0, min) | too small +- // [min, max) | correct +- // [max, max] | overflow one +- // (max, +oo) | overflow badly +- if (queueOffset < minQueueOffset) { ++ } else if (queueOffset < result.getMinOffset()) { + result.setStatus(GetMessageStatus.OFFSET_TOO_SMALL); +- result.setNextBeginOffset(flatFile.getConsumeQueueMinOffset()); ++ result.setNextBeginOffset(result.getMinOffset()); + return CompletableFuture.completedFuture(result); +- } else if (queueOffset == maxQueueOffset) { ++ } else if (queueOffset == result.getMaxOffset()) { + result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE); +- result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset()); ++ result.setNextBeginOffset(result.getMaxOffset()); + return CompletableFuture.completedFuture(result); +- } else if (queueOffset > maxQueueOffset) { ++ } else if (queueOffset > result.getMaxOffset()) { + result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY); +- result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset()); ++ result.setNextBeginOffset(result.getMaxOffset()); + return CompletableFuture.completedFuture(result); + } + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +index 5240ac8e9..78e855f36 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +@@ -99,11 +99,11 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + return storeConfig; + } + +- public boolean viaTieredStorage(String topic, int queueId, long offset) { +- return viaTieredStorage(topic, queueId, offset, 1); ++ public boolean fetchFromCurrentStore(String topic, int queueId, long offset) { ++ return fetchFromCurrentStore(topic, queueId, offset, 1); + } + +- public boolean viaTieredStorage(String topic, int queueId, long offset, int batchSize) { ++ public boolean fetchFromCurrentStore(String topic, int queueId, long offset, int batchSize) { + TieredMessageStoreConfig.TieredStorageLevel deepStorageLevel = storeConfig.getTieredStorageLevel(); + + if (deepStorageLevel.check(TieredMessageStoreConfig.TieredStorageLevel.FORCE)) { +@@ -146,8 +146,10 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + public CompletableFuture getMessageAsync(String group, String topic, + int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) { + +- if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) { +- logger.trace("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset); ++ if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) { ++ logger.trace("GetMessageAsync from current store, topic: {}, queue: {}, offset: {}", topic, queueId, offset); ++ } else { ++ logger.trace("GetMessageAsync from next store, topic: {}, queue: {}, offset: {}", topic, queueId, offset); + return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter); + } + +@@ -168,14 +170,14 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + + if (next.checkInStoreByConsumeOffset(topic, queueId, offset)) { + TieredStoreMetricsManager.fallbackTotal.add(1, latencyAttributes); +- logger.debug("GetMessageAsync not found then try back to next store, result: {}, " + ++ logger.debug("GetMessageAsync not found, then back to next store, result: {}, " + + "topic: {}, queue: {}, queue offset: {}, offset range: {}-{}", + result.getStatus(), topic, queueId, offset, result.getMinOffset(), result.getMaxOffset()); + return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); + } + } + +- // system topic ++ // Fetch system topic data from the broker when using the force level. + if (result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) { + if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) { + return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); +@@ -198,7 +200,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + TieredStoreMetricsManager.messagesOutTotal.add(result.getMessageCount(), messagesOutAttributes); + } + +- // fix min or max offset according next store ++ // Fix min or max offset according next store at last + long minOffsetInQueue = next.getMinOffsetInQueue(topic, queueId); + if (minOffsetInQueue >= 0 && minOffsetInQueue < result.getMinOffset()) { + result.setMinOffset(minOffsetInQueue); +@@ -209,7 +211,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + } + return result; + }).exceptionally(e -> { +- logger.error("GetMessageAsync from tiered store failed: ", e); ++ logger.error("GetMessageAsync from tiered store failed", e); + return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); + }); + } +@@ -251,7 +253,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + .build(); + TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes); + if (time < 0) { +- logger.debug("TieredMessageStore#getEarliestMessageTimeAsync: get earliest message time failed, try to get earliest message time from next store: topic: {}, queue: {}", ++ logger.debug("GetEarliestMessageTimeAsync failed, try to get earliest message time from next store: topic: {}, queue: {}", + topic, queueId); + return finalNextEarliestMessageTime != Long.MAX_VALUE ? finalNextEarliestMessageTime : -1; + } +@@ -262,7 +264,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + @Override + public CompletableFuture getMessageStoreTimeStampAsync(String topic, int queueId, + long consumeQueueOffset) { +- if (viaTieredStorage(topic, queueId, consumeQueueOffset)) { ++ if (fetchFromCurrentStore(topic, queueId, consumeQueueOffset)) { + Stopwatch stopwatch = Stopwatch.createStarted(); + return fetcher.getMessageStoreTimeStampAsync(topic, queueId, consumeQueueOffset) + .thenApply(time -> { +@@ -272,7 +274,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + .build(); + TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes); + if (time == -1) { +- logger.debug("TieredMessageStore#getMessageStoreTimeStampAsync: get message time failed, try to get message time from next store: topic: {}, queue: {}, queue offset: {}", ++ logger.debug("GetEarliestMessageTimeAsync failed, try to get message time from next store, topic: {}, queue: {}, queue offset: {}", + topic, queueId, consumeQueueOffset); + return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset); + } +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java +index 5062c7d9e..32911a6e8 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java +@@ -16,14 +16,11 @@ + */ + package org.apache.rocketmq.tieredstore.provider; + +-import com.google.common.annotations.VisibleForTesting; +-import com.google.common.base.Stopwatch; + import java.nio.ByteBuffer; + import java.util.ArrayList; + import java.util.List; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.Semaphore; +-import java.util.concurrent.TimeUnit; + import java.util.concurrent.locks.ReentrantLock; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +@@ -35,8 +32,8 @@ import org.apache.rocketmq.tieredstore.exception.TieredStoreException; + import org.apache.rocketmq.tieredstore.file.TieredCommitLog; + import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue; + import org.apache.rocketmq.tieredstore.file.TieredIndexFile; +-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; +-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory; ++import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; ++import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; + +@@ -50,22 +47,23 @@ public abstract class TieredFileSegment implements Comparable + protected final TieredMessageStoreConfig storeConfig; + + private final long maxSize; +- private final ReentrantLock bufferLock; +- private final Semaphore commitLock; ++ private final ReentrantLock bufferLock = new ReentrantLock(); ++ private final Semaphore commitLock = new Semaphore(1); + +- private volatile boolean full; +- private volatile boolean closed; ++ private volatile boolean full = false; ++ private volatile boolean closed = false; + +- private volatile long minTimestamp; +- private volatile long maxTimestamp; +- private volatile long commitPosition; +- private volatile long appendPosition; ++ private volatile long minTimestamp = Long.MAX_VALUE; ++ private volatile long maxTimestamp = Long.MAX_VALUE; ++ private volatile long commitPosition = 0L; ++ private volatile long appendPosition = 0L; + + // only used in commitLog +- private volatile long dispatchCommitOffset = 0; ++ private volatile long dispatchCommitOffset = 0L; + + private ByteBuffer codaBuffer; +- private List uploadBufferList = new ArrayList<>(); ++ private List bufferList = new ArrayList<>(); ++ private FileSegmentInputStream fileSegmentInputStream; + private CompletableFuture flightCommitRequest = CompletableFuture.completedFuture(false); + + public TieredFileSegment(TieredMessageStoreConfig storeConfig, +@@ -75,21 +73,13 @@ public abstract class TieredFileSegment implements Comparable + this.fileType = fileType; + this.filePath = filePath; + this.baseOffset = baseOffset; +- +- this.closed = false; +- this.bufferLock = new ReentrantLock(); +- this.commitLock = new Semaphore(1); +- +- this.commitPosition = 0L; +- this.appendPosition = 0L; +- this.minTimestamp = Long.MAX_VALUE; +- this.maxTimestamp = Long.MAX_VALUE; +- +- // The max segment size of a file is determined by the file type +- this.maxSize = getMaxSizeAccordingFileType(storeConfig); ++ this.maxSize = getMaxSizeByFileType(); + } + +- private long getMaxSizeAccordingFileType(TieredMessageStoreConfig storeConfig) { ++ /** ++ * The max segment size of a file is determined by the file type ++ */ ++ protected long getMaxSizeByFileType() { + switch (fileType) { + case COMMIT_LOG: + return storeConfig.getTieredStoreCommitLogMaxSize(); +@@ -184,39 +174,23 @@ public abstract class TieredFileSegment implements Comparable + this.appendPosition = pos; + } + +- private List rollingUploadBuffer() { ++ private List borrowBuffer() { + bufferLock.lock(); + try { +- List tmp = uploadBufferList; +- uploadBufferList = new ArrayList<>(); ++ List tmp = bufferList; ++ bufferList = new ArrayList<>(); + return tmp; + } finally { + bufferLock.unlock(); + } + } + +- private void sendBackBuffer(TieredFileSegmentInputStream inputStream) { +- bufferLock.lock(); +- try { +- List tmpBufferList = inputStream.getUploadBufferList(); +- for (ByteBuffer buffer : tmpBufferList) { +- buffer.rewind(); +- } +- tmpBufferList.addAll(uploadBufferList); +- uploadBufferList = tmpBufferList; +- if (inputStream.getCodaBuffer() != null) { +- codaBuffer.rewind(); +- } +- } finally { +- bufferLock.unlock(); +- } +- } +- + @SuppressWarnings("NonAtomicOperationOnVolatileField") +- public AppendResult append(ByteBuffer byteBuf, long timeStamp) { ++ public AppendResult append(ByteBuffer byteBuf, long timestamp) { + if (closed) { + return AppendResult.FILE_CLOSED; + } ++ + bufferLock.lock(); + try { + if (full || codaBuffer != null) { +@@ -227,7 +201,8 @@ public abstract class TieredFileSegment implements Comparable + minTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION); + maxTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_END_TIME_STAMP_POSITION); + appendPosition += byteBuf.remaining(); +- uploadBufferList.add(byteBuf); ++ // IndexFile is large and not change after compaction, no need deep copy ++ bufferList.add(byteBuf); + setFull(); + return AppendResult.SUCCESS; + } +@@ -236,23 +211,34 @@ public abstract class TieredFileSegment implements Comparable + setFull(); + return AppendResult.FILE_FULL; + } +- if (uploadBufferList.size() > storeConfig.getTieredStoreGroupCommitCount() ++ ++ if (bufferList.size() > storeConfig.getTieredStoreGroupCommitCount() + || appendPosition - commitPosition > storeConfig.getTieredStoreGroupCommitSize()) { + commitAsync(); + } +- if (uploadBufferList.size() > storeConfig.getTieredStoreMaxGroupCommitCount()) { +- logger.debug("TieredFileSegment#append: buffer full: file: {}, upload buffer size: {}", +- getPath(), uploadBufferList.size()); ++ ++ if (bufferList.size() > storeConfig.getTieredStoreMaxGroupCommitCount()) { ++ logger.debug("File segment append buffer full, file: {}, buffer size: {}, pending bytes: {}", ++ getPath(), bufferList.size(), appendPosition - commitPosition); + return AppendResult.BUFFER_FULL; + } +- if (timeStamp != Long.MAX_VALUE) { +- maxTimestamp = timeStamp; ++ ++ if (timestamp != Long.MAX_VALUE) { ++ maxTimestamp = timestamp; + if (minTimestamp == Long.MAX_VALUE) { +- minTimestamp = timeStamp; ++ minTimestamp = timestamp; + } + } ++ + appendPosition += byteBuf.remaining(); +- uploadBufferList.add(byteBuf); ++ ++ // deep copy buffer ++ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(byteBuf.remaining()); ++ byteBuffer.put(byteBuf); ++ byteBuffer.flip(); ++ byteBuf.rewind(); ++ ++ bufferList.add(byteBuffer); + return AppendResult.SUCCESS; + } finally { + bufferLock.unlock(); +@@ -267,7 +253,6 @@ public abstract class TieredFileSegment implements Comparable + return appendPosition; + } + +- @VisibleForTesting + public void setAppendPosition(long appendPosition) { + this.appendPosition = appendPosition; + } +@@ -333,6 +318,8 @@ public abstract class TieredFileSegment implements Comparable + if (closed) { + return false; + } ++ // result is false when we send real commit request ++ // use join for wait flight request done + Boolean result = commitAsync().join(); + if (!result) { + result = flightCommitRequest.join(); +@@ -340,92 +327,156 @@ public abstract class TieredFileSegment implements Comparable + return result; + } + ++ private void releaseCommitLock() { ++ if (commitLock.availablePermits() == 0) { ++ commitLock.release(); ++ } else { ++ logger.error("[Bug] FileSegmentCommitAsync, lock is already released: available permits: {}", ++ commitLock.availablePermits()); ++ } ++ } ++ ++ private void updateDispatchCommitOffset(List bufferList) { ++ if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) { ++ dispatchCommitOffset = ++ MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1)); ++ } ++ } ++ ++ /** ++ * @return false: commit, true: no commit operation ++ */ + @SuppressWarnings("NonAtomicOperationOnVolatileField") + public CompletableFuture commitAsync() { + if (closed) { + return CompletableFuture.completedFuture(false); + } +- Stopwatch stopwatch = Stopwatch.createStarted(); ++ + if (!needCommit()) { + return CompletableFuture.completedFuture(true); + } +- try { +- int permits = commitLock.drainPermits(); +- if (permits <= 0) { +- return CompletableFuture.completedFuture(false); +- } +- } catch (Exception e) { ++ ++ if (commitLock.drainPermits() <= 0) { + return CompletableFuture.completedFuture(false); + } +- List bufferList = rollingUploadBuffer(); +- int bufferSize = 0; +- for (ByteBuffer buffer : bufferList) { +- bufferSize += buffer.remaining(); +- } +- if (codaBuffer != null) { +- bufferSize += codaBuffer.remaining(); +- } +- if (bufferSize == 0) { +- return CompletableFuture.completedFuture(true); +- } +- TieredFileSegmentInputStream inputStream = TieredFileSegmentInputStreamFactory.build( +- fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize); +- int finalBufferSize = bufferSize; ++ + try { +- flightCommitRequest = commit0(inputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX) ++ if (fileSegmentInputStream != null) { ++ long fileSize = this.getSize(); ++ if (fileSize == -1L) { ++ logger.error("Get commit position error before commit, Commit: %d, Expect: %d, Current Max: %d, FileName: %s", ++ commitPosition, commitPosition + fileSegmentInputStream.getContentLength(), appendPosition, getPath()); ++ releaseCommitLock(); ++ return CompletableFuture.completedFuture(false); ++ } else { ++ if (correctPosition(fileSize, null)) { ++ updateDispatchCommitOffset(fileSegmentInputStream.getBufferList()); ++ fileSegmentInputStream = null; ++ } ++ } ++ } ++ ++ int bufferSize; ++ if (fileSegmentInputStream != null) { ++ bufferSize = fileSegmentInputStream.available(); ++ } else { ++ List bufferList = borrowBuffer(); ++ bufferSize = bufferList.stream().mapToInt(ByteBuffer::remaining).sum() ++ + (codaBuffer != null ? codaBuffer.remaining() : 0); ++ if (bufferSize == 0) { ++ releaseCommitLock(); ++ return CompletableFuture.completedFuture(true); ++ } ++ fileSegmentInputStream = FileSegmentInputStreamFactory.build( ++ fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize); ++ } ++ ++ return flightCommitRequest = this ++ .commit0(fileSegmentInputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX) + .thenApply(result -> { + if (result) { +- if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) { +- dispatchCommitOffset = MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1)); +- } +- commitPosition += finalBufferSize; ++ updateDispatchCommitOffset(fileSegmentInputStream.getBufferList()); ++ commitPosition += bufferSize; ++ fileSegmentInputStream = null; + return true; +- } +- sendBackBuffer(inputStream); +- return false; +- }) +- .exceptionally(e -> handleCommitException(inputStream, e)) +- .whenComplete((result, e) -> { +- if (commitLock.availablePermits() == 0) { +- logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize); +- commitLock.release(); + } else { +- logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits()); ++ fileSegmentInputStream.rewind(); ++ return false; + } +- }); +- return flightCommitRequest; ++ }) ++ .exceptionally(this::handleCommitException) ++ .whenComplete((result, e) -> releaseCommitLock()); ++ + } catch (Exception e) { +- handleCommitException(inputStream, e); +- if (commitLock.availablePermits() == 0) { +- logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize); +- commitLock.release(); +- } else { +- logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits()); +- } ++ handleCommitException(e); ++ releaseCommitLock(); + } + return CompletableFuture.completedFuture(false); + } + +- private boolean handleCommitException(TieredFileSegmentInputStream inputStream, Throwable e) { ++ private long getCorrectFileSize(Throwable throwable) { ++ if (throwable instanceof TieredStoreException) { ++ long fileSize = ((TieredStoreException) throwable).getPosition(); ++ if (fileSize > 0) { ++ return fileSize; ++ } ++ } ++ return getSize(); ++ } ++ ++ private boolean handleCommitException(Throwable e) { ++ // Get root cause here + Throwable cause = e.getCause() != null ? e.getCause() : e; +- sendBackBuffer(inputStream); +- long realSize = 0; +- if (cause instanceof TieredStoreException && ((TieredStoreException) cause).getPosition() > 0) { +- realSize = ((TieredStoreException) cause).getPosition(); ++ long fileSize = this.getCorrectFileSize(cause); ++ ++ if (fileSize == -1L) { ++ logger.error("Get commit position error, Commit: %d, Expect: %d, Current Max: %d, FileName: %s", ++ commitPosition, commitPosition + fileSegmentInputStream.getContentLength(), appendPosition, getPath()); ++ fileSegmentInputStream.rewind(); ++ return false; ++ } ++ ++ if (correctPosition(fileSize, cause)) { ++ updateDispatchCommitOffset(fileSegmentInputStream.getBufferList()); ++ fileSegmentInputStream = null; ++ return true; ++ } else { ++ fileSegmentInputStream.rewind(); ++ return false; + } +- if (realSize <= 0) { +- realSize = getSize(); ++ } ++ ++ /** ++ * return true to clear buffer ++ */ ++ private boolean correctPosition(long fileSize, Throwable throwable) { ++ ++ // Current we have three offsets here: commit offset, expect offset, file size. ++ // We guarantee that the commit offset is less than or equal to the expect offset. ++ // Max offset will increase because we can continuously put in new buffers ++ String handleInfo = throwable == null ? "before commit" : "after commit"; ++ long expectPosition = commitPosition + fileSegmentInputStream.getContentLength(); ++ ++ String offsetInfo = String.format("Correct Commit Position, %s, result=[{}], " + ++ "Commit: %d, Expect: %d, Current Max: %d, FileSize: %d, FileName: %s", ++ handleInfo, commitPosition, expectPosition, appendPosition, fileSize, this.getPath()); ++ ++ // We are believing that the file size returned by the server is correct, ++ // can reset the commit offset to the file size reported by the storage system. ++ if (fileSize == expectPosition) { ++ logger.info(offsetInfo, "Success", throwable); ++ commitPosition = fileSize; ++ return true; + } +- if (realSize > 0 && realSize > commitPosition) { +- logger.error("TieredFileSegment#handleCommitException: commit failed: file: {}, try to fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause); +- // TODO check if this diff part is uploaded to backend storage +- long diff = appendPosition - commitPosition; +- commitPosition = realSize; +- appendPosition = realSize + diff; +- // TODO check if appendPosition is large than maxOffset +- } else if (realSize < commitPosition) { +- logger.error("[Bug]TieredFileSegment#handleCommitException: commit failed: file: {}, can not fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause); ++ ++ if (fileSize < commitPosition) { ++ logger.error(offsetInfo, "FileSizeIncorrect", throwable); ++ } else if (fileSize == commitPosition) { ++ logger.warn(offsetInfo, "CommitFailed", throwable); ++ } else if (fileSize > commitPosition) { ++ logger.warn(offsetInfo, "PartialSuccess", throwable); + } ++ commitPosition = fileSize; + return false; + } + } +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java +index 5a0ca25f5..0db3eaf8f 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java +@@ -18,7 +18,7 @@ package org.apache.rocketmq.tieredstore.provider; + + import java.nio.ByteBuffer; + import java.util.concurrent.CompletableFuture; +-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; ++import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; + + public interface TieredStoreProvider { + +@@ -30,7 +30,9 @@ public interface TieredStoreProvider { + String getPath(); + + /** +- * Get file size in backend file system ++ * Get the real length of the file. ++ * Return 0 if the file does not exist, ++ * Return -1 if system get size failed. + * + * @return file real size + */ +@@ -71,5 +73,5 @@ public interface TieredStoreProvider { + * @param append try to append or create a new file + * @return put result, true if data successfully write; false otherwise + */ +- CompletableFuture commit0(TieredFileSegmentInputStream inputStream,long position, int length, boolean append); ++ CompletableFuture commit0(FileSegmentInputStream inputStream,long position, int length, boolean append); + } +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +index 52be90b1d..7e949cb28 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +@@ -36,7 +36,7 @@ import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; + import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; + import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager; + import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; +-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; ++import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; + + import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE; +@@ -184,7 +184,7 @@ public class PosixFileSegment extends TieredFileSegment { + + @Override + public CompletableFuture commit0( +- TieredFileSegmentInputStream inputStream, long position, int length, boolean append) { ++ FileSegmentInputStream inputStream, long position, int length, boolean append) { + + Stopwatch stopwatch = Stopwatch.createStarted(); + AttributesBuilder attributesBuilder = newAttributesBuilder() +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java +similarity index 88% +rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java +rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java +index c70bb7656..13b6e0ef9 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java +@@ -15,7 +15,7 @@ + * limitations under the License. + */ + +-package org.apache.rocketmq.tieredstore.provider.inputstream; ++package org.apache.rocketmq.tieredstore.provider.stream; + + import java.io.IOException; + import java.nio.ByteBuffer; +@@ -23,20 +23,23 @@ import java.util.List; + import org.apache.rocketmq.tieredstore.common.FileSegmentType; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; + +-public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { ++public class CommitLogInputStream extends FileSegmentInputStream { + + /** + * commitLogOffset is the real physical offset of the commitLog buffer which is being read + */ ++ private final long startCommitLogOffset; ++ + private long commitLogOffset; + + private final ByteBuffer codaBuffer; + + private long markCommitLogOffset = -1; + +- public TieredCommitLogInputStream(FileSegmentType fileType, long startOffset, ++ public CommitLogInputStream(FileSegmentType fileType, long startOffset, + List uploadBufferList, ByteBuffer codaBuffer, int contentLength) { + super(fileType, uploadBufferList, contentLength); ++ this.startCommitLogOffset = startOffset; + this.commitLogOffset = startOffset; + this.codaBuffer = codaBuffer; + } +@@ -53,6 +56,15 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { + this.commitLogOffset = markCommitLogOffset; + } + ++ @Override ++ public synchronized void rewind() { ++ super.rewind(); ++ this.commitLogOffset = this.startCommitLogOffset; ++ if (this.codaBuffer != null) { ++ this.codaBuffer.rewind(); ++ } ++ } ++ + @Override + public ByteBuffer getCodaBuffer() { + return this.codaBuffer; +@@ -64,17 +76,17 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { + return -1; + } + readPosition++; +- if (curReadBufferIndex >= uploadBufferList.size()) { ++ if (curReadBufferIndex >= bufferList.size()) { + return readCoda(); + } + int res; + if (readPosInCurBuffer >= curBuffer.remaining()) { + curReadBufferIndex++; +- if (curReadBufferIndex >= uploadBufferList.size()) { ++ if (curReadBufferIndex >= bufferList.size()) { + readPosInCurBuffer = 0; + return readCoda(); + } +- curBuffer = uploadBufferList.get(curReadBufferIndex); ++ curBuffer = bufferList.get(curReadBufferIndex); + commitLogOffset += readPosInCurBuffer; + readPosInCurBuffer = 0; + } +@@ -119,9 +131,9 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { + int posInCurBuffer = readPosInCurBuffer; + long curCommitLogOffset = commitLogOffset; + ByteBuffer curBuf = curBuffer; +- while (needRead > 0 && bufIndex <= uploadBufferList.size()) { ++ while (needRead > 0 && bufIndex <= bufferList.size()) { + int readLen, remaining, realReadLen = 0; +- if (bufIndex == uploadBufferList.size()) { ++ if (bufIndex == bufferList.size()) { + // read from coda buffer + remaining = codaBuffer.remaining() - posInCurBuffer; + readLen = Math.min(remaining, needRead); +@@ -137,7 +149,7 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { + } + remaining = curBuf.remaining() - posInCurBuffer; + readLen = Math.min(remaining, needRead); +- curBuf = uploadBufferList.get(bufIndex); ++ curBuf = bufferList.get(bufIndex); + if (posInCurBuffer < MessageBufferUtil.PHYSICAL_OFFSET_POSITION) { + realReadLen = Math.min(MessageBufferUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer, readLen); + // read from commitLog buffer +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java +similarity index 77% +rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java +rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java +index e1758ca93..9e9d5135c 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java +@@ -15,15 +15,16 @@ + * limitations under the License. + */ + +-package org.apache.rocketmq.tieredstore.provider.inputstream; ++package org.apache.rocketmq.tieredstore.provider.stream; + + import java.io.IOException; + import java.io.InputStream; + import java.nio.ByteBuffer; + import java.util.List; ++import org.apache.commons.collections.CollectionUtils; + import org.apache.rocketmq.tieredstore.common.FileSegmentType; + +-public class TieredFileSegmentInputStream extends InputStream { ++public class FileSegmentInputStream extends InputStream { + + /** + * file type, can be commitlog, consume queue or indexfile now +@@ -33,7 +34,7 @@ public class TieredFileSegmentInputStream extends InputStream { + /** + * hold bytebuffer + */ +- protected final List uploadBufferList; ++ protected final List bufferList; + + /** + * total remaining of bytebuffer list +@@ -65,13 +66,13 @@ public class TieredFileSegmentInputStream extends InputStream { + + private int markReadPosInCurBuffer = -1; + +- public TieredFileSegmentInputStream(FileSegmentType fileType, List uploadBufferList, +- int contentLength) { ++ public FileSegmentInputStream( ++ FileSegmentType fileType, List bufferList, int contentLength) { + this.fileType = fileType; + this.contentLength = contentLength; +- this.uploadBufferList = uploadBufferList; +- if (uploadBufferList != null && uploadBufferList.size() > 0) { +- this.curBuffer = uploadBufferList.get(curReadBufferIndex); ++ this.bufferList = bufferList; ++ if (bufferList != null && bufferList.size() > 0) { ++ this.curBuffer = bufferList.get(curReadBufferIndex); + } + } + +@@ -95,18 +96,34 @@ public class TieredFileSegmentInputStream extends InputStream { + this.readPosition = markReadPosition; + this.curReadBufferIndex = markCurReadBufferIndex; + this.readPosInCurBuffer = markReadPosInCurBuffer; +- if (this.curReadBufferIndex < uploadBufferList.size()) { +- this.curBuffer = uploadBufferList.get(curReadBufferIndex); ++ if (this.curReadBufferIndex < bufferList.size()) { ++ this.curBuffer = bufferList.get(curReadBufferIndex); + } + } + ++ public synchronized void rewind() { ++ this.readPosition = 0; ++ this.curReadBufferIndex = 0; ++ this.readPosInCurBuffer = 0; ++ if (CollectionUtils.isNotEmpty(bufferList)) { ++ this.curBuffer = bufferList.get(0); ++ for (ByteBuffer buffer : bufferList) { ++ buffer.rewind(); ++ } ++ } ++ } ++ ++ public int getContentLength() { ++ return contentLength; ++ } ++ + @Override + public int available() { + return contentLength - readPosition; + } + +- public List getUploadBufferList() { +- return uploadBufferList; ++ public List getBufferList() { ++ return bufferList; + } + + public ByteBuffer getCodaBuffer() { +@@ -121,10 +138,10 @@ public class TieredFileSegmentInputStream extends InputStream { + readPosition++; + if (readPosInCurBuffer >= curBuffer.remaining()) { + curReadBufferIndex++; +- if (curReadBufferIndex >= uploadBufferList.size()) { ++ if (curReadBufferIndex >= bufferList.size()) { + return -1; + } +- curBuffer = uploadBufferList.get(curReadBufferIndex); ++ curBuffer = bufferList.get(curReadBufferIndex); + readPosInCurBuffer = 0; + } + return curBuffer.get(readPosInCurBuffer++) & 0xff; +@@ -153,8 +170,8 @@ public class TieredFileSegmentInputStream extends InputStream { + int bufIndex = curReadBufferIndex; + int posInCurBuffer = readPosInCurBuffer; + ByteBuffer curBuf = curBuffer; +- while (needRead > 0 && bufIndex < uploadBufferList.size()) { +- curBuf = uploadBufferList.get(bufIndex); ++ while (needRead > 0 && bufIndex < bufferList.size()) { ++ curBuf = bufferList.get(bufIndex); + int remaining = curBuf.remaining() - posInCurBuffer; + int readLen = Math.min(remaining, needRead); + // read from curBuf +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java +similarity index 54% +rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java +rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java +index d0c983fd4..a90baff3a 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java +@@ -15,30 +15,34 @@ + * limitations under the License. + */ + +-package org.apache.rocketmq.tieredstore.provider.inputstream; ++package org.apache.rocketmq.tieredstore.provider.stream; + + import java.nio.ByteBuffer; + import java.util.List; + import org.apache.rocketmq.tieredstore.common.FileSegmentType; + +-public class TieredFileSegmentInputStreamFactory { ++public class FileSegmentInputStreamFactory { + +- public static TieredFileSegmentInputStream build(FileSegmentType fileType, +- long startOffset, List uploadBufferList, ByteBuffer codaBuffer, int contentLength) { ++ public static FileSegmentInputStream build( ++ FileSegmentType fileType, long offset, List bufferList, ByteBuffer byteBuffer, int length) { ++ ++ if (bufferList == null) { ++ throw new IllegalArgumentException("bufferList is null"); ++ } + + switch (fileType) { + case COMMIT_LOG: +- return new TieredCommitLogInputStream( +- fileType, startOffset, uploadBufferList, codaBuffer, contentLength); ++ return new CommitLogInputStream( ++ fileType, offset, bufferList, byteBuffer, length); + case CONSUME_QUEUE: +- return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength); ++ return new FileSegmentInputStream(fileType, bufferList, length); + case INDEX: +- if (uploadBufferList.size() != 1) { +- throw new IllegalArgumentException("uploadBufferList size in INDEX type input stream must be 1"); ++ if (bufferList.size() != 1) { ++ throw new IllegalArgumentException("buffer block size must be 1 when file type is IndexFile"); + } +- return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength); ++ return new FileSegmentInputStream(fileType, bufferList, length); + default: +- throw new IllegalArgumentException("fileType is not supported"); ++ throw new IllegalArgumentException("file type is not supported"); + } + } + } +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +index 8601392e7..2451199c2 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +@@ -130,36 +130,36 @@ public class TieredMessageStoreTest { + // TieredStorageLevel.DISABLE + properties.setProperty("tieredStorageLevel", "0"); + configuration.update(properties); +- Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); ++ Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); + + // TieredStorageLevel.NOT_IN_DISK + properties.setProperty("tieredStorageLevel", "1"); + configuration.update(properties); + when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(false); +- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); ++ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); + + when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true); +- Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); ++ Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); + + // TieredStorageLevel.NOT_IN_MEM + properties.setProperty("tieredStorageLevel", "2"); + configuration.update(properties); + Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(false); + Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(true); +- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); ++ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); + + Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true); + Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(false); +- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); ++ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); + + Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true); + Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(true); +- Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); ++ Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); + + // TieredStorageLevel.FORCE + properties.setProperty("tieredStorageLevel", "3"); + configuration.update(properties); +- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); ++ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); + } + + @Test +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java +index cc39cfbfc..7a4d05969 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java +@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; + import org.apache.rocketmq.tieredstore.common.FileSegmentType; + import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; ++import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; + import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata; + import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; + import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; +@@ -55,6 +56,7 @@ public class TieredFlatFileTest { + public void tearDown() throws IOException { + TieredStoreTestUtil.destroyMetadataStore(); + TieredStoreTestUtil.destroyTempDir(storePath); ++ TieredStoreExecutor.shutdown(); + } + + private List getSegmentMetadataList(TieredMetadataStore metadataStore) { +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java +index 262d6645b..2da72bc7a 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java +@@ -87,5 +87,7 @@ public class TieredIndexFileTest { + + indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); + Assert.assertEquals(1, indexList.size()); ++ ++ indexFile.destroy(); + } + } +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java +similarity index 82% +rename from tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java +rename to tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java +index a6566b7de..3bbe41dd4 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java +@@ -20,13 +20,13 @@ package org.apache.rocketmq.tieredstore.provider; + import java.io.InputStream; + import java.nio.ByteBuffer; + import java.util.List; +-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; ++import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; + +-public class MockTieredFileSegmentInputStream extends TieredFileSegmentInputStream { ++public class MockFileSegmentInputStream extends FileSegmentInputStream { + + private final InputStream inputStream; + +- public MockTieredFileSegmentInputStream(InputStream inputStream) { ++ public MockFileSegmentInputStream(InputStream inputStream) { + super(null, null, Integer.MAX_VALUE); + this.inputStream = inputStream; + } +@@ -43,7 +43,7 @@ public class MockTieredFileSegmentInputStream extends TieredFileSegmentInputStre + } + + @Override +- public List getUploadBufferList() { ++ public List getBufferList() { + return null; + } + +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java +index a2554ba3d..743d9182c 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java +@@ -28,8 +28,8 @@ import java.util.Random; + import org.apache.rocketmq.tieredstore.common.FileSegmentType; + import org.apache.rocketmq.tieredstore.file.TieredCommitLog; + import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue; +-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; +-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory; ++import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; ++import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; + import org.junit.Assert; +@@ -57,7 +57,7 @@ public class TieredFileSegmentInputStreamTest { + bufferSize += byteBuffer.remaining(); + } + +- // build expected byte buffer for verifying the TieredFileSegmentInputStream ++ // build expected byte buffer for verifying the FileSegmentInputStream + ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize); + for (ByteBuffer byteBuffer : uploadBufferList) { + expectedByteBuffer.put(byteBuffer); +@@ -74,7 +74,7 @@ public class TieredFileSegmentInputStreamTest { + int[] batchReadSizeTestSet = { + MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 1, MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtil.PHYSICAL_OFFSET_POSITION + 1, MSG_LEN - 1, MSG_LEN, MSG_LEN + 1 + }; +- verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build( ++ verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build( + FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), finalBufferSize, batchReadSizeTestSet); + + } +@@ -98,7 +98,7 @@ public class TieredFileSegmentInputStreamTest { + int codaBufferSize = codaBuffer.remaining(); + bufferSize += codaBufferSize; + +- // build expected byte buffer for verifying the TieredFileSegmentInputStream ++ // build expected byte buffer for verifying the FileSegmentInputStream + ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize); + for (ByteBuffer byteBuffer : uploadBufferList) { + expectedByteBuffer.put(byteBuffer); +@@ -119,7 +119,7 @@ public class TieredFileSegmentInputStreamTest { + MSG_LEN - 1, MSG_LEN, MSG_LEN + 1, + bufferSize - 1, bufferSize, bufferSize + 1 + }; +- verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build( ++ verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build( + FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, codaBuffer, finalBufferSize), finalBufferSize, batchReadSizeTestSet); + + } +@@ -134,7 +134,7 @@ public class TieredFileSegmentInputStreamTest { + bufferSize += byteBuffer.remaining(); + } + +- // build expected byte buffer for verifying the TieredFileSegmentInputStream ++ // build expected byte buffer for verifying the FileSegmentInputStream + ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize); + for (ByteBuffer byteBuffer : uploadBufferList) { + expectedByteBuffer.put(byteBuffer); +@@ -143,7 +143,7 @@ public class TieredFileSegmentInputStreamTest { + + int finalBufferSize = bufferSize; + int[] batchReadSizeTestSet = {TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE - 1, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE + 1}; +- verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build( ++ verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build( + FileSegmentType.CONSUME_QUEUE, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), bufferSize, batchReadSizeTestSet); + } + +@@ -156,16 +156,16 @@ public class TieredFileSegmentInputStreamTest { + byteBuffer.flip(); + List uploadBufferList = Arrays.asList(byteBuffer); + +- // build expected byte buffer for verifying the TieredFileSegmentInputStream ++ // build expected byte buffer for verifying the FileSegmentInputStream + ByteBuffer expectedByteBuffer = byteBuffer.slice(); + +- verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build( ++ verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build( + FileSegmentType.INDEX, COMMIT_LOG_START_OFFSET, uploadBufferList, null, byteBuffer.limit()), byteBuffer.limit(), new int[] {23, 24, 25}); + } + +- private void verifyReadAndReset(ByteBuffer expectedByteBuffer, Supplier constructor, ++ private void verifyReadAndReset(ByteBuffer expectedByteBuffer, Supplier constructor, + int bufferSize, int[] readBatchSizeTestSet) { +- TieredFileSegmentInputStream inputStream = constructor.get(); ++ FileSegmentInputStream inputStream = constructor.get(); + + // verify + verifyInputStream(inputStream, expectedByteBuffer); +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java +index 4cd83e0d2..a655710a5 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java +@@ -116,13 +116,22 @@ public class TieredFileSegmentTest { + } + + @Test +- public void testCommitFailed() { ++ public void testCommitFailedThenSuccess() { + long startTime = System.currentTimeMillis(); + MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(FileSegmentType.COMMIT_LOG); + long lastSize = segment.getSize(); +- segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0); +- segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0); +- ++ segment.setCheckSize(false); ++ segment.initPosition(lastSize); ++ segment.setSize((int) lastSize); ++ ++ ByteBuffer buffer1 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong( ++ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize); ++ ByteBuffer buffer2 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong( ++ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN); ++ segment.append(buffer1, 0); ++ segment.append(buffer2, 0); ++ ++ // Mock new message arrive + segment.blocker = new CompletableFuture<>(); + new Thread(() -> { + try { +@@ -131,20 +140,88 @@ public class TieredFileSegmentTest { + Assert.fail(e.getMessage()); + } + ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer(); ++ buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN * 2); + buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime); + segment.append(buffer, 0); + segment.blocker.complete(false); + }).start(); + ++ // Commit failed + segment.commit(); + segment.blocker.join(); ++ segment.blocker = null; ++ ++ // Copy data and assume commit success ++ segment.getMemStore().put(buffer1); ++ segment.getMemStore().put(buffer2); ++ segment.setSize((int) (lastSize + MessageBufferUtilTest.MSG_LEN * 2)); + +- segment.blocker = new CompletableFuture<>(); +- segment.blocker.complete(true); + segment.commit(); ++ Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitPosition()); ++ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset()); ++ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset()); ++ ++ ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN); ++ Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1)); ++ ++ ByteBuffer msg2 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN); ++ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtil.getCommitLogOffset(msg2)); ++ ++ ByteBuffer msg3 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN); ++ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtil.getCommitLogOffset(msg3)); ++ } ++ ++ @Test ++ public void testCommitFailed3Times() { ++ long startTime = System.currentTimeMillis(); ++ MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(FileSegmentType.COMMIT_LOG); ++ long lastSize = segment.getSize(); ++ segment.setCheckSize(false); ++ segment.initPosition(lastSize); ++ segment.setSize((int) lastSize); ++ ++ ByteBuffer buffer1 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong( ++ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize); ++ ByteBuffer buffer2 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong( ++ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN); ++ segment.append(buffer1, 0); ++ segment.append(buffer2, 0); ++ ++ // Mock new message arrive ++ segment.blocker = new CompletableFuture<>(); ++ new Thread(() -> { ++ try { ++ Thread.sleep(3000); ++ } catch (InterruptedException e) { ++ Assert.fail(e.getMessage()); ++ } ++ ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer(); ++ buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN * 2); ++ buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime); ++ segment.append(buffer, 0); ++ segment.blocker.complete(false); ++ }).start(); ++ ++ for (int i = 0; i < 3; i++) { ++ segment.commit(); ++ } ++ ++ Assert.assertEquals(lastSize, segment.getCommitPosition()); ++ Assert.assertEquals(baseOffset + lastSize, segment.getCommitOffset()); ++ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset()); ++ ++ segment.blocker.join(); ++ segment.blocker = null; + ++ segment.commit(); ++ Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 2, segment.getCommitPosition()); ++ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, segment.getCommitOffset()); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset()); ++ ++ segment.commit(); ++ Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitPosition()); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset()); ++ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset()); + + ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN); + Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1)); +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java +index cb155cf8f..80ad41f68 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java +@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.tieredstore.common.FileSegmentType; + import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; + import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; +-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; ++import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; + import org.junit.Assert; + +@@ -33,6 +33,8 @@ public class MemoryFileSegment extends TieredFileSegment { + + public CompletableFuture blocker; + ++ protected int size = 0; ++ + protected boolean checkSize = true; + + public MemoryFileSegment(FileSegmentType fileType, MessageQueue messageQueue, long baseOffset, +@@ -56,6 +58,18 @@ public class MemoryFileSegment extends TieredFileSegment { + memStore.position((int) getSize()); + } + ++ public boolean isCheckSize() { ++ return checkSize; ++ } ++ ++ public void setCheckSize(boolean checkSize) { ++ this.checkSize = checkSize; ++ } ++ ++ public ByteBuffer getMemStore() { ++ return memStore; ++ } ++ + @Override + public String getPath() { + return filePath; +@@ -66,7 +80,11 @@ public class MemoryFileSegment extends TieredFileSegment { + if (checkSize) { + return 1000; + } +- return 0; ++ return size; ++ } ++ ++ public void setSize(int size) { ++ this.size = size; + } + + @Override +@@ -85,11 +103,11 @@ public class MemoryFileSegment extends TieredFileSegment { + + @Override + public CompletableFuture commit0( +- TieredFileSegmentInputStream inputStream, long position, int length, boolean append) { ++ FileSegmentInputStream inputStream, long position, int length, boolean append) { + + try { + if (blocker != null && !blocker.get()) { +- throw new IllegalStateException(); ++ throw new IllegalStateException("Commit Exception for Memory Test"); + } + } catch (InterruptedException | ExecutionException e) { + Assert.fail(e.getMessage()); +@@ -98,7 +116,6 @@ public class MemoryFileSegment extends TieredFileSegment { + Assert.assertTrue(!checkSize || position >= getSize()); + + byte[] buffer = new byte[1024]; +- + int startPos = memStore.position(); + try { + int len; +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java +index 8ac330b37..630fd2223 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java +@@ -22,7 +22,7 @@ import java.util.concurrent.ExecutionException; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.tieredstore.common.FileSegmentType; + import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; +-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; ++import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; + import org.junit.Assert; + +@@ -46,7 +46,7 @@ public class MemoryFileSegmentWithoutCheck extends MemoryFileSegment { + } + + @Override +- public CompletableFuture commit0(TieredFileSegmentInputStream inputStream, long position, int length, ++ public CompletableFuture commit0(FileSegmentInputStream inputStream, long position, int length, + boolean append) { + try { + if (blocker != null && !blocker.get()) { +-- +2.32.0.windows.2 + + +From d000ef947d7c99918ceba0fa451c1e29fd84ba07 Mon Sep 17 00:00:00 2001 +From: yuz10 <845238369@qq.com> +Date: Thu, 31 Aug 2023 09:41:33 +0800 +Subject: [PATCH 3/7] [ISSUE #7283] Incorrect dledger commitlog min offset + after mappedFile re delete failed (#7284) + +--- + .../apache/rocketmq/store/dledger/DLedgerCommitLog.java | 7 ++++++- + 1 file changed, 6 insertions(+), 1 deletion(-) + +diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +index ec5e86d70..d5f6acdc0 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java ++++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +@@ -162,7 +162,12 @@ public class DLedgerCommitLog extends CommitLog { + if (!mappedFileQueue.getMappedFiles().isEmpty()) { + return mappedFileQueue.getMinOffset(); + } +- return dLedgerFileList.getMinOffset(); ++ for (MmapFile file : dLedgerFileList.getMappedFiles()) { ++ if (file.isAvailable()) { ++ return file.getFileFromOffset() + file.getStartPosition(); ++ } ++ } ++ return 0; + } + + @Override +-- +2.32.0.windows.2 + + +From f82718ae3b77a16b553c03f672dc971a2d5d48fa Mon Sep 17 00:00:00 2001 +From: cnScarb +Date: Thu, 31 Aug 2023 15:50:10 +0800 +Subject: [PATCH 4/7] [ISSUE #7208] fix: when deleting topic also delete its + pop retry topic (#7209) + +--- + .../processor/AdminBrokerProcessor.java | 24 ++++++++++--- + .../processor/AdminBrokerProcessorTest.java | 36 +++++++++++++++++++ + 2 files changed, 55 insertions(+), 5 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +index bbddcec2d..8fbcd3c94 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +@@ -51,6 +51,7 @@ import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; + import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; + import org.apache.rocketmq.common.AclConfig; + import org.apache.rocketmq.common.BrokerConfig; ++import org.apache.rocketmq.common.KeyBuilder; + import org.apache.rocketmq.common.LockCallback; + import org.apache.rocketmq.common.MQVersion; + import org.apache.rocketmq.common.MixAll; +@@ -542,16 +543,29 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + } + } + +- this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic()); +- this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic()); +- this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic()); +- this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic()); +- this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(requestHeader.getTopic())); ++ final Set groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic); ++ // delete pop retry topics first ++ for (String group : groups) { ++ final String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); ++ if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) { ++ deleteTopicInBroker(popRetryTopic); ++ } ++ } ++ // delete topic ++ deleteTopicInBroker(topic); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + ++ private void deleteTopicInBroker(String topic) { ++ this.brokerController.getTopicConfigManager().deleteTopicConfig(topic); ++ this.brokerController.getTopicQueueMappingManager().delete(topic); ++ this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic); ++ this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic); ++ this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic)); ++ } ++ + private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); +diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +index d33a217f7..9d17011b6 100644 +--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java ++++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +@@ -29,6 +29,7 @@ import java.util.HashMap; + import java.util.Map; + import java.util.Properties; + import java.util.Set; ++import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.atomic.LongAdder; + import org.apache.rocketmq.broker.BrokerController; +@@ -41,6 +42,7 @@ import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager; + import org.apache.rocketmq.broker.topic.TopicConfigManager; + import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.BrokerConfig; ++import org.apache.rocketmq.common.KeyBuilder; + import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.TopicConfig; + import org.apache.rocketmq.common.TopicFilterType; +@@ -90,8 +92,11 @@ import static org.assertj.core.api.Assertions.assertThat; + import static org.mockito.ArgumentMatchers.any; + import static org.mockito.ArgumentMatchers.anyInt; + import static org.mockito.ArgumentMatchers.anyLong; ++import static org.mockito.ArgumentMatchers.anySet; + import static org.mockito.ArgumentMatchers.anyString; + import static org.mockito.Mockito.mock; ++import static org.mockito.Mockito.times; ++import static org.mockito.Mockito.verify; + import static org.mockito.Mockito.when; + + @RunWith(MockitoJUnitRunner.class) +@@ -321,6 +326,37 @@ public class AdminBrokerProcessorTest { + "please execute it from master broker."); + } + ++ @Test ++ public void testDeleteWithPopRetryTopic() throws Exception { ++ String topic = "topicA"; ++ String anotherTopic = "another_topicA"; ++ ++ topicConfigManager = mock(TopicConfigManager.class); ++ when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); ++ final ConcurrentHashMap topicConfigTable = new ConcurrentHashMap<>(); ++ topicConfigTable.put(topic, new TopicConfig()); ++ topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1"), new TopicConfig()); ++ ++ topicConfigTable.put(anotherTopic, new TopicConfig()); ++ topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic, "cid2"), new TopicConfig()); ++ when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable); ++ when(topicConfigManager.selectTopicConfig(anyString())).thenAnswer(invocation -> { ++ final String selectTopic = invocation.getArgument(0); ++ return topicConfigManager.getTopicConfigTable().get(selectTopic); ++ }); ++ ++ when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); ++ when(consumerOffsetManager.whichGroupByTopic(topic)).thenReturn(Sets.newHashSet("cid1")); ++ ++ RemotingCommand request = buildDeleteTopicRequest(topic); ++ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); ++ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); ++ ++ verify(topicConfigManager).deleteTopicConfig(topic); ++ verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic, "cid1")); ++ verify(messageStore, times(2)).deleteTopics(anySet()); ++ } ++ + @Test + public void testGetAllTopicConfigInRocksdb() throws Exception { + if (notToBeExecuted()) { +-- +2.32.0.windows.2 + + +From 31d10385d1616445478104ce9ef463a8c4852ba2 Mon Sep 17 00:00:00 2001 +From: guyinyou <36399867+guyinyou@users.noreply.github.com> +Date: Mon, 4 Sep 2023 14:09:32 +0800 +Subject: [PATCH 5/7] [ISSUE #7289] Fixed asynchronous send backpressure + capability + +Co-authored-by: guyinyou +--- + .../impl/producer/DefaultMQProducerImpl.java | 77 +++++++++++++------ + 1 file changed, 53 insertions(+), 24 deletions(-) + +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +index bbbb17b07..2d6b83ac2 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +@@ -547,6 +547,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { + @Deprecated + public void send(final Message msg, final SendCallback sendCallback, final long timeout) + throws MQClientException, RemotingException, InterruptedException { ++ BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback); ++ + final long beginStartTime = System.currentTimeMillis(); + Runnable runnable = new Runnable() { + @Override +@@ -554,20 +556,53 @@ public class DefaultMQProducerImpl implements MQProducerInner { + long costTime = System.currentTimeMillis() - beginStartTime; + if (timeout > costTime) { + try { +- sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime); ++ sendDefaultImpl(msg, CommunicationMode.ASYNC, newCallBack, timeout - costTime); + } catch (Exception e) { +- sendCallback.onException(e); ++ newCallBack.onException(e); + } + } else { +- sendCallback.onException( ++ newCallBack.onException( + new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout")); + } + } + }; +- executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime); ++ executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime); + } + +- public void executeAsyncMessageSend(Runnable runnable, final Message msg, final SendCallback sendCallback, ++ class BackpressureSendCallBack implements SendCallback { ++ public boolean isSemaphoreAsyncSizeAquired = false; ++ public boolean isSemaphoreAsyncNumAquired = false; ++ public int msgLen; ++ private final SendCallback sendCallback; ++ ++ public BackpressureSendCallBack(final SendCallback sendCallback) { ++ this.sendCallback = sendCallback; ++ } ++ ++ @Override ++ public void onSuccess(SendResult sendResult) { ++ if (isSemaphoreAsyncSizeAquired) { ++ semaphoreAsyncSendSize.release(msgLen); ++ } ++ if (isSemaphoreAsyncNumAquired) { ++ semaphoreAsyncSendNum.release(); ++ } ++ sendCallback.onSuccess(sendResult); ++ } ++ ++ @Override ++ public void onException(Throwable e) { ++ if (isSemaphoreAsyncSizeAquired) { ++ semaphoreAsyncSendSize.release(msgLen); ++ } ++ if (isSemaphoreAsyncNumAquired) { ++ semaphoreAsyncSendNum.release(); ++ } ++ sendCallback.onException(e); ++ } ++ } ++ ++ public void executeAsyncMessageSend(Runnable runnable, final Message msg, final BackpressureSendCallBack sendCallback, + final long timeout, final long beginStartTime) + throws MQClientException, InterruptedException { + ExecutorService executor = this.getAsyncSenderExecutor(); +@@ -595,7 +630,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { + return; + } + } +- ++ sendCallback.isSemaphoreAsyncSizeAquired = isSemaphoreAsyncSizeAquired; ++ sendCallback.isSemaphoreAsyncNumAquired = isSemaphoreAsyncNumAquired; ++ sendCallback.msgLen = msgLen; + executor.submit(runnable); + } catch (RejectedExecutionException e) { + if (isEnableBackpressureForAsyncMode) { +@@ -603,15 +640,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { + } else { + throw new MQClientException("executor rejected ", e); + } +- } finally { +- if (isSemaphoreAsyncSizeAquired) { +- semaphoreAsyncSendSize.release(msgLen); +- } +- if (isSemaphoreAsyncNumAquired) { +- semaphoreAsyncSendNum.release(); +- } + } +- + } + + public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg, +@@ -1188,7 +1217,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { + @Deprecated + public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) + throws MQClientException, RemotingException, InterruptedException { +- ++ BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback); + final long beginStartTime = System.currentTimeMillis(); + Runnable runnable = new Runnable() { + @Override +@@ -1203,22 +1232,22 @@ public class DefaultMQProducerImpl implements MQProducerInner { + long costTime = System.currentTimeMillis() - beginStartTime; + if (timeout > costTime) { + try { +- sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, ++ sendKernelImpl(msg, mq, CommunicationMode.ASYNC, newCallBack, null, + timeout - costTime); + } catch (MQBrokerException e) { + throw new MQClientException("unknown exception", e); + } + } else { +- sendCallback.onException(new RemotingTooMuchRequestException("call timeout")); ++ newCallBack.onException(new RemotingTooMuchRequestException("call timeout")); + } + } catch (Exception e) { +- sendCallback.onException(e); ++ newCallBack.onException(e); + } + } + + }; + +- executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime); ++ executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime); + } + + /** +@@ -1315,7 +1344,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { + public void send(final Message msg, final MessageQueueSelector selector, final Object arg, + final SendCallback sendCallback, final long timeout) + throws MQClientException, RemotingException, InterruptedException { +- ++ BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback); + final long beginStartTime = System.currentTimeMillis(); + Runnable runnable = new Runnable() { + @Override +@@ -1324,21 +1353,21 @@ public class DefaultMQProducerImpl implements MQProducerInner { + if (timeout > costTime) { + try { + try { +- sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, ++ sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, newCallBack, + timeout - costTime); + } catch (MQBrokerException e) { + throw new MQClientException("unknown exception", e); + } + } catch (Exception e) { +- sendCallback.onException(e); ++ newCallBack.onException(e); + } + } else { +- sendCallback.onException(new RemotingTooMuchRequestException("call timeout")); ++ newCallBack.onException(new RemotingTooMuchRequestException("call timeout")); + } + } + + }; +- executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime); ++ executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime); + } + + /** +-- +2.32.0.windows.2 + + +From d67b9d64cbd53824798af57ba18770e0fcefa37a Mon Sep 17 00:00:00 2001 +From: yuz10 <845238369@qq.com> +Date: Wed, 6 Sep 2023 14:07:23 +0800 +Subject: [PATCH 6/7] [ISSUE #7302] Fix singleTopicRegister code deleted in + merge + +--- + .../apache/rocketmq/broker/topic/TopicConfigManager.java | 6 +++++- + 1 file changed, 5 insertions(+), 1 deletion(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +index 1c3b9711f..4e3c1736c 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +@@ -330,7 +330,11 @@ public class TopicConfigManager extends ConfigManager { + log.error("createTopicIfAbsent ", e); + } + if (createNew && register) { +- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); ++ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { ++ this.brokerController.registerSingleTopicAll(topicConfig); ++ } else { ++ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); ++ } + } + return getTopicConfig(topicConfig.getTopicName()); + } +-- +2.32.0.windows.2 + + +From 37017dbaec5c521fd529ef4aecf3658092884f84 Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Wed, 6 Sep 2023 15:23:15 +0800 +Subject: [PATCH 7/7] [ISSUE #7305] Fix metrics and transactional module not + shutdown while broker offline cause coredump(#7307) + +--- + .../java/org/apache/rocketmq/broker/BrokerController.java | 8 ++++++++ + .../queue/TransactionalMessageServiceImpl.java | 4 +++- + 2 files changed, 11 insertions(+), 1 deletion(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +index e8f943702..6aba70cb2 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +@@ -1302,6 +1302,10 @@ public class BrokerController { + this.fastRemotingServer.shutdown(); + } + ++ if (this.brokerMetricsManager != null) { ++ this.brokerMetricsManager.shutdown(); ++ } ++ + if (this.brokerStatsManager != null) { + this.brokerStatsManager.shutdown(); + } +@@ -1324,6 +1328,10 @@ public class BrokerController { + this.ackMessageProcessor.shutdownPopReviveService(); + } + ++ if (this.transactionalMessageService != null) { ++ this.transactionalMessageService.close(); ++ } ++ + if (this.notificationProcessor != null) { + this.notificationProcessor.getPopLongPollingService().shutdown(); + } +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +index 93fa725a9..48db828e0 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +@@ -629,7 +629,9 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ + + @Override + public void close() { +- ++ if (this.transactionalOpBatchService != null) { ++ this.transactionalOpBatchService.shutdown(); ++ } + } + + public Message getOpMessage(int queueId, String moreData) { +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 169d0d0c053d95c1116a7c60105fb2024115b3de..6c8f925016eb47bef7aa0a7c5103f205fb88ad41 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.3 -Release: 15 +Release: 16 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -24,6 +24,7 @@ Patch0011: patch011-backport-optimize-config.patch Patch0012: patch012-backport-enhance-rockdbconfigtojson.patch Patch0013: patch013-backport-enhance-admin-output.patch Patch0014: patch014-backport-Queue-Selection-Strategy-Optimization.patch +Patch0015: patch015-backport-fix-some-bugs.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -58,6 +59,9 @@ exit 0 %changelog +* Mon Oct 30 2023 ShiZhili - 5.1.3-16 +- backport some bug fixes + * Mon Oct 30 2023 ShiZhili - 5.1.3-15 - backport queue selection strategy