From a3553ea8c53e94fb40149eb64aef1f44afae327f Mon Sep 17 00:00:00 2001 From: shizhili Date: Mon, 30 Oct 2023 21:00:21 +0800 Subject: [PATCH] backport Optimize fault tolerant-mechanism --- ...rt-Optimize-fault-tolerant-mechanism.patch | 520 ++++++++++++++++++ rocketmq.spec | 6 +- 2 files changed, 525 insertions(+), 1 deletion(-) create mode 100644 patch016-backport-Optimize-fault-tolerant-mechanism.patch diff --git a/patch016-backport-Optimize-fault-tolerant-mechanism.patch b/patch016-backport-Optimize-fault-tolerant-mechanism.patch new file mode 100644 index 0000000..9d8ef20 --- /dev/null +++ b/patch016-backport-Optimize-fault-tolerant-mechanism.patch @@ -0,0 +1,520 @@ +From e11e29419f6e2d1d9673d0329e57b824ebf3da47 Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Wed, 6 Sep 2023 20:42:24 +0800 +Subject: [PATCH 1/3] [ISSUE #7308] Adding topic blacklist and filter in tiered + storage module (#7310) + +--- + .../tieredstore/TieredDispatcher.java | 21 +++++++-- + .../tieredstore/TieredMessageStore.java | 1 + + .../file/TieredFlatFileManager.java | 17 ++++--- + .../TieredStoreTopicBlackListFilter.java | 45 +++++++++++++++++++ + .../provider/TieredStoreTopicFilter.java | 25 +++++++++++ + .../TieredStoreTopicBlackListFilterTest.java | 36 +++++++++++++++ + 6 files changed, 136 insertions(+), 9 deletions(-) + create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java + create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java + create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +index 430c2b62e..766c559e9 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +@@ -48,6 +48,8 @@ import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile; + import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager; + import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant; + import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager; ++import org.apache.rocketmq.tieredstore.provider.TieredStoreTopicBlackListFilter; ++import org.apache.rocketmq.tieredstore.provider.TieredStoreTopicFilter; + import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +@@ -56,6 +58,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch + + private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); + ++ private TieredStoreTopicFilter topicFilter; + private final String brokerName; + private final MessageStore defaultStore; + private final TieredMessageStoreConfig storeConfig; +@@ -70,15 +73,15 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch + this.defaultStore = defaultStore; + this.storeConfig = storeConfig; + this.brokerName = storeConfig.getBrokerName(); ++ this.topicFilter = new TieredStoreTopicBlackListFilter(); + this.tieredFlatFileManager = TieredFlatFileManager.getInstance(storeConfig); + this.dispatchRequestReadMap = new ConcurrentHashMap<>(); + this.dispatchRequestWriteMap = new ConcurrentHashMap<>(); + this.dispatchTaskLock = new ReentrantLock(); + this.dispatchWriteLock = new ReentrantLock(); +- this.initScheduleTask(); + } + +- private void initScheduleTask() { ++ protected void initScheduleTask() { + TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() -> + tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> { + if (!flatFile.getCompositeFlatFileLock().isLocked()) { +@@ -87,6 +90,14 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch + }), 30, 10, TimeUnit.SECONDS); + } + ++ public TieredStoreTopicFilter getTopicFilter() { ++ return topicFilter; ++ } ++ ++ public void setTopicFilter(TieredStoreTopicFilter topicFilter) { ++ this.topicFilter = topicFilter; ++ } ++ + @Override + public void dispatch(DispatchRequest request) { + if (stopped) { +@@ -94,7 +105,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch + } + + String topic = request.getTopic(); +- if (TieredStoreUtil.isSystemTopic(topic)) { ++ if (topicFilter != null && topicFilter.filterTopic(topic)) { + return; + } + +@@ -219,6 +230,10 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch + return; + } + ++ if (topicFilter != null && topicFilter.filterTopic(flatFile.getMessageQueue().getTopic())) { ++ return; ++ } ++ + if (flatFile.getDispatchOffset() == -1L) { + return; + } +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +index 78e855f36..9fb1b2f01 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +@@ -90,6 +90,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + boolean loadNextStore = next.load(); + boolean result = loadFlatFile && loadNextStore; + if (result) { ++ dispatcher.initScheduleTask(); + dispatcher.start(); + } + return result; +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +index e9ae4a5a5..7c744af3b 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +@@ -134,21 +134,21 @@ public class TieredFlatFileManager { + public void doCleanExpiredFile() { + long expiredTimeStamp = System.currentTimeMillis() - + TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); +- Random random = new Random(); + for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) { +- int delay = random.nextInt(storeConfig.getMaxCommitJitter()); +- TieredStoreExecutor.cleanExpiredFileExecutor.schedule(() -> { ++ TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> { + flatFile.getCompositeFlatFileLock().lock(); + try { + flatFile.cleanExpiredFile(expiredTimeStamp); + flatFile.destroyExpiredFile(); + if (flatFile.getConsumeQueueBaseOffset() == -1) { ++ logger.info("Clean flatFile because file not initialized, topic={}, queueId={}", ++ flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId()); + destroyCompositeFile(flatFile.getMessageQueue()); + } + } finally { + flatFile.getCompositeFlatFileLock().unlock(); + } +- }, delay, TimeUnit.MILLISECONDS); ++ }); + } + if (indexFile != null) { + indexFile.cleanExpiredFile(expiredTimeStamp); +@@ -218,8 +218,13 @@ public class TieredFlatFileManager { + storeConfig.getBrokerName(), queueMetadata.getQueue().getQueueId())); + queueCount.incrementAndGet(); + }); +- logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms", +- topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS)); ++ ++ if (queueCount.get() == 0L) { ++ metadataStore.deleteTopic(topicMetadata.getTopic()); ++ } else { ++ logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms", ++ topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS)); ++ } + } catch (Exception e) { + logger.error("Recover TopicFlatFile error, topic: {}", topicMetadata.getTopic(), e); + } finally { +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java +new file mode 100644 +index 000000000..50adbb713 +--- /dev/null ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java +@@ -0,0 +1,45 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.tieredstore.provider; ++ ++import java.util.HashSet; ++import java.util.Set; ++import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; ++ ++public class TieredStoreTopicBlackListFilter implements TieredStoreTopicFilter { ++ ++ private final Set topicBlackSet; ++ ++ public TieredStoreTopicBlackListFilter() { ++ this.topicBlackSet = new HashSet<>(); ++ } ++ ++ @Override ++ public boolean filterTopic(String topicName) { ++ if (StringUtils.isBlank(topicName)) { ++ return true; ++ } ++ return TieredStoreUtil.isSystemTopic(topicName) || topicBlackSet.contains(topicName); ++ } ++ ++ @Override ++ public void addTopicToWhiteList(String topicName) { ++ this.topicBlackSet.add(topicName); ++ } ++} +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java +new file mode 100644 +index 000000000..3f26b8b02 +--- /dev/null ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java +@@ -0,0 +1,25 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.tieredstore.provider; ++ ++public interface TieredStoreTopicFilter { ++ ++ boolean filterTopic(String topicName); ++ ++ void addTopicToWhiteList(String topicName); ++} +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java +new file mode 100644 +index 000000000..2bf48173c +--- /dev/null ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java +@@ -0,0 +1,36 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.tieredstore.provider; ++ ++import org.apache.rocketmq.common.topic.TopicValidator; ++import org.junit.Assert; ++import org.junit.Test; ++ ++public class TieredStoreTopicBlackListFilterTest { ++ ++ @Test ++ public void filterTopicTest() { ++ TieredStoreTopicFilter topicFilter = new TieredStoreTopicBlackListFilter(); ++ Assert.assertTrue(topicFilter.filterTopic("")); ++ Assert.assertTrue(topicFilter.filterTopic(TopicValidator.SYSTEM_TOPIC_PREFIX + "_Topic")); ++ ++ String topicName = "WhiteTopic"; ++ Assert.assertFalse(topicFilter.filterTopic(topicName)); ++ topicFilter.addTopicToWhiteList(topicName); ++ Assert.assertTrue(topicFilter.filterTopic(topicName)); ++ } ++} +\ No newline at end of file +-- +2.32.0.windows.2 + + +From 628020537fa7035226bc8dcde9fa33d9d5df30ff Mon Sep 17 00:00:00 2001 +From: rongtong +Date: Thu, 7 Sep 2023 16:17:47 +0800 +Subject: [PATCH 2/3] [ISSUE #7293] Fix NPE when alter sync state set + +--- + .../rocketmq/controller/impl/manager/ReplicasInfoManager.java | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +index b0a67531d..d83a690f9 100644 +--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java ++++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +@@ -104,7 +104,7 @@ public class ReplicasInfoManager { + } + + // Check master +- if (!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) { ++ if (syncStateInfo.getMasterBrokerId() == null || !syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) { + String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}", + syncStateInfo.getMasterBrokerId(), request.getMasterBrokerId()); + LOGGER.error("{}", err); +-- +2.32.0.windows.2 + + +From 6fd0073d6475c539e8f4c30dc4f104a56a21d724 Mon Sep 17 00:00:00 2001 +From: Ji Juntao +Date: Thu, 7 Sep 2023 20:21:16 +0800 +Subject: [PATCH 3/3] [ISSUE #7319] Optimize fault-tolerant mechanism for + sending messages and hot update switch (#7320) + +--- + .../impl/producer/DefaultMQProducerImpl.java | 8 ++------ + .../client/latency/LatencyFaultTolerance.java | 14 +++++++++++++ + .../latency/LatencyFaultToleranceImpl.java | 13 +++++++++++- + .../client/latency/MQFaultStrategy.java | 20 +++++++------------ + .../proxy/service/route/MessageQueueView.java | 9 --------- + .../service/route/TopicRouteService.java | 10 +++++++++- + 6 files changed, 44 insertions(+), 30 deletions(-) + +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +index 2d6b83ac2..b0c212e46 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +@@ -263,9 +263,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { + mQClientFactory.start(); + } + +- if (this.mqFaultStrategy.isStartDetectorEnable()) { +- this.mqFaultStrategy.startDetector(); +- } ++ this.mqFaultStrategy.startDetector(); + + log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), + this.defaultMQProducer.isSendMessageWithVIPChannel()); +@@ -311,9 +309,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { + if (shutdownFactory) { + this.mQClientFactory.shutdown(); + } +- if (this.mqFaultStrategy.isStartDetectorEnable()) { +- this.mqFaultStrategy.shutdown(); +- } ++ this.mqFaultStrategy.shutdown(); + RequestFutureHolder.getInstance().shutdown(this); + log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); + this.serviceState = ServiceState.SHUTDOWN_ALREADY; +diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java +index 72d2f3450..17aaa266a 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java ++++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java +@@ -89,4 +89,18 @@ public interface LatencyFaultTolerance { + * @param detectInterval each broker's detecting interval + */ + void setDetectInterval(final int detectInterval); ++ ++ /** ++ * Use it to set the detector work or not. ++ * ++ * @param startDetectorEnable set the detector's work status ++ */ ++ void setStartDetectorEnable(final boolean startDetectorEnable); ++ ++ /** ++ * Use it to judge if the detector enabled. ++ * ++ * @return is the detector should be started. ++ */ ++ boolean isStartDetectorEnable(); + } +diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java +index 8af629574..d3ff7eb45 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java +@@ -37,6 +37,8 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance + private int detectTimeout = 200; + private int detectInterval = 2000; + private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(); ++ ++ private volatile boolean startDetectorEnable = false; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { +@@ -80,7 +82,9 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance + @Override + public void run() { + try { +- detectByOneRound(); ++ if (startDetectorEnable) { ++ detectByOneRound(); ++ } + } catch (Exception e) { + log.warn("Unexpected exception raised while detecting service reachability", e); + } +@@ -137,6 +141,13 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance + this.faultItemTable.remove(name); + } + ++ public boolean isStartDetectorEnable() { ++ return startDetectorEnable; ++ } ++ ++ public void setStartDetectorEnable(boolean startDetectorEnable) { ++ this.startDetectorEnable = startDetectorEnable; ++ } + @Override + public String pickOneAtLeast() { + final Enumeration elements = this.faultItemTable.elements(); +diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +index c01490784..69fb533e5 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java ++++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +@@ -24,8 +24,8 @@ import org.apache.rocketmq.common.message.MessageQueue; + + public class MQFaultStrategy { + private LatencyFaultTolerance latencyFaultTolerance; +- private boolean sendLatencyFaultEnable; +- private boolean startDetectorEnable; ++ private volatile boolean sendLatencyFaultEnable; ++ private volatile boolean startDetectorEnable; + private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L}; + private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L, 10000L, 30000L}; + +@@ -64,11 +64,11 @@ public class MQFaultStrategy { + + + public MQFaultStrategy(ClientConfig cc, Resolver fetcher, ServiceDetector serviceDetector) { +- this.setStartDetectorEnable(cc.isStartDetectorEnable()); +- this.setSendLatencyFaultEnable(cc.isSendLatencyEnable()); + this.latencyFaultTolerance = new LatencyFaultToleranceImpl(fetcher, serviceDetector); + this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval()); + this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout()); ++ this.setStartDetectorEnable(cc.isStartDetectorEnable()); ++ this.setSendLatencyFaultEnable(cc.isSendLatencyEnable()); + } + + // For unit test. +@@ -123,21 +123,15 @@ public class MQFaultStrategy { + + public void setStartDetectorEnable(boolean startDetectorEnable) { + this.startDetectorEnable = startDetectorEnable; ++ this.latencyFaultTolerance.setStartDetectorEnable(startDetectorEnable); + } + + public void startDetector() { +- // user should start the detector +- // and the thread should not be in running state. +- if (this.sendLatencyFaultEnable && this.startDetectorEnable) { +- // start the detector. +- this.latencyFaultTolerance.startDetector(); +- } ++ this.latencyFaultTolerance.startDetector(); + } + + public void shutdown() { +- if (this.sendLatencyFaultEnable && this.startDetectorEnable) { +- this.latencyFaultTolerance.shutdown(); +- } ++ this.latencyFaultTolerance.shutdown(); + } + + public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java +index 8b3c2f7c8..898e529f8 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java +@@ -26,7 +26,6 @@ public class MessageQueueView { + private final MessageQueueSelector readSelector; + private final MessageQueueSelector writeSelector; + private final TopicRouteWrapper topicRouteWrapper; +- private MQFaultStrategy mqFaultStrategy; + + public MessageQueueView(String topic, TopicRouteData topicRouteData, MQFaultStrategy mqFaultStrategy) { + this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic); +@@ -67,12 +66,4 @@ public class MessageQueueView { + .add("topicRouteWrapper", topicRouteWrapper) + .toString(); + } +- +- public MQFaultStrategy getMQFaultStrategy() { +- return mqFaultStrategy; +- } +- +- public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) { +- this.mqFaultStrategy = mqFaultStrategy; +- } + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +index 74769a423..caf62a1e0 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +@@ -127,7 +127,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { + @Override + public String resolve(String name) { + try { +- String brokerAddr = getBrokerAddr(null, name); ++ String brokerAddr = getBrokerAddr(ProxyContext.createForInner("MQFaultStrategy"), name); + return brokerAddr; + } catch (Exception e) { + return null; +@@ -175,9 +175,17 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { + + public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation, + boolean reachable) { ++ checkSendFaultToleranceEnable(); + this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable); + } + ++ public void checkSendFaultToleranceEnable() { ++ boolean hotLatencySwitch = ConfigurationManager.getProxyConfig().isSendLatencyEnable(); ++ boolean hotDetectorSwitch = ConfigurationManager.getProxyConfig().isStartDetectorEnable(); ++ this.mqFaultStrategy.setSendLatencyFaultEnable(hotLatencySwitch); ++ this.mqFaultStrategy.setStartDetectorEnable(hotDetectorSwitch); ++ } ++ + public MQFaultStrategy getMqFaultStrategy() { + return this.mqFaultStrategy; + } +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 6c8f925..f4c9fc6 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.3 -Release: 16 +Release: 17 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -25,6 +25,7 @@ Patch0012: patch012-backport-enhance-rockdbconfigtojson.patch Patch0013: patch013-backport-enhance-admin-output.patch Patch0014: patch014-backport-Queue-Selection-Strategy-Optimization.patch Patch0015: patch015-backport-fix-some-bugs.patch +Patch0016: patch016-backport-Optimize-fault-tolerant-mechanism.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -59,6 +60,9 @@ exit 0 %changelog +* Mon Oct 30 2023 ShiZhili - 5.1.3-17 +- backport Optimize fault tolerant-mechanism + * Mon Oct 30 2023 ShiZhili - 5.1.3-16 - backport some bug fixes -- Gitee