From d79c72c4c56e16ae9e4df0e689852af75965eb42 Mon Sep 17 00:00:00 2001 From: shizhili Date: Mon, 20 Nov 2023 11:06:19 +0800 Subject: [PATCH] backport some enhancement of tiered storage --- ...ckport-enhancement-of-tiered-storage.patch | 601 ++++++++++++++++++ rocketmq.spec | 6 +- 2 files changed, 606 insertions(+), 1 deletion(-) create mode 100644 patch018-backport-enhancement-of-tiered-storage.patch diff --git a/patch018-backport-enhancement-of-tiered-storage.patch b/patch018-backport-enhancement-of-tiered-storage.patch new file mode 100644 index 0000000..ed79df9 --- /dev/null +++ b/patch018-backport-enhancement-of-tiered-storage.patch @@ -0,0 +1,601 @@ +From 1a8e7cb17cb29ed33b0196b52e452a6e76ade781 Mon Sep 17 00:00:00 2001 +From: yuz10 <845238369@qq.com> +Date: Tue, 12 Sep 2023 19:33:41 +0800 +Subject: [PATCH 1/5] [ISSUE #7345] Fix wrong result of searchOffset in tiered + storage + +--- + .../tieredstore/file/TieredFlatFile.java | 5 +- + .../tieredstore/file/TieredFlatFileTest.java | 46 +++++++++++++++++-- + 2 files changed, 46 insertions(+), 5 deletions(-) + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +index 426c4e09d..d973179ee 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +@@ -365,7 +365,10 @@ public class TieredFlatFile { + if (!segmentList.isEmpty()) { + return boundaryType == BoundaryType.UPPER ? segmentList.get(0) : segmentList.get(segmentList.size() - 1); + } +- return fileSegmentList.isEmpty() ? null : fileSegmentList.get(fileSegmentList.size() - 1); ++ if (fileSegmentList.isEmpty()) { ++ return null; ++ } ++ return boundaryType == BoundaryType.UPPER ? fileSegmentList.get(fileSegmentList.size() - 1) : fileSegmentList.get(0); + } finally { + fileSegmentLock.readLock().unlock(); + } +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java +index 7a4d05969..7e2fbf201 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java +@@ -16,10 +16,7 @@ + */ + package org.apache.rocketmq.tieredstore.file; + +-import java.io.IOException; +-import java.nio.ByteBuffer; +-import java.util.ArrayList; +-import java.util.List; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; + import org.apache.rocketmq.tieredstore.common.FileSegmentType; +@@ -35,6 +32,11 @@ import org.junit.Assert; + import org.junit.Before; + import org.junit.Test; + ++import java.io.IOException; ++import java.nio.ByteBuffer; ++import java.util.ArrayList; ++import java.util.List; ++ + public class TieredFlatFileTest { + + private final String storePath = TieredStoreTestUtil.getRandomStorePath(); +@@ -301,4 +303,40 @@ public class TieredFlatFileTest { + fileQueue.rollingNewFile(); + Assert.assertEquals(2, fileQueue.getFileSegmentCount()); + } ++ ++ @Test ++ public void testGetFileByTime() { ++ String filePath = TieredStoreUtil.toPath(queue); ++ TieredFlatFile tieredFlatFile = fileQueueFactory.createFlatFileForCommitLog(filePath); ++ TieredFileSegment fileSegment1 = new MemoryFileSegment(FileSegmentType.CONSUME_QUEUE, queue, 1100, storeConfig); ++ fileSegment1.setMinTimestamp(100); ++ fileSegment1.setMaxTimestamp(200); ++ ++ TieredFileSegment fileSegment2 = new MemoryFileSegment(FileSegmentType.CONSUME_QUEUE, queue, 1100, storeConfig); ++ fileSegment2.setMinTimestamp(200); ++ fileSegment2.setMaxTimestamp(300); ++ ++ tieredFlatFile.getFileSegmentList().add(fileSegment1); ++ tieredFlatFile.getFileSegmentList().add(fileSegment2); ++ ++ TieredFileSegment segmentUpper = tieredFlatFile.getFileByTime(400, BoundaryType.UPPER); ++ Assert.assertEquals(fileSegment2, segmentUpper); ++ ++ TieredFileSegment segmentLower = tieredFlatFile.getFileByTime(400, BoundaryType.LOWER); ++ Assert.assertEquals(fileSegment2, segmentLower); ++ ++ ++ TieredFileSegment segmentUpper2 = tieredFlatFile.getFileByTime(0, BoundaryType.UPPER); ++ Assert.assertEquals(fileSegment1, segmentUpper2); ++ ++ TieredFileSegment segmentLower2 = tieredFlatFile.getFileByTime(0, BoundaryType.LOWER); ++ Assert.assertEquals(fileSegment1, segmentLower2); ++ ++ ++ TieredFileSegment segmentUpper3 = tieredFlatFile.getFileByTime(200, BoundaryType.UPPER); ++ Assert.assertEquals(fileSegment1, segmentUpper3); ++ ++ TieredFileSegment segmentLower3 = tieredFlatFile.getFileByTime(200, BoundaryType.LOWER); ++ Assert.assertEquals(fileSegment2, segmentLower3); ++ } + } +-- +2.32.0.windows.2 + + +From fd32dae2ab59f86dd215eeec405bf4fa6212bcb3 Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Tue, 12 Sep 2023 19:58:08 +0800 +Subject: [PATCH 2/5] [ISSUE #6633] Not clear uninitialized files and fix + metadata recover (#7342) + +--- + .../tieredstore/file/TieredFlatFile.java | 53 +++++++------------ + .../file/TieredFlatFileManager.java | 10 ++-- + 2 files changed, 22 insertions(+), 41 deletions(-) + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +index d973179ee..d96eb6e8f 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +@@ -16,7 +16,6 @@ + */ + package org.apache.rocketmq.tieredstore.file; + +-import com.alibaba.fastjson.JSON; + import com.google.common.annotations.VisibleForTesting; + import java.nio.ByteBuffer; + import java.util.ArrayList; +@@ -25,13 +24,13 @@ import java.util.Comparator; + import java.util.HashSet; + import java.util.LinkedList; + import java.util.List; +-import java.util.Objects; + import java.util.Set; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.CopyOnWriteArrayList; + import java.util.concurrent.locks.ReentrantReadWriteLock; + import java.util.stream.Collectors; + import javax.annotation.Nullable; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.tieredstore.common.AppendResult; +@@ -43,7 +42,6 @@ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; + import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator; + import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +-import org.apache.rocketmq.common.BoundaryType; + + public class TieredFlatFile { + +@@ -177,7 +175,10 @@ public class TieredFlatFile { + } + } + +- private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) { ++ /** ++ * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full ++ */ ++ public void updateFileSegment(TieredFileSegment fileSegment) { + + FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment( + this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset()); +@@ -186,45 +187,24 @@ public class TieredFlatFile { + if (metadata == null) { + metadata = new FileSegmentMetadata( + this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType()); +- metadata.setCreateTimestamp(fileSegment.getMinTimestamp()); +- metadata.setBeginTimestamp(fileSegment.getMinTimestamp()); +- metadata.setEndTimestamp(fileSegment.getMaxTimestamp()); +- if (fileSegment.isClosed()) { +- metadata.setStatus(FileSegmentMetadata.STATUS_DELETED); +- } +- this.tieredMetadataStore.updateFileSegment(metadata); ++ metadata.setCreateTimestamp(System.currentTimeMillis()); + } +- return metadata; +- } +- +- /** +- * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full +- */ +- public void updateFileSegment(TieredFileSegment fileSegment) { +- FileSegmentMetadata segmentMetadata = getOrCreateFileSegmentMetadata(fileSegment); + +- if (segmentMetadata.getStatus() == FileSegmentMetadata.STATUS_NEW +- && fileSegment.isFull() +- && !fileSegment.needCommit()) { ++ metadata.setSize(fileSegment.getCommitPosition()); ++ metadata.setBeginTimestamp(fileSegment.getMinTimestamp()); ++ metadata.setEndTimestamp(fileSegment.getMaxTimestamp()); + +- segmentMetadata.markSealed(); ++ if (fileSegment.isFull() && !fileSegment.needCommit()) { ++ if (metadata.getStatus() == FileSegmentMetadata.STATUS_NEW) { ++ metadata.markSealed(); ++ } + } + + if (fileSegment.isClosed()) { +- segmentMetadata.setStatus(FileSegmentMetadata.STATUS_DELETED); ++ metadata.setStatus(FileSegmentMetadata.STATUS_DELETED); + } + +- segmentMetadata.setSize(fileSegment.getCommitPosition()); +- segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp()); +- +- FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment( +- this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset()); +- +- if (!Objects.equals(metadata, segmentMetadata)) { +- this.tieredMetadataStore.updateFileSegment(segmentMetadata); +- logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, content: {}", +- segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata)); +- } ++ this.tieredMetadataStore.updateFileSegment(metadata); + } + + private void checkAndFixFileSize() { +@@ -598,6 +578,9 @@ public class TieredFlatFile { + logger.error("TieredFlatFile#destroy: mark file segment: {} is deleted failed", fileSegment.getPath(), e); + } + fileSegment.destroyFile(); ++ if (!fileSegment.exists()) { ++ tieredMetadataStore.deleteFileSegment(filePath, fileType, fileSegment.getBaseOffset()); ++ } + } + fileSegmentList.clear(); + needCommitFileSegmentList.clear(); +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +index 7c744af3b..087ea8c9c 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +@@ -136,15 +136,13 @@ public class TieredFlatFileManager { + TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); + for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) { + TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> { +- flatFile.getCompositeFlatFileLock().lock(); + try { ++ flatFile.getCompositeFlatFileLock().lock(); + flatFile.cleanExpiredFile(expiredTimeStamp); + flatFile.destroyExpiredFile(); +- if (flatFile.getConsumeQueueBaseOffset() == -1) { +- logger.info("Clean flatFile because file not initialized, topic={}, queueId={}", +- flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId()); +- destroyCompositeFile(flatFile.getMessageQueue()); +- } ++ } catch (Throwable t) { ++ logger.error("Do Clean expired file error, topic={}, queueId={}", ++ flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), t); + } finally { + flatFile.getCompositeFlatFileLock().unlock(); + } +-- +2.32.0.windows.2 + + +From 4a8e0d5b851d1f9573cda79b7d2e42ee498809da Mon Sep 17 00:00:00 2001 +From: guyinyou <36399867+guyinyou@users.noreply.github.com> +Date: Wed, 13 Sep 2023 16:08:03 +0800 +Subject: [PATCH 3/5] [ISSUE #7351] Allow mqadmin to operate slave nodes + +Co-authored-by: guyinyou +--- + .../processor/AdminBrokerProcessor.java | 12 -- + .../processor/AdminBrokerProcessorTest.java | 106 ------------------ + 2 files changed, 118 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +index 8fbcd3c94..9e48431be 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +@@ -406,9 +406,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); +- if (validateSlave(response)) { +- return response; +- } + final CreateTopicRequestHeader requestHeader = + (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); + +@@ -519,9 +516,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); +- if (validateSlave(response)) { +- return response; +- } + DeleteTopicRequestHeader requestHeader = + (DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class); + +@@ -1413,9 +1407,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); +- if (validateSlave(response)) { +- return response; +- } + + LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroup called by {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); +@@ -1480,9 +1471,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); +- if (validateSlave(response)) { +- return response; +- } + DeleteSubscriptionGroupRequestHeader requestHeader = + (DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class); + +diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +index 9d17011b6..ec252cece 100644 +--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java ++++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +@@ -76,7 +76,6 @@ import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi + import org.apache.rocketmq.store.DefaultMessageStore; + import org.apache.rocketmq.store.MessageStore; + import org.apache.rocketmq.store.SelectMappedBufferResult; +-import org.apache.rocketmq.store.config.BrokerRole; + import org.apache.rocketmq.store.config.MessageStoreConfig; + import org.apache.rocketmq.store.logfile.DefaultMappedFile; + import org.apache.rocketmq.store.stats.BrokerStats; +@@ -250,32 +249,6 @@ public class AdminBrokerProcessorTest { + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + +- @Test +- public void testUpdateAndCreateTopicOnSlaveInRocksdb() throws Exception { +- if (notToBeExecuted()) { +- return; +- } +- initRocksdbTopicManager(); +- testUpdateAndCreateTopicOnSlave(); +- } +- +- @Test +- public void testUpdateAndCreateTopicOnSlave() throws Exception { +- // setup +- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class); +- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE); +- defaultMessageStore = mock(DefaultMessageStore.class); +- when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); +- +- // test on slave +- String topic = "TEST_CREATE_TOPIC"; +- RemotingCommand request = buildCreateTopicRequest(topic); +- RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); +- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); +- assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " + +- "please execute it from master broker."); +- } +- + @Test + public void testDeleteTopicInRocksdb() throws Exception { + if (notToBeExecuted()) { +@@ -301,31 +274,6 @@ public class AdminBrokerProcessorTest { + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + +- @Test +- public void testDeleteTopicOnSlaveInRocksdb() throws Exception { +- if (notToBeExecuted()) { +- return; +- } +- initRocksdbTopicManager(); +- testDeleteTopicOnSlave(); +- } +- +- @Test +- public void testDeleteTopicOnSlave() throws Exception { +- // setup +- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class); +- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE); +- defaultMessageStore = mock(DefaultMessageStore.class); +- when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); +- +- String topic = "TEST_DELETE_TOPIC"; +- RemotingCommand request = buildDeleteTopicRequest(topic); +- RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); +- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); +- assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " + +- "please execute it from master broker."); +- } +- + @Test + public void testDeleteWithPopRetryTopic() throws Exception { + String topic = "topicA"; +@@ -538,36 +486,6 @@ public class AdminBrokerProcessorTest { + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + +- @Test +- public void testUpdateAndCreateSubscriptionGroupOnSlaveInRocksdb() throws Exception { +- initRocksdbSubscriptionManager(); +- testUpdateAndCreateSubscriptionGroupOnSlave(); +- } +- +- @Test +- public void testUpdateAndCreateSubscriptionGroupOnSlave() throws RemotingCommandException { +- // Setup +- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class); +- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE); +- defaultMessageStore = mock(DefaultMessageStore.class); +- when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); +- +- // Test +- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null); +- SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); +- subscriptionGroupConfig.setBrokerId(1); +- subscriptionGroupConfig.setGroupName("groupId"); +- subscriptionGroupConfig.setConsumeEnable(Boolean.TRUE); +- subscriptionGroupConfig.setConsumeBroadcastEnable(Boolean.TRUE); +- subscriptionGroupConfig.setRetryMaxTimes(111); +- subscriptionGroupConfig.setConsumeFromMinEnable(Boolean.TRUE); +- request.setBody(JSON.toJSON(subscriptionGroupConfig).toString().getBytes()); +- RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); +- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); +- assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " + +- "please execute it from master broker."); +- } +- + @Test + public void testGetAllSubscriptionGroupInRocksdb() throws Exception { + initRocksdbSubscriptionManager(); +@@ -596,30 +514,6 @@ public class AdminBrokerProcessorTest { + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + +- @Test +- public void testDeleteSubscriptionGroupOnSlaveInRocksdb() throws Exception { +- initRocksdbSubscriptionManager(); +- testDeleteSubscriptionGroupOnSlave(); +- } +- +- @Test +- public void testDeleteSubscriptionGroupOnSlave() throws RemotingCommandException { +- // Setup +- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class); +- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE); +- defaultMessageStore = mock(DefaultMessageStore.class); +- when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); +- +- // Test +- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, null); +- request.addExtField("groupName", "GID-Group-Name"); +- request.addExtField("removeOffset", "true"); +- RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); +- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); +- assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " + +- "please execute it from master broker."); +- } +- + @Test + public void testGetTopicStatsInfo() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, null); +-- +2.32.0.windows.2 + + +From 831fcc76cd7cd362bb6c136c287c624bb7eaf40a Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Tue, 19 Sep 2023 10:04:04 +0800 +Subject: [PATCH 4/5] [ISSUE #7363] Fix get message from tiered storage return + incorrect next pull offset (#7365) + +--- + .../tieredstore/TieredMessageFetcher.java | 2 +- + .../tieredstore/TieredMessageStore.java | 29 ++++++++++--------- + .../tieredstore/TieredMessageStoreTest.java | 5 ++-- + 3 files changed, 20 insertions(+), 16 deletions(-) + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +index 766ff64f6..c948fa3fa 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +@@ -319,7 +319,7 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + } + + // if cache is miss, immediately pull messages +- LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + ++ LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + + "topic: {}, queue: {}, queue offset: {}, max message num: {}", + mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +index 9fb1b2f01..d7d13d61e 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +@@ -147,6 +147,11 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + public CompletableFuture getMessageAsync(String group, String topic, + int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) { + ++ // For system topic, force reading from local store ++ if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) { ++ return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter); ++ } ++ + if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) { + logger.trace("GetMessageAsync from current store, topic: {}, queue: {}, offset: {}", topic, queueId, offset); + } else { +@@ -158,6 +163,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + return fetcher + .getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter) + .thenApply(result -> { ++ + Attributes latencyAttributes = TieredStoreMetricsManager.newAttributesBuilder() + .put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_MESSAGE) + .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic) +@@ -166,8 +172,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes); + + if (result.getStatus() == GetMessageStatus.OFFSET_FOUND_NULL || +- result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE || +- result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_BADLY) { ++ result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) { + + if (next.checkInStoreByConsumeOffset(topic, queueId, offset)) { + TieredStoreMetricsManager.fallbackTotal.add(1, latencyAttributes); +@@ -178,14 +183,8 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + } + } + +- // Fetch system topic data from the broker when using the force level. +- if (result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) { +- if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) { +- return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); +- } +- } +- + if (result.getStatus() != GetMessageStatus.FOUND && ++ result.getStatus() != GetMessageStatus.NO_MATCHED_LOGIC_QUEUE && + result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_ONE && + result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_BADLY) { + logger.warn("GetMessageAsync not found and message is not in next store, result: {}, " + +@@ -206,10 +205,14 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + if (minOffsetInQueue >= 0 && minOffsetInQueue < result.getMinOffset()) { + result.setMinOffset(minOffsetInQueue); + } +- long maxOffsetInQueue = next.getMaxOffsetInQueue(topic, queueId); +- if (maxOffsetInQueue >= 0 && maxOffsetInQueue > result.getMaxOffset()) { +- result.setMaxOffset(maxOffsetInQueue); +- } ++ ++ // In general, the local cq offset is slightly greater than the commit offset in read message, ++ // so there is no need to update the maximum offset to the local cq offset here, ++ // otherwise it will cause repeated consumption after next begin offset over commit offset. ++ ++ logger.trace("GetMessageAsync result, group: {}, topic: {}, queueId: {}, offset: {}, count:{}, {}", ++ group, topic, queueId, offset, maxMsgNums, result); ++ + return result; + }).exceptionally(e -> { + logger.error("GetMessageAsync from tiered store failed", e); +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +index 2451199c2..07af1fc8b 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +@@ -168,7 +168,7 @@ public class TieredMessageStoreTest { + GetMessageResult result1 = new GetMessageResult(); + result1.setStatus(GetMessageStatus.FOUND); + GetMessageResult result2 = new GetMessageResult(); +- result2.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING); ++ result2.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY); + + when(fetcher.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(CompletableFuture.completedFuture(result1)); + when(nextStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(result2); +@@ -188,7 +188,8 @@ public class TieredMessageStoreTest { + properties.setProperty("tieredStorageLevel", "3"); + configuration.update(properties); + when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true); +- Assert.assertSame(result2, store.getMessage("group", mq.getTopic(), mq.getQueueId(), 0, 0, null)); ++ Assert.assertEquals(result2.getStatus(), ++ store.getMessage("group", mq.getTopic(), mq.getQueueId(), 0, 0, null).getStatus()); + } + + @Test +-- +2.32.0.windows.2 + + +From f05a8da760dfade411ad56ef874f477988479cf9 Mon Sep 17 00:00:00 2001 +From: rongtong +Date: Wed, 20 Sep 2023 15:06:21 +0800 +Subject: [PATCH 5/5] Print admin queue watermark in log (#7372) + +--- + .../main/java/org/apache/rocketmq/broker/BrokerController.java | 1 + + 1 file changed, 1 insertion(+) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +index 13a3feb4e..53e2e1b62 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +@@ -1182,6 +1182,7 @@ public class BrokerController { + LOG_WATER_MARK.info("[WATERMARK] ClientManager Queue Size: {} SlowTimeMills: {}", this.clientManagerThreadPoolQueue.size(), this.headSlowTimeMills(this.clientManagerThreadPoolQueue)); + LOG_WATER_MARK.info("[WATERMARK] Heartbeat Queue Size: {} SlowTimeMills: {}", this.heartbeatThreadPoolQueue.size(), this.headSlowTimeMills(this.heartbeatThreadPoolQueue)); + LOG_WATER_MARK.info("[WATERMARK] Ack Queue Size: {} SlowTimeMills: {}", this.ackThreadPoolQueue.size(), headSlowTimeMills(this.ackThreadPoolQueue)); ++ LOG_WATER_MARK.info("[WATERMARK] Admin Queue Size: {} SlowTimeMills: {}", this.adminBrokerThreadPoolQueue.size(), headSlowTimeMills(this.adminBrokerThreadPoolQueue)); + } + + public MessageStore getMessageStore() { +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 2570f84..5b3e317 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.3 -Release: 18 +Release: 19 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -27,6 +27,7 @@ Patch0014: patch014-backport-Queue-Selection-Strategy-Optimization.patch Patch0015: patch015-backport-fix-some-bugs.patch Patch0016: patch016-backport-Optimize-fault-tolerant-mechanism.patch Patch0017: patch017-backport-Convergent-thread-pool-creation.patch +Patch0018: patch018-backport-enhancement-of-tiered-storage.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -61,6 +62,9 @@ exit 0 %changelog +* Mon Nov 20 2023 ShiZhili - 5.1.3-19 +- backport some enhancement of tiered storage + * Mon Nov 20 2023 ShiZhili - 5.1.3-18 - backport-Convergent-thread-pool-creation -- Gitee