From 2612dbbece1975fe3573a8d4ef3d278859e8e1fb Mon Sep 17 00:00:00 2001 From: shizhili Date: Fri, 8 Dec 2023 17:47:21 +0800 Subject: [PATCH] backport Clear POP_CK when sending messages --- ...t-Clear-POP_CK-when-sending-messages.patch | 375 ++++++++++++++++++ rocketmq.spec | 6 +- 2 files changed, 380 insertions(+), 1 deletion(-) create mode 100644 patch032-backport-Clear-POP_CK-when-sending-messages.patch diff --git a/patch032-backport-Clear-POP_CK-when-sending-messages.patch b/patch032-backport-Clear-POP_CK-when-sending-messages.patch new file mode 100644 index 0000000..13ba916 --- /dev/null +++ b/patch032-backport-Clear-POP_CK-when-sending-messages.patch @@ -0,0 +1,375 @@ +From 26fa0501482bbf31c2a64a33f329ab9744ac3800 Mon Sep 17 00:00:00 2001 +From: fuyou001 +Date: Fri, 27 Oct 2023 16:28:17 +0800 +Subject: [PATCH 1/3] [ISSUE #7501] The broker supports idempotence in creating + topics (#7502) + +--- + .../rocketmq/broker/processor/AdminBrokerProcessor.java | 7 +++++++ + 1 file changed, 7 insertions(+) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +index 004bf12ac..fbba6633b 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +@@ -440,6 +440,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + return response; + } + ++ if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) { ++ LOGGER.info("Broker receive request to update or create topic={}, but topicConfig has no changes , so idempotent, caller address={}", ++ requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); ++ response.setCode(ResponseCode.SUCCESS); ++ return response; ++ } ++ + try { + this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); + if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { +-- +2.32.0.windows.2 + + +From 46962c262c37554ff09afe9e02c7baf66a5ecc73 Mon Sep 17 00:00:00 2001 +From: fujian-zfj <2573259572@qq.com> +Date: Thu, 2 Nov 2023 13:47:16 +0800 +Subject: [PATCH 2/3] [ISSUE #7523] Message will flush timeout when + transientStorePoolEnable=true and flushDiskType=SYNC_FLUSH (#7524) + +* typo int readme[ecosystem] + +* enableTransientPool and sync_flush will cause flush_time_out + +* polish + +* add log +--- + .../org/apache/rocketmq/store/CommitLog.java | 17 +++++++++++++---- + 1 file changed, 13 insertions(+), 4 deletions(-) + +diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +index 3d3ee86b8..6c3afde70 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ++++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +@@ -1634,12 +1634,21 @@ public class CommitLog implements Swappable { + private void doCommit() { + if (!this.requestsRead.isEmpty()) { + for (GroupCommitRequest req : this.requestsRead) { +- // There may be a message in the next file, so a maximum of +- // two times the flush + boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); +- for (int i = 0; i < 2 && !flushOK; i++) { ++ for (int i = 0; i < 1000 && !flushOK; i++) { + CommitLog.this.mappedFileQueue.flush(0); + flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); ++ if (flushOK) { ++ break; ++ } else { ++ // When transientStorePoolEnable is true, the messages in writeBuffer may not be committed ++ // to pageCache very quickly, and flushOk here may almost be false, so we can sleep 1ms to ++ // wait for the messages to be committed to pageCache. ++ try { ++ Thread.sleep(1); ++ } catch (InterruptedException ignored) { ++ } ++ } + } + + req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); +@@ -1846,7 +1855,7 @@ public class CommitLog implements Swappable { + // Record ConsumeQueue information + Long queueOffset = msgInner.getQueueOffset(); + +- // this msg maybe a inner-batch msg. ++ // this msg maybe an inner-batch msg. + short messageNum = getMessageNum(msgInner); + + // Transaction messages that require special handling +-- +2.32.0.windows.2 + + +From 00965d8c11833237d5c9cd925664a1c456493cee Mon Sep 17 00:00:00 2001 +From: lk +Date: Mon, 6 Nov 2023 09:46:39 +0800 +Subject: [PATCH 3/3] [ISSUE #7531] Clear POP_CK when sending messages (#7532) + +--- + .../processor/SendMessageProcessor.java | 9 ++ + .../common/message/MessageExtBrokerInner.java | 44 +------- + .../rocketmq/common/utils/MessageUtils.java | 48 +++++++++ + .../pop/PopMessageAndForwardingIT.java | 102 ++++++++++++++++++ + 4 files changed, 161 insertions(+), 42 deletions(-) + create mode 100644 test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopMessageAndForwardingIT.java + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +index 9625689a8..956ef43fb 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +@@ -47,6 +47,7 @@ import org.apache.rocketmq.common.message.MessageExtBrokerInner; + import org.apache.rocketmq.common.sysflag.MessageSysFlag; + import org.apache.rocketmq.common.topic.TopicValidator; + import org.apache.rocketmq.common.utils.CleanupPolicyUtils; ++import org.apache.rocketmq.common.utils.MessageUtils; + import org.apache.rocketmq.common.utils.QueueTypeUtils; + import org.apache.rocketmq.remoting.exception.RemotingCommandException; + import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +@@ -106,6 +107,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement + } + + RemotingCommand response; ++ clearReservedProperties(requestHeader); ++ + if (requestHeader.isBatch()) { + response = this.sendBatchMessage(ctx, request, sendMessageContext, requestHeader, mappingContext, + (ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1)); +@@ -131,6 +134,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement + return false; + } + ++ private void clearReservedProperties(SendMessageRequestHeader requestHeader) { ++ String properties = requestHeader.getProperties(); ++ properties = MessageUtils.deleteProperty(properties, MessageConst.PROPERTY_POP_CK); ++ requestHeader.setProperties(properties); ++ } ++ + /** + * If the response is not null, it meets some errors + * +diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +index 4e5d3419a..52501dbca 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java ++++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +@@ -19,9 +19,7 @@ package org.apache.rocketmq.common.message; + import java.nio.ByteBuffer; + + import org.apache.rocketmq.common.TopicFilterType; +- +-import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR; +-import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR; ++import org.apache.rocketmq.common.utils.MessageUtils; + + public class MessageExtBrokerInner extends MessageExt { + private static final long serialVersionUID = 7256001576878700634L; +@@ -62,45 +60,7 @@ public class MessageExtBrokerInner extends MessageExt { + public void deleteProperty(String name) { + super.clearProperty(name); + if (propertiesString != null) { +- int idx0 = 0; +- int idx1; +- int idx2; +- idx1 = propertiesString.indexOf(name, idx0); +- if (idx1 != -1) { +- // cropping may be required +- StringBuilder stringBuilder = new StringBuilder(propertiesString.length()); +- while (true) { +- int startIdx = idx0; +- while (true) { +- idx1 = propertiesString.indexOf(name, startIdx); +- if (idx1 == -1) { +- break; +- } +- startIdx = idx1 + name.length(); +- if (idx1 == 0 || propertiesString.charAt(idx1 - 1) == PROPERTY_SEPARATOR) { +- if (propertiesString.length() > idx1 + name.length() +- && propertiesString.charAt(idx1 + name.length()) == NAME_VALUE_SEPARATOR) { +- break; +- } +- } +- } +- if (idx1 == -1) { +- // there are no characters that need to be skipped. Append all remaining characters. +- stringBuilder.append(propertiesString, idx0, propertiesString.length()); +- break; +- } +- // there are characters that need to be cropped +- stringBuilder.append(propertiesString, idx0, idx1); +- // move idx2 to the end of the cropped character +- idx2 = propertiesString.indexOf(PROPERTY_SEPARATOR, idx1 + name.length() + 1); +- // all subsequent characters will be cropped +- if (idx2 == -1) { +- break; +- } +- idx0 = idx2 + 1; +- } +- this.setPropertiesString(stringBuilder.toString()); +- } ++ this.setPropertiesString(MessageUtils.deleteProperty(propertiesString, name)); + } + } + +diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/MessageUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/MessageUtils.java +index 4d6a150ad..a6563bc92 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/utils/MessageUtils.java ++++ b/common/src/main/java/org/apache/rocketmq/common/utils/MessageUtils.java +@@ -25,6 +25,9 @@ import com.google.common.hash.Hashing; + import org.apache.rocketmq.common.message.MessageConst; + import org.apache.rocketmq.common.message.MessageExt; + ++import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR; ++import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR; ++ + public class MessageUtils { + + public static int getShardingKeyIndex(String shardingKey, int indexSize) { +@@ -47,4 +50,49 @@ public class MessageUtils { + } + return indexSet; + } ++ ++ public static String deleteProperty(String propertiesString, String name) { ++ if (propertiesString != null) { ++ int idx0 = 0; ++ int idx1; ++ int idx2; ++ idx1 = propertiesString.indexOf(name, idx0); ++ if (idx1 != -1) { ++ // cropping may be required ++ StringBuilder stringBuilder = new StringBuilder(propertiesString.length()); ++ while (true) { ++ int startIdx = idx0; ++ while (true) { ++ idx1 = propertiesString.indexOf(name, startIdx); ++ if (idx1 == -1) { ++ break; ++ } ++ startIdx = idx1 + name.length(); ++ if (idx1 == 0 || propertiesString.charAt(idx1 - 1) == PROPERTY_SEPARATOR) { ++ if (propertiesString.length() > idx1 + name.length() ++ && propertiesString.charAt(idx1 + name.length()) == NAME_VALUE_SEPARATOR) { ++ break; ++ } ++ } ++ } ++ if (idx1 == -1) { ++ // there are no characters that need to be skipped. Append all remaining characters. ++ stringBuilder.append(propertiesString, idx0, propertiesString.length()); ++ break; ++ } ++ // there are characters that need to be cropped ++ stringBuilder.append(propertiesString, idx0, idx1); ++ // move idx2 to the end of the cropped character ++ idx2 = propertiesString.indexOf(PROPERTY_SEPARATOR, idx1 + name.length() + 1); ++ // all subsequent characters will be cropped ++ if (idx2 == -1) { ++ break; ++ } ++ idx0 = idx2 + 1; ++ } ++ return stringBuilder.toString(); ++ } ++ } ++ return propertiesString; ++ } + } +diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopMessageAndForwardingIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopMessageAndForwardingIT.java +new file mode 100644 +index 000000000..52a0c277c +--- /dev/null ++++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopMessageAndForwardingIT.java +@@ -0,0 +1,102 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.test.client.consumer.pop; ++ ++import java.time.Duration; ++import java.util.concurrent.atomic.AtomicReference; ++import org.apache.rocketmq.client.consumer.PopResult; ++import org.apache.rocketmq.client.consumer.PopStatus; ++import org.apache.rocketmq.common.attribute.CQType; ++import org.apache.rocketmq.common.attribute.TopicMessageType; ++import org.apache.rocketmq.common.constant.ConsumeInitMode; ++import org.apache.rocketmq.common.filter.ExpressionType; ++import org.apache.rocketmq.common.message.MessageConst; ++import org.apache.rocketmq.common.message.MessageExt; ++import org.apache.rocketmq.common.message.MessageQueue; ++import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; ++import org.apache.rocketmq.test.base.IntegrationTestBase; ++import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; ++import org.apache.rocketmq.test.client.rmq.RMQPopClient; ++import org.apache.rocketmq.test.util.MQRandomUtils; ++import org.junit.Before; ++import org.junit.Test; ++ ++import static org.awaitility.Awaitility.await; ++import static org.junit.Assert.assertEquals; ++import static org.junit.Assert.assertNotEquals; ++ ++public class PopMessageAndForwardingIT extends BasePop { ++ ++ protected String topic; ++ protected String group; ++ protected RMQNormalProducer producer = null; ++ protected RMQPopClient client = null; ++ protected String broker1Addr; ++ protected MessageQueue broker1MessageQueue; ++ protected String broker2Addr; ++ protected MessageQueue broker2MessageQueue; ++ ++ @Before ++ public void setUp() { ++ broker1Addr = brokerController1.getBrokerAddr(); ++ broker2Addr = brokerController2.getBrokerAddr(); ++ topic = MQRandomUtils.getRandomTopic(); ++ group = initConsumerGroup(); ++ IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 8, CQType.SimpleCQ, TopicMessageType.NORMAL); ++ IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER2_NAME, 8, CQType.SimpleCQ, TopicMessageType.NORMAL); ++ producer = getProducer(NAMESRV_ADDR, topic); ++ client = getRMQPopClient(); ++ broker1MessageQueue = new MessageQueue(topic, BROKER1_NAME, -1); ++ broker2MessageQueue = new MessageQueue(topic, BROKER2_NAME, -1); ++ } ++ ++ @Test ++ public void test() { ++ producer.send(1, broker1MessageQueue); ++ ++ AtomicReference firstMessageExtRef = new AtomicReference<>(); ++ await().atMost(Duration.ofSeconds(3)).until(() -> { ++ PopResult popResult = client.popMessageAsync(broker1Addr, broker1MessageQueue, 3000, 32, group, 1000, ++ true, ConsumeInitMode.MIN, false, ExpressionType.TAG, "*").get(); ++ if (!popResult.getPopStatus().equals(PopStatus.FOUND)) { ++ return false; ++ } ++ firstMessageExtRef.set(popResult.getMsgFoundList().get(0)); ++ return true; ++ }); ++ ++ producer.sendMQ(firstMessageExtRef.get(), broker2MessageQueue); ++ AtomicReference secondMessageExtRef = new AtomicReference<>(); ++ await().atMost(Duration.ofSeconds(3)).until(() -> { ++ PopResult popResult = client.popMessageAsync(broker2Addr, broker2MessageQueue, 3000, 32, group, 1000, ++ true, ConsumeInitMode.MIN, false, ExpressionType.TAG, "*").get(); ++ if (!popResult.getPopStatus().equals(PopStatus.FOUND)) { ++ return false; ++ } ++ secondMessageExtRef.set(popResult.getMsgFoundList().get(0)); ++ return true; ++ }); ++ ++ assertEquals(firstMessageExtRef.get().getMsgId(), secondMessageExtRef.get().getMsgId()); ++ String firstPopCk = firstMessageExtRef.get().getProperty(MessageConst.PROPERTY_POP_CK); ++ String secondPopCk = secondMessageExtRef.get().getProperty(MessageConst.PROPERTY_POP_CK); ++ assertNotEquals(firstPopCk, secondPopCk); ++ assertEquals(BROKER1_NAME, ExtraInfoUtil.getBrokerName(ExtraInfoUtil.split(firstPopCk))); ++ assertEquals(BROKER2_NAME, ExtraInfoUtil.getBrokerName(ExtraInfoUtil.split(secondPopCk))); ++ } ++} +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index a28ac53..887a59d 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.5 -Release: 32 +Release: 33 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -41,6 +41,7 @@ Patch0028: patch028-backport-Fix-proxy-client-language-error.patch Patch0029: patch029-backport-Introduce-new-event-NettyEventType.ACTIVEr.patch Patch0030: patch030-backport-remove-some-code.patch Patch0031: patch031-backport-Add-CRC-check-of-commitlog.patch +Patch0032: patch032-backport-Clear-POP_CK-when-sending-messages.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -81,6 +82,9 @@ exit 0 %changelog +* Fri Dec 8 2023 ShiZhili - 5.1.3-33 +- backport Clear POP_CK when sending messages + * Fri Dec 8 2023 ShiZhili - 5.1.3-32 - backport Add the CRC check of commitlog -- Gitee