diff --git a/patch014-backport-Queue-Selection-Strategy-Optimization.patch b/patch014-backport-Queue-Selection-Strategy-Optimization.patch new file mode 100644 index 0000000000000000000000000000000000000000..90d3553aaa7a7ff198dda1cd73050cc111c9e091 --- /dev/null +++ b/patch014-backport-Queue-Selection-Strategy-Optimization.patch @@ -0,0 +1,2023 @@ +From b028277018946868838a82a08211071bc231a175 Mon Sep 17 00:00:00 2001 +From: Ji Juntao +Date: Tue, 29 Aug 2023 16:13:38 +0800 +Subject: [PATCH] [ISSUE #6567] [RIP-63] Queue Selection Strategy Optimization + (#6568) + +Optimize the proxy's and client's selection strategy for brokers when sending messages, and use multiple selection strategies as a pipeline to filter suitable queues. +--- + .../apache/rocketmq/client/ClientConfig.java | 54 +++++ + .../client/common/ThreadLocalIndex.java | 8 + + .../rocketmq/client/impl/MQClientAPIImpl.java | 12 +- + .../client/impl/factory/MQClientInstance.java | 7 + + .../impl/producer/DefaultMQProducerImpl.java | 87 ++++++-- + .../impl/producer/TopicPublishInfo.java | 40 ++++ + .../client/latency/LatencyFaultTolerance.java | 66 +++++- + .../latency/LatencyFaultToleranceImpl.java | 189 ++++++++++++++---- + .../client/latency/MQFaultStrategy.java | 155 ++++++++++---- + .../rocketmq/client/latency/Resolver.java | 17 +- + .../client/latency/ServiceDetector.java | 30 +++ + .../LatencyFaultToleranceImplTest.java | 36 +++- + .../processor/DefaultRequestProcessor.java | 24 --- + .../rocketmq/proxy/config/ProxyConfig.java | 46 +++++ + .../grpc/v2/producer/SendMessageActivity.java | 2 +- + .../proxy/processor/ProducerProcessor.java | 18 +- + .../service/route/LocalTopicRouteService.java | 2 +- + .../service/route/MessageQueueSelector.java | 95 ++++++++- + .../proxy/service/route/MessageQueueView.java | 18 +- + .../service/route/TopicRouteService.java | 80 +++++++- + .../consumer/ReceiveMessageActivityTest.java | 5 +- + .../v2/producer/SendMessageActivityTest.java | 82 +++++++- + .../proxy/service/BaseServiceTest.java | 4 +- + .../route/MessageQueueSelectorTest.java | 8 +- + .../sysmessage/HeartbeatSyncerTest.java | 2 +- + .../ClusterTransactionServiceTest.java | 8 +- + 26 files changed, 919 insertions(+), 176 deletions(-) + rename remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java => client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java (65%) + create mode 100644 client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java + +diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +index f87450f66..bb0fe3522 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java ++++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +@@ -38,6 +38,8 @@ public class ClientConfig { + public static final String SOCKS_PROXY_CONFIG = "com.rocketmq.socks.proxy.config"; + public static final String DECODE_READ_BODY = "com.rocketmq.read.body"; + public static final String DECODE_DECOMPRESS_BODY = "com.rocketmq.decompress.body"; ++ public static final String SEND_LATENCY_ENABLE = "com.rocketmq.sendLatencyEnable"; ++ public static final String START_DETECTOR_ENABLE = "com.rocketmq.startDetectorEnable"; + public static final String HEART_BEAT_V2 = "com.rocketmq.heartbeat.v2"; + private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses(); + private String clientIP = NetworkUtil.getLocalAddress(); +@@ -72,6 +74,8 @@ public class ClientConfig { + private String socksProxyConfig = System.getProperty(SOCKS_PROXY_CONFIG, "{}"); + + private int mqClientApiTimeout = 3 * 1000; ++ private int detectTimeout = 200; ++ private int detectInterval = 2 * 1000; + + private LanguageCode language = LanguageCode.JAVA; + +@@ -81,6 +85,15 @@ public class ClientConfig { + */ + protected boolean enableStreamRequestType = false; + ++ /** ++ * Enable the fault tolerance mechanism of the client sending process. ++ * DO NOT OPEN when ORDER messages are required. ++ * Turning on will interfere with the queue selection functionality, ++ * possibly conflicting with the order message. ++ */ ++ private boolean sendLatencyEnable = Boolean.parseBoolean(System.getProperty(SEND_LATENCY_ENABLE, "false")); ++ private boolean startDetectorEnable = Boolean.parseBoolean(System.getProperty(START_DETECTOR_ENABLE, "false")); ++ + public String buildMQClientId() { + StringBuilder sb = new StringBuilder(); + sb.append(this.getClientIP()); +@@ -186,6 +199,10 @@ public class ClientConfig { + this.decodeDecompressBody = cc.decodeDecompressBody; + this.enableStreamRequestType = cc.enableStreamRequestType; + this.useHeartbeatV2 = cc.useHeartbeatV2; ++ this.startDetectorEnable = cc.startDetectorEnable; ++ this.sendLatencyEnable = cc.sendLatencyEnable; ++ this.detectInterval = cc.detectInterval; ++ this.detectTimeout = cc.detectTimeout; + } + + public ClientConfig cloneClientConfig() { +@@ -210,6 +227,10 @@ public class ClientConfig { + cc.decodeDecompressBody = decodeDecompressBody; + cc.enableStreamRequestType = enableStreamRequestType; + cc.useHeartbeatV2 = useHeartbeatV2; ++ cc.startDetectorEnable = startDetectorEnable; ++ cc.sendLatencyEnable = sendLatencyEnable; ++ cc.detectInterval = detectInterval; ++ cc.detectTimeout = detectTimeout; + return cc; + } + +@@ -381,6 +402,38 @@ public class ClientConfig { + this.enableStreamRequestType = enableStreamRequestType; + } + ++ public boolean isSendLatencyEnable() { ++ return sendLatencyEnable; ++ } ++ ++ public void setSendLatencyEnable(boolean sendLatencyEnable) { ++ this.sendLatencyEnable = sendLatencyEnable; ++ } ++ ++ public boolean isStartDetectorEnable() { ++ return startDetectorEnable; ++ } ++ ++ public void setStartDetectorEnable(boolean startDetectorEnable) { ++ this.startDetectorEnable = startDetectorEnable; ++ } ++ ++ public int getDetectTimeout() { ++ return this.detectTimeout; ++ } ++ ++ public void setDetectTimeout(int detectTimeout) { ++ this.detectTimeout = detectTimeout; ++ } ++ ++ public int getDetectInterval() { ++ return this.detectInterval; ++ } ++ ++ public void setDetectInterval(int detectInterval) { ++ this.detectInterval = detectInterval; ++ } ++ + public boolean isUseHeartbeatV2() { + return useHeartbeatV2; + } +@@ -403,6 +456,7 @@ public class ClientConfig { + + ", socksProxyConfig=" + socksProxyConfig + ", language=" + language.name() + + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout + + ", decodeReadBody=" + decodeReadBody + ", decodeDecompressBody=" + decodeDecompressBody ++ + ", sendLatencyEnable=" + sendLatencyEnable + ", startDetectorEnable=" + startDetectorEnable + + ", enableStreamRequestType=" + enableStreamRequestType + ", useHeartbeatV2=" + useHeartbeatV2 + "]"; + } + } +diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java +index 4a3d90135..3a086c13d 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java ++++ b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java +@@ -33,6 +33,14 @@ public class ThreadLocalIndex { + return index & POSITIVE_MASK; + } + ++ public void reset() { ++ int index = Math.abs(random.nextInt(Integer.MAX_VALUE)); ++ if (index < 0) { ++ index = 0; ++ } ++ this.threadLocalIndex.set(index); ++ } ++ + @Override + public String toString() { + return "ThreadLocalIndex{" + +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +index 213c26fd6..3201a493f 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +@@ -666,7 +666,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { + } catch (Throwable e) { + } + +- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); ++ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true); + return; + } + +@@ -684,14 +684,14 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { + } catch (Throwable e) { + } + +- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); ++ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true); + } catch (Exception e) { +- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); ++ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true); + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, e, context, false, producer); + } + } else { +- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); ++ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true); + if (!responseFuture.isSendRequestOK()) { + MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, +@@ -711,7 +711,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { + }); + } catch (Exception ex) { + long cost = System.currentTimeMillis() - beginStartTime; +- producer.updateFaultItem(brokerName, cost, true); ++ producer.updateFaultItem(brokerName, cost, true, false); + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, ex, context, true, producer); + } +@@ -735,7 +735,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { + if (needRetry && tmp <= timesTotal) { + String retryBrokerName = brokerName;//by default, it will send to the same broker + if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send +- MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName); ++ MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName, false); + retryBrokerName = instance.getBrokerNameFromMessageQueue(mqChosen); + } + String addr = instance.findBrokerAddressInPublish(retryBrokerName); +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +index 8851bc815..9484b26f8 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; ++import java.util.concurrent.ThreadFactory; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.locks.Lock; +@@ -125,6 +126,12 @@ public class MQClientInstance { + private final Set brokerSupportV2HeartbeatSet = new HashSet(); + private final ConcurrentMap brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap(); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread")); ++ private final ScheduledExecutorService fetchRemoteConfigExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { ++ @Override ++ public Thread newThread(Runnable r) { ++ return new Thread(r, "MQClientFactoryFetchRemoteConfigScheduledThread"); ++ } ++ }); + private final PullMessageService pullMessageService; + private final RebalanceService rebalanceService; + private final DefaultMQProducer defaultMQProducer; +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +index 3f4c6e5f7..bbbb17b07 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +@@ -33,6 +33,8 @@ import java.util.concurrent.RejectedExecutionException; + import java.util.concurrent.Semaphore; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; ++ ++import com.google.common.base.Optional; + import org.apache.rocketmq.client.QueryResult; + import org.apache.rocketmq.client.Validators; + import org.apache.rocketmq.client.common.ClientErrorCode; +@@ -49,6 +51,8 @@ import org.apache.rocketmq.client.impl.CommunicationMode; + import org.apache.rocketmq.client.impl.MQClientManager; + import org.apache.rocketmq.client.impl.factory.MQClientInstance; + import org.apache.rocketmq.client.latency.MQFaultStrategy; ++import org.apache.rocketmq.client.latency.Resolver; ++import org.apache.rocketmq.client.latency.ServiceDetector; + import org.apache.rocketmq.client.producer.DefaultMQProducer; + import org.apache.rocketmq.client.producer.LocalTransactionExecuter; + import org.apache.rocketmq.client.producer.LocalTransactionState; +@@ -112,7 +116,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { + private ServiceState serviceState = ServiceState.CREATE_JUST; + private MQClientInstance mQClientFactory; + private ArrayList checkForbiddenHookList = new ArrayList<>(); +- private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); ++ private MQFaultStrategy mqFaultStrategy; + private ExecutorService asyncSenderExecutor; + + // compression related +@@ -153,8 +157,38 @@ public class DefaultMQProducerImpl implements MQProducerInner { + semaphoreAsyncSendSize = new Semaphore(1024 * 1024, true); + log.info("semaphoreAsyncSendSize can not be smaller than 1M."); + } +- } + ++ ServiceDetector serviceDetector = new ServiceDetector() { ++ @Override ++ public boolean detect(String endpoint, long timeoutMillis) { ++ Optional candidateTopic = pickTopic(); ++ if (!candidateTopic.isPresent()) { ++ return false; ++ } ++ try { ++ MessageQueue mq = new MessageQueue(candidateTopic.get(), null, 0); ++ mQClientFactory.getMQClientAPIImpl() ++ .getMaxOffset(endpoint, mq, timeoutMillis); ++ return true; ++ } catch (Exception e) { ++ return false; ++ } ++ } ++ }; ++ ++ this.mqFaultStrategy = new MQFaultStrategy(defaultMQProducer.cloneClientConfig(), new Resolver() { ++ @Override ++ public String resolve(String name) { ++ return DefaultMQProducerImpl.this.mQClientFactory.findBrokerAddressInPublish(name); ++ } ++ }, serviceDetector); ++ } ++ private Optional pickTopic() { ++ if (topicPublishInfoTable.isEmpty()) { ++ return Optional.absent(); ++ } ++ return Optional.of(topicPublishInfoTable.keySet().iterator().next()); ++ } + public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) { + this.checkForbiddenHookList.add(checkForbiddenHook); + log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(), +@@ -229,6 +263,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { + mQClientFactory.start(); + } + ++ if (this.mqFaultStrategy.isStartDetectorEnable()) { ++ this.mqFaultStrategy.startDetector(); ++ } ++ + log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), + this.defaultMQProducer.isSendMessageWithVIPChannel()); + this.serviceState = ServiceState.RUNNING; +@@ -273,6 +311,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { + if (shutdownFactory) { + this.mQClientFactory.shutdown(); + } ++ if (this.mqFaultStrategy.isStartDetectorEnable()) { ++ this.mqFaultStrategy.shutdown(); ++ } + RequestFutureHolder.getInstance().shutdown(this); + log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); + this.serviceState = ServiceState.SHUTDOWN_ALREADY; +@@ -574,7 +615,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { + } + + public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg, +- final long timeout) throws MQClientException, RemotingTooMuchRequestException { ++ final long timeout) throws MQClientException, RemotingTooMuchRequestException { + long beginStartTime = System.currentTimeMillis(); + this.makeSureStateOK(); + Validators.checkMessage(msg, this.defaultMQProducer); +@@ -584,7 +625,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { + MessageQueue mq = null; + try { + List messageQueueList = +- mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); ++ mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); + Message userMessage = MessageAccessor.cloneMessage(msg); + String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); + userMessage.setTopic(userTopic); +@@ -609,12 +650,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { + throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); + } + +- public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { +- return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); ++ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) { ++ return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName, resetIndex); + } + +- public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { +- this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); ++ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation, ++ boolean reachable) { ++ this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable); + } + + private void validateNameServerSetting() throws MQClientException { +@@ -647,9 +689,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { + int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; + int times = 0; + String[] brokersSent = new String[timesTotal]; ++ boolean resetIndex = false; + for (; times < timesTotal; times++) { + String lastBrokerName = null == mq ? null : mq.getBrokerName(); +- MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); ++ if (times > 0) { ++ resetIndex = true; ++ } ++ MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex); + if (mqSelected != null) { + mq = mqSelected; + brokersSent[times] = mq.getBrokerName(); +@@ -667,7 +713,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { + + sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); + endTimestamp = System.currentTimeMillis(); +- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); ++ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true); + switch (communicationMode) { + case ASYNC: + return null; +@@ -684,9 +730,22 @@ public class DefaultMQProducerImpl implements MQProducerInner { + default: + break; + } +- } catch (RemotingException | MQClientException e) { ++ } catch (MQClientException e) { + endTimestamp = System.currentTimeMillis(); +- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); ++ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true); ++ log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); ++ log.warn(msg.toString()); ++ exception = e; ++ continue; ++ } catch (RemotingException e) { ++ endTimestamp = System.currentTimeMillis(); ++ if (this.mqFaultStrategy.isStartDetectorEnable()) { ++ // Set this broker unreachable when detecting schedule task is running for RemotingException. ++ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false); ++ } else { ++ // Otherwise, isolate this broker. ++ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, true); ++ } + log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); + if (log.isDebugEnabled()) { + log.debug(msg.toString()); +@@ -695,7 +754,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { + continue; + } catch (MQBrokerException e) { + endTimestamp = System.currentTimeMillis(); +- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); ++ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false); + log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); + if (log.isDebugEnabled()) { + log.debug(msg.toString()); +@@ -712,7 +771,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { + } + } catch (InterruptedException e) { + endTimestamp = System.currentTimeMillis(); +- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); ++ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true); + log.warn("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); + if (log.isDebugEnabled()) { + log.debug(msg.toString()); +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java +index 275ada7ac..37b1f3252 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java +@@ -18,6 +18,8 @@ package org.apache.rocketmq.client.impl.producer; + + import java.util.ArrayList; + import java.util.List; ++ ++import com.google.common.base.Preconditions; + import org.apache.rocketmq.client.common.ThreadLocalIndex; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.remoting.protocol.route.QueueData; +@@ -30,6 +32,10 @@ public class TopicPublishInfo { + private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); + private TopicRouteData topicRouteData; + ++ public interface QueueFilter { ++ boolean filter(MessageQueue mq); ++ } ++ + public boolean isOrderTopic() { + return orderTopic; + } +@@ -66,6 +72,40 @@ public class TopicPublishInfo { + this.haveTopicRouterInfo = haveTopicRouterInfo; + } + ++ public MessageQueue selectOneMessageQueue(QueueFilter ...filter) { ++ return selectOneMessageQueue(this.messageQueueList, this.sendWhichQueue, filter); ++ } ++ ++ private MessageQueue selectOneMessageQueue(List messageQueueList, ThreadLocalIndex sendQueue, QueueFilter ...filter) { ++ if (messageQueueList == null || messageQueueList.isEmpty()) { ++ return null; ++ } ++ ++ if (filter != null && filter.length != 0) { ++ for (int i = 0; i < messageQueueList.size(); i++) { ++ int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); ++ MessageQueue mq = messageQueueList.get(index); ++ boolean filterResult = true; ++ for (QueueFilter f: filter) { ++ Preconditions.checkNotNull(f); ++ filterResult &= f.filter(mq); ++ } ++ if (filterResult) { ++ return mq; ++ } ++ } ++ ++ return null; ++ } ++ ++ int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); ++ return messageQueueList.get(index); ++ } ++ ++ public void resetIndex() { ++ this.sendWhichQueue.reset(); ++ } ++ + public MessageQueue selectOneMessageQueue(final String lastBrokerName) { + if (lastBrokerName == null) { + return selectOneMessageQueue(); +diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java +index 09a8aa461..72d2f3450 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java ++++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java +@@ -18,11 +18,75 @@ + package org.apache.rocketmq.client.latency; + + public interface LatencyFaultTolerance { +- void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration); ++ /** ++ * Update brokers' states, to decide if they are good or not. ++ * ++ * @param name Broker's name. ++ * @param currentLatency Current message sending process's latency. ++ * @param notAvailableDuration Corresponding not available time, ms. The broker will be not available until it ++ * spends such time. ++ * @param reachable To decide if this broker is reachable or not. ++ */ ++ void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration, ++ final boolean reachable); + ++ /** ++ * To check if this broker is available. ++ * ++ * @param name Broker's name. ++ * @return boolean variable, if this is true, then the broker is available. ++ */ + boolean isAvailable(final T name); + ++ /** ++ * To check if this broker is reachable. ++ * ++ * @param name Broker's name. ++ * @return boolean variable, if this is true, then the broker is reachable. ++ */ ++ boolean isReachable(final T name); ++ ++ /** ++ * Remove the broker in this fault item table. ++ * ++ * @param name broker's name. ++ */ + void remove(final T name); + ++ /** ++ * The worst situation, no broker can be available. Then choose random one. ++ * ++ * @return A random mq will be returned. ++ */ + T pickOneAtLeast(); ++ ++ /** ++ * Start a new thread, to detect the broker's reachable tag. ++ */ ++ void startDetector(); ++ ++ /** ++ * Shutdown threads that started by LatencyFaultTolerance. ++ */ ++ void shutdown(); ++ ++ /** ++ * A function reserved, just detect by once, won't create a new thread. ++ */ ++ void detectByOneRound(); ++ ++ /** ++ * Use it to set the detect timeout bound. ++ * ++ * @param detectTimeout timeout bound ++ */ ++ void setDetectTimeout(final int detectTimeout); ++ ++ /** ++ * Use it to set the detector's detector interval for each broker (each broker will be detected once during this ++ * time) ++ * ++ * @param detectInterval each broker's detecting interval ++ */ ++ void setDetectInterval(final int detectInterval); + } +diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java +index 93795d957..8af629574 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java +@@ -21,30 +21,97 @@ import java.util.Collections; + import java.util.Enumeration; + import java.util.LinkedList; + import java.util.List; ++import java.util.Map; + import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.Executors; ++import java.util.concurrent.ScheduledExecutorService; ++import java.util.concurrent.ThreadFactory; ++import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.client.common.ThreadLocalIndex; ++import org.apache.rocketmq.logging.org.slf4j.Logger; ++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + + public class LatencyFaultToleranceImpl implements LatencyFaultTolerance { +- private final ConcurrentHashMap faultItemTable = new ConcurrentHashMap<>(16); ++ private final static Logger log = LoggerFactory.getLogger(MQFaultStrategy.class); ++ private final ConcurrentHashMap faultItemTable = new ConcurrentHashMap(16); ++ private int detectTimeout = 200; ++ private int detectInterval = 2000; ++ private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(); ++ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { ++ @Override ++ public Thread newThread(Runnable r) { ++ return new Thread(r, "LatencyFaultToleranceScheduledThread"); ++ } ++ }); + +- private final ThreadLocalIndex randomItem = new ThreadLocalIndex(); ++ private final Resolver resolver; ++ ++ private final ServiceDetector serviceDetector; ++ ++ public LatencyFaultToleranceImpl(Resolver resolver, ServiceDetector serviceDetector) { ++ this.resolver = resolver; ++ this.serviceDetector = serviceDetector; ++ } ++ ++ public void detectByOneRound() { ++ for (Map.Entry item : this.faultItemTable.entrySet()) { ++ FaultItem brokerItem = item.getValue(); ++ if (System.currentTimeMillis() - brokerItem.checkStamp >= 0) { ++ brokerItem.checkStamp = System.currentTimeMillis() + this.detectInterval; ++ String brokerAddr = resolver.resolve(brokerItem.getName()); ++ if (brokerAddr == null) { ++ faultItemTable.remove(item.getKey()); ++ continue; ++ } ++ if (null == serviceDetector) { ++ continue; ++ } ++ boolean serviceOK = serviceDetector.detect(brokerAddr, detectTimeout); ++ if (serviceOK && !brokerItem.reachableFlag) { ++ log.info(brokerItem.name + " is reachable now, then it can be used."); ++ brokerItem.reachableFlag = true; ++ } ++ } ++ } ++ } ++ ++ public void startDetector() { ++ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ++ @Override ++ public void run() { ++ try { ++ detectByOneRound(); ++ } catch (Exception e) { ++ log.warn("Unexpected exception raised while detecting service reachability", e); ++ } ++ } ++ }, 3, 3, TimeUnit.SECONDS); ++ } ++ ++ public void shutdown() { ++ this.scheduledExecutorService.shutdown(); ++ } + + @Override +- public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { ++ public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration, ++ final boolean reachable) { + FaultItem old = this.faultItemTable.get(name); + if (null == old) { + final FaultItem faultItem = new FaultItem(name); + faultItem.setCurrentLatency(currentLatency); +- faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); +- ++ faultItem.updateNotAvailableDuration(notAvailableDuration); ++ faultItem.setReachable(reachable); + old = this.faultItemTable.putIfAbsent(name, faultItem); +- if (old != null) { +- old.setCurrentLatency(currentLatency); +- old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); +- } +- } else { ++ } ++ ++ if (null != old) { + old.setCurrentLatency(currentLatency); +- old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); ++ old.updateNotAvailableDuration(notAvailableDuration); ++ old.setReachable(reachable); ++ } ++ ++ if (!reachable) { ++ log.info(name + " is unreachable, it will not be used until it's reachable"); + } + } + +@@ -57,6 +124,14 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance + return true; + } + ++ public boolean isReachable(final String name) { ++ final FaultItem faultItem = this.faultItemTable.get(name); ++ if (faultItem != null) { ++ return faultItem.isReachable(); ++ } ++ return true; ++ } ++ + @Override + public void remove(final String name) { + this.faultItemTable.remove(name); +@@ -65,68 +140,98 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance + @Override + public String pickOneAtLeast() { + final Enumeration elements = this.faultItemTable.elements(); +- List tmpList = new LinkedList<>(); ++ List tmpList = new LinkedList(); + while (elements.hasMoreElements()) { + final FaultItem faultItem = elements.nextElement(); + tmpList.add(faultItem); + } ++ + if (!tmpList.isEmpty()) { +- Collections.sort(tmpList); +- final int half = tmpList.size() / 2; +- if (half <= 0) { +- return tmpList.get(0).getName(); +- } else { +- final int i = this.randomItem.incrementAndGet() % half; +- return tmpList.get(i).getName(); ++ Collections.shuffle(tmpList); ++ for (FaultItem faultItem : tmpList) { ++ if (faultItem.reachableFlag) { ++ return faultItem.name; ++ } + } + } ++ + return null; + } + + @Override + public String toString() { + return "LatencyFaultToleranceImpl{" + +- "faultItemTable=" + faultItemTable + +- ", whichItemWorst=" + randomItem + +- '}'; ++ "faultItemTable=" + faultItemTable + ++ ", whichItemWorst=" + whichItemWorst + ++ '}'; ++ } ++ ++ public void setDetectTimeout(final int detectTimeout) { ++ this.detectTimeout = detectTimeout; + } + +- class FaultItem implements Comparable { ++ public void setDetectInterval(final int detectInterval) { ++ this.detectInterval = detectInterval; ++ } ++ ++ public class FaultItem implements Comparable { + private final String name; + private volatile long currentLatency; + private volatile long startTimestamp; ++ private volatile long checkStamp; ++ private volatile boolean reachableFlag; + + public FaultItem(final String name) { + this.name = name; + } + ++ public void updateNotAvailableDuration(long notAvailableDuration) { ++ if (notAvailableDuration > 0 && System.currentTimeMillis() + notAvailableDuration > this.startTimestamp) { ++ this.startTimestamp = System.currentTimeMillis() + notAvailableDuration; ++ log.info(name + " will be isolated for " + notAvailableDuration + " ms."); ++ } ++ } ++ + @Override + public int compareTo(final FaultItem other) { + if (this.isAvailable() != other.isAvailable()) { +- if (this.isAvailable()) ++ if (this.isAvailable()) { + return -1; ++ } + +- if (other.isAvailable()) ++ if (other.isAvailable()) { + return 1; ++ } + } + +- if (this.currentLatency < other.currentLatency) ++ if (this.currentLatency < other.currentLatency) { + return -1; +- else if (this.currentLatency > other.currentLatency) { ++ } else if (this.currentLatency > other.currentLatency) { + return 1; + } + +- if (this.startTimestamp < other.startTimestamp) ++ if (this.startTimestamp < other.startTimestamp) { + return -1; +- else if (this.startTimestamp > other.startTimestamp) { ++ } else if (this.startTimestamp > other.startTimestamp) { + return 1; + } +- + return 0; + } + ++ public void setReachable(boolean reachableFlag) { ++ this.reachableFlag = reachableFlag; ++ } ++ ++ public void setCheckStamp(long checkStamp) { ++ this.checkStamp = checkStamp; ++ } ++ + public boolean isAvailable() { +- return (System.currentTimeMillis() - startTimestamp) >= 0; ++ return reachableFlag && System.currentTimeMillis() >= startTimestamp; ++ } ++ ++ public boolean isReachable() { ++ return reachableFlag; + } + + @Override +@@ -139,28 +244,32 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance + + @Override + public boolean equals(final Object o) { +- if (this == o) ++ if (this == o) { + return true; +- if (!(o instanceof FaultItem)) ++ } ++ if (!(o instanceof FaultItem)) { + return false; ++ } + + final FaultItem faultItem = (FaultItem) o; + +- if (getCurrentLatency() != faultItem.getCurrentLatency()) ++ if (getCurrentLatency() != faultItem.getCurrentLatency()) { + return false; +- if (getStartTimestamp() != faultItem.getStartTimestamp()) ++ } ++ if (getStartTimestamp() != faultItem.getStartTimestamp()) { + return false; ++ } + return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null; +- + } + + @Override + public String toString() { + return "FaultItem{" + +- "name='" + name + '\'' + +- ", currentLatency=" + currentLatency + +- ", startTimestamp=" + startTimestamp + +- '}'; ++ "name='" + name + '\'' + ++ ", currentLatency=" + currentLatency + ++ ", startTimestamp=" + startTimestamp + ++ ", reachableFlag=" + reachableFlag + ++ '}'; + } + + public String getName() { +diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +index 1e1953fad..c01490784 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java ++++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +@@ -17,25 +17,86 @@ + + package org.apache.rocketmq.client.latency; + +-import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.client.ClientConfig; + import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; ++import org.apache.rocketmq.client.impl.producer.TopicPublishInfo.QueueFilter; + import org.apache.rocketmq.common.message.MessageQueue; +-import org.apache.rocketmq.logging.org.slf4j.Logger; +-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + + public class MQFaultStrategy { +- private final static Logger log = LoggerFactory.getLogger(MQFaultStrategy.class); +- private final LatencyFaultTolerance latencyFaultTolerance = new LatencyFaultToleranceImpl(); ++ private LatencyFaultTolerance latencyFaultTolerance; ++ private boolean sendLatencyFaultEnable; ++ private boolean startDetectorEnable; ++ private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L}; ++ private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L, 10000L, 30000L}; + +- private boolean sendLatencyFaultEnable = false; ++ public static class BrokerFilter implements QueueFilter { ++ private String lastBrokerName; ++ ++ public void setLastBrokerName(String lastBrokerName) { ++ this.lastBrokerName = lastBrokerName; ++ } ++ ++ @Override public boolean filter(MessageQueue mq) { ++ if (lastBrokerName != null) { ++ return !mq.getBrokerName().equals(lastBrokerName); ++ } ++ return true; ++ } ++ } ++ ++ private ThreadLocal threadBrokerFilter = new ThreadLocal() { ++ @Override protected BrokerFilter initialValue() { ++ return new BrokerFilter(); ++ } ++ }; ++ ++ private QueueFilter reachableFilter = new QueueFilter() { ++ @Override public boolean filter(MessageQueue mq) { ++ return latencyFaultTolerance.isReachable(mq.getBrokerName()); ++ } ++ }; ++ ++ private QueueFilter availableFilter = new QueueFilter() { ++ @Override public boolean filter(MessageQueue mq) { ++ return latencyFaultTolerance.isAvailable(mq.getBrokerName()); ++ } ++ }; ++ ++ ++ public MQFaultStrategy(ClientConfig cc, Resolver fetcher, ServiceDetector serviceDetector) { ++ this.setStartDetectorEnable(cc.isStartDetectorEnable()); ++ this.setSendLatencyFaultEnable(cc.isSendLatencyEnable()); ++ this.latencyFaultTolerance = new LatencyFaultToleranceImpl(fetcher, serviceDetector); ++ this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval()); ++ this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout()); ++ } ++ ++ // For unit test. ++ public MQFaultStrategy(ClientConfig cc, LatencyFaultTolerance tolerance) { ++ this.setStartDetectorEnable(cc.isStartDetectorEnable()); ++ this.setSendLatencyFaultEnable(cc.isSendLatencyEnable()); ++ this.latencyFaultTolerance = tolerance; ++ this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval()); ++ this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout()); ++ } + +- private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; +- private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; + + public long[] getNotAvailableDuration() { + return notAvailableDuration; + } + ++ public QueueFilter getAvailableFilter() { ++ return availableFilter; ++ } ++ ++ public QueueFilter getReachableFilter() { ++ return reachableFilter; ++ } ++ ++ public ThreadLocal getThreadBrokerFilter() { ++ return threadBrokerFilter; ++ } ++ + public void setNotAvailableDuration(final long[] notAvailableDuration) { + this.notAvailableDuration = notAvailableDuration; + } +@@ -56,51 +117,69 @@ public class MQFaultStrategy { + this.sendLatencyFaultEnable = sendLatencyFaultEnable; + } + +- public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { ++ public boolean isStartDetectorEnable() { ++ return startDetectorEnable; ++ } ++ ++ public void setStartDetectorEnable(boolean startDetectorEnable) { ++ this.startDetectorEnable = startDetectorEnable; ++ } ++ ++ public void startDetector() { ++ // user should start the detector ++ // and the thread should not be in running state. ++ if (this.sendLatencyFaultEnable && this.startDetectorEnable) { ++ // start the detector. ++ this.latencyFaultTolerance.startDetector(); ++ } ++ } ++ ++ public void shutdown() { ++ if (this.sendLatencyFaultEnable && this.startDetectorEnable) { ++ this.latencyFaultTolerance.shutdown(); ++ } ++ } ++ ++ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) { ++ BrokerFilter brokerFilter = threadBrokerFilter.get(); ++ brokerFilter.setLastBrokerName(lastBrokerName); + if (this.sendLatencyFaultEnable) { +- try { +- int index = tpInfo.getSendWhichQueue().incrementAndGet(); +- for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { +- int pos = index++ % tpInfo.getMessageQueueList().size(); +- MessageQueue mq = tpInfo.getMessageQueueList().get(pos); +- if (!StringUtils.equals(lastBrokerName, mq.getBrokerName()) && latencyFaultTolerance.isAvailable(mq.getBrokerName())) { +- return mq; +- } +- } +- +- final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); +- int writeQueueNums = tpInfo.getWriteQueueNumsByBroker(notBestBroker); +- if (writeQueueNums > 0) { +- final MessageQueue mq = tpInfo.selectOneMessageQueue(); +- if (notBestBroker != null) { +- mq.setBrokerName(notBestBroker); +- mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); +- } +- return mq; +- } else { +- latencyFaultTolerance.remove(notBestBroker); +- } +- } catch (Exception e) { +- log.error("Error occurred when selecting message queue", e); ++ if (resetIndex) { ++ tpInfo.resetIndex(); ++ } ++ MessageQueue mq = tpInfo.selectOneMessageQueue(availableFilter, brokerFilter); ++ if (mq != null) { ++ return mq; ++ } ++ ++ mq = tpInfo.selectOneMessageQueue(reachableFilter, brokerFilter); ++ if (mq != null) { ++ return mq; + } + + return tpInfo.selectOneMessageQueue(); + } + +- return tpInfo.selectOneMessageQueue(lastBrokerName); ++ MessageQueue mq = tpInfo.selectOneMessageQueue(brokerFilter); ++ if (mq != null) { ++ return mq; ++ } ++ return tpInfo.selectOneMessageQueue(); + } + +- public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { ++ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation, ++ final boolean reachable) { + if (this.sendLatencyFaultEnable) { +- long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); +- this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); ++ long duration = computeNotAvailableDuration(isolation ? 10000 : currentLatency); ++ this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration, reachable); + } + } + + private long computeNotAvailableDuration(final long currentLatency) { + for (int i = latencyMax.length - 1; i >= 0; i--) { +- if (currentLatency >= latencyMax[i]) ++ if (currentLatency >= latencyMax[i]) { + return this.notAvailableDuration[i]; ++ } + } + + return 0; +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java b/client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java +similarity index 65% +rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java +rename to client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java +index 6aa547047..1c29ba334 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java ++++ b/client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java +@@ -14,20 +14,9 @@ + * See the License for the specific language governing permissions and + * limitations under the License. + */ +-package org.apache.rocketmq.remoting.protocol.body; ++package org.apache.rocketmq.client.latency; + +-import java.util.ArrayList; +-import java.util.List; +-import org.apache.rocketmq.remoting.protocol.RemotingSerializable; ++public interface Resolver { + +-public class GetRemoteClientConfigBody extends RemotingSerializable { +- private List keys = new ArrayList<>(); +- +- public List getKeys() { +- return keys; +- } +- +- public void setKeys(List keys) { +- this.keys = keys; +- } ++ String resolve(String name); + } +diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java b/client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java +new file mode 100644 +index 000000000..c6ffbad1c +--- /dev/null ++++ b/client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java +@@ -0,0 +1,30 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.client.latency; ++ ++/** ++ * Detect whether the remote service state is normal. ++ */ ++public interface ServiceDetector { ++ ++ /** ++ * Check if the remote service is normal. ++ * @param endpoint Service endpoint to check against ++ * @return true if the service is back to normal; false otherwise. ++ */ ++ boolean detect(String endpoint, long timeoutMillis); ++} +diff --git a/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java b/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java +index 86690e40b..42ccdae5a 100644 +--- a/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java ++++ b/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java +@@ -16,11 +16,14 @@ + */ + package org.apache.rocketmq.client.latency; + +-import java.util.concurrent.TimeUnit; ++import org.awaitility.core.ThrowingRunnable; + import org.junit.Before; + import org.junit.Test; + ++import java.util.concurrent.TimeUnit; ++ + import static org.assertj.core.api.Assertions.assertThat; ++import static org.awaitility.Awaitility.await; + + public class LatencyFaultToleranceImplTest { + private LatencyFaultTolerance latencyFaultTolerance; +@@ -29,28 +32,31 @@ public class LatencyFaultToleranceImplTest { + + @Before + public void init() { +- latencyFaultTolerance = new LatencyFaultToleranceImpl(); ++ latencyFaultTolerance = new LatencyFaultToleranceImpl(null, null); + } + + @Test + public void testUpdateFaultItem() throws Exception { +- latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000); ++ latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000, true); + assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse(); + assertThat(latencyFaultTolerance.isAvailable(anotherBrokerName)).isTrue(); + } + + @Test + public void testIsAvailable() throws Exception { +- latencyFaultTolerance.updateFaultItem(brokerName, 3000, 50); ++ latencyFaultTolerance.updateFaultItem(brokerName, 3000, 50, true); + assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse(); + +- TimeUnit.MILLISECONDS.sleep(70); +- assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue(); ++ await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(new ThrowingRunnable() { ++ @Override public void run() throws Throwable { ++ assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue(); ++ } ++ }); + } + + @Test + public void testRemove() throws Exception { +- latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000); ++ latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000, true); + assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse(); + latencyFaultTolerance.remove(brokerName); + assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue(); +@@ -58,10 +64,20 @@ public class LatencyFaultToleranceImplTest { + + @Test + public void testPickOneAtLeast() throws Exception { +- latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000); ++ latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000, true); + assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName); + +- latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000); +- assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName); ++ // Bad case, since pickOneAtLeast's behavior becomes random ++ // latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000, "127.0.0.1:12011", true); ++ // assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName); ++ } ++ ++ @Test ++ public void testIsReachable() throws Exception { ++ latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000, true); ++ assertThat(latencyFaultTolerance.isReachable(brokerName)).isEqualTo(true); ++ ++ latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000, false); ++ assertThat(latencyFaultTolerance.isReachable(anotherBrokerName)).isEqualTo(false); + } + } +\ No newline at end of file +diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +index fada0efd7..485b95c42 100644 +--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java ++++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +@@ -41,7 +41,6 @@ import org.apache.rocketmq.remoting.protocol.RequestCode; + import org.apache.rocketmq.remoting.protocol.ResponseCode; + import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; + import org.apache.rocketmq.remoting.protocol.body.GetBrokerMemberGroupResponseBody; +-import org.apache.rocketmq.remoting.protocol.body.GetRemoteClientConfigBody; + import org.apache.rocketmq.remoting.protocol.body.RegisterBrokerBody; + import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; + import org.apache.rocketmq.remoting.protocol.body.TopicList; +@@ -132,8 +131,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { + return this.updateConfig(ctx, request); + case RequestCode.GET_NAMESRV_CONFIG: + return this.getConfig(ctx, request); +- case RequestCode.GET_CLIENT_CONFIG: +- return this.getClientConfigs(ctx, request); + default: + String error = " request type " + request.getCode() + " not supported"; + return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); +@@ -661,25 +658,4 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { + return response; + } + +- private RemotingCommand getClientConfigs(ChannelHandlerContext ctx, RemotingCommand request) { +- final RemotingCommand response = RemotingCommand.createResponseCommand(null); +- final GetRemoteClientConfigBody body = GetRemoteClientConfigBody.decode(request.getBody(), GetRemoteClientConfigBody.class); +- +- String content = this.namesrvController.getConfiguration().getClientConfigsFormatString(body.getKeys()); +- if (StringUtils.isNotBlank(content)) { +- try { +- response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); +- } catch (UnsupportedEncodingException e) { +- log.error("getConfig error, ", e); +- response.setCode(ResponseCode.SYSTEM_ERROR); +- response.setRemark("UnsupportedEncodingException " + e); +- return response; +- } +- } +- +- response.setCode(ResponseCode.SUCCESS); +- response.setRemark(null); +- return response; +- } +- + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +index 2994893d7..b2478fec3 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +@@ -232,6 +232,12 @@ public class ProxyConfig implements ConfigFile { + private String remotingAccessAddr = ""; + private int remotingListenPort = 8080; + ++ // related to proxy's send strategy in cluster mode. ++ private boolean sendLatencyEnable = false; ++ private boolean startDetectorEnable = false; ++ private int detectTimeout = 200; ++ private int detectInterval = 2 * 1000; ++ + private int remotingHeartbeatThreadPoolNums = 2 * PROCESSOR_NUMBER; + private int remotingTopicRouteThreadPoolNums = 2 * PROCESSOR_NUMBER; + private int remotingSendMessageThreadPoolNums = 4 * PROCESSOR_NUMBER; +@@ -1409,6 +1415,46 @@ public class ProxyConfig implements ConfigFile { + this.remotingWaitTimeMillsInDefaultQueue = remotingWaitTimeMillsInDefaultQueue; + } + ++ public boolean isSendLatencyEnable() { ++ return sendLatencyEnable; ++ } ++ ++ public boolean isStartDetectorEnable() { ++ return startDetectorEnable; ++ } ++ ++ public void setStartDetectorEnable(boolean startDetectorEnable) { ++ this.startDetectorEnable = startDetectorEnable; ++ } ++ ++ public void setSendLatencyEnable(boolean sendLatencyEnable) { ++ this.sendLatencyEnable = sendLatencyEnable; ++ } ++ ++ public boolean getStartDetectorEnable() { ++ return this.startDetectorEnable; ++ } ++ ++ public boolean getSendLatencyEnable() { ++ return this.sendLatencyEnable; ++ } ++ ++ public int getDetectTimeout() { ++ return detectTimeout; ++ } ++ ++ public void setDetectTimeout(int detectTimeout) { ++ this.detectTimeout = detectTimeout; ++ } ++ ++ public int getDetectInterval() { ++ return detectInterval; ++ } ++ ++ public void setDetectInterval(int detectInterval) { ++ this.detectInterval = detectInterval; ++ } ++ + public boolean isEnableBatchAck() { + return enableBatchAck; + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java +index 6146c80cd..f670df205 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java +@@ -382,7 +382,7 @@ public class SendMessageActivity extends AbstractMessingActivity { + int bucket = Hashing.consistentHash(shardingKey.hashCode(), writeQueues.size()); + targetMessageQueue = writeQueues.get(bucket); + } else { +- targetMessageQueue = messageQueueView.getWriteSelector().selectOne(false); ++ targetMessageQueue = messageQueueView.getWriteSelector().selectOneByPipeline(false); + } + return targetMessageQueue; + } catch (Exception e) { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java +index 0d0c62168..a80f6df0b 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java +@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.processor; + import java.util.List; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.ExecutorService; ++ + import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.client.producer.SendResult; + import org.apache.rocketmq.client.producer.SendStatus; +@@ -66,6 +67,8 @@ public class ProducerProcessor extends AbstractProcessor { + public CompletableFuture> sendMessage(ProxyContext ctx, QueueSelector queueSelector, + String producerGroup, int sysFlag, List messageList, long timeoutMillis) { + CompletableFuture> future = new CompletableFuture<>(); ++ long beginTimestampFirst = System.currentTimeMillis(); ++ AddressableMessageQueue messageQueue = null; + try { + Message message = messageList.get(0); + String topic = message.getTopic(); +@@ -79,7 +82,7 @@ public class ProducerProcessor extends AbstractProcessor { + } + } + } +- AddressableMessageQueue messageQueue = queueSelector.select(ctx, ++ messageQueue = queueSelector.select(ctx, + this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx, topic)); + if (messageQueue == null) { + throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no writable queue"); +@@ -90,6 +93,7 @@ public class ProducerProcessor extends AbstractProcessor { + } + SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId()); + ++ AddressableMessageQueue finalMessageQueue = messageQueue; + future = this.serviceManager.getMessageService().sendMessage( + ctx, + messageQueue, +@@ -102,11 +106,19 @@ public class ProducerProcessor extends AbstractProcessor { + if (SendStatus.SEND_OK.equals(sendResult.getSendStatus()) && + tranType == MessageSysFlag.TRANSACTION_PREPARED_TYPE && + StringUtils.isNotBlank(sendResult.getTransactionId())) { +- fillTransactionData(ctx, producerGroup, messageQueue, sendResult, messageList); ++ fillTransactionData(ctx, producerGroup, finalMessageQueue, sendResult, messageList); + } + } + return sendResultList; +- }, this.executor); ++ }, this.executor) ++ .whenComplete((result, exception) -> { ++ long endTimestamp = System.currentTimeMillis(); ++ if (exception != null) { ++ this.serviceManager.getTopicRouteService().updateFaultItem(finalMessageQueue.getBrokerName(), endTimestamp - beginTimestampFirst, true, false); ++ } else { ++ this.serviceManager.getTopicRouteService().updateFaultItem(finalMessageQueue.getBrokerName(),endTimestamp - beginTimestampFirst, false, true); ++ } ++ }); + } catch (Throwable t) { + future.completeExceptionally(t); + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java +index d67b68f38..aced15cee 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java +@@ -54,7 +54,7 @@ public class LocalTopicRouteService extends TopicRouteService { + @Override + public MessageQueueView getCurrentMessageQueueView(ProxyContext ctx, String topic) throws Exception { + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic); +- return new MessageQueueView(topic, toTopicRouteData(topicConfig)); ++ return new MessageQueueView(topic, toTopicRouteData(topicConfig), null); + } + + @Override +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java +index 85cd18d45..f25fb907e 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java +@@ -17,6 +17,7 @@ + package org.apache.rocketmq.proxy.service.route; + + import com.google.common.base.MoreObjects; ++import com.google.common.base.Preconditions; + import com.google.common.math.IntMath; + import java.util.ArrayList; + import java.util.Collections; +@@ -30,6 +31,8 @@ import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; ++import org.apache.rocketmq.client.latency.MQFaultStrategy; + import org.apache.rocketmq.common.constant.PermName; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.remoting.protocol.route.QueueData; +@@ -44,8 +47,9 @@ public class MessageQueueSelector { + private final Map brokerNameQueueMap = new ConcurrentHashMap<>(); + private final AtomicInteger queueIndex; + private final AtomicInteger brokerIndex; ++ private MQFaultStrategy mqFaultStrategy; + +- public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean read) { ++ public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, MQFaultStrategy mqFaultStrategy, boolean read) { + if (read) { + this.queues.addAll(buildRead(topicRouteWrapper)); + } else { +@@ -55,6 +59,7 @@ public class MessageQueueSelector { + Random random = new Random(); + this.queueIndex = new AtomicInteger(random.nextInt()); + this.brokerIndex = new AtomicInteger(random.nextInt()); ++ this.mqFaultStrategy = mqFaultStrategy; + } + + private static List buildRead(TopicRouteWrapper topicRoute) { +@@ -154,6 +159,86 @@ public class MessageQueueSelector { + return selectOneByIndex(nextIndex, onlyBroker); + } + ++ public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) { ++ if (mqFaultStrategy != null && mqFaultStrategy.isSendLatencyFaultEnable()) { ++ List messageQueueList = null; ++ MessageQueue messageQueue = null; ++ if (onlyBroker) { ++ messageQueueList = transferAddressableQueues(brokerActingQueues); ++ } else { ++ messageQueueList = transferAddressableQueues(queues); ++ } ++ AddressableMessageQueue addressableMessageQueue = null; ++ ++ // use both available filter. ++ messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, ++ mqFaultStrategy.getAvailableFilter(), mqFaultStrategy.getReachableFilter()); ++ addressableMessageQueue = transferQueue2Addressable(messageQueue); ++ if (addressableMessageQueue != null) { ++ return addressableMessageQueue; ++ } ++ ++ // use available filter. ++ messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, ++ mqFaultStrategy.getAvailableFilter()); ++ addressableMessageQueue = transferQueue2Addressable(messageQueue); ++ if (addressableMessageQueue != null) { ++ return addressableMessageQueue; ++ } ++ ++ // no available filter, then use reachable filter. ++ messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, ++ mqFaultStrategy.getReachableFilter()); ++ addressableMessageQueue = transferQueue2Addressable(messageQueue); ++ if (addressableMessageQueue != null) { ++ return addressableMessageQueue; ++ } ++ } ++ ++ // SendLatency is not enabled, or no queue is selected, then select by index. ++ return selectOne(onlyBroker); ++ } ++ ++ private MessageQueue selectOneMessageQueue(List messageQueueList, AtomicInteger sendQueue, TopicPublishInfo.QueueFilter...filter) { ++ if (messageQueueList == null || messageQueueList.isEmpty()) { ++ return null; ++ } ++ if (filter != null && filter.length != 0) { ++ for (int i = 0; i < messageQueueList.size(); i++) { ++ int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); ++ MessageQueue mq = messageQueueList.get(index); ++ boolean filterResult = true; ++ for (TopicPublishInfo.QueueFilter f: filter) { ++ Preconditions.checkNotNull(f); ++ filterResult &= f.filter(mq); ++ } ++ if (filterResult) { ++ return mq; ++ } ++ } ++ } ++ return null; ++ } ++ ++ public List transferAddressableQueues(List addressableMessageQueueList) { ++ if (addressableMessageQueueList == null) { ++ return null; ++ } ++ ++ return addressableMessageQueueList.stream() ++ .map(AddressableMessageQueue::getMessageQueue) ++ .collect(Collectors.toList()); ++ } ++ ++ private AddressableMessageQueue transferQueue2Addressable(MessageQueue messageQueue) { ++ for (AddressableMessageQueue amq: queues) { ++ if (amq.getMessageQueue().equals(messageQueue)) { ++ return amq; ++ } ++ } ++ return null; ++ } ++ + public AddressableMessageQueue selectNextOne(AddressableMessageQueue last) { + boolean onlyBroker = last.getQueueId() < 0; + AddressableMessageQueue newOne = last; +@@ -190,6 +275,14 @@ public class MessageQueueSelector { + return brokerActingQueues; + } + ++ public MQFaultStrategy getMQFaultStrategy() { ++ return mqFaultStrategy; ++ } ++ ++ public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) { ++ this.mqFaultStrategy = mqFaultStrategy; ++ } ++ + @Override + public boolean equals(Object o) { + if (this == o) { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java +index fe5387cfd..8b3c2f7c8 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java +@@ -17,20 +17,22 @@ + package org.apache.rocketmq.proxy.service.route; + + import com.google.common.base.MoreObjects; ++import org.apache.rocketmq.client.latency.MQFaultStrategy; + import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; + + public class MessageQueueView { +- public static final MessageQueueView WRAPPED_EMPTY_QUEUE = new MessageQueueView("", new TopicRouteData()); ++ public static final MessageQueueView WRAPPED_EMPTY_QUEUE = new MessageQueueView("", new TopicRouteData(), null); + + private final MessageQueueSelector readSelector; + private final MessageQueueSelector writeSelector; + private final TopicRouteWrapper topicRouteWrapper; ++ private MQFaultStrategy mqFaultStrategy; + +- public MessageQueueView(String topic, TopicRouteData topicRouteData) { ++ public MessageQueueView(String topic, TopicRouteData topicRouteData, MQFaultStrategy mqFaultStrategy) { + this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic); + +- this.readSelector = new MessageQueueSelector(topicRouteWrapper, true); +- this.writeSelector = new MessageQueueSelector(topicRouteWrapper, false); ++ this.readSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, true); ++ this.writeSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, false); + } + + public TopicRouteData getTopicRouteData() { +@@ -65,4 +67,12 @@ public class MessageQueueView { + .add("topicRouteWrapper", topicRouteWrapper) + .toString(); + } ++ ++ public MQFaultStrategy getMQFaultStrategy() { ++ return mqFaultStrategy; ++ } ++ ++ public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) { ++ this.mqFaultStrategy = mqFaultStrategy; ++ } + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +index 84348adc3..74769a423 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +@@ -25,7 +25,13 @@ import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; ++ ++import com.google.common.base.Optional; ++import org.apache.rocketmq.client.ClientConfig; + import org.apache.rocketmq.client.exception.MQClientException; ++import org.apache.rocketmq.client.latency.MQFaultStrategy; ++import org.apache.rocketmq.client.latency.Resolver; ++import org.apache.rocketmq.client.latency.ServiceDetector; + import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; +@@ -39,6 +45,7 @@ import org.apache.rocketmq.proxy.common.ProxyContext; + import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.config.ProxyConfig; + import org.apache.rocketmq.remoting.protocol.ResponseCode; ++import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader; + import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; + import org.checkerframework.checker.nullness.qual.NonNull; + import org.checkerframework.checker.nullness.qual.Nullable; +@@ -47,6 +54,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { + private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + + private final MQClientAPIFactory mqClientAPIFactory; ++ private MQFaultStrategy mqFaultStrategy; + + protected final LoadingCache topicCache; + protected final ScheduledExecutorService scheduledExecutorService; +@@ -97,15 +105,83 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { + } + } + }); +- ++ ServiceDetector serviceDetector = new ServiceDetector() { ++ @Override ++ public boolean detect(String endpoint, long timeoutMillis) { ++ Optional candidateTopic = pickTopic(); ++ if (!candidateTopic.isPresent()) { ++ return false; ++ } ++ try { ++ GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader(); ++ requestHeader.setTopic(candidateTopic.get()); ++ requestHeader.setQueueId(0); ++ Long maxOffset = mqClientAPIFactory.getClient().getMaxOffset(endpoint, requestHeader, timeoutMillis).get(); ++ return true; ++ } catch (Exception e) { ++ return false; ++ } ++ } ++ }; ++ mqFaultStrategy = new MQFaultStrategy(extractClientConfigFromProxyConfig(config), new Resolver() { ++ @Override ++ public String resolve(String name) { ++ try { ++ String brokerAddr = getBrokerAddr(null, name); ++ return brokerAddr; ++ } catch (Exception e) { ++ return null; ++ } ++ } ++ }, serviceDetector); + this.init(); + } + ++ // pickup one topic in the topic cache ++ private Optional pickTopic() { ++ if (topicCache.asMap().isEmpty()) { ++ return Optional.absent(); ++ } ++ return Optional.of(topicCache.asMap().keySet().iterator().next()); ++ } ++ + protected void init() { + this.appendShutdown(this.scheduledExecutorService::shutdown); + this.appendStartAndShutdown(this.mqClientAPIFactory); + } + ++ @Override ++ public void shutdown() throws Exception { ++ if (this.mqFaultStrategy.isStartDetectorEnable()) { ++ mqFaultStrategy.shutdown(); ++ } ++ } ++ ++ @Override ++ public void start() throws Exception { ++ if (this.mqFaultStrategy.isStartDetectorEnable()) { ++ this.mqFaultStrategy.startDetector(); ++ } ++ } ++ ++ public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig proxyConfig) { ++ ClientConfig tempClientConfig = new ClientConfig(); ++ tempClientConfig.setSendLatencyEnable(proxyConfig.getSendLatencyEnable()); ++ tempClientConfig.setStartDetectorEnable(proxyConfig.getStartDetectorEnable()); ++ tempClientConfig.setDetectTimeout(proxyConfig.getDetectTimeout()); ++ tempClientConfig.setDetectInterval(proxyConfig.getDetectInterval()); ++ return tempClientConfig; ++ } ++ ++ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation, ++ boolean reachable) { ++ this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable); ++ } ++ ++ public MQFaultStrategy getMqFaultStrategy() { ++ return this.mqFaultStrategy; ++ } ++ + public MessageQueueView getAllMessageQueueView(ProxyContext ctx, String topicName) throws Exception { + return getCacheMessageQueueWrapper(this.topicCache, topicName); + } +@@ -136,7 +212,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { + + protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) { + if (isTopicRouteValid(topicRouteData)) { +- MessageQueueView tmp = new MessageQueueView(topic, topicRouteData); ++ MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, TopicRouteService.this.getMqFaultStrategy()); + log.debug("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); + return tmp; + } +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +index 7fd9a9ffd..77ae5e4d1 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +@@ -93,7 +93,6 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { + pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList()))); + +- + ProxyContext context = createContext(); + context.setRemainingMs(1L); + this.receiveMessageActivity.receiveMessage( +@@ -274,7 +273,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { + } + + @Test +- public void testReceiveMessageQueueSelector() { ++ public void testReceiveMessageQueueSelector() throws Exception { + TopicRouteData topicRouteData = new TopicRouteData(); + List queueDatas = new ArrayList<>(); + for (int i = 0; i < 2; i++) { +@@ -298,7 +297,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { + } + topicRouteData.setBrokerDatas(brokerDatas); + +- MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData); ++ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null); + ReceiveMessageActivity.ReceiveMessageQueueSelector selector = new ReceiveMessageActivity.ReceiveMessageQueueSelector(""); + + AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView); +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java +index 588423bb9..4882a5ed8 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java +@@ -35,6 +35,8 @@ import java.util.concurrent.CompletableFuture; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.client.ClientConfig; ++import org.apache.rocketmq.client.latency.MQFaultStrategy; + import org.apache.rocketmq.client.producer.SendResult; + import org.apache.rocketmq.client.producer.SendStatus; + import org.apache.rocketmq.common.MixAll; +@@ -49,6 +51,7 @@ import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest; + import org.apache.rocketmq.proxy.grpc.v2.common.GrpcProxyException; + import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; + import org.apache.rocketmq.proxy.service.route.MessageQueueView; ++import org.apache.rocketmq.proxy.service.route.TopicRouteService; + import org.apache.rocketmq.remoting.protocol.route.BrokerData; + import org.apache.rocketmq.remoting.protocol.route.QueueData; + import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +@@ -62,15 +65,19 @@ import static org.junit.Assert.assertThrows; + import static org.mockito.ArgumentMatchers.any; + import static org.mockito.ArgumentMatchers.anyInt; + import static org.mockito.ArgumentMatchers.anyString; ++import static org.mockito.Mockito.mock; + import static org.mockito.Mockito.when; + + public class SendMessageActivityTest extends BaseActivityTest { + + protected static final String BROKER_NAME = "broker"; ++ protected static final String BROKER_NAME2 = "broker2"; + protected static final String CLUSTER_NAME = "cluster"; + protected static final String BROKER_ADDR = "127.0.0.1:10911"; ++ protected static final String BROKER_ADDR2 = "127.0.0.1:10912"; + private static final String TOPIC = "topic"; + private static final String CONSUMER_GROUP = "consumerGroup"; ++ MQFaultStrategy mqFaultStrategy; + + private SendMessageActivity sendMessageActivity; + +@@ -262,7 +269,7 @@ public class SendMessageActivityTest extends BaseActivityTest { + } + + @Test +- public void testSendOrderMessageQueueSelector() { ++ public void testSendOrderMessageQueueSelector() throws Exception { + TopicRouteData topicRouteData = new TopicRouteData(); + QueueData queueData = new QueueData(); + BrokerData brokerData = new BrokerData(); +@@ -277,7 +284,7 @@ public class SendMessageActivityTest extends BaseActivityTest { + brokerData.setBrokerAddrs(brokerAddrs); + topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData)); + +- MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData); ++ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null); + SendMessageActivity.SendMessageQueueSelector selector1 = new SendMessageActivity.SendMessageQueueSelector( + SendMessageRequest.newBuilder() + .addMessages(Message.newBuilder() +@@ -288,6 +295,12 @@ public class SendMessageActivityTest extends BaseActivityTest { + .build() + ); + ++ TopicRouteService topicRouteService = mock(TopicRouteService.class); ++ MQFaultStrategy mqFaultStrategy = mock(MQFaultStrategy.class); ++ when(topicRouteService.getAllMessageQueueView(any(), any())).thenReturn(messageQueueView); ++ when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy); ++ when(mqFaultStrategy.isSendLatencyFaultEnable()).thenReturn(false); ++ + SendMessageActivity.SendMessageQueueSelector selector2 = new SendMessageActivity.SendMessageQueueSelector( + SendMessageRequest.newBuilder() + .addMessages(Message.newBuilder() +@@ -328,12 +341,17 @@ public class SendMessageActivityTest extends BaseActivityTest { + brokerData.setBrokerAddrs(brokerAddrs); + topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData)); + +- MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData); ++ + SendMessageActivity.SendMessageQueueSelector selector = new SendMessageActivity.SendMessageQueueSelector( + SendMessageRequest.newBuilder() + .addMessages(Message.newBuilder().build()) + .build() + ); ++ TopicRouteService topicRouteService = mock(TopicRouteService.class); ++ MQFaultStrategy mqFaultStrategy = mock(MQFaultStrategy.class); ++ when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy); ++ when(mqFaultStrategy.isSendLatencyFaultEnable()).thenReturn(false); ++ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy()); + + AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView); + AddressableMessageQueue secondSelect = selector.select(ProxyContext.create(), messageQueueView); +@@ -343,6 +361,45 @@ public class SendMessageActivityTest extends BaseActivityTest { + assertNotEquals(firstSelect, secondSelect); + } + ++ @Test ++ public void testSendNormalMessageQueueSelectorPipeLine() throws Exception { ++ TopicRouteData topicRouteData = new TopicRouteData(); ++ int queueNums = 2; ++ ++ QueueData queueData = createQueueData(BROKER_NAME, queueNums); ++ QueueData queueData2 = createQueueData(BROKER_NAME2, queueNums); ++ topicRouteData.setQueueDatas(Lists.newArrayList(queueData,queueData2)); ++ ++ ++ BrokerData brokerData = createBrokerData(CLUSTER_NAME, BROKER_NAME, BROKER_ADDR); ++ BrokerData brokerData2 = createBrokerData(CLUSTER_NAME, BROKER_NAME2, BROKER_ADDR2); ++ topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData, brokerData2)); ++ ++ SendMessageActivity.SendMessageQueueSelector selector = new SendMessageActivity.SendMessageQueueSelector( ++ SendMessageRequest.newBuilder() ++ .addMessages(Message.newBuilder().build()) ++ .build() ++ ); ++ ++ ClientConfig cc = new ClientConfig(); ++ this.mqFaultStrategy = new MQFaultStrategy(cc, null, null); ++ mqFaultStrategy.setSendLatencyFaultEnable(true); ++ mqFaultStrategy.updateFaultItem(BROKER_NAME2, 1000, true, true); ++ mqFaultStrategy.updateFaultItem(BROKER_NAME, 1000, true, false); ++ ++ TopicRouteService topicRouteService = mock(TopicRouteService.class); ++ when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy); ++ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy()); ++ ++ ++ AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView); ++ assertEquals(firstSelect.getBrokerName(), BROKER_NAME2); ++ ++ mqFaultStrategy.updateFaultItem(BROKER_NAME2, 1000, true, false); ++ mqFaultStrategy.updateFaultItem(BROKER_NAME, 1000, true, true); ++ AddressableMessageQueue secondSelect = selector.select(ProxyContext.create(), messageQueueView); ++ assertEquals(secondSelect.getBrokerName(), BROKER_NAME); ++ } + @Test + public void testParameterValidate() { + // too large message body +@@ -850,4 +907,23 @@ public class SendMessageActivityTest extends BaseActivityTest { + } + return sb.toString(); + } ++ ++ private static QueueData createQueueData(String brokerName, int writeQueueNums) { ++ QueueData queueData = new QueueData(); ++ queueData.setBrokerName(brokerName); ++ queueData.setWriteQueueNums(writeQueueNums); ++ queueData.setPerm(PermName.PERM_WRITE); ++ return queueData; ++ } ++ ++ private static BrokerData createBrokerData(String clusterName, String brokerName, String brokerAddrs) { ++ BrokerData brokerData = new BrokerData(); ++ brokerData.setCluster(clusterName); ++ brokerData.setBrokerName(brokerName); ++ HashMap brokerAddrsMap = new HashMap<>(); ++ brokerAddrsMap.put(MixAll.MASTER_ID, brokerAddrs); ++ brokerData.setBrokerAddrs(brokerAddrsMap); ++ ++ return brokerData; ++ } + } +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java +index c97bd5a72..ca6fe909e 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java +@@ -78,7 +78,7 @@ public class BaseServiceTest extends InitConfigTest { + topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData)); + + when(this.topicRouteService.getAllMessageQueueView(any(), eq(ERR_TOPIC))).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "")); +- when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData)); +- when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, topicRouteData)); ++ when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData, null)); ++ when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, topicRouteData, null)); + } + } +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java +index e44ed28f4..d150f87c4 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java +@@ -30,12 +30,12 @@ public class MessageQueueSelectorTest extends BaseServiceTest { + public void testReadMessageQueue() { + queueData.setPerm(PermName.PERM_READ); + queueData.setReadQueueNums(0); +- MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true); ++ MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true); + assertTrue(messageQueueSelector.getQueues().isEmpty()); + + queueData.setPerm(PermName.PERM_READ); + queueData.setReadQueueNums(3); +- messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true); ++ messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true); + assertEquals(3, messageQueueSelector.getQueues().size()); + assertEquals(1, messageQueueSelector.getBrokerActingQueues().size()); + for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) { +@@ -58,12 +58,12 @@ public class MessageQueueSelectorTest extends BaseServiceTest { + public void testWriteMessageQueue() { + queueData.setPerm(PermName.PERM_WRITE); + queueData.setReadQueueNums(0); +- MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false); ++ MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false); + assertTrue(messageQueueSelector.getQueues().isEmpty()); + + queueData.setPerm(PermName.PERM_WRITE); + queueData.setWriteQueueNums(3); +- messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false); ++ messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false); + assertEquals(3, messageQueueSelector.getQueues().size()); + assertEquals(1, messageQueueSelector.getBrokerActingQueues().size()); + for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) { +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java +index c67f4953d..43fba3d03 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java +@@ -132,7 +132,7 @@ public class HeartbeatSyncerTest extends InitConfigTest { + brokerAddr.put(0L, "127.0.0.1:10911"); + brokerData.setBrokerAddrs(brokerAddr); + topicRouteData.getBrokerDatas().add(brokerData); +- MessageQueueView messageQueueView = new MessageQueueView("foo", topicRouteData); ++ MessageQueueView messageQueueView = new MessageQueueView("foo", topicRouteData, null); + when(this.topicRouteService.getAllMessageQueueView(any(), anyString())).thenReturn(messageQueueView); + } + } +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java +index a0063544e..91af74cbe 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java +@@ -64,7 +64,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest { + this.clusterTransactionService = new ClusterTransactionService(this.topicRouteService, this.producerManager, + this.mqClientAPIFactory); + +- MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData); ++ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null); + when(this.topicRouteService.getAllMessageQueueView(any(), anyString())) + .thenReturn(messageQueueView); + +@@ -127,7 +127,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest { + brokerData.setBrokerAddrs(brokerAddrs); + topicRouteData.getQueueDatas().add(queueData); + topicRouteData.getBrokerDatas().add(brokerData); +- when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData)); ++ when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData, null)); + + TopicRouteData clusterTopicRouteData = new TopicRouteData(); + QueueData clusterQueueData = new QueueData(); +@@ -141,7 +141,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest { + brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR); + clusterBrokerData.setBrokerAddrs(brokerAddrs); + clusterTopicRouteData.setBrokerDatas(Lists.newArrayList(clusterBrokerData)); +- when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, clusterTopicRouteData)); ++ when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, clusterTopicRouteData, null)); + + TopicRouteData clusterTopicRouteData2 = new TopicRouteData(); + QueueData clusterQueueData2 = new QueueData(); +@@ -155,7 +155,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest { + brokerAddrs.put(MixAll.MASTER_ID, brokerAddr2); + clusterBrokerData2.setBrokerAddrs(brokerAddrs); + clusterTopicRouteData2.setBrokerDatas(Lists.newArrayList(clusterBrokerData2)); +- when(this.topicRouteService.getAllMessageQueueView(any(), eq(clusterName2))).thenReturn(new MessageQueueView(clusterName2, clusterTopicRouteData2)); ++ when(this.topicRouteService.getAllMessageQueueView(any(), eq(clusterName2))).thenReturn(new MessageQueueView(clusterName2, clusterTopicRouteData2, null)); + + ConfigurationManager.getProxyConfig().setTransactionHeartbeatBatchNum(2); + this.clusterTransactionService.start(); +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 76a389cd7c70b4f795d0171c5def186ee7afccb5..169d0d0c053d95c1116a7c60105fb2024115b3de 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.3 -Release: 14 +Release: 15 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -23,6 +23,7 @@ Patch0010: patch010-backport-add-some-fixes.patch Patch0011: patch011-backport-optimize-config.patch Patch0012: patch012-backport-enhance-rockdbconfigtojson.patch Patch0013: patch013-backport-enhance-admin-output.patch +Patch0014: patch014-backport-Queue-Selection-Strategy-Optimization.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -57,6 +58,9 @@ exit 0 %changelog +* Mon Oct 30 2023 ShiZhili - 5.1.3-15 +- backport queue selection strategy + * Fri Oct 6 2023 ShiZhili - 5.1.3-14 - backport enhance admin output