diff --git a/patch013-backport-enhance-admin-output.patch b/patch013-backport-enhance-admin-output.patch new file mode 100644 index 0000000000000000000000000000000000000000..3fa60916f1cb2ca3fa43c468846bb7ed3d9328d4 --- /dev/null +++ b/patch013-backport-enhance-admin-output.patch @@ -0,0 +1,892 @@ +From 7e018520ef707a841c66c55d621f6560d03b631b Mon Sep 17 00:00:00 2001 +From: Zhouxiang Zhan +Date: Fri, 25 Aug 2023 09:49:22 +0800 +Subject: [PATCH 1/6] Add expireAfterAccess for cache (#7247) + +Add expireAfterAccess for cache +--- + .../rocketmq/proxy/config/ProxyConfig.java | 59 ++++++++++++++----- + .../metadata/ClusterMetadataService.java | 6 +- + .../service/route/TopicRouteService.java | 14 +++-- + 3 files changed, 56 insertions(+), 23 deletions(-) + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +index 76a243919..2994893d7 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +@@ -155,14 +155,17 @@ public class ProxyConfig implements ConfigFile { + private int consumerProcessorThreadPoolQueueCapacity = 10000; + + private boolean useEndpointPortFromRequest = false; +- private int topicRouteServiceCacheExpiredInSeconds = 20; ++ ++ private int topicRouteServiceCacheExpiredSeconds = 300; ++ private int topicRouteServiceCacheRefreshSeconds = 20; + private int topicRouteServiceCacheMaxNum = 20000; + private int topicRouteServiceThreadPoolNums = PROCESSOR_NUMBER; + private int topicRouteServiceThreadPoolQueueCapacity = 5000; +- +- private int topicConfigCacheExpiredInSeconds = 20; ++ private int topicConfigCacheExpiredSeconds = 300; ++ private int topicConfigCacheRefreshSeconds = 20; + private int topicConfigCacheMaxNum = 20000; +- private int subscriptionGroupConfigCacheExpiredInSeconds = 20; ++ private int subscriptionGroupConfigCacheExpiredSeconds = 300; ++ private int subscriptionGroupConfigCacheRefreshSeconds = 20; + private int subscriptionGroupConfigCacheMaxNum = 20000; + private int metadataThreadPoolNums = 3; + private int metadataThreadPoolQueueCapacity = 100000; +@@ -794,12 +797,20 @@ public class ProxyConfig implements ConfigFile { + this.consumerProcessorThreadPoolQueueCapacity = consumerProcessorThreadPoolQueueCapacity; + } + +- public int getTopicRouteServiceCacheExpiredInSeconds() { +- return topicRouteServiceCacheExpiredInSeconds; ++ public int getTopicRouteServiceCacheExpiredSeconds() { ++ return topicRouteServiceCacheExpiredSeconds; ++ } ++ ++ public void setTopicRouteServiceCacheExpiredSeconds(int topicRouteServiceCacheExpiredSeconds) { ++ this.topicRouteServiceCacheExpiredSeconds = topicRouteServiceCacheExpiredSeconds; + } + +- public void setTopicRouteServiceCacheExpiredInSeconds(int topicRouteServiceCacheExpiredInSeconds) { +- this.topicRouteServiceCacheExpiredInSeconds = topicRouteServiceCacheExpiredInSeconds; ++ public int getTopicRouteServiceCacheRefreshSeconds() { ++ return topicRouteServiceCacheRefreshSeconds; ++ } ++ ++ public void setTopicRouteServiceCacheRefreshSeconds(int topicRouteServiceCacheRefreshSeconds) { ++ this.topicRouteServiceCacheRefreshSeconds = topicRouteServiceCacheRefreshSeconds; + } + + public int getTopicRouteServiceCacheMaxNum() { +@@ -826,12 +837,20 @@ public class ProxyConfig implements ConfigFile { + this.topicRouteServiceThreadPoolQueueCapacity = topicRouteServiceThreadPoolQueueCapacity; + } + +- public int getTopicConfigCacheExpiredInSeconds() { +- return topicConfigCacheExpiredInSeconds; ++ public int getTopicConfigCacheRefreshSeconds() { ++ return topicConfigCacheRefreshSeconds; ++ } ++ ++ public void setTopicConfigCacheRefreshSeconds(int topicConfigCacheRefreshSeconds) { ++ this.topicConfigCacheRefreshSeconds = topicConfigCacheRefreshSeconds; ++ } ++ ++ public int getTopicConfigCacheExpiredSeconds() { ++ return topicConfigCacheExpiredSeconds; + } + +- public void setTopicConfigCacheExpiredInSeconds(int topicConfigCacheExpiredInSeconds) { +- this.topicConfigCacheExpiredInSeconds = topicConfigCacheExpiredInSeconds; ++ public void setTopicConfigCacheExpiredSeconds(int topicConfigCacheExpiredSeconds) { ++ this.topicConfigCacheExpiredSeconds = topicConfigCacheExpiredSeconds; + } + + public int getTopicConfigCacheMaxNum() { +@@ -842,12 +861,20 @@ public class ProxyConfig implements ConfigFile { + this.topicConfigCacheMaxNum = topicConfigCacheMaxNum; + } + +- public int getSubscriptionGroupConfigCacheExpiredInSeconds() { +- return subscriptionGroupConfigCacheExpiredInSeconds; ++ public int getSubscriptionGroupConfigCacheRefreshSeconds() { ++ return subscriptionGroupConfigCacheRefreshSeconds; ++ } ++ ++ public void setSubscriptionGroupConfigCacheRefreshSeconds(int subscriptionGroupConfigCacheRefreshSeconds) { ++ this.subscriptionGroupConfigCacheRefreshSeconds = subscriptionGroupConfigCacheRefreshSeconds; ++ } ++ ++ public int getSubscriptionGroupConfigCacheExpiredSeconds() { ++ return subscriptionGroupConfigCacheExpiredSeconds; + } + +- public void setSubscriptionGroupConfigCacheExpiredInSeconds(int subscriptionGroupConfigCacheExpiredInSeconds) { +- this.subscriptionGroupConfigCacheExpiredInSeconds = subscriptionGroupConfigCacheExpiredInSeconds; ++ public void setSubscriptionGroupConfigCacheExpiredSeconds(int subscriptionGroupConfigCacheExpiredSeconds) { ++ this.subscriptionGroupConfigCacheExpiredSeconds = subscriptionGroupConfigCacheExpiredSeconds; + } + + public int getSubscriptionGroupConfigCacheMaxNum() { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java +index bc9582ad8..d34a0efd9 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java +@@ -69,11 +69,13 @@ public class ClusterMetadataService extends AbstractStartAndShutdown implements + ); + this.topicConfigCache = CacheBuilder.newBuilder() + .maximumSize(config.getTopicConfigCacheMaxNum()) +- .refreshAfterWrite(config.getTopicConfigCacheExpiredInSeconds(), TimeUnit.SECONDS) ++ .expireAfterAccess(config.getTopicConfigCacheExpiredSeconds(), TimeUnit.SECONDS) ++ .refreshAfterWrite(config.getTopicConfigCacheRefreshSeconds(), TimeUnit.SECONDS) + .build(new ClusterTopicConfigCacheLoader()); + this.subscriptionGroupConfigCache = CacheBuilder.newBuilder() + .maximumSize(config.getSubscriptionGroupConfigCacheMaxNum()) +- .refreshAfterWrite(config.getSubscriptionGroupConfigCacheExpiredInSeconds(), TimeUnit.SECONDS) ++ .expireAfterAccess(config.getSubscriptionGroupConfigCacheExpiredSeconds(), TimeUnit.SECONDS) ++ .refreshAfterWrite(config.getSubscriptionGroupConfigCacheRefreshSeconds(), TimeUnit.SECONDS) + .build(new ClusterSubscriptionGroupConfigCacheLoader()); + + this.init(); +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +index e012a5465..84348adc3 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +@@ -68,10 +68,13 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { + ); + this.mqClientAPIFactory = mqClientAPIFactory; + +- this.topicCache = Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()). +- refreshAfterWrite(config.getTopicRouteServiceCacheExpiredInSeconds(), TimeUnit.SECONDS). +- executor(cacheRefreshExecutor).build(new CacheLoader() { +- @Override public @Nullable MessageQueueView load(String topic) throws Exception { ++ this.topicCache = Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()) ++ .expireAfterAccess(config.getTopicRouteServiceCacheExpiredSeconds(), TimeUnit.SECONDS) ++ .refreshAfterWrite(config.getTopicRouteServiceCacheRefreshSeconds(), TimeUnit.SECONDS) ++ .executor(cacheRefreshExecutor) ++ .build(new CacheLoader() { ++ @Override ++ public @Nullable MessageQueueView load(String topic) throws Exception { + try { + TopicRouteData topicRouteData = mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic, Duration.ofSeconds(3).toMillis()); + return buildMessageQueueView(topic, topicRouteData); +@@ -83,7 +86,8 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { + } + } + +- @Override public @Nullable MessageQueueView reload(@NonNull String key, ++ @Override ++ public @Nullable MessageQueueView reload(@NonNull String key, + @NonNull MessageQueueView oldValue) throws Exception { + try { + return load(key); +-- +2.32.0.windows.2 + + +From 5f6dc90f9dab35809fcb0407d4d5cc2737d2335e Mon Sep 17 00:00:00 2001 +From: Ziyi Tan +Date: Fri, 25 Aug 2023 11:17:23 +0800 +Subject: [PATCH 2/6] [ISSUE #7250] Beautify command rocksDBConfigToJson output + +Co-authored-by: Ziy1-Tan +--- + .../metadata/RocksDBConfigToJsonCommand.java | 32 +++++++++++-------- + 1 file changed, 18 insertions(+), 14 deletions(-) + +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java +index 3053f4684..3fc63e4dd 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java +@@ -21,13 +21,13 @@ import org.apache.commons.cli.CommandLine; + import org.apache.commons.cli.Option; + import org.apache.commons.cli.Options; + import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.common.UtilAll; + import org.apache.rocketmq.common.config.RocksDBConfigManager; + import org.apache.rocketmq.common.utils.DataConverter; + import org.apache.rocketmq.remoting.RPCHook; + import org.apache.rocketmq.tools.command.SubCommand; + import org.apache.rocketmq.tools.command.SubCommandException; + +-import java.io.File; + import java.util.HashMap; + import java.util.Map; + +@@ -48,7 +48,7 @@ public class RocksDBConfigToJsonCommand implements SubCommand { + @Override + public Options buildCommandlineOptions(Options options) { + Option pathOption = new Option("p", "path", true, +- "Absolute path to the metadata directory"); ++ "Absolute path for the metadata directory"); + pathOption.setRequired(true); + options.addOption(pathOption); + +@@ -63,15 +63,14 @@ public class RocksDBConfigToJsonCommand implements SubCommand { + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { + String path = commandLine.getOptionValue("path").trim(); +- if (StringUtils.isEmpty(path) || !new File(path).exists()) { ++ if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) { + System.out.print("Rocksdb path is invalid.\n"); + return; + } + + String configType = commandLine.getOptionValue("configType").trim().toLowerCase(); + +- final long memTableFlushInterval = 60 * 60 * 1000L; +- RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(memTableFlushInterval); ++ RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(60 * 60 * 1000L); + try { + if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) { + // for topics.json +@@ -84,13 +83,16 @@ public class RocksDBConfigToJsonCommand implements SubCommand { + topicConfigTable.put(topic, jsonObject); + }); + +- if (isLoad) { +- topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable)); +- final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true); +- System.out.print(topicsJsonStr + "\n"); ++ if (!isLoad) { ++ System.out.print("RocksDB load error, path=" + path); + return; + } ++ topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable)); ++ final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true); ++ System.out.print(topicsJsonStr + "\n"); ++ return; + } ++ + if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) { + // for subscriptionGroup.json + final Map subscriptionGroupJsonConfig = new HashMap<>(); +@@ -102,13 +104,15 @@ public class RocksDBConfigToJsonCommand implements SubCommand { + subscriptionGroupTable.put(subscriptionGroup, jsonObject); + }); + +- if (isLoad) { +- subscriptionGroupJsonConfig.put("subscriptionGroupTable", +- (JSONObject) JSONObject.toJSON(subscriptionGroupTable)); +- final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true); +- System.out.print(subscriptionGroupJsonStr + "\n"); ++ if (!isLoad) { ++ System.out.print("RocksDB load error, path=" + path); + return; + } ++ subscriptionGroupJsonConfig.put("subscriptionGroupTable", ++ (JSONObject) JSONObject.toJSON(subscriptionGroupTable)); ++ final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true); ++ System.out.print(subscriptionGroupJsonStr + "\n"); ++ return; + } + System.out.print("Config type was not recognized, configType=" + configType + "\n"); + } finally { +-- +2.32.0.windows.2 + + +From b4f73e2aabc1b141cec98431899e4090340adf0f Mon Sep 17 00:00:00 2001 +From: mxsm +Date: Sun, 27 Aug 2023 20:58:58 +0800 +Subject: [PATCH 3/6] [ISSUE #7271] Optimize the configuration for setting the + quantity of TimerDequeuePutMessageService (#7272) + +--- + .../java/org/apache/rocketmq/store/timer/TimerMessageStore.java | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +index 690f4863e..181f7087a 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +@@ -222,7 +222,7 @@ public class TimerMessageStore { + dequeueGetMessageServices[i] = new TimerDequeueGetMessageService(); + } + +- int putThreadNum = Math.max(storeConfig.getTimerGetMessageThreadNum(), 1); ++ int putThreadNum = Math.max(storeConfig.getTimerPutMessageThreadNum(), 1); + dequeuePutMessageServices = new TimerDequeuePutMessageService[putThreadNum]; + for (int i = 0; i < dequeuePutMessageServices.length; i++) { + dequeuePutMessageServices[i] = new TimerDequeuePutMessageService(); +-- +2.32.0.windows.2 + + +From 3e100103af68588528bf32f3752a85e8023f46f8 Mon Sep 17 00:00:00 2001 +From: Ziyi Tan +Date: Tue, 29 Aug 2023 13:48:51 +0800 +Subject: [PATCH 4/6] [ISSUE #7277] Enhance rocksDBConfigToJson to support + metadata counting (#7276) + +--- + .../common/config/AbstractRocksDBStorage.java | 4 +- + .../common/config/ConfigRocksDBStorage.java | 6 + + .../tools/command/MQAdminStartup.java | 4 +- + .../ExportMetadataInRocksDBCommand.java | 138 ++++++++++++++++++ + .../metadata/RocksDBConfigToJsonCommand.java | 122 ---------------- + ...> ExportMetadataInRocksDBCommandTest.java} | 38 +++-- + 6 files changed, 173 insertions(+), 139 deletions(-) + create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java + delete mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java + rename tools/src/test/java/org/apache/rocketmq/tools/command/metadata/{KvConfigToJsonCommandTest.java => ExportMetadataInRocksDBCommandTest.java} (62%) + +diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +index e3673baad..a720a5be3 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java ++++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +@@ -385,8 +385,10 @@ public abstract class AbstractRocksDBStorage { + this.options.close(); + } + //4. close db. +- if (db != null) { ++ if (db != null && !this.readOnly) { + this.db.syncWal(); ++ } ++ if (db != null) { + this.db.closeE(); + } + //5. help gc. +diff --git a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java +index 9d05ed282..463bd8fed 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java ++++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java +@@ -60,6 +60,12 @@ public class ConfigRocksDBStorage extends AbstractRocksDBStorage { + this.readOnly = false; + } + ++ public ConfigRocksDBStorage(final String dbPath, boolean readOnly) { ++ super(); ++ this.dbPath = dbPath; ++ this.readOnly = readOnly; ++ } ++ + private void initOptions() { + this.options = createConfigDBOptions(); + +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +index 324aa1856..788fa83c2 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +@@ -80,7 +80,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; + import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; + import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand; + import org.apache.rocketmq.tools.command.message.SendMessageCommand; +-import org.apache.rocketmq.tools.command.metadata.RocksDBConfigToJsonCommand; ++import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand; + import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand; + import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; + import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; +@@ -212,7 +212,6 @@ public class MQAdminStartup { + + initCommand(new ClusterListSubCommand()); + initCommand(new TopicListSubCommand()); +- initCommand(new RocksDBConfigToJsonCommand()); + + initCommand(new UpdateKvConfigCommand()); + initCommand(new DeleteKvConfigCommand()); +@@ -257,6 +256,7 @@ public class MQAdminStartup { + initCommand(new ExportMetadataCommand()); + initCommand(new ExportConfigsCommand()); + initCommand(new ExportMetricsCommand()); ++ initCommand(new ExportMetadataInRocksDBCommand()); + + initCommand(new HAStatusSubCommand()); + +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java +new file mode 100644 +index 000000000..2a7d3fba4 +--- /dev/null ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java +@@ -0,0 +1,138 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.tools.command.export; ++ ++import com.alibaba.fastjson.JSONObject; ++import org.apache.commons.cli.CommandLine; ++import org.apache.commons.cli.Option; ++import org.apache.commons.cli.Options; ++import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.common.UtilAll; ++import org.apache.rocketmq.common.config.ConfigRocksDBStorage; ++import org.apache.rocketmq.common.utils.DataConverter; ++import org.apache.rocketmq.remoting.RPCHook; ++import org.apache.rocketmq.tools.command.SubCommand; ++import org.apache.rocketmq.tools.command.SubCommandException; ++import org.rocksdb.RocksIterator; ++ ++import java.util.HashMap; ++import java.util.Map; ++import java.util.concurrent.atomic.AtomicLong; ++import java.util.function.BiConsumer; ++ ++public class ExportMetadataInRocksDBCommand implements SubCommand { ++ private static final String TOPICS_JSON_CONFIG = "topics"; ++ private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups"; ++ ++ @Override ++ public String commandName() { ++ return "exportMetadataInRocksDB"; ++ } ++ ++ @Override ++ public String commandDesc() { ++ return "export RocksDB kv config (topics/subscriptionGroups)"; ++ } ++ ++ @Override ++ public Options buildCommandlineOptions(Options options) { ++ Option pathOption = new Option("p", "path", true, ++ "Absolute path for the metadata directory"); ++ pathOption.setRequired(true); ++ options.addOption(pathOption); ++ ++ Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " + ++ "topics/subscriptionGroups"); ++ configTypeOption.setRequired(true); ++ options.addOption(configTypeOption); ++ ++ Option jsonEnableOption = new Option("j", "jsonEnable", true, ++ "Json format enable, Default: false"); ++ jsonEnableOption.setRequired(false); ++ options.addOption(jsonEnableOption); ++ ++ return options; ++ } ++ ++ @Override ++ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { ++ String path = commandLine.getOptionValue("path").trim(); ++ if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) { ++ System.out.print("RocksDB path is invalid.\n"); ++ return; ++ } ++ ++ String configType = commandLine.getOptionValue("configType").trim().toLowerCase(); ++ ++ boolean jsonEnable = false; ++ if (commandLine.hasOption("jsonEnable")) { ++ jsonEnable = Boolean.parseBoolean(commandLine.getOptionValue("jsonEnable").trim()); ++ } ++ ++ ++ ConfigRocksDBStorage kvStore = new ConfigRocksDBStorage(path, true /* readOnly */); ++ if (!kvStore.start()) { ++ System.out.print("RocksDB load error, path=" + path + "\n"); ++ return; ++ } ++ ++ try { ++ if (TOPICS_JSON_CONFIG.equalsIgnoreCase(configType) || SUBSCRIPTION_GROUP_JSON_CONFIG.equalsIgnoreCase(configType)) { ++ handleExportMetadata(kvStore, configType, jsonEnable); ++ } else { ++ System.out.printf("Invalid config type=%s, Options: topics,subscriptionGroups\n", configType); ++ } ++ } finally { ++ kvStore.shutdown(); ++ } ++ } ++ ++ private static void handleExportMetadata(ConfigRocksDBStorage kvStore, String configType, boolean jsonEnable) { ++ if (jsonEnable) { ++ final Map jsonConfig = new HashMap<>(); ++ final Map configTable = new HashMap<>(); ++ iterateKvStore(kvStore, (key, value) -> { ++ final String configKey = new String(key, DataConverter.charset); ++ final String configValue = new String(value, DataConverter.charset); ++ final JSONObject jsonObject = JSONObject.parseObject(configValue); ++ configTable.put(configKey, jsonObject); ++ } ++ ); ++ ++ jsonConfig.put(configType.equalsIgnoreCase(TOPICS_JSON_CONFIG) ? "topicConfigTable" : "subscriptionGroupTable", ++ (JSONObject) JSONObject.toJSON(configTable)); ++ final String jsonConfigStr = JSONObject.toJSONString(jsonConfig, true); ++ System.out.print(jsonConfigStr + "\n"); ++ } else { ++ AtomicLong count = new AtomicLong(0); ++ iterateKvStore(kvStore, (key, value) -> { ++ final String configKey = new String(key, DataConverter.charset); ++ final String configValue = new String(value, DataConverter.charset); ++ System.out.printf("%d, Key: %s, Value: %s%n", count.incrementAndGet(), configKey, configValue); ++ }); ++ } ++ } ++ ++ private static void iterateKvStore(ConfigRocksDBStorage kvStore, BiConsumer biConsumer) { ++ try (RocksIterator iterator = kvStore.iterator()) { ++ iterator.seekToFirst(); ++ for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) { ++ biConsumer.accept(iterator.key(), iterator.value()); ++ } ++ } ++ } ++} +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java +deleted file mode 100644 +index 3fc63e4dd..000000000 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java ++++ /dev/null +@@ -1,122 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.rocketmq.tools.command.metadata; +- +-import com.alibaba.fastjson.JSONObject; +-import org.apache.commons.cli.CommandLine; +-import org.apache.commons.cli.Option; +-import org.apache.commons.cli.Options; +-import org.apache.commons.lang3.StringUtils; +-import org.apache.rocketmq.common.UtilAll; +-import org.apache.rocketmq.common.config.RocksDBConfigManager; +-import org.apache.rocketmq.common.utils.DataConverter; +-import org.apache.rocketmq.remoting.RPCHook; +-import org.apache.rocketmq.tools.command.SubCommand; +-import org.apache.rocketmq.tools.command.SubCommandException; +- +-import java.util.HashMap; +-import java.util.Map; +- +-public class RocksDBConfigToJsonCommand implements SubCommand { +- private static final String TOPICS_JSON_CONFIG = "topics"; +- private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups"; +- +- @Override +- public String commandName() { +- return "rocksDBConfigToJson"; +- } +- +- @Override +- public String commandDesc() { +- return "Convert RocksDB kv config (topics/subscriptionGroups) to json"; +- } +- +- @Override +- public Options buildCommandlineOptions(Options options) { +- Option pathOption = new Option("p", "path", true, +- "Absolute path for the metadata directory"); +- pathOption.setRequired(true); +- options.addOption(pathOption); +- +- Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " + +- "topics/subscriptionGroups"); +- configTypeOption.setRequired(true); +- options.addOption(configTypeOption); +- +- return options; +- } +- +- @Override +- public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { +- String path = commandLine.getOptionValue("path").trim(); +- if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) { +- System.out.print("Rocksdb path is invalid.\n"); +- return; +- } +- +- String configType = commandLine.getOptionValue("configType").trim().toLowerCase(); +- +- RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(60 * 60 * 1000L); +- try { +- if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) { +- // for topics.json +- final Map topicsJsonConfig = new HashMap<>(); +- final Map topicConfigTable = new HashMap<>(); +- boolean isLoad = kvConfigManager.load(path, (key, value) -> { +- final String topic = new String(key, DataConverter.charset); +- final String topicConfig = new String(value, DataConverter.charset); +- final JSONObject jsonObject = JSONObject.parseObject(topicConfig); +- topicConfigTable.put(topic, jsonObject); +- }); +- +- if (!isLoad) { +- System.out.print("RocksDB load error, path=" + path); +- return; +- } +- topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable)); +- final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true); +- System.out.print(topicsJsonStr + "\n"); +- return; +- } +- +- if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) { +- // for subscriptionGroup.json +- final Map subscriptionGroupJsonConfig = new HashMap<>(); +- final Map subscriptionGroupTable = new HashMap<>(); +- boolean isLoad = kvConfigManager.load(path, (key, value) -> { +- final String subscriptionGroup = new String(key, DataConverter.charset); +- final String subscriptionGroupConfig = new String(value, DataConverter.charset); +- final JSONObject jsonObject = JSONObject.parseObject(subscriptionGroupConfig); +- subscriptionGroupTable.put(subscriptionGroup, jsonObject); +- }); +- +- if (!isLoad) { +- System.out.print("RocksDB load error, path=" + path); +- return; +- } +- subscriptionGroupJsonConfig.put("subscriptionGroupTable", +- (JSONObject) JSONObject.toJSON(subscriptionGroupTable)); +- final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true); +- System.out.print(subscriptionGroupJsonStr + "\n"); +- return; +- } +- System.out.print("Config type was not recognized, configType=" + configType + "\n"); +- } finally { +- kvConfigManager.stop(); +- } +- } +-} +diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java +similarity index 62% +rename from tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java +rename to tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java +index b2f66c7b0..2b938c90f 100644 +--- a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java ++++ b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java +@@ -21,43 +21,53 @@ import org.apache.commons.cli.DefaultParser; + import org.apache.commons.cli.Options; + import org.apache.rocketmq.srvutil.ServerUtil; + import org.apache.rocketmq.tools.command.SubCommandException; ++import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand; + import org.junit.Test; + + import java.io.File; + + import static org.assertj.core.api.Assertions.assertThat; + +-public class KvConfigToJsonCommandTest { ++public class ExportMetadataInRocksDBCommandTest { + private static final String BASE_PATH = System.getProperty("user.home") + File.separator + "store/config/"; + + @Test + public void testExecute() throws SubCommandException { + { +- String[] cases = new String[]{"topics", "subscriptionGroups"}; +- for (String c : cases) { +- RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand(); ++ String[][] cases = new String[][] { ++ {"topics", "false"}, ++ {"topics", "false1"}, ++ {"topics", "true"}, ++ {"subscriptionGroups", "false"}, ++ {"subscriptionGroups", "false2"}, ++ {"subscriptionGroups", "true"} ++ }; ++ ++ for (String[] c : cases) { ++ ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); +- String[] subargs = new String[]{"-p " + BASE_PATH + c, "-t " + c}; ++ String[] subargs = new String[] {"-p " + BASE_PATH + c[0], "-t " + c[0], "-j " + c[1]}; + final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, +- cmd.buildCommandlineOptions(options), new DefaultParser()); ++ cmd.buildCommandlineOptions(options), new DefaultParser()); + cmd.execute(commandLine, options, null); +- assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c); +- assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c); ++ assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c[0]); ++ assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c[0]); ++ assertThat(commandLine.getOptionValue("j").trim()).isEqualTo(c[1]); + } + } + // invalid cases + { +- String[][] cases = new String[][]{ +- {"-p " + BASE_PATH + "tmpPath", "-t topics"}, +- {"-p ", "-t topics"}, +- {"-p " + BASE_PATH + "topics", "-t invalid_type"} ++ String[][] cases = new String[][] { ++ {"-p " + BASE_PATH + "tmpPath", "-t topics", "-j true"}, ++ {"-p ", "-t topics", "-j true"}, ++ {"-p " + BASE_PATH + "topics", "-t invalid_type", "-j true"} + }; + + for (String[] c : cases) { +- RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand(); ++ ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), c, +- cmd.buildCommandlineOptions(options), new DefaultParser()); ++ cmd.buildCommandlineOptions(options), new DefaultParser()); + cmd.execute(commandLine, options, null); + } + } +-- +2.32.0.windows.2 + + +From fa549154370cb866a90e37c13a90d2c598d6b1f6 Mon Sep 17 00:00:00 2001 +From: yuz10 <845238369@qq.com> +Date: Tue, 29 Aug 2023 15:22:09 +0800 +Subject: [PATCH 5/6] [ISSUE #7261] Slave high CPU usage when + enableScheduleAsyncDeliver=true (#7262) + +* [ISSUE #6390] Add break to the exception of WHEEL_TIMER_NOT_ENABLE. + +* fix broker start fail if mapped file size is 0 + +* log + +* only delete the last empty file + +* change dataReadAheadEnable default to true + +* fix endless loop when master change to slave. +--- + .../rocketmq/broker/schedule/ScheduleMessageService.java | 7 ++++++- + 1 file changed, 6 insertions(+), 1 deletion(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +index aed0ee19f..297b14207 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +@@ -566,7 +566,8 @@ public class ScheduleMessageService extends ConfigManager { + pendingQueue.remove(); + break; + case RUNNING: +- break; ++ scheduleNextTask(); ++ return; + case EXCEPTION: + if (!isStarted()) { + log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString()); +@@ -586,6 +587,10 @@ public class ScheduleMessageService extends ConfigManager { + } + } + ++ scheduleNextTask(); ++ } ++ ++ private void scheduleNextTask() { + if (isStarted()) { + ScheduleMessageService.this.handleExecutorService + .schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS); +-- +2.32.0.windows.2 + + +From 9f34f55e1dac495730c9cd5469f2ab3225b8f0b9 Mon Sep 17 00:00:00 2001 +From: ShuangxiDing +Date: Tue, 29 Aug 2023 15:48:46 +0800 +Subject: [PATCH 6/6] [ISSUE #7226] Filter tlvs in ppv2 which contents not are + spec-compliant ASCII characters and space (#7227) + +Filter tlvs in ppv2 which not are spec-compliant ASCII characters and space +--- + .../rocketmq/common/utils/BinaryUtil.java | 17 +++++++++++++++++ + .../grpc/ProxyAndTlsProtocolNegotiator.java | 8 +++++++- + .../remoting/netty/NettyRemotingServer.java | 8 +++++++- + 3 files changed, 31 insertions(+), 2 deletions(-) + +diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java +index 421adaca4..7b4b24819 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java ++++ b/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java +@@ -43,4 +43,21 @@ public class BinaryUtil { + byte[] bytes = calculateMd5(content); + return Hex.encodeHexString(bytes, false); + } ++ ++ /** ++ * Returns true if subject contains only bytes that are spec-compliant ASCII characters. ++ * @param subject ++ * @return ++ */ ++ public static boolean isAscii(byte[] subject) { ++ if (subject == null) { ++ return false; ++ } ++ for (byte b : subject) { ++ if ((b & 0x80) != 0) { ++ return false; ++ } ++ } ++ return true; ++ } + } +\ No newline at end of file +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java +index ee167bd7b..b584ddfbd 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java +@@ -24,6 +24,7 @@ import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator; + import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators; + import io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiationEvent; + import io.grpc.netty.shaded.io.netty.buffer.ByteBuf; ++import io.grpc.netty.shaded.io.netty.buffer.ByteBufUtil; + import io.grpc.netty.shaded.io.netty.channel.ChannelHandler; + import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext; + import io.grpc.netty.shaded.io.netty.channel.ChannelInboundHandlerAdapter; +@@ -44,6 +45,7 @@ import org.apache.commons.collections.CollectionUtils; + import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.common.constant.HAProxyConstants; + import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.utils.BinaryUtil; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.proxy.config.ConfigurationManager; +@@ -191,9 +193,13 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator + } + if (CollectionUtils.isNotEmpty(msg.tlvs())) { + msg.tlvs().forEach(tlv -> { ++ byte[] valueBytes = ByteBufUtil.getBytes(tlv.content()); ++ if (!BinaryUtil.isAscii(valueBytes)) { ++ return; ++ } + Attributes.Key key = AttributeKeys.valueOf( + HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); +- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); ++ String value = StringUtils.trim(new String(valueBytes, CharsetUtil.UTF_8)); + builder.set(key, value); + }); + } +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +index 17f138f86..e626260c9 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +@@ -18,6 +18,7 @@ package org.apache.rocketmq.remoting.netty; + + import io.netty.bootstrap.ServerBootstrap; + import io.netty.buffer.ByteBuf; ++import io.netty.buffer.ByteBufUtil; + import io.netty.buffer.PooledByteBufAllocator; + import io.netty.channel.Channel; + import io.netty.channel.ChannelDuplexHandler; +@@ -58,6 +59,7 @@ import org.apache.rocketmq.common.Pair; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.HAProxyConstants; + import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.utils.BinaryUtil; + import org.apache.rocketmq.common.utils.NetworkUtil; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +@@ -787,9 +789,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + } + if (CollectionUtils.isNotEmpty(msg.tlvs())) { + msg.tlvs().forEach(tlv -> { ++ byte[] valueBytes = ByteBufUtil.getBytes(tlv.content()); ++ if (!BinaryUtil.isAscii(valueBytes)) { ++ return; ++ } + AttributeKey key = AttributeKeys.valueOf( + HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); +- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); ++ String value = StringUtils.trim(new String(valueBytes, CharsetUtil.UTF_8)); + channel.attr(key).set(value); + }); + } +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 8cbf2248905cc883fa590bb2bccda5adb7c198aa..76a389cd7c70b4f795d0171c5def186ee7afccb5 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.3 -Release: 13 +Release: 14 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -22,6 +22,7 @@ Patch0009: patch009-backport-Support-KV-Storage.patch Patch0010: patch010-backport-add-some-fixes.patch Patch0011: patch011-backport-optimize-config.patch Patch0012: patch012-backport-enhance-rockdbconfigtojson.patch +Patch0013: patch013-backport-enhance-admin-output.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -56,6 +57,9 @@ exit 0 %changelog +* Fri Oct 6 2023 ShiZhili - 5.1.3-14 +- backport enhance admin output + * Wed Oct 2 2023 ShiZhili - 5.1.3-13 - backport enhance medata to json