From 98aba19773e22019c4f4a1ff11fbafd37e15b238 Mon Sep 17 00:00:00 2001 From: shizhili Date: Mon, 11 Dec 2023 15:20:10 +0800 Subject: [PATCH] backport fix some bugs --- patch035-backport-fix-some-bugs.patch | 394 ++++++++++++++++++++++++++ rocketmq.spec | 6 +- 2 files changed, 399 insertions(+), 1 deletion(-) create mode 100644 patch035-backport-fix-some-bugs.patch diff --git a/patch035-backport-fix-some-bugs.patch b/patch035-backport-fix-some-bugs.patch new file mode 100644 index 0000000..00495f2 --- /dev/null +++ b/patch035-backport-fix-some-bugs.patch @@ -0,0 +1,394 @@ +From 1be5ebc7363e4bc6503c80688160a354f5a12f78 Mon Sep 17 00:00:00 2001 +From: Zhanhui Li +Date: Mon, 13 Nov 2023 09:45:37 +0800 +Subject: [PATCH 1/5] [ISSUE #7551] Reuse helper methods from Netty to free + direct byte buffer (#7550) + +* Reuse helper methods from Netty to free direct byte buffer, making codebase JDK 9+ compatible + +Signed-off-by: Li Zhanhui + +* Guard against null + +Signed-off-by: Li Zhanhui + +* fix #7552 + +Signed-off-by: Li Zhanhui + +--------- + +Signed-off-by: Li Zhanhui +--- + .../org/apache/rocketmq/common/UtilAll.java | 60 +------------------ + .../apache/rocketmq/common/UtilAllTest.java | 10 ---- + 2 files changed, 3 insertions(+), 67 deletions(-) + +diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +index 95b6b09b4..2808f106a 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java ++++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +@@ -16,21 +16,18 @@ + */ + package org.apache.rocketmq.common; + ++import io.netty.util.internal.PlatformDependent; + import java.io.ByteArrayInputStream; + import java.io.ByteArrayOutputStream; + import java.io.File; + import java.io.IOException; + import java.lang.management.ManagementFactory; +-import java.lang.reflect.Field; +-import java.lang.reflect.Method; + import java.net.Inet4Address; + import java.net.Inet6Address; + import java.net.InetAddress; + import java.net.NetworkInterface; + import java.nio.ByteBuffer; + import java.nio.file.Files; +-import java.security.AccessController; +-import java.security.PrivilegedAction; + import java.text.NumberFormat; + import java.text.ParseException; + import java.text.SimpleDateFormat; +@@ -46,15 +43,11 @@ import java.util.function.Supplier; + import java.util.zip.CRC32; + import java.util.zip.DeflaterOutputStream; + import java.util.zip.InflaterInputStream; +-import org.apache.commons.lang3.JavaVersion; + import org.apache.commons.lang3.StringUtils; +-import org.apache.commons.lang3.SystemUtils; + import org.apache.commons.validator.routines.InetAddressValidator; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +-import sun.misc.Unsafe; +-import sun.nio.ch.DirectBuffer; + + public class UtilAll { + private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); +@@ -707,57 +700,10 @@ public class UtilAll { + } + + public static void cleanBuffer(final ByteBuffer buffer) { +- if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0) { ++ if (null == buffer) { + return; + } +- if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { +- try { +- Field field = Unsafe.class.getDeclaredField("theUnsafe"); +- field.setAccessible(true); +- Unsafe unsafe = (Unsafe) field.get(null); +- Method cleaner = method(unsafe, "invokeCleaner", new Class[] {ByteBuffer.class}); +- cleaner.invoke(unsafe, viewed(buffer)); +- } catch (Exception e) { +- throw new IllegalStateException(e); +- } +- } else { +- invoke(invoke(viewed(buffer), "cleaner"), "clean"); +- } +- } +- +- public static Object invoke(final Object target, final String methodName, final Class... args) { +- return AccessController.doPrivileged(new PrivilegedAction() { +- @Override +- public Object run() { +- try { +- Method method = method(target, methodName, args); +- method.setAccessible(true); +- return method.invoke(target); +- } catch (Exception e) { +- throw new IllegalStateException(e); +- } +- } +- }); +- } +- +- public static Method method(Object target, String methodName, Class[] args) throws NoSuchMethodException { +- try { +- return target.getClass().getMethod(methodName, args); +- } catch (NoSuchMethodException e) { +- return target.getClass().getDeclaredMethod(methodName, args); +- } +- } +- +- private static ByteBuffer viewed(ByteBuffer buffer) { +- if (!buffer.isDirect()) { +- throw new IllegalArgumentException("buffer is non-direct"); +- } +- ByteBuffer viewedBuffer = (ByteBuffer) ((DirectBuffer) buffer).attachment(); +- if (viewedBuffer == null) { +- return buffer; +- } else { +- return viewed(viewedBuffer); +- } ++ PlatformDependent.freeDirectBuffer(buffer); + } + + public static void ensureDirOK(final String dirName) { +diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java +index 94bb390eb..cb288578c 100644 +--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java ++++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java +@@ -219,16 +219,6 @@ public class UtilAllTest { + UtilAll.cleanBuffer(ByteBuffer.allocate(0)); + } + +- @Test(expected = NoSuchMethodException.class) +- public void testMethod() throws NoSuchMethodException { +- UtilAll.method(new Object(), "noMethod", null); +- } +- +- @Test(expected = IllegalStateException.class) +- public void testInvoke() throws Exception { +- UtilAll.invoke(new Object(), "noMethod"); +- } +- + @Test + public void testCalculateFileSizeInPath() throws Exception { + /** +-- +2.32.0.windows.2 + + +From 4791d9a1f1a7c39e005da15f228473c04eafd007 Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Tue, 14 Nov 2023 17:59:47 +0800 +Subject: [PATCH 2/5] [ISSUE #5923] Revert "Fix tiered store README.md error + about Configuration (#7436)" (#7557) + +This reverts commit 70dc93abbcb9bf161378d66fcaca55bedc78b905. +--- + .../tieredstore/common/TieredMessageStoreConfig.java | 10 +++++----- + .../tieredstore/provider/posix/PosixFileSegment.java | 4 ++-- + .../rocketmq/tieredstore/file/TieredCommitLogTest.java | 2 +- + .../provider/posix/PosixFileSegmentTest.java | 2 +- + 4 files changed, 9 insertions(+), 9 deletions(-) + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java +index a112ea6b1..595db6b86 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java +@@ -115,7 +115,7 @@ public class TieredMessageStoreConfig { + private long readAheadCacheExpireDuration = 10 * 1000; + private double readAheadCacheSizeThresholdRate = 0.3; + +- private String tieredStoreFilepath = ""; ++ private String tieredStoreFilePath = ""; + + private String objectStoreEndpoint = ""; + +@@ -350,12 +350,12 @@ public class TieredMessageStoreConfig { + this.readAheadCacheSizeThresholdRate = rate; + } + +- public String getTieredStoreFilepath() { +- return tieredStoreFilepath; ++ public String getTieredStoreFilePath() { ++ return tieredStoreFilePath; + } + +- public void setTieredStoreFilepath(String tieredStoreFilepath) { +- this.tieredStoreFilepath = tieredStoreFilepath; ++ public void setTieredStoreFilePath(String tieredStoreFilePath) { ++ this.tieredStoreFilePath = tieredStoreFilePath; + } + + public void setObjectStoreEndpoint(String objectStoreEndpoint) { +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +index 708ce33f9..7e949cb28 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +@@ -66,8 +66,8 @@ public class PosixFileSegment extends TieredFileSegment { + super(storeConfig, fileType, filePath, baseOffset); + + // basePath +- String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilepath(), +- StringUtils.appendIfMissing(storeConfig.getTieredStoreFilepath(), File.separator)); ++ String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilePath(), ++ StringUtils.appendIfMissing(storeConfig.getTieredStoreFilePath(), File.separator)); + + // fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset + String brokerClusterName = storeConfig.getBrokerClusterName(); +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java +index 80cdba977..6693d3cb7 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java +@@ -49,7 +49,7 @@ public class TieredCommitLogTest { + TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig(); + storeConfig.setBrokerName("brokerName"); + storeConfig.setStorePathRootDir(storePath); +- storeConfig.setTieredStoreFilepath(storePath + File.separator); ++ storeConfig.setTieredStoreFilePath(storePath + File.separator); + storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment"); + storeConfig.setCommitLogRollingInterval(0); + storeConfig.setTieredStoreCommitLogMaxSize(1000); +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java +index ede62b8ce..db33ae847 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java +@@ -42,7 +42,7 @@ public class PosixFileSegmentTest { + @Before + public void setUp() { + storeConfig = new TieredMessageStoreConfig(); +- storeConfig.setTieredStoreFilepath(storePath); ++ storeConfig.setTieredStoreFilePath(storePath); + mq = new MessageQueue("OSSFileSegmentTest", "broker", 0); + TieredStoreExecutor.init(); + } +-- +2.32.0.windows.2 + + +From 651a5ca992988b90c7e4884e9975db0938557def Mon Sep 17 00:00:00 2001 +From: Jixiang Jin +Date: Thu, 16 Nov 2023 10:16:16 +0800 +Subject: [PATCH 3/5] [ISSUE #7562] BugFix for estimating message accumulation + correctly (#7563) + +--- + .../broker/metrics/ConsumerLagCalculator.java | 11 +++++--- + .../proxy/common/utils/FilterUtilTest.java | 25 +++++++++++++++++++ + .../remoting/protocol/filter/FilterAPI.java | 8 ++++++ + 3 files changed, 40 insertions(+), 4 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java +index 7a5f1f765..af08a83c7 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java +@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.constant.PermName; + import org.apache.rocketmq.common.filter.ExpressionType; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; ++import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; + import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; + import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; + import org.apache.rocketmq.remoting.protocol.subscription.SimpleSubscriptionData; +@@ -435,10 +436,12 @@ public class ConsumerLagCalculator { + if (subscriptionGroupConfig != null) { + for (SimpleSubscriptionData simpleSubscriptionData : subscriptionGroupConfig.getSubscriptionDataSet()) { + if (topic.equals(simpleSubscriptionData.getTopic())) { +- subscriptionData = new SubscriptionData(); +- subscriptionData.setTopic(simpleSubscriptionData.getTopic()); +- subscriptionData.setExpressionType(simpleSubscriptionData.getExpressionType()); +- subscriptionData.setSubString(simpleSubscriptionData.getExpression()); ++ try { ++ subscriptionData = FilterAPI.buildSubscriptionData(simpleSubscriptionData.getTopic(), ++ simpleSubscriptionData.getExpression(), simpleSubscriptionData.getExpressionType()); ++ } catch (Exception e) { ++ LOGGER.error("Try to build subscription for group:{}, topic:{} exception.", group, topic, e); ++ } + break; + } + } +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java +index 23389e9d3..7c9d84015 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java +@@ -48,4 +48,29 @@ public class FilterUtilTest { + assertThat(FilterUtils.isTagMatched(subscriptionData.getTagsSet(), null)).isFalse(); + } + ++ @Test ++ public void testBuildSubscriptionData() throws Exception { ++ // Test case 1: expressionType is null, will use TAG as default. ++ String topic = "topic"; ++ String subString = "substring"; ++ String expressionType = null; ++ SubscriptionData result = FilterAPI.buildSubscriptionData(topic, subString, expressionType); ++ assertThat(result).isNotNull(); ++ assertThat(topic).isEqualTo(result.getTopic()); ++ assertThat(subString).isEqualTo(result.getSubString()); ++ assertThat(result.getExpressionType()).isEqualTo("TAG"); ++ assertThat(result.getCodeSet().size()).isEqualTo(1); ++ ++ // Test case 2: expressionType is not null ++ topic = "topic"; ++ subString = "substring1||substring2"; ++ expressionType = "SQL92"; ++ result = FilterAPI.buildSubscriptionData(topic, subString, expressionType); ++ assertThat(result).isNotNull(); ++ assertThat(topic).isEqualTo(result.getTopic()); ++ assertThat(subString).isEqualTo(result.getSubString()); ++ assertThat(result.getExpressionType()).isEqualTo(expressionType); ++ assertThat(result.getCodeSet().size()).isEqualTo(2); ++ } ++ + } +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java +index 10a6bb463..f291bfccf 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java +@@ -46,6 +46,14 @@ public class FilterAPI { + return subscriptionData; + } + ++ public static SubscriptionData buildSubscriptionData(String topic, String subString, String expressionType) throws Exception { ++ final SubscriptionData subscriptionData = buildSubscriptionData(topic, subString); ++ if (StringUtils.isNotBlank(expressionType)) { ++ subscriptionData.setExpressionType(expressionType); ++ } ++ return subscriptionData; ++ } ++ + public static SubscriptionData build(final String topic, final String subString, + final String type) throws Exception { + if (ExpressionType.TAG.equals(type) || type == null) { +-- +2.32.0.windows.2 + + +From 01a2aef96bdfb17c5f82415141ef421efb4e3bc7 Mon Sep 17 00:00:00 2001 +From: cnScarb +Date: Fri, 17 Nov 2023 15:58:14 +0800 +Subject: [PATCH 4/5] [ISSUE #7570] Add default value for lastPopTimestamp + (#7571) + +--- + .../apache/rocketmq/client/impl/consumer/PopProcessQueue.java | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java +index 3b39b86cc..50827545b 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java +@@ -26,7 +26,7 @@ public class PopProcessQueue { + + private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000")); + +- private long lastPopTimestamp; ++ private long lastPopTimestamp = System.currentTimeMillis(); + private AtomicInteger waitAckCounter = new AtomicInteger(0); + private volatile boolean dropped = false; + +-- +2.32.0.windows.2 + + +From 8e7e2b5f50e0db14b77462ef1574d4020c0fd986 Mon Sep 17 00:00:00 2001 +From: guyinyou <36399867+guyinyou@users.noreply.github.com> +Date: Mon, 20 Nov 2023 19:32:57 +0800 +Subject: [PATCH 5/5] [ISSUE #7574] Fix RunningFlags conflict + +Co-authored-by: guyinyou +--- + store/src/main/java/org/apache/rocketmq/store/RunningFlags.java | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java +index 91fcb155a..88b398a77 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java ++++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java +@@ -30,7 +30,7 @@ public class RunningFlags { + + private static final int FENCED_BIT = 1 << 5; + +- private static final int LOGIC_DISK_FULL_BIT = 1 << 5; ++ private static final int LOGIC_DISK_FULL_BIT = 1 << 6; + + private volatile int flagBits = 0; + +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index de1e58a..743ffa7 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.5 -Release: 35 +Release: 36 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -44,6 +44,7 @@ Patch0031: patch031-backport-Add-CRC-check-of-commitlog.patch Patch0032: patch032-backport-Clear-POP_CK-when-sending-messages.patch Patch0033: patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch Patch0034: patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch +Patch0035: patch035-backport-fix-some-bugs.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -84,6 +85,9 @@ exit 0 %changelog +* Mon Dec 11 2023 ShiZhili - 5.1.3-36 +- backport fix some bugs + * Mon Dec 11 2023 ShiZhili - 5.1.3-35 - backport Let consumer be aware of message queue assignment change -- Gitee