From df52d543eb37df52c9bad90fa8d903784bc93100 Mon Sep 17 00:00:00 2001 From: shizhili Date: Mon, 11 Dec 2023 16:13:04 +0800 Subject: [PATCH] backport fix some bugs --- patch043-backport-fix-some-bugs.patch | 186 ++++++++++++++++++++++++++ rocketmq.spec | 6 +- 2 files changed, 191 insertions(+), 1 deletion(-) create mode 100644 patch043-backport-fix-some-bugs.patch diff --git a/patch043-backport-fix-some-bugs.patch b/patch043-backport-fix-some-bugs.patch new file mode 100644 index 0000000..588edf1 --- /dev/null +++ b/patch043-backport-fix-some-bugs.patch @@ -0,0 +1,186 @@ +From c2c29c2435e0626cfe4f49830fbdc0d9421d82b5 Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Mon, 4 Dec 2023 16:13:07 +0800 +Subject: [PATCH 1/2] [ISSUE #7545] Fix set mapped file to null cause file can + not destroy (#7612) + +--- + .../rocketmq/tieredstore/index/IndexStoreFile.java | 2 -- + .../rocketmq/tieredstore/index/IndexStoreService.java | 10 ++++++++++ + 2 files changed, 10 insertions(+), 2 deletions(-) + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +index 52a686f68..def5c8f2d 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +@@ -457,11 +457,9 @@ public class IndexStoreFile implements IndexFile { + this.fileStatus.set(IndexStatusEnum.SHUTDOWN); + if (this.mappedFile != null) { + this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); +- this.mappedFile = null; + } + if (this.compactMappedFile != null) { + this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); +- this.compactMappedFile = null; + } + } catch (Exception e) { + log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e); +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +index 14608aa58..e99ea0de1 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +@@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReadWriteLock; + import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.common.ServiceThread; ++import org.apache.rocketmq.common.UtilAll; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.store.logfile.DefaultMappedFile; +@@ -101,6 +102,10 @@ public class IndexStoreService extends ServiceThread implements IndexService { + private void recover() { + Stopwatch stopwatch = Stopwatch.createStarted(); + ++ // delete compact file directory ++ UtilAll.deleteFile(new File(Paths.get(storeConfig.getStorePathRootDir(), ++ FILE_DIRECTORY_NAME, FILE_COMPACTED_DIRECTORY_NAME).toString())); ++ + // recover local + File dir = new File(Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME).toString()); + this.doConvertOldFormatFile(Paths.get(dir.getPath(), "0000").toString()); +@@ -141,6 +146,10 @@ public class IndexStoreService extends ServiceThread implements IndexService { + + for (TieredFileSegment fileSegment : flatFile.getFileSegmentList()) { + IndexFile indexFile = new IndexStoreFile(storeConfig, fileSegment); ++ IndexFile localFile = timeStoreTable.get(indexFile.getTimestamp()); ++ if (localFile != null) { ++ localFile.destroy(); ++ } + timeStoreTable.put(indexFile.getTimestamp(), indexFile); + log.info("IndexStoreService recover load remote file, timestamp: {}", indexFile.getTimestamp()); + } +@@ -248,6 +257,7 @@ public class IndexStoreService extends ServiceThread implements IndexService { + if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) { + log.error("IndexStoreService file status not correct, so skip, timestamp: {}, status: {}", + indexFile.getTimestamp(), indexFile.getFileStatus()); ++ indexFile.destroy(); + return; + } + +-- +2.32.0.windows.2 + + +From faae64715d917bb5d64b8d72581172d26ebe9501 Mon Sep 17 00:00:00 2001 +From: gaoyf +Date: Thu, 7 Dec 2023 11:25:22 +0800 +Subject: [PATCH 2/2] [ISSUE #7601] Fix slave acting master bug (#7603) + +* fix NullPointerException when message escape to remote + +* fix NumberFormatException when message retry to escape to remote + +* fix timerCheckPoint of the master is not updated, causing the timer message to be replayed after master is restarted + +* Use properties copies instead of referencing the same map when converting message +--- + .../org/apache/rocketmq/broker/BrokerController.java | 1 + + .../rocketmq/broker/slave/SlaveSynchronize.java | 4 +++- + .../rocketmq/common/message/MessageAccessor.java | 7 +++++++ + .../rocketmq/store/timer/TimerMessageStore.java | 12 +++++++++--- + 4 files changed, 20 insertions(+), 4 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +index 9f1fd0ad0..8d29d4438 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +@@ -2108,6 +2108,7 @@ public class BrokerController { + isScheduleServiceStart = shouldStart; + + if (timerMessageStore != null) { ++ timerMessageStore.syncLastReadTimeMs(); + timerMessageStore.setShouldRunningDequeue(shouldStart); + } + } +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +index 53cdecdf8..7f802adb9 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +@@ -215,11 +215,13 @@ public class SlaveSynchronize { + String masterAddrBak = this.masterAddr; + if (masterAddrBak != null) { + try { +- if (null != brokerController.getMessageStore().getTimerMessageStore()) { ++ if (null != brokerController.getMessageStore().getTimerMessageStore() && ++ !brokerController.getTimerMessageStore().isShouldRunningDequeue()) { + TimerCheckpoint checkpoint = this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(masterAddrBak); + if (null != this.brokerController.getTimerCheckpoint()) { + this.brokerController.getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs()); + this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset()); ++ this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion()); + } + } + } catch (Exception e) { +diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java +index 1b7e2bba3..62e3bbd7e 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java ++++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java +@@ -17,6 +17,7 @@ + + package org.apache.rocketmq.common.message; + ++import java.util.HashMap; + import java.util.Map; + + public class MessageAccessor { +@@ -96,4 +97,10 @@ public class MessageAccessor { + return newMsg; + } + ++ public static Map deepCopyProperties(Map properties) { ++ if (properties == null) { ++ return null; ++ } ++ return new HashMap<>(properties); ++ } + } +diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +index d796e4467..872cd7105 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +@@ -602,6 +602,10 @@ public class TimerMessageStore { + this.shouldRunningDequeue = shouldRunningDequeue; + } + ++ public boolean isShouldRunningDequeue() { ++ return shouldRunningDequeue; ++ } ++ + public void addMetric(MessageExt msg, int value) { + try { + if (null == msg || null == msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) { +@@ -1084,8 +1088,10 @@ public class TimerMessageStore { + case PUT_OK: + if (brokerStatsManager != null) { + this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1); +- this.brokerStatsManager.incTopicPutSize(message.getTopic(), +- putMessageResult.getAppendMessageResult().getWroteBytes()); ++ if (putMessageResult.getAppendMessageResult() != null) { ++ this.brokerStatsManager.incTopicPutSize(message.getTopic(), ++ putMessageResult.getAppendMessageResult().getWroteBytes()); ++ } + this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1); + } + return PUT_OK; +@@ -1119,7 +1125,7 @@ public class TimerMessageStore { + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setBody(msgExt.getBody()); + msgInner.setFlag(msgExt.getFlag()); +- MessageAccessor.setProperties(msgInner, msgExt.getProperties()); ++ MessageAccessor.setProperties(msgInner, MessageAccessor.deepCopyProperties(msgExt.getProperties())); + TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag()); + long tagsCodeValue = + MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 2a7a057..a4aa37f 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.5 -Release: 43 +Release: 44 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -52,6 +52,7 @@ Patch0039: patch039-backport-add-some-validations.patch Patch0040: patch040-backport-add-some-test-cases.patch Patch0041: patch041-backport-improve-performance.patch Patch0042: patch042-backport-Support-message-filtering.patch +Patch0043: patch043-backport-fix-some-bugs.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -92,6 +93,9 @@ exit 0 %changelog +* Mon Dec 11 2023 ShiZhili - 5.1.3-44 +- backport fix some bugs + * Mon Dec 11 2023 ShiZhili - 5.1.3-43 - backport Support message filtering -- Gitee