From edd2755081d6387608c276dda2730e935747e90a Mon Sep 17 00:00:00 2001 From: shizhili Date: Sun, 1 Oct 2023 10:27:45 +0800 Subject: [PATCH] backport add some fixes --- patch010-backport-add-some-fixes.patch | 1286 ++++++++++++++++++++++++ rocketmq.spec | 6 +- 2 files changed, 1291 insertions(+), 1 deletion(-) create mode 100644 patch010-backport-add-some-fixes.patch diff --git a/patch010-backport-add-some-fixes.patch b/patch010-backport-add-some-fixes.patch new file mode 100644 index 0000000..36d5e4a --- /dev/null +++ b/patch010-backport-add-some-fixes.patch @@ -0,0 +1,1286 @@ +From b2deef179dbc6a9eb1a2b6dd7b652d95cb768295 Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Thu, 10 Aug 2023 10:38:47 +0800 +Subject: [PATCH 01/12] [ISSUE #7144] Accelerate the recovery speed of the + tiered storage module (#7145) + +--- + .../tieredstore/TieredDispatcher.java | 3 + + .../tieredstore/TieredMessageStore.java | 2 +- + .../common/TieredStoreExecutor.java | 25 ++-- + .../tieredstore/file/CompositeFlatFile.java | 15 +- + .../file/CompositeQueueFlatFile.java | 20 ++- + .../tieredstore/file/TieredCommitLog.java | 24 +++- + .../tieredstore/file/TieredFlatFile.java | 42 +++--- + .../file/TieredFlatFileManager.java | 135 ++++++++++-------- + .../metadata/FileSegmentMetadata.java | 26 +++- + .../tieredstore/TieredDispatcherTest.java | 15 +- + .../tieredstore/TieredMessageFetcherTest.java | 2 +- + .../file/CompositeQueueFlatFileTest.java | 2 +- + .../file/TieredFlatFileManagerTest.java | 7 +- + 13 files changed, 194 insertions(+), 124 deletions(-) + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +index bb58ea7dd..1746190cd 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +@@ -279,6 +279,9 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch + long upperBound = Math.min(dispatchOffset + maxCount, maxOffsetInQueue); + ConsumeQueue consumeQueue = (ConsumeQueue) defaultStore.getConsumeQueue(topic, queueId); + ++ logger.debug("DispatchFlatFile race, topic={}, queueId={}, cq range={}-{}, dispatch offset={}-{}", ++ topic, queueId, minOffsetInQueue, maxOffsetInQueue, dispatchOffset, upperBound - 1); ++ + for (; dispatchOffset < upperBound; dispatchOffset++) { + // get consume queue + SelectMappedBufferResult cqItem = consumeQueue.getIndexBuffer(dispatchOffset); +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +index 1f12410f2..ced1fb818 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +@@ -147,7 +147,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) { + + if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) { +- logger.debug("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset); ++ logger.trace("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset); + return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter); + } + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java +index 6eb3478b3..6dd0e8846 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java +@@ -43,18 +43,9 @@ public class TieredStoreExecutor { + public static ExecutorService compactIndexFileExecutor; + + public static void init() { +- dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); +- dispatchExecutor = new ThreadPoolExecutor( +- Math.max(2, Runtime.getRuntime().availableProcessors()), +- Math.max(16, Runtime.getRuntime().availableProcessors() * 4), +- 1000 * 60, +- TimeUnit.MILLISECONDS, +- dispatchThreadPoolQueue, +- new ThreadFactoryImpl("TieredCommonExecutor_")); +- + commonScheduledExecutor = new ScheduledThreadPoolExecutor( + Math.max(4, Runtime.getRuntime().availableProcessors()), +- new ThreadFactoryImpl("TieredCommonScheduledExecutor_")); ++ new ThreadFactoryImpl("TieredCommonExecutor_")); + + commitExecutor = new ScheduledThreadPoolExecutor( + Math.max(16, Runtime.getRuntime().availableProcessors() * 4), +@@ -62,7 +53,17 @@ public class TieredStoreExecutor { + + cleanExpiredFileExecutor = new ScheduledThreadPoolExecutor( + Math.max(4, Runtime.getRuntime().availableProcessors()), +- new ThreadFactoryImpl("TieredCleanExpiredFileExecutor_")); ++ new ThreadFactoryImpl("TieredCleanFileExecutor_")); ++ ++ dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); ++ dispatchExecutor = new ThreadPoolExecutor( ++ Math.max(2, Runtime.getRuntime().availableProcessors()), ++ Math.max(16, Runtime.getRuntime().availableProcessors() * 4), ++ 1000 * 60, ++ TimeUnit.MILLISECONDS, ++ dispatchThreadPoolQueue, ++ new ThreadFactoryImpl("TieredDispatchExecutor_"), ++ new ThreadPoolExecutor.DiscardOldestPolicy()); + + fetchDataThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); + fetchDataExecutor = new ThreadPoolExecutor( +@@ -71,7 +72,7 @@ public class TieredStoreExecutor { + 1000 * 60, + TimeUnit.MILLISECONDS, + fetchDataThreadPoolQueue, +- new ThreadFactoryImpl("TieredFetchDataExecutor_")); ++ new ThreadFactoryImpl("TieredFetchExecutor_")); + + compactIndexFileThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); + compactIndexFileExecutor = new ThreadPoolExecutor( +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java +index df4baf33f..5ad3a6ff3 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java +@@ -76,20 +76,15 @@ public class CompositeFlatFile implements CompositeAccess { + this.storeConfig = fileQueueFactory.getStoreConfig(); + this.readAheadFactor = this.storeConfig.getReadAheadMinFactor(); + this.metadataStore = TieredStoreUtil.getMetadataStore(this.storeConfig); +- this.dispatchOffset = new AtomicLong(); + this.compositeFlatFileLock = new ReentrantLock(); + this.inFlightRequestMap = new ConcurrentHashMap<>(); + this.commitLog = new TieredCommitLog(fileQueueFactory, filePath); + this.consumeQueue = new TieredConsumeQueue(fileQueueFactory, filePath); ++ this.dispatchOffset = new AtomicLong( ++ this.consumeQueue.isInitialized() ? this.getConsumeQueueCommitOffset() : -1L); + this.groupOffsetCache = this.initOffsetCache(); + } + +- protected void recoverMetadata() { +- if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) { +- consumeQueue.setBaseOffset(this.dispatchOffset.get() * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); +- } +- } +- + private Cache initOffsetCache() { + return Caffeine.newBuilder() + .expireAfterWrite(2, TimeUnit.MINUTES) +@@ -310,10 +305,12 @@ public class CompositeFlatFile implements CompositeAccess { + + @Override + public void initOffset(long offset) { +- if (!consumeQueue.isInitialized()) { ++ if (consumeQueue.isInitialized()) { ++ dispatchOffset.set(this.getConsumeQueueCommitOffset()); ++ } else { + consumeQueue.setBaseOffset(offset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); ++ dispatchOffset.set(offset); + } +- dispatchOffset.set(offset); + } + + @Override +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java +index f6c0afed0..0a797f465 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java +@@ -36,8 +36,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile { + public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory, MessageQueue messageQueue) { + super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue)); + this.messageQueue = messageQueue; +- this.recoverTopicMetadata(); +- super.recoverMetadata(); ++ this.recoverQueueMetadata(); + this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig); + } + +@@ -46,11 +45,12 @@ public class CompositeQueueFlatFile extends CompositeFlatFile { + if (!consumeQueue.isInitialized()) { + queueMetadata.setMinOffset(offset); + queueMetadata.setMaxOffset(offset); ++ metadataStore.updateQueue(queueMetadata); + } + super.initOffset(offset); + } + +- public void recoverTopicMetadata() { ++ public void recoverQueueMetadata() { + TopicMetadata topicMetadata = this.metadataStore.getTopic(messageQueue.getTopic()); + if (topicMetadata == null) { + topicMetadata = this.metadataStore.addTopic(messageQueue.getTopic(), -1L); +@@ -64,18 +64,16 @@ public class CompositeQueueFlatFile extends CompositeFlatFile { + if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) { + queueMetadata.setMaxOffset(queueMetadata.getMinOffset()); + } +- this.dispatchOffset.set(queueMetadata.getMaxOffset()); + } + +- public void persistMetadata() { ++ public void flushMetadata() { + try { +- if (consumeQueue.getCommitOffset() < queueMetadata.getMinOffset()) { +- return; +- } +- queueMetadata.setMaxOffset(consumeQueue.getCommitOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); ++ queueMetadata.setMinOffset(super.getConsumeQueueMinOffset()); ++ queueMetadata.setMaxOffset(super.getConsumeQueueMaxOffset()); + metadataStore.updateQueue(queueMetadata); + } catch (Exception e) { +- LOGGER.error("CompositeFlatFile#flushMetadata: update queue metadata failed: topic: {}, queue: {}", messageQueue.getTopic(), messageQueue.getQueueId(), e); ++ LOGGER.error("CompositeFlatFile#flushMetadata error, topic: {}, queue: {}", ++ messageQueue.getTopic(), messageQueue.getQueueId(), e); + } + } + +@@ -114,7 +112,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile { + @Override + public void shutdown() { + super.shutdown(); +- metadataStore.updateQueue(queueMetadata); ++ this.flushMetadata(); + } + + @Override +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java +index 80e1bce50..0e5f79132 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java +@@ -50,7 +50,7 @@ public class TieredCommitLog { + this.storeConfig = fileQueueFactory.getStoreConfig(); + this.flatFile = fileQueueFactory.createFlatFileForCommitLog(filePath); + this.minConsumeQueueOffset = new AtomicLong(NOT_EXIST_MIN_OFFSET); +- this.correctMinOffset(); ++ this.correctMinOffsetAsync(); + } + + @VisibleForTesting +@@ -91,17 +91,26 @@ public class TieredCommitLog { + return flatFile.getFileToWrite().getMaxTimestamp(); + } + +- public synchronized long correctMinOffset() { ++ public long correctMinOffset() { ++ try { ++ return correctMinOffsetAsync().get(); ++ } catch (Exception e) { ++ log.error("Correct min offset failed in clean expired file", e); ++ } ++ return NOT_EXIST_MIN_OFFSET; ++ } ++ ++ public synchronized CompletableFuture correctMinOffsetAsync() { + if (flatFile.getFileSegmentCount() == 0) { + this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET); +- return NOT_EXIST_MIN_OFFSET; ++ return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET); + } + + // queue offset field length is 8 + int length = MessageBufferUtil.QUEUE_OFFSET_POSITION + 8; + if (flatFile.getCommitOffset() - flatFile.getMinOffset() < length) { + this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET); +- return NOT_EXIST_MIN_OFFSET; ++ return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET); + } + + try { +@@ -109,7 +118,8 @@ public class TieredCommitLog { + .thenApply(buffer -> { + long offset = MessageBufferUtil.getQueueOffset(buffer); + minConsumeQueueOffset.set(offset); +- log.info("Correct commitlog min cq offset success, filePath={}, min cq offset={}, range={}-{}", ++ log.debug("Correct commitlog min cq offset success, " + ++ "filePath={}, min cq offset={}, commitlog range={}-{}", + flatFile.getFilePath(), offset, flatFile.getMinOffset(), flatFile.getCommitOffset()); + return offset; + }) +@@ -117,11 +127,11 @@ public class TieredCommitLog { + log.warn("Correct commitlog min cq offset error, filePath={}, range={}-{}", + flatFile.getFilePath(), flatFile.getMinOffset(), flatFile.getCommitOffset(), throwable); + return minConsumeQueueOffset.get(); +- }).get(); ++ }); + } catch (Exception e) { + log.error("Correct commitlog min cq offset error, filePath={}", flatFile.getFilePath(), e); + } +- return minConsumeQueueOffset.get(); ++ return CompletableFuture.completedFuture(minConsumeQueueOffset.get()); + } + + public AppendResult append(ByteBuffer byteBuf) { +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +index 75ce8d89f..426c4e09d 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +@@ -16,6 +16,7 @@ + */ + package org.apache.rocketmq.tieredstore.file; + ++import com.alibaba.fastjson.JSON; + import com.google.common.annotations.VisibleForTesting; + import java.nio.ByteBuffer; + import java.util.ArrayList; +@@ -24,6 +25,7 @@ import java.util.Comparator; + import java.util.HashSet; + import java.util.LinkedList; + import java.util.List; ++import java.util.Objects; + import java.util.Set; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.CopyOnWriteArrayList; +@@ -178,32 +180,26 @@ public class TieredFlatFile { + private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) { + + FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment( +- fileSegment.getPath(), fileSegment.getFileType(), fileSegment.getBaseOffset()); +- +- if (metadata != null) { +- return metadata; +- } ++ this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset()); + + // Note: file segment path may not the same as file base path, use base path here. +- metadata = new FileSegmentMetadata( +- this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType()); +- +- if (fileSegment.isClosed()) { +- metadata.setStatus(FileSegmentMetadata.STATUS_DELETED); ++ if (metadata == null) { ++ metadata = new FileSegmentMetadata( ++ this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType()); ++ metadata.setCreateTimestamp(fileSegment.getMinTimestamp()); ++ metadata.setBeginTimestamp(fileSegment.getMinTimestamp()); ++ metadata.setEndTimestamp(fileSegment.getMaxTimestamp()); ++ if (fileSegment.isClosed()) { ++ metadata.setStatus(FileSegmentMetadata.STATUS_DELETED); ++ } ++ this.tieredMetadataStore.updateFileSegment(metadata); + } +- +- metadata.setBeginTimestamp(fileSegment.getMinTimestamp()); +- metadata.setEndTimestamp(fileSegment.getMaxTimestamp()); +- +- // Submit to persist +- this.tieredMetadataStore.updateFileSegment(metadata); + return metadata; + } + + /** + * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full + */ +- @VisibleForTesting + public void updateFileSegment(TieredFileSegment fileSegment) { + FileSegmentMetadata segmentMetadata = getOrCreateFileSegmentMetadata(fileSegment); + +@@ -219,9 +215,16 @@ public class TieredFlatFile { + } + + segmentMetadata.setSize(fileSegment.getCommitPosition()); +- segmentMetadata.setBeginTimestamp(fileSegment.getMinTimestamp()); + segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp()); +- this.tieredMetadataStore.updateFileSegment(segmentMetadata); ++ ++ FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment( ++ this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset()); ++ ++ if (!Objects.equals(metadata, segmentMetadata)) { ++ this.tieredMetadataStore.updateFileSegment(segmentMetadata); ++ logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, content: {}", ++ segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata)); ++ } + } + + private void checkAndFixFileSize() { +@@ -257,6 +260,7 @@ public class TieredFlatFile { + logger.warn("TieredFlatFile#checkAndFixFileSize: fix last file {} size: origin: {}, actual: {}", + lastFile.getPath(), lastFile.getCommitOffset() - lastFile.getBaseOffset(), lastFileSize); + lastFile.initPosition(lastFileSize); ++ this.updateFileSegment(lastFile); + } + } + } +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +index aeca44b8c..e9ae4a5a5 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +@@ -16,16 +16,19 @@ + */ + package org.apache.rocketmq.tieredstore.file; + ++import com.google.common.base.Stopwatch; + import com.google.common.collect.ImmutableList; + import java.util.ArrayList; + import java.util.List; + import java.util.Random; ++import java.util.concurrent.CompletableFuture; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; +-import java.util.concurrent.Future; ++import java.util.concurrent.Semaphore; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicLong; + import javax.annotation.Nullable; ++import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +@@ -36,6 +39,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; + + public class TieredFlatFileManager { + ++ private static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); + + private static volatile TieredFlatFileManager instance; +@@ -44,7 +48,7 @@ public class TieredFlatFileManager { + private final TieredMetadataStore metadataStore; + private final TieredMessageStoreConfig storeConfig; + private final TieredFileAllocator tieredFileAllocator; +- private final ConcurrentMap queueFlatFileMap; ++ private final ConcurrentMap flatFileConcurrentMap; + + public TieredFlatFileManager(TieredMessageStoreConfig storeConfig) + throws ClassNotFoundException, NoSuchMethodException { +@@ -52,23 +56,20 @@ public class TieredFlatFileManager { + this.storeConfig = storeConfig; + this.metadataStore = TieredStoreUtil.getMetadataStore(storeConfig); + this.tieredFileAllocator = new TieredFileAllocator(storeConfig); +- this.queueFlatFileMap = new ConcurrentHashMap<>(); ++ this.flatFileConcurrentMap = new ConcurrentHashMap<>(); + this.doScheduleTask(); + } + + public static TieredFlatFileManager getInstance(TieredMessageStoreConfig storeConfig) { +- if (storeConfig == null) { ++ if (storeConfig == null || instance != null) { + return instance; + } +- +- if (instance == null) { +- synchronized (TieredFlatFileManager.class) { +- if (instance == null) { +- try { +- instance = new TieredFlatFileManager(storeConfig); +- } catch (Exception e) { +- logger.error("TieredFlatFileManager#getInstance: create flat file manager failed", e); +- } ++ synchronized (TieredFlatFileManager.class) { ++ if (instance == null) { ++ try { ++ instance = new TieredFlatFileManager(storeConfig); ++ } catch (Exception e) { ++ logger.error("Construct FlatFileManager instance error", e); + } + } + } +@@ -88,7 +89,7 @@ public class TieredFlatFileManager { + TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, storeConfig.getBrokerName(), 0)); + indexFile = new TieredIndexFile(new TieredFileAllocator(storeConfig), filePath); + } catch (Exception e) { +- logger.error("TieredFlatFileManager#getIndexFile: create index file failed", e); ++ logger.error("Construct FlatFileManager indexFile error", e); + } + } + } +@@ -105,7 +106,7 @@ public class TieredFlatFileManager { + flatFile.commitCommitLog(); + } catch (Throwable e) { + MessageQueue mq = flatFile.getMessageQueue(); +- logger.error("commit commitLog periodically failed: topic: {}, queue: {}", ++ logger.error("Commit commitLog periodically failed: topic: {}, queue: {}", + mq.getTopic(), mq.getQueueId(), e); + } + }, delay, TimeUnit.MILLISECONDS); +@@ -114,7 +115,7 @@ public class TieredFlatFileManager { + flatFile.commitConsumeQueue(); + } catch (Throwable e) { + MessageQueue mq = flatFile.getMessageQueue(); +- logger.error("commit consumeQueue periodically failed: topic: {}, queue: {}", ++ logger.error("Commit consumeQueue periodically failed: topic: {}, queue: {}", + mq.getTopic(), mq.getQueueId(), e); + } + }, delay, TimeUnit.MILLISECONDS); +@@ -125,7 +126,7 @@ public class TieredFlatFileManager { + indexFile.commit(true); + } + } catch (Throwable e) { +- logger.error("commit indexFile periodically failed", e); ++ logger.error("Commit indexFile periodically failed", e); + } + }, 0, TimeUnit.MILLISECONDS); + } +@@ -160,7 +161,7 @@ public class TieredFlatFileManager { + try { + doCommit(); + } catch (Throwable e) { +- logger.error("commit flat file periodically failed: ", e); ++ logger.error("Commit flat file periodically failed: ", e); + } + }, 60, 60, TimeUnit.SECONDS); + +@@ -168,49 +169,73 @@ public class TieredFlatFileManager { + try { + doCleanExpiredFile(); + } catch (Throwable e) { +- logger.error("clean expired flat file failed: ", e); ++ logger.error("Clean expired flat file failed: ", e); + } + }, 30, 30, TimeUnit.SECONDS); + } + + public boolean load() { ++ Stopwatch stopwatch = Stopwatch.createStarted(); + try { +- AtomicLong topicSequenceNumber = new AtomicLong(); +- List> futureList = new ArrayList<>(); +- queueFlatFileMap.clear(); +- metadataStore.iterateTopic(topicMetadata -> { ++ flatFileConcurrentMap.clear(); ++ this.recoverSequenceNumber(); ++ this.recoverTieredFlatFile(); ++ logger.info("Message store recover end, total cost={}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS)); ++ } catch (Exception e) { ++ long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); ++ logger.info("Message store recover error, total cost={}ms", costTime); ++ BROKER_LOG.error("Message store recover error, total cost={}ms", costTime, e); ++ return false; ++ } ++ return true; ++ } ++ ++ public void recoverSequenceNumber() { ++ AtomicLong topicSequenceNumber = new AtomicLong(); ++ metadataStore.iterateTopic(topicMetadata -> { ++ if (topicMetadata != null && topicMetadata.getTopicId() > 0) { + topicSequenceNumber.set(Math.max(topicSequenceNumber.get(), topicMetadata.getTopicId())); +- Future future = TieredStoreExecutor.dispatchExecutor.submit(() -> { +- if (topicMetadata.getStatus() != 0) { +- return; +- } ++ } ++ }); ++ metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet()); ++ } ++ ++ public void recoverTieredFlatFile() { ++ Semaphore semaphore = new Semaphore((int) (TieredStoreExecutor.QUEUE_CAPACITY * 0.75)); ++ List> futures = new ArrayList<>(); ++ metadataStore.iterateTopic(topicMetadata -> { ++ try { ++ semaphore.acquire(); ++ CompletableFuture future = CompletableFuture.runAsync(() -> { + try { +- metadataStore.iterateQueue(topicMetadata.getTopic(), +- queueMetadata -> getOrCreateFlatFileIfAbsent( +- new MessageQueue(topicMetadata.getTopic(), +- storeConfig.getBrokerName(), +- queueMetadata.getQueue().getQueueId()))); ++ Stopwatch subWatch = Stopwatch.createStarted(); ++ if (topicMetadata.getStatus() != 0) { ++ return; ++ } ++ AtomicLong queueCount = new AtomicLong(); ++ metadataStore.iterateQueue(topicMetadata.getTopic(), queueMetadata -> { ++ this.getOrCreateFlatFileIfAbsent(new MessageQueue(topicMetadata.getTopic(), ++ storeConfig.getBrokerName(), queueMetadata.getQueue().getQueueId())); ++ queueCount.incrementAndGet(); ++ }); ++ logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms", ++ topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS)); + } catch (Exception e) { +- logger.error("load mq composite flat file from metadata failed", e); ++ logger.error("Recover TopicFlatFile error, topic: {}", topicMetadata.getTopic(), e); ++ } finally { ++ semaphore.release(); + } +- }); +- futureList.add(future); +- }); +- +- // Wait for load all metadata done +- for (Future future : futureList) { +- future.get(); ++ }, TieredStoreExecutor.commitExecutor); ++ futures.add(future); ++ } catch (Exception e) { ++ throw new RuntimeException(e); + } +- metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet()); +- } catch (Exception e) { +- logger.error("load mq composite flat file from metadata failed", e); +- return false; +- } +- return true; ++ }); ++ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } + + public void cleanup() { +- queueFlatFileMap.clear(); ++ flatFileConcurrentMap.clear(); + cleanStaticReference(); + } + +@@ -221,27 +246,25 @@ public class TieredFlatFileManager { + + @Nullable + public CompositeQueueFlatFile getOrCreateFlatFileIfAbsent(MessageQueue messageQueue) { +- return queueFlatFileMap.computeIfAbsent(messageQueue, mq -> { ++ return flatFileConcurrentMap.computeIfAbsent(messageQueue, mq -> { + try { +- logger.debug("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " + +- "try to create new flat file: topic: {}, queueId: {}", ++ logger.debug("Create new TopicFlatFile, topic: {}, queueId: {}", + messageQueue.getTopic(), messageQueue.getQueueId()); + return new CompositeQueueFlatFile(tieredFileAllocator, mq); + } catch (Exception e) { +- logger.error("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " + +- "create new flat file: topic: {}, queueId: {}", ++ logger.debug("Create new TopicFlatFile failed, topic: {}, queueId: {}", + messageQueue.getTopic(), messageQueue.getQueueId(), e); +- return null; + } ++ return null; + }); + } + + public CompositeQueueFlatFile getFlatFile(MessageQueue messageQueue) { +- return queueFlatFileMap.get(messageQueue); ++ return flatFileConcurrentMap.get(messageQueue); + } + + public ImmutableList deepCopyFlatFileToList() { +- return ImmutableList.copyOf(queueFlatFileMap.values()); ++ return ImmutableList.copyOf(flatFileConcurrentMap.values()); + } + + public void shutdown() { +@@ -270,7 +293,7 @@ public class TieredFlatFileManager { + } + + // delete memory reference +- CompositeQueueFlatFile flatFile = queueFlatFileMap.remove(mq); ++ CompositeQueueFlatFile flatFile = flatFileConcurrentMap.remove(mq); + if (flatFile != null) { + MessageQueue messageQueue = flatFile.getMessageQueue(); + logger.info("TieredFlatFileManager#destroyCompositeFile: " + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java +index 1b232fc75..2f0fd71de 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java +@@ -16,6 +16,8 @@ + */ + package org.apache.rocketmq.tieredstore.metadata; + ++import java.util.Objects; ++ + public class FileSegmentMetadata { + + public static final int STATUS_NEW = 0; +@@ -43,7 +45,6 @@ public class FileSegmentMetadata { + this.baseOffset = baseOffset; + this.type = type; + this.status = STATUS_NEW; +- this.createTimestamp = System.currentTimeMillis(); + } + + public void markSealed() { +@@ -122,4 +123,27 @@ public class FileSegmentMetadata { + public void setSealTimestamp(long sealTimestamp) { + this.sealTimestamp = sealTimestamp; + } ++ ++ @Override ++ public boolean equals(Object o) { ++ if (this == o) ++ return true; ++ if (o == null || getClass() != o.getClass()) ++ return false; ++ FileSegmentMetadata metadata = (FileSegmentMetadata) o; ++ return size == metadata.size ++ && baseOffset == metadata.baseOffset ++ && status == metadata.status ++ && path.equals(metadata.path) ++ && type == metadata.type ++ && createTimestamp == metadata.createTimestamp ++ && beginTimestamp == metadata.beginTimestamp ++ && endTimestamp == metadata.endTimestamp ++ && sealTimestamp == metadata.sealTimestamp; ++ } ++ ++ @Override ++ public int hashCode() { ++ return Objects.hash(type, path, baseOffset, status, size, createTimestamp, beginTimestamp, endTimestamp, sealTimestamp); ++ } + } +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java +index e6adef1d1..5791dc9a4 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java +@@ -116,19 +116,20 @@ public class TieredDispatcherTest { + buffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 9); + flatFile.appendCommitLog(buffer3); + flatFile.commitCommitLog(); +- Assert.assertEquals(10, flatFile.getDispatchOffset()); ++ Assert.assertEquals(9 + 1, flatFile.getDispatchOffset()); ++ Assert.assertEquals(9, flatFile.getCommitLogDispatchCommitOffset()); + + dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 8, 8, 0, 0, buffer1); + dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 9, 9, 0, 0, buffer2); + dispatcher.buildConsumeQueueAndIndexFile(); + Assert.assertEquals(7, flatFile.getConsumeQueueMaxOffset()); +- Assert.assertEquals(7, flatFile.getDispatchOffset()); + + dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 7, 7, 0, 0, buffer1); + dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 8, 8, 0, 0, buffer2); + dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 9, 9, 0, 0, buffer3); + dispatcher.buildConsumeQueueAndIndexFile(); +- Assert.assertEquals(10, flatFile.getConsumeQueueMaxOffset()); ++ Assert.assertEquals(6, flatFile.getConsumeQueueMinOffset()); ++ Assert.assertEquals(9 + 1, flatFile.getConsumeQueueMaxOffset()); + } + + @Test +@@ -142,6 +143,7 @@ public class TieredDispatcherTest { + Mockito.when(defaultStore.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId())).thenReturn(0L); + Mockito.when(defaultStore.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId())).thenReturn(9L); + ++ // mock cq item, position = 7 + ByteBuffer cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE); + cqItem.putLong(7); + cqItem.putInt(MessageBufferUtilTest.MSG_LEN); +@@ -150,13 +152,13 @@ public class TieredDispatcherTest { + SelectMappedBufferResult mockResult = new SelectMappedBufferResult(0, cqItem, ConsumeQueue.CQ_STORE_UNIT_SIZE, null); + Mockito.when(((ConsumeQueue) defaultStore.getConsumeQueue(mq.getTopic(), mq.getQueueId())).getIndexBuffer(6)).thenReturn(mockResult); + ++ // mock cq item, position = 8 + cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE); + cqItem.putLong(8); + cqItem.putInt(MessageBufferUtilTest.MSG_LEN); + cqItem.putLong(1); + cqItem.flip(); + mockResult = new SelectMappedBufferResult(0, cqItem, ConsumeQueue.CQ_STORE_UNIT_SIZE, null); +- + Mockito.when(((ConsumeQueue) defaultStore.getConsumeQueue(mq.getTopic(), mq.getQueueId())).getIndexBuffer(7)).thenReturn(mockResult); + + mockResult = new SelectMappedBufferResult(0, MessageBufferUtilTest.buildMockedMessageBuffer(), MessageBufferUtilTest.MSG_LEN, null); +@@ -167,7 +169,10 @@ public class TieredDispatcherTest { + mockResult = new SelectMappedBufferResult(0, msg, MessageBufferUtilTest.MSG_LEN, null); + Mockito.when(defaultStore.selectOneMessageByOffset(8, MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult); + +- dispatcher.dispatchFlatFile(flatFileManager.getOrCreateFlatFileIfAbsent(mq)); ++ CompositeQueueFlatFile flatFile = flatFileManager.getOrCreateFlatFileIfAbsent(mq); ++ Assert.assertNotNull(flatFile); ++ flatFile.initOffset(7); ++ dispatcher.dispatchFlatFile(flatFile); + Assert.assertEquals(8, flatFileManager.getFlatFile(mq).getDispatchOffset()); + } + } +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java +index d75b2f916..774c6cf64 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java +@@ -23,6 +23,7 @@ import java.util.Objects; + import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.SystemUtils; + import org.apache.commons.lang3.tuple.Triple; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.store.DispatchRequest; + import org.apache.rocketmq.store.GetMessageResult; +@@ -40,7 +41,6 @@ import org.apache.rocketmq.tieredstore.file.TieredIndexFile; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +-import org.apache.rocketmq.common.BoundaryType; + import org.awaitility.Awaitility; + import org.junit.After; + import org.junit.Assert; +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java +index 27efe111e..2e028ada3 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java +@@ -119,7 +119,7 @@ public class CompositeQueueFlatFileTest { + Assert.assertEquals(AppendResult.SUCCESS, result); + + file.commit(true); +- file.persistMetadata(); ++ file.flushMetadata(); + + QueueMetadata queueMetadata = metadataStore.getQueue(mq); + Assert.assertEquals(53, queueMetadata.getMaxOffset()); +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java +index b7528c5e4..20fe4dd70 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java +@@ -72,10 +72,15 @@ public class TieredFlatFileManagerTest { + + CompositeFlatFile flatFile = flatFileManager.getFlatFile(mq); + Assert.assertNotNull(flatFile); +- Assert.assertEquals(100, flatFile.getDispatchOffset()); ++ Assert.assertEquals(-1L, flatFile.getDispatchOffset()); ++ flatFile.initOffset(100L); ++ Assert.assertEquals(100L, flatFile.getDispatchOffset()); ++ flatFile.initOffset(200L); ++ Assert.assertEquals(100L, flatFile.getDispatchOffset()); + + CompositeFlatFile flatFile1 = flatFileManager.getFlatFile(mq1); + Assert.assertNotNull(flatFile1); ++ flatFile1.initOffset(200L); + Assert.assertEquals(200, flatFile1.getDispatchOffset()); + + flatFileManager.destroyCompositeFile(mq); +-- +2.32.0.windows.2 + + +From 99b39a35f29e491862296d56b7938a995d153974 Mon Sep 17 00:00:00 2001 +From: ShuangxiDing +Date: Thu, 10 Aug 2023 11:28:39 +0800 +Subject: [PATCH 02/12] [ISSUE #7115] Fix grpc response message NPE (#7116) + +--- + .../apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java +index 0b3c85ea6..efa879a9c 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java +@@ -92,7 +92,7 @@ public class ResponseBuilder { + public Status buildStatus(Code code, String message) { + return Status.newBuilder() + .setCode(code) +- .setMessage(message) ++ .setMessage(message != null ? message : code.name()) + .build(); + } + +-- +2.32.0.windows.2 + + +From c0ba453f38183266cf9a69be754e620311e1923b Mon Sep 17 00:00:00 2001 +From: caigy +Date: Thu, 10 Aug 2023 14:08:17 +0800 +Subject: [PATCH 03/12] [ISSUE #7129] Fix resource collisions in acl tests + (#7130) + +* run acl tests sequencially to avoid collision + +* disable reuseForks for acl like broker + +* Revert "[ISSUE #7135] Temporarily ignoring plainAccessValidator test (#7135)" + +This reverts commit 6bc2c8474a0ce1e2833c82dffea7b1d8f718fcd7. +--- + acl/pom.xml | 13 +++++++++++++ + .../acl/plain/PlainAccessControlFlowTest.java | 5 ----- + .../acl/plain/PlainAccessValidatorTest.java | 3 --- + .../acl/plain/PlainPermissionManagerTest.java | 3 --- + 4 files changed, 13 insertions(+), 11 deletions(-) + +diff --git a/acl/pom.xml b/acl/pom.xml +index 67bfcb8d2..989c0cf77 100644 +--- a/acl/pom.xml ++++ b/acl/pom.xml +@@ -74,4 +74,17 @@ + test + + ++ ++ ++ ++ ++ maven-surefire-plugin ++ ${maven-surefire-plugin.version} ++ ++ 1 ++ false ++ ++ ++ ++ + +diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java +index e7fd0932f..519345714 100644 +--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java ++++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java +@@ -31,7 +31,6 @@ import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2; + import org.junit.Assert; +-import org.junit.Ignore; + import org.junit.Test; + + import java.io.File; +@@ -44,7 +43,6 @@ import java.util.Collections; + import java.util.LinkedList; + import java.util.List; + +- + /** + *

In this class, we'll test the following scenarios, each containing several consecutive operations on ACL, + *

like updating and deleting ACL, changing config files and checking validations. +@@ -52,9 +50,6 @@ import java.util.List; + *

Case 2: Only conf/acl/plain_acl.yml exists; + *

Case 3: Both conf/plain_acl.yml and conf/acl/plain_acl.yml exists. + */ +- +-// Ignore this test case as it is currently unable to pass on ubuntu workflow +-@Ignore + public class PlainAccessControlFlowTest { + public static final String DEFAULT_TOPIC = "topic-acl"; + +diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java +index a3a925758..ef0cffbdc 100644 +--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java ++++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java +@@ -56,11 +56,8 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; + import org.junit.After; + import org.junit.Assert; + import org.junit.Before; +-import org.junit.Ignore; + import org.junit.Test; + +-// Ignore this test case as it is currently unable to pass on ubuntu workflow +-@Ignore + public class PlainAccessValidatorTest { + + private PlainAccessValidator plainAccessValidator; +diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java +index aa7539f3a..941d8c779 100644 +--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java ++++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java +@@ -29,7 +29,6 @@ import org.assertj.core.api.Assertions; + import org.assertj.core.util.Lists; + import org.junit.Assert; + import org.junit.Before; +-import org.junit.Ignore; + import org.junit.Test; + + import java.io.File; +@@ -42,8 +41,6 @@ import java.util.List; + import java.util.Map; + import java.util.Set; + +-// Ignore this test case as it is currently unable to pass on ubuntu workflow +-@Ignore + public class PlainPermissionManagerTest { + + PlainPermissionManager plainPermissionManager; +-- +2.32.0.windows.2 + + +From 8741ff8c9b3bdbfc97976285affa7ea35c81243c Mon Sep 17 00:00:00 2001 +From: ShuangxiDing +Date: Thu, 10 Aug 2023 17:41:15 +0800 +Subject: [PATCH 04/12] [ISSUE #7153] Add switch for MIXED message type (#7154) + +Add a switch for MIXED message type when creating a Topic in the Broker. +--- + .../broker/processor/AdminBrokerProcessor.java | 8 ++++++++ + .../java/org/apache/rocketmq/common/BrokerConfig.java | 10 ++++++++++ + 2 files changed, 18 insertions(+) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +index a6ce03dc2..bbddcec2d 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +@@ -59,6 +59,7 @@ import org.apache.rocketmq.common.TopicConfig; + import org.apache.rocketmq.common.UnlockCallback; + import org.apache.rocketmq.common.UtilAll; + import org.apache.rocketmq.common.attribute.AttributeParser; ++import org.apache.rocketmq.common.attribute.TopicMessageType; + import org.apache.rocketmq.common.constant.ConsumeInitMode; + import org.apache.rocketmq.common.constant.FIleReadaheadMode; + import org.apache.rocketmq.common.constant.LoggerName; +@@ -439,6 +440,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + String attributesModification = requestHeader.getAttributes(); + topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification)); + ++ if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED ++ && !brokerController.getBrokerConfig().isEnableMixedMessageType()) { ++ response.setCode(ResponseCode.SYSTEM_ERROR); ++ response.setRemark("MIXED message type is not supported."); ++ return response; ++ } ++ + try { + this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); + if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { +diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +index a815636b1..99a5db5ad 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java ++++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +@@ -393,6 +393,8 @@ public class BrokerConfig extends BrokerIdentity { + */ + private boolean enableSingleTopicRegister = false; + ++ private boolean enableMixedMessageType = false; ++ + public long getMaxPopPollingSize() { + return maxPopPollingSize; + } +@@ -1712,4 +1714,12 @@ public class BrokerConfig extends BrokerIdentity { + public void setEnableSingleTopicRegister(boolean enableSingleTopicRegister) { + this.enableSingleTopicRegister = enableSingleTopicRegister; + } ++ ++ public boolean isEnableMixedMessageType() { ++ return enableMixedMessageType; ++ } ++ ++ public void setEnableMixedMessageType(boolean enableMixedMessageType) { ++ this.enableMixedMessageType = enableMixedMessageType; ++ } + } +-- +2.32.0.windows.2 + + +From f534501855f8edbcb58f5b856973bf1027b5cf3a Mon Sep 17 00:00:00 2001 +From: Steven +Date: Fri, 11 Aug 2023 10:25:48 +0800 +Subject: [PATCH 05/12] [Feature 7155] add errlog when cmd err (#7157) +MIME-Version: 1.0 +Content-Type: text/plain; charset=UTF-8 +Content-Transfer-Encoding: 8bit + +Co-authored-by: 十真 +--- + .../src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java | 1 + + 1 file changed, 1 insertion(+) + +diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java +index b00bad3c5..5a8a7cd54 100644 +--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java ++++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java +@@ -52,6 +52,7 @@ public class ServerUtil { + System.exit(0); + } + } catch (ParseException e) { ++ System.err.println(e.getMessage()); + hf.printHelp(appName, options, true); + System.exit(1); + } +-- +2.32.0.windows.2 + + +From db58f00c0fe0f129611d654291f2177de55dc9ff Mon Sep 17 00:00:00 2001 +From: Zhouxiang Zhan +Date: Fri, 11 Aug 2023 19:18:30 +0800 +Subject: [PATCH 06/12] [ISSUE #7169] Change metadataThreadPoolQueueCapacity to + 100000 (#7170) + +--- + .../main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +index 4f57a7052..39caaa0d9 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +@@ -165,7 +165,7 @@ public class ProxyConfig implements ConfigFile { + private int subscriptionGroupConfigCacheExpiredInSeconds = 20; + private int subscriptionGroupConfigCacheMaxNum = 20000; + private int metadataThreadPoolNums = 3; +- private int metadataThreadPoolQueueCapacity = 1000; ++ private int metadataThreadPoolQueueCapacity = 100000; + + private int transactionHeartbeatThreadPoolNums = 20; + private int transactionHeartbeatThreadPoolQueueCapacity = 200; +-- +2.32.0.windows.2 + + +From 1f04e68a2e331ab035b791280c5a91b60fe0c85f Mon Sep 17 00:00:00 2001 +From: yx9o +Date: Sat, 12 Aug 2023 21:12:22 +0800 +Subject: [PATCH 07/12] [ISSUE #7172] Unified Chinese for Name Server (#7173) + +--- + docs/cn/concept.md | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/docs/cn/concept.md b/docs/cn/concept.md +index cb2c863bd..3d67e9371 100644 +--- a/docs/cn/concept.md ++++ b/docs/cn/concept.md +@@ -17,7 +17,7 @@ RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer + 消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。 + + ## 6 名字服务(Name Server) +- 名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。 ++名字服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。 + + ## 7 拉取式消费(Pull Consumer) + Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。 +-- +2.32.0.windows.2 + + +From 25005060bbace477eeaaf4c0142cece5213efbbf Mon Sep 17 00:00:00 2001 +From: yx9o +Date: Sun, 13 Aug 2023 20:52:17 +0800 +Subject: [PATCH 08/12] [ISSUE #7176] Correct mismatched logs (#7177) + +--- + .../org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +index 0055a1cc8..f7a95f0a6 100644 +--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java ++++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +@@ -522,7 +522,7 @@ public class RouteInfoManager { + this.lock.writeLock().unlock(); + } + } catch (Exception e) { +- log.error("wipeWritePermOfBrokerByLock Exception", e); ++ log.error("addWritePermOfBrokerByLock Exception", e); + } + return 0; + } +-- +2.32.0.windows.2 + + +From ac411daa27117e9115a8fc5e2d5753085f009ed9 Mon Sep 17 00:00:00 2001 +From: yx9o +Date: Tue, 15 Aug 2023 08:31:00 +0800 +Subject: [PATCH 09/12] [ISSUE #7183] Correct mismatched commandDesc (#7184) + +--- + .../tools/command/topic/RemappingStaticTopicSubCommand.java | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java +index 849f680d0..2a08fdb5b 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java +@@ -47,7 +47,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Update or create static topic, which has fixed number of queues"; ++ return "Remapping static topic."; + } + + @Override +-- +2.32.0.windows.2 + + +From 55e0cdb2af3ab75a6d892f919d60797f17a99fda Mon Sep 17 00:00:00 2001 +From: redlsz +Date: Tue, 15 Aug 2023 19:19:45 +0800 +Subject: [PATCH 10/12] fix: IndexOutOfBoundsException when process pop + response (#7003) + +--- + .../org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 5 ++++- + .../rocketmq/proxy/service/message/LocalMessageService.java | 5 ++++- + .../rocketmq/remoting/protocol/header/ExtraInfoUtil.java | 4 ++++ + 3 files changed, 12 insertions(+), 2 deletions(-) + +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +index 708a6acd1..5101ffc8e 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +@@ -1174,7 +1174,10 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { + Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))); + continue; + } +- key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId()); ++ // Value of POP_CK is used to determine whether it is a pop retry, ++ // cause topic could be rewritten by broker. ++ key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), ++ messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getQueueId()); + if (!sortMap.containsKey(key)) { + sortMap.put(key, new ArrayList<>(4)); + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +index 115c140ff..eb2c4d9ee 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +@@ -249,7 +249,10 @@ public class LocalMessageService implements MessageService { + // + Map> sortMap = new HashMap<>(16); + for (MessageExt messageExt : messageExtList) { +- String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId()); ++ // Value of POP_CK is used to determine whether it is a pop retry, ++ // cause topic could be rewritten by broker. ++ String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), ++ messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getQueueId()); + if (!sortMap.containsKey(key)) { + sortMap.put(key, new ArrayList<>(4)); + } +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java +index 9a5fa89ab..13094331e 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java +@@ -282,6 +282,10 @@ public class ExtraInfoUtil { + return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key; + } + ++ public static String getStartOffsetInfoMapKey(String topic, String popCk, long key) { ++ return ((topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || popCk != null) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key; ++ } ++ + public static String getQueueOffsetKeyValueKey(long queueId, long queueOffset) { + return QUEUE_OFFSET + queueId + "%" + queueOffset; + } +-- +2.32.0.windows.2 + + +From a9c0b43f7f6ce5acfc4f2f3069553071fa93dfee Mon Sep 17 00:00:00 2001 +From: yx9o +Date: Wed, 16 Aug 2023 18:45:00 +0800 +Subject: [PATCH 11/12] [ISSUE #7192] Correct typos (#7193) + +--- + .../tools/command/consumer/ConsumerProgressSubCommand.java | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java +index f51a24673..97125b854 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java +@@ -54,7 +54,7 @@ public class ConsumerProgressSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Query consumers's progress, speed"; ++ return "Query consumer's progress, speed."; + } + + @Override +-- +2.32.0.windows.2 + + +From 5a3de926b816db5a121c1d788430072a3bc942ae Mon Sep 17 00:00:00 2001 +From: Zhouxiang Zhan +Date: Wed, 16 Aug 2023 20:52:53 +0800 +Subject: [PATCH 12/12] Optimize updateSubscription check exist loop (#7190) + +--- + .../broker/client/ConsumerGroupInfo.java | 17 ++++++----------- + 1 file changed, 6 insertions(+), 11 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java +index 867b9c720..1ea58c125 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java +@@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.client; + + import io.netty.channel.Channel; + import java.util.ArrayList; ++import java.util.HashSet; + import java.util.Iterator; + import java.util.List; + import java.util.Map.Entry; +@@ -172,7 +173,7 @@ public class ConsumerGroupInfo { + */ + public boolean updateSubscription(final Set subList) { + boolean updated = false; +- ++ Set topicSet = new HashSet<>(); + for (SubscriptionData sub : subList) { + SubscriptionData old = this.subscriptionTable.get(sub.getTopic()); + if (old == null) { +@@ -194,22 +195,16 @@ public class ConsumerGroupInfo { + + this.subscriptionTable.put(sub.getTopic(), sub); + } ++ // Add all new topics to the HashSet ++ topicSet.add(sub.getTopic()); + } + + Iterator> it = this.subscriptionTable.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + String oldTopic = next.getKey(); +- +- boolean exist = false; +- for (SubscriptionData sub : subList) { +- if (sub.getTopic().equals(oldTopic)) { +- exist = true; +- break; +- } +- } +- +- if (!exist) { ++ // Check HashSet with O(1) time complexity ++ if (!topicSet.contains(oldTopic)) { + log.warn("subscription changed, group: {} remove topic {} {}", + this.groupName, + oldTopic, +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index b5cfd83..55f859f 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.3 -Release: 10 +Release: 11 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -19,6 +19,7 @@ Patch0006: patch006-backport-auto-batch-producer.patch Patch0007: patch007-backport-fix-some-bugs.patch Patch0008: patch008-backport-Allow-BoundaryType.patch Patch0009: patch009-backport-Support-KV-Storage.patch +Patch0010: patch010-backport-add-some-fixes.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -53,6 +54,9 @@ exit 0 %changelog +* Wed Oct 1 2023 ShiZhili - 5.1.3-11 +- backport add some fixes + * Wed Sep 20 2023 ShiZhili - 5.1.3-10 - backport support kv storage -- Gitee