diff --git a/patch037-backport-Retry-topic-v2-in-pop.patch b/patch037-backport-Retry-topic-v2-in-pop.patch new file mode 100644 index 0000000000000000000000000000000000000000..6b0c8d191941c5f9666fedae0f7a23b523a43e0a --- /dev/null +++ b/patch037-backport-Retry-topic-v2-in-pop.patch @@ -0,0 +1,845 @@ +From ca721b0145994d7f5e67b4d2fe3b7a4ad7a1c132 Mon Sep 17 00:00:00 2001 +From: zhanghong <985492783@qq.com> +Date: Tue, 21 Nov 2023 14:03:24 +0800 +Subject: [PATCH 1/3] [ISSUE #7462] Remove deprecated LocalTransactionExecuter + (#7463) + +--- + .../impl/producer/DefaultMQProducerImpl.java | 9 +++---- + .../client/producer/DefaultMQProducer.java | 16 ----------- + .../producer/LocalTransactionExecuter.java | 27 ------------------- + .../rocketmq/client/producer/MQProducer.java | 3 --- + .../producer/TransactionMQProducer.java | 16 ----------- + .../client.producer.DefaultMQProducer.schema | 1 - + 6 files changed, 4 insertions(+), 68 deletions(-) + delete mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java + +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +index b0c212e46..545f17d93 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +@@ -54,7 +54,6 @@ import org.apache.rocketmq.client.latency.MQFaultStrategy; + import org.apache.rocketmq.client.latency.Resolver; + import org.apache.rocketmq.client.latency.ServiceDetector; + import org.apache.rocketmq.client.producer.DefaultMQProducer; +-import org.apache.rocketmq.client.producer.LocalTransactionExecuter; + import org.apache.rocketmq.client.producer.LocalTransactionState; + import org.apache.rocketmq.client.producer.MessageQueueSelector; + import org.apache.rocketmq.client.producer.RequestCallback; +@@ -1379,10 +1378,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { + } + + public TransactionSendResult sendMessageInTransaction(final Message msg, +- final LocalTransactionExecuter localTransactionExecuter, final Object arg) ++ final TransactionListener localTransactionListener, final Object arg) + throws MQClientException { + TransactionListener transactionListener = getCheckListener(); +- if (null == localTransactionExecuter && null == transactionListener) { ++ if (null == localTransactionListener && null == transactionListener) { + throw new MQClientException("tranExecutor is null", null); + } + +@@ -1414,8 +1413,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { + if (null != transactionId && !"".equals(transactionId)) { + msg.setTransactionId(transactionId); + } +- if (null != localTransactionExecuter) { +- localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); ++ if (null != localTransactionListener) { ++ localTransactionState = localTransactionListener.executeLocalTransaction(msg, arg); + } else { + log.debug("Used new transaction API"); + localTransactionState = transactionListener.executeLocalTransaction(msg, arg); +diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +index c5b1b5223..7bd3876f5 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java ++++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +@@ -853,22 +853,6 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + this.defaultMQProducerImpl.sendOneway(msg, selector, arg); + } + +- /** +- * This method is to send transactional messages. +- * +- * @param msg Transactional message to send. +- * @param tranExecuter local transaction executor. +- * @param arg Argument used along with local transaction executor. +- * @return Transaction result. +- * @throws MQClientException if there is any client error. +- */ +- @Override +- public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, +- final Object arg) +- throws MQClientException { +- throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); +- } +- + /** + * This method is used to send transactional messages. + * +diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java +deleted file mode 100644 +index 267ba10bd..000000000 +--- a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java ++++ /dev/null +@@ -1,27 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.rocketmq.client.producer; +- +-import org.apache.rocketmq.common.message.Message; +- +-/** +- * @deprecated This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended. +- */ +-@Deprecated +-public interface LocalTransactionExecuter { +- LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg); +-} +diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +index 78657e623..8bd30e98d 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java ++++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +@@ -81,9 +81,6 @@ public interface MQProducer extends MQAdmin { + void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) + throws MQClientException, RemotingException, InterruptedException; + +- TransactionSendResult sendMessageInTransaction(final Message msg, +- final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; +- + TransactionSendResult sendMessageInTransaction(final Message msg, + final Object arg) throws MQClientException; + +diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +index baa8b4408..d529f3e77 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java ++++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +@@ -67,22 +67,6 @@ public class TransactionMQProducer extends DefaultMQProducer { + this.defaultMQProducerImpl.destroyTransactionEnv(); + } + +- /** +- * This method will be removed in the version 5.0.0, method sendMessageInTransaction(Message,Object)} +- * is recommended. +- */ +- @Override +- @Deprecated +- public TransactionSendResult sendMessageInTransaction(final Message msg, +- final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { +- if (null == this.transactionCheckListener) { +- throw new MQClientException("localTransactionBranchCheckListener is null", null); +- } +- +- msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic())); +- return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg); +- } +- + @Override + public TransactionSendResult sendMessageInTransaction(final Message msg, + final Object arg) throws MQClientException { +diff --git a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema +index 0418c73fe..d1111fb45 100644 +--- a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema ++++ b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema +@@ -122,7 +122,6 @@ Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq + Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (void) + Method send(org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.SendResult) + Method send(org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (org.apache.rocketmq.client.producer.SendResult) +-Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.client.producer.LocalTransactionExecuter,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult) + Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult) + Method sendOneway(java.lang.Object,org.apache.rocketmq.client.producer.MessageQueueSelector,org.apache.rocketmq.common.message.Message) : public throws (void) + Method sendOneway(org.apache.rocketmq.common.message.Message) : public throws (void) +-- +2.32.0.windows.2 + + +From a7d493b2fbc153cc6cbdf2b2ffcbf19cf7cba803 Mon Sep 17 00:00:00 2001 +From: panzhi +Date: Tue, 21 Nov 2023 20:55:35 +0800 +Subject: [PATCH 2/3] transactionProducer get the topic route before sending + the message (#7569) + +--- + .../impl/producer/DefaultMQProducerImpl.java | 15 +++++ + .../client/producer/DefaultMQProducer.java | 63 +++++++++++++++++++ + .../producer/TransactionMQProducer.java | 23 +++++-- + .../transaction/TransactionProducer.java | 3 +- + 4 files changed, 98 insertions(+), 6 deletions(-) + +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +index 545f17d93..088bff089 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +@@ -262,6 +262,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { + mQClientFactory.start(); + } + ++ this.initTopicRoute(); ++ + this.mqFaultStrategy.startDetector(); + + log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), +@@ -1740,6 +1742,19 @@ public class DefaultMQProducerImpl implements MQProducerInner { + } + } + ++ private void initTopicRoute() { ++ List topics = this.defaultMQProducer.getTopics(); ++ if (topics != null && topics.size() > 0) { ++ topics.forEach(topic -> { ++ String newTopic = NamespaceUtil.wrapNamespace(this.defaultMQProducer.getNamespace(), topic); ++ TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(newTopic); ++ if (topicPublishInfo == null || !topicPublishInfo.ok()) { ++ log.warn("No route info of this topic: " + newTopic + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO)); ++ } ++ }); ++ } ++ } ++ + public ConcurrentMap getTopicPublishInfoTable() { + return topicPublishInfoTable; + } +diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +index 7bd3876f5..700e00aac 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java ++++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +@@ -87,6 +87,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + */ + private String producerGroup; + ++ /** ++ * Topics that need to be initialized for transaction producer ++ */ ++ private List topics; ++ + /** + * Just for testing or demo program + */ +@@ -235,6 +240,22 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); + } + ++ /** ++ * Constructor specifying namespace, producer group, topics and RPC hook. ++ * ++ * @param namespace Namespace for this MQ Producer instance. ++ * @param producerGroup Producer group, see the name-sake field. ++ * @param topics Topic that needs to be initialized for routing ++ * @param rpcHook RPC hook to execute per each remoting command execution. ++ */ ++ public DefaultMQProducer(final String namespace, final String producerGroup, final List topics, RPCHook rpcHook) { ++ this.namespace = namespace; ++ this.producerGroup = producerGroup; ++ this.topics = topics; ++ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); ++ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); ++ } ++ + /** + * Constructor specifying producer group and enabled msg trace flag. + * +@@ -290,6 +311,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + } + } + ++ /** ++ * Constructor specifying namespace, producer group, topics, RPC hook, enabled msgTrace flag and customized trace topic ++ * name. ++ * ++ * @param namespace Namespace for this MQ Producer instance. ++ * @param producerGroup Producer group, see the name-sake field. ++ * @param topics Topic that needs to be initialized for routing ++ * @param rpcHook RPC hook to execute per each remoting command execution. ++ * @param enableMsgTrace Switch flag instance for message trace. ++ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default ++ * trace topic name. ++ */ ++ public DefaultMQProducer(final String namespace, final String producerGroup, final List topics, ++ RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { ++ this.namespace = namespace; ++ this.producerGroup = producerGroup; ++ this.topics = topics; ++ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); ++ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); ++ //if client open the message trace feature ++ if (enableMsgTrace) { ++ try { ++ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); ++ dispatcher.setHostProducer(this.defaultMQProducerImpl); ++ traceDispatcher = dispatcher; ++ this.defaultMQProducerImpl.registerSendMessageHook( ++ new SendMessageTraceHookImpl(traceDispatcher)); ++ this.defaultMQProducerImpl.registerEndTransactionHook( ++ new EndTransactionTraceHookImpl(traceDispatcher)); ++ } catch (Throwable e) { ++ logger.error("system mqtrace hook init failed ,maybe can't send msg trace data"); ++ } ++ } ++ } ++ + @Override + public void setUseTLS(boolean useTLS) { + super.setUseTLS(useTLS); +@@ -1316,4 +1372,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize); + } + ++ public List getTopics() { ++ return topics; ++ } ++ ++ public void setTopics(List topics) { ++ this.topics = topics; ++ } + } +diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +index d529f3e77..2c3b479f7 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java ++++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +@@ -16,6 +16,7 @@ + */ + package org.apache.rocketmq.client.producer; + ++import java.util.List; + import java.util.concurrent.ExecutorService; + import org.apache.rocketmq.client.exception.MQClientException; + import org.apache.rocketmq.common.message.Message; +@@ -36,19 +37,31 @@ public class TransactionMQProducer extends DefaultMQProducer { + } + + public TransactionMQProducer(final String producerGroup) { +- this(null, producerGroup, null); ++ this(null, producerGroup, null, null); ++ } ++ ++ public TransactionMQProducer(final String producerGroup, final List topics) { ++ this(null, producerGroup, topics, null); + } + + public TransactionMQProducer(final String namespace, final String producerGroup) { +- this(namespace, producerGroup, null); ++ this(namespace, producerGroup, null, null); ++ } ++ ++ public TransactionMQProducer(final String namespace, final String producerGroup, final List topics) { ++ this(namespace, producerGroup, topics, null); + } + + public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) { +- this(null, producerGroup, rpcHook); ++ this(null, producerGroup, null, rpcHook); ++ } ++ ++ public TransactionMQProducer(final String producerGroup, final List topics, RPCHook rpcHook) { ++ this(null, producerGroup, topics, rpcHook); + } + +- public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { +- super(namespace, producerGroup, rpcHook); ++ public TransactionMQProducer(final String namespace, final String producerGroup, final List topics, RPCHook rpcHook) { ++ super(namespace, producerGroup, topics, rpcHook); + } + + public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { +diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java +index 5973c3c30..d1d57c55e 100644 +--- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java ++++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java +@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.Message; + import org.apache.rocketmq.remoting.common.RemotingHelper; + + import java.io.UnsupportedEncodingException; ++import java.util.Arrays; + import java.util.concurrent.ArrayBlockingQueue; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.ThreadPoolExecutor; +@@ -39,7 +40,7 @@ public class TransactionProducer { + + public static void main(String[] args) throws MQClientException, InterruptedException { + TransactionListener transactionListener = new TransactionListenerImpl(); +- TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP); ++ TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, Arrays.asList(TOPIC)); + + // Uncomment the following line while debugging, namesrvAddr should be set to your local address + // producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); +-- +2.32.0.windows.2 + + +From 5b43387be33506e4c19df4783724d06b1dfdc062 Mon Sep 17 00:00:00 2001 +From: Zhouxiang Zhan +Date: Thu, 23 Nov 2023 14:53:48 +0800 +Subject: [PATCH 3/3] [ISSUE #7543] Retry topic v2 in pop (#7544) + +* Implement pop retry topic v2 + +* Use pop retry topic v2 to notify the origin topic + +* add parse group + +* retry topic v2 compatibility + + * calculate consumer lag + + * delete retry topic +--- + .../acl/plain/PlainAccessResource.java | 3 +- + .../ExpressionForRetryMessageFilter.java | 3 +- + .../NotifyMessageArrivingListener.java | 3 +- + .../longpolling/PopLongPollingService.java | 10 +++ + .../broker/metrics/ConsumerLagCalculator.java | 11 ++++ + .../processor/AdminBrokerProcessor.java | 4 ++ + .../processor/NotificationProcessor.java | 2 +- + .../broker/processor/PopMessageProcessor.java | 24 ++++++- + .../broker/processor/PopReviveService.java | 9 --- + .../processor/SendMessageProcessor.java | 3 +- + .../apache/rocketmq/common/BrokerConfig.java | 10 +++ + .../apache/rocketmq/common/KeyBuilder.java | 37 +++++++++-- + .../rocketmq/common/KeyBuilderTest.java | 65 +++++++++++++++++++ + .../consumer/ConsumerProgressSubCommand.java | 3 +- + .../tools/monitor/MonitorService.java | 3 +- + 15 files changed, 168 insertions(+), 22 deletions(-) + create mode 100644 common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java + +diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java +index 72aa8ca71..1e185afff 100644 +--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java ++++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java +@@ -48,6 +48,7 @@ import org.apache.rocketmq.acl.common.AuthenticationHeader; + import org.apache.rocketmq.acl.common.AuthorizationHeader; + import org.apache.rocketmq.acl.common.Permission; + import org.apache.rocketmq.acl.common.SessionCredentials; ++import org.apache.rocketmq.common.KeyBuilder; + import org.apache.rocketmq.common.MQVersion; + import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.PlainAccessConfig; +@@ -341,7 +342,7 @@ public class PlainAccessResource implements AccessResource { + if (retryTopic == null) { + return null; + } +- return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); ++ return KeyBuilder.parseGroup(retryTopic); + } + + public static String getRetryTopic(String group) { +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java +index bc01b21cb..cc3e37bf4 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java +@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.filter; + + import java.nio.ByteBuffer; + import java.util.Map; ++import org.apache.rocketmq.common.KeyBuilder; + import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.filter.ExpressionType; + import org.apache.rocketmq.common.message.MessageConst; +@@ -62,7 +63,7 @@ public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter { + tempProperties = MessageDecoder.decodeProperties(msgBuffer); + } + String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC); +- String group = subscriptionData.getTopic().substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); ++ String group = KeyBuilder.parseGroup(subscriptionData.getTopic()); + realFilterData = this.consumerFilterManager.get(realTopic, group); + } + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java +index 3c099fe2f..e55ed2778 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java +@@ -17,12 +17,11 @@ + + package org.apache.rocketmq.broker.longpolling; + ++import java.util.Map; + import org.apache.rocketmq.broker.processor.NotificationProcessor; + import org.apache.rocketmq.broker.processor.PopMessageProcessor; + import org.apache.rocketmq.store.MessageArrivingListener; + +-import java.util.Map; +- + public class NotifyMessageArrivingListener implements MessageArrivingListener { + private final PullRequestHoldService pullRequestHoldService; + private final PopMessageProcessor popMessageProcessor; +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +index 113c91297..f1bc9adc4 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +@@ -144,6 +144,16 @@ public class PopLongPollingService extends ServiceThread { + } + } + ++ public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId) { ++ String notifyTopic; ++ if (KeyBuilder.isPopRetryTopicV2(topic)) { ++ notifyTopic = KeyBuilder.parseNormalTopic(topic); ++ } else { ++ notifyTopic = topic; ++ } ++ notifyMessageArriving(notifyTopic, queueId); ++ } ++ + public void notifyMessageArriving(final String topic, final int queueId) { + ConcurrentHashMap cids = topicCidMap.get(topic); + if (cids == null) { +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java +index af08a83c7..d1f3fffde 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java +@@ -185,6 +185,17 @@ public class ConsumerLagCalculator { + continue; + } + } ++ if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { ++ String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); ++ TopicConfig retryTopicConfigV1 = topicConfigManager.selectTopicConfig(retryTopicV1); ++ if (retryTopicConfigV1 != null) { ++ int retryTopicPerm = retryTopicConfigV1.getPerm() & brokerConfig.getBrokerPermission(); ++ if (PermName.isReadable(retryTopicPerm) || PermName.isWriteable(retryTopicPerm)) { ++ consumer.accept(new ProcessGroupInfo(group, topic, true, retryTopicV1)); ++ continue; ++ } ++ } ++ } + consumer.accept(new ProcessGroupInfo(group, topic, true, null)); + } else { + consumer.accept(new ProcessGroupInfo(group, topic, false, null)); +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +index fbba6633b..863b275d1 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +@@ -548,6 +548,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) { + deleteTopicInBroker(popRetryTopic); + } ++ final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); ++ if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) { ++ deleteTopicInBroker(popRetryTopicV1); ++ } + } + // delete topic + deleteTopicInBroker(topic); +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +index a15340383..91d275dfe 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +@@ -58,7 +58,7 @@ public class NotificationProcessor implements NettyRequestProcessor { + } + + public void notifyMessageArriving(final String topic, final int queueId) { +- popLongPollingService.notifyMessageArriving(topic, queueId); ++ popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId); + } + + @Override +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +index 7ed4d53ab..58baecc05 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +@@ -185,7 +185,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { + } + + public void notifyMessageArriving(final String topic, final int queueId) { +- popLongPollingService.notifyMessageArriving(topic, queueId); ++ popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId); + } + + public boolean notifyMessageArriving(final String topic, final String cid, final int queueId) { +@@ -364,6 +364,17 @@ public class PopMessageProcessor implements NettyRequestProcessor { + startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); + } + } ++ if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) { ++ TopicConfig retryTopicConfigV1 = ++ this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup())); ++ if (retryTopicConfigV1 != null) { ++ for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) { ++ int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums(); ++ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, ++ startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); ++ } ++ } ++ } + } + if (requestHeader.getQueueId() < 0) { + // read all queue +@@ -388,6 +399,17 @@ public class PopMessageProcessor implements NettyRequestProcessor { + startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); + } + } ++ if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) { ++ TopicConfig retryTopicConfigV1 = ++ this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup())); ++ if (retryTopicConfigV1 != null) { ++ for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) { ++ int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums(); ++ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, ++ startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); ++ } ++ } ++ } + } + + final RemotingCommand finalResponse = response; +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +index 3fb689ed6..8d25bc57e 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +@@ -142,15 +142,6 @@ public class PopReviveService extends ServiceThread { + this.brokerController.getBrokerStatsManager().incBrokerPutNums(popCheckPoint.getTopic(), 1); + this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic()); + this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); +- if (brokerController.getPopMessageProcessor() != null) { +- brokerController.getPopMessageProcessor().notifyMessageArriving( +- KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), +- popCheckPoint.getCId(), +- -1 +- ); +- brokerController.getNotificationProcessor().notifyMessageArriving( +- KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1); +- } + return true; + } + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +index 956ef43fb..4ec84c146 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +@@ -28,6 +28,7 @@ import org.apache.rocketmq.broker.BrokerController; + import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; + import org.apache.rocketmq.broker.mqtrace.SendMessageContext; + import org.apache.rocketmq.common.AbortProcessException; ++import org.apache.rocketmq.common.KeyBuilder; + import org.apache.rocketmq.common.MQVersion; + import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.TopicConfig; +@@ -178,7 +179,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement + MessageExt msg, TopicConfig topicConfig, Map properties) { + String newTopic = requestHeader.getTopic(); + if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { +- String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); ++ String groupName = KeyBuilder.parseGroup(newTopic); + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); + if (null == subscriptionGroupConfig) { +diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +index 0d248c4e1..c186352d1 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java ++++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +@@ -223,6 +223,8 @@ public class BrokerConfig extends BrokerIdentity { + private boolean enablePopBatchAck = false; + private boolean enableNotifyAfterPopOrderLockRelease = true; + private boolean initPopOffsetByCheckMsgInMem = true; ++ // read message from pop retry topic v1, for the compatibility, will be removed in the future version ++ private boolean retrieveMessageFromPopRetryTopicV1 = true; + + private boolean realTimeNotifyConsumerChange = true; + +@@ -1284,6 +1286,14 @@ public class BrokerConfig extends BrokerIdentity { + this.initPopOffsetByCheckMsgInMem = initPopOffsetByCheckMsgInMem; + } + ++ public boolean isRetrieveMessageFromPopRetryTopicV1() { ++ return retrieveMessageFromPopRetryTopicV1; ++ } ++ ++ public void setRetrieveMessageFromPopRetryTopicV1(boolean retrieveMessageFromPopRetryTopicV1) { ++ this.retrieveMessageFromPopRetryTopicV1 = retrieveMessageFromPopRetryTopicV1; ++ } ++ + public boolean isRealTimeNotifyConsumerChange() { + return realTimeNotifyConsumerChange; + } +diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java +index e1532d939..f2a8c4089 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java ++++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java +@@ -18,24 +18,53 @@ package org.apache.rocketmq.common; + + public class KeyBuilder { + public static final int POP_ORDER_REVIVE_QUEUE = 999; ++ private static final String POP_RETRY_SEPARATOR_V1 = "_"; ++ private static final String POP_RETRY_SEPARATOR_V2 = ":"; + + public static String buildPopRetryTopic(String topic, String cid) { +- return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_" + topic; ++ return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2 + topic; ++ } ++ ++ public static String buildPopRetryTopicV1(String topic, String cid) { ++ return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1 + topic; + } + + public static String parseNormalTopic(String topic, String cid) { + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { +- return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_").length()); ++ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2)) { ++ return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2).length()); ++ } ++ return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1).length()); + } else { + return topic; + } + } + ++ public static String parseNormalTopic(String retryTopic) { ++ if (isPopRetryTopicV2(retryTopic)) { ++ String[] result = retryTopic.split(POP_RETRY_SEPARATOR_V2); ++ if (result.length == 2) { ++ return result[1]; ++ } ++ } ++ return retryTopic; ++ } ++ ++ public static String parseGroup(String retryTopic) { ++ if (isPopRetryTopicV2(retryTopic)) { ++ String[] result = retryTopic.split(POP_RETRY_SEPARATOR_V2); ++ if (result.length == 2) { ++ return result[0].substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); ++ } ++ } ++ return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); ++ } ++ + public static String buildPollingKey(String topic, String cid, int queueId) { + return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId; + } + +- public static String buildPollingNotificationKey(String topic, int queueId) { +- return topic + PopAckConstants.SPLIT + queueId; ++ public static boolean isPopRetryTopicV2(String retryTopic) { ++ return retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && retryTopic.contains(POP_RETRY_SEPARATOR_V2); + } + } +diff --git a/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java +new file mode 100644 +index 000000000..f83e0aa14 +--- /dev/null ++++ b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java +@@ -0,0 +1,65 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.common; ++ ++import org.junit.Test; ++ ++import static org.assertj.core.api.Assertions.assertThat; ++ ++public class KeyBuilderTest { ++ String topic = "test-topic"; ++ String group = "test-group"; ++ ++ @Test ++ public void buildPopRetryTopic() { ++ assertThat(KeyBuilder.buildPopRetryTopic(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + ":" + topic); ++ } ++ ++ @Test ++ public void buildPopRetryTopicV1() { ++ assertThat(KeyBuilder.buildPopRetryTopicV1(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + "_" + topic); ++ } ++ ++ @Test ++ public void parseNormalTopic() { ++ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); ++ assertThat(KeyBuilder.parseNormalTopic(popRetryTopic, group)).isEqualTo(topic); ++ String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); ++ assertThat(KeyBuilder.parseNormalTopic(popRetryTopicV1, group)).isEqualTo(topic); ++ } ++ ++ @Test ++ public void testParseNormalTopic() { ++ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); ++ assertThat(KeyBuilder.parseNormalTopic(popRetryTopic)).isEqualTo(topic); ++ } ++ ++ @Test ++ public void parseGroup() { ++ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); ++ assertThat(KeyBuilder.parseGroup(popRetryTopic)).isEqualTo(group); ++ } ++ ++ @Test ++ public void isPopRetryTopicV2() { ++ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); ++ assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopic)).isEqualTo(true); ++ String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); ++ assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopicV1)).isEqualTo(false); ++ } ++} +\ No newline at end of file +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java +index 97125b854..c489cad68 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java +@@ -25,6 +25,7 @@ import java.util.Map; + import org.apache.commons.cli.CommandLine; + import org.apache.commons.cli.Option; + import org.apache.commons.cli.Options; ++import org.apache.rocketmq.common.KeyBuilder; + import org.apache.rocketmq.common.MQVersion; + import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.UtilAll; +@@ -212,7 +213,7 @@ public class ConsumerProgressSubCommand implements SubCommand { + TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); + for (String topic : topicList.getTopicList()) { + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { +- String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); ++ String consumerGroup = KeyBuilder.parseGroup(topic); + try { + ConsumeStats consumeStats = null; + try { +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java +index 45dc3a036..b66dfad20 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java +@@ -34,6 +34,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; + import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; + import org.apache.rocketmq.client.exception.MQBrokerException; + import org.apache.rocketmq.client.exception.MQClientException; ++import org.apache.rocketmq.common.KeyBuilder; + import org.apache.rocketmq.common.MQVersion; + import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.ThreadFactoryImpl; +@@ -172,7 +173,7 @@ public class MonitorService { + TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); + for (String topic : topicList.getTopicList()) { + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { +- String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); ++ String consumerGroup = KeyBuilder.parseGroup(topic); + + try { + this.reportUndoneMsgs(consumerGroup); +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index a2e4839bbeba70bce623526b3ea1d9160f6fc718..e99cff83e6fb551c9b3f90409fb96c9803178cc7 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.5 -Release: 37 +Release: 38 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -46,6 +46,7 @@ Patch0033: patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.pat Patch0034: patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch Patch0035: patch035-backport-fix-some-bugs.patch Patch0036: patch036-backport-RIP65.patch +Patch0037: patch037-backport-Retry-topic-v2-in-pop.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -86,6 +87,9 @@ exit 0 %changelog +* Mon Dec 11 2023 ShiZhili - 5.1.3-38 +- backport Retry topic v2 in pop + * Mon Dec 11 2023 ShiZhili - 5.1.3-37 - backport rip 65