From e3f42198a27ee278e647ea4e23c3fba60a25539d Mon Sep 17 00:00:00 2001 From: shizhili Date: Mon, 2 Oct 2023 21:16:25 +0800 Subject: [PATCH] backport enhance medata to json --- ...-backport-enhance-rockdbconfigtojson.patch | 2920 +++++++++++++++++ rocketmq.spec | 6 +- 2 files changed, 2925 insertions(+), 1 deletion(-) create mode 100644 patch012-backport-enhance-rockdbconfigtojson.patch diff --git a/patch012-backport-enhance-rockdbconfigtojson.patch b/patch012-backport-enhance-rockdbconfigtojson.patch new file mode 100644 index 0000000..cf6128b --- /dev/null +++ b/patch012-backport-enhance-rockdbconfigtojson.patch @@ -0,0 +1,2920 @@ +From fec141481496c53a0db398367006c34264662d18 Mon Sep 17 00:00:00 2001 +From: yx9o +Date: Wed, 23 Aug 2023 08:22:34 +0800 +Subject: [PATCH 1/8] [ISSUE #7166] Optimize the display format of admin + (#7210) + +--- + .../java/org/apache/rocketmq/tools/command/MQAdminStartup.java | 2 +- + .../command/acl/ClusterAclConfigVersionListSubCommand.java | 2 +- + .../tools/command/acl/DeleteAccessConfigSubCommand.java | 2 +- + .../rocketmq/tools/command/acl/GetAccessConfigSubCommand.java | 2 +- + .../tools/command/acl/UpdateAccessConfigSubCommand.java | 2 +- + .../tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java | 2 +- + .../tools/command/broker/BrokerConsumeStatsSubCommad.java | 2 +- + .../rocketmq/tools/command/broker/BrokerStatusSubCommand.java | 2 +- + .../tools/command/broker/CommitLogSetReadAheadSubCommand.java | 2 +- + .../tools/command/broker/DeleteExpiredCommitLogSubCommand.java | 2 +- + .../rocketmq/tools/command/broker/GetBrokerConfigCommand.java | 2 +- + .../rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java | 2 +- + .../tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java | 2 +- + .../broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java | 2 +- + .../tools/command/broker/ResetMasterFlushOffsetSubCommand.java | 2 +- + .../tools/command/broker/UpdateBrokerConfigSubCommand.java | 2 +- + .../broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java | 2 +- + .../rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java | 2 +- + .../rocketmq/tools/command/cluster/ClusterListSubCommand.java | 2 +- + .../tools/command/connection/ConsumerConnectionSubCommand.java | 2 +- + .../tools/command/connection/ProducerConnectionSubCommand.java | 2 +- + .../tools/command/consumer/ConsumerStatusSubCommand.java | 2 +- + .../tools/command/consumer/GetConsumerConfigSubCommand.java | 2 +- + .../tools/command/consumer/StartMonitoringSubCommand.java | 2 +- + .../tools/command/consumer/UpdateSubGroupSubCommand.java | 2 +- + .../rocketmq/tools/command/container/AddBrokerSubCommand.java | 2 +- + .../tools/command/container/RemoveBrokerSubCommand.java | 2 +- + .../command/controller/CleanControllerBrokerMetaSubCommand.java | 2 +- + .../command/controller/GetControllerMetaDataSubCommand.java | 2 +- + .../tools/command/controller/ReElectMasterSubCommand.java | 2 +- + .../rocketmq/tools/command/export/ExportConfigsCommand.java | 2 +- + .../rocketmq/tools/command/export/ExportMetadataCommand.java | 2 +- + .../rocketmq/tools/command/export/ExportMetricsCommand.java | 2 +- + .../rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java | 2 +- + .../apache/rocketmq/tools/command/ha/HAStatusSubCommand.java | 2 +- + .../rocketmq/tools/command/message/CheckMsgSendRTCommand.java | 2 +- + .../rocketmq/tools/command/message/ConsumeMessageCommand.java | 2 +- + .../tools/command/message/DumpCompactionLogCommand.java | 2 +- + .../tools/command/message/PrintMessageByQueueCommand.java | 2 +- + .../rocketmq/tools/command/message/PrintMessageSubCommand.java | 2 +- + .../rocketmq/tools/command/message/QueryMsgByIdSubCommand.java | 2 +- + .../rocketmq/tools/command/message/QueryMsgByKeySubCommand.java | 2 +- + .../tools/command/message/QueryMsgByOffsetSubCommand.java | 2 +- + .../tools/command/message/QueryMsgByUniqueKeySubCommand.java | 2 +- + .../tools/command/message/QueryMsgTraceByIdSubCommand.java | 2 +- + .../rocketmq/tools/command/message/SendMessageCommand.java | 2 +- + .../rocketmq/tools/command/namesrv/AddWritePermSubCommand.java | 2 +- + .../rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java | 2 +- + .../tools/command/offset/SkipAccumulationSubCommand.java | 2 +- + .../apache/rocketmq/tools/command/stats/StatsAllSubCommand.java | 2 +- + .../rocketmq/tools/command/topic/AllocateMQSubCommand.java | 2 +- + .../rocketmq/tools/command/topic/TopicClusterSubCommand.java | 2 +- + .../rocketmq/tools/command/topic/TopicListSubCommand.java | 2 +- + .../rocketmq/tools/command/topic/TopicRouteSubCommand.java | 2 +- + .../rocketmq/tools/command/topic/TopicStatusSubCommand.java | 2 +- + .../rocketmq/tools/command/topic/UpdateOrderConfCommand.java | 2 +- + .../tools/command/topic/UpdateStaticTopicSubCommand.java | 2 +- + .../rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java | 2 +- + .../rocketmq/tools/command/topic/UpdateTopicSubCommand.java | 2 +- + 59 files changed, 59 insertions(+), 59 deletions(-) + +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +index 0c2618e91..890125ca0 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +@@ -278,7 +278,7 @@ public class MQAdminStartup { + System.out.printf("The most commonly used mqadmin commands are:%n"); + + for (SubCommand cmd : SUB_COMMANDS) { +- System.out.printf(" %-25s %s%n", cmd.commandName(), cmd.commandDesc()); ++ System.out.printf(" %-35s %s%n", cmd.commandName(), cmd.commandDesc()); + } + + System.out.printf("%nSee 'mqadmin help ' for more information on a specific command.%n"); +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java +index f8a00b1e0..26ed028fb 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java +@@ -47,7 +47,7 @@ public class ClusterAclConfigVersionListSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "List all of acl config version information in cluster"; ++ return "List all of acl config version information in cluster."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java +index fd3a92fff..a7f3d295a 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java +@@ -42,7 +42,7 @@ public class DeleteAccessConfigSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Delete Acl Config Account in broker"; ++ return "Delete Acl Config Account in broker."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java +index 25844d6a1..f1c9a1496 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java +@@ -49,7 +49,7 @@ public class GetAccessConfigSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "List all of acl config information in cluster"; ++ return "List all of acl config information in cluster."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java +index 3be40daa1..d8a06f92d 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java +@@ -40,7 +40,7 @@ public class UpdateAccessConfigSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Update acl config yaml file in broker"; ++ return "Update acl config yaml file in broker."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java +index ff662b506..9dacf1fae 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java +@@ -37,7 +37,7 @@ public class UpdateGlobalWhiteAddrSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Update global white address for acl Config File in broker"; ++ return "Update global white address for acl Config File in broker."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java +index 3f2f90673..7658a2139 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java +@@ -61,7 +61,7 @@ public class BrokerConsumeStatsSubCommad implements SubCommand { + + @Override + public String commandDesc() { +- return "Fetch broker consume stats data"; ++ return "Fetch broker consume stats data."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java +index 830ff3425..ce934f547 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java +@@ -44,7 +44,7 @@ public class BrokerStatusSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Fetch broker runtime status data"; ++ return "Fetch broker runtime status data."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java +index b00c7f5f5..4fdabfdf8 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java +@@ -44,7 +44,7 @@ public class CommitLogSetReadAheadSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "set read ahead mode for all commitlog files"; ++ return "Set read ahead mode for all commitlog files."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java +index a4b2a51ad..142bb7b3c 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java +@@ -37,7 +37,7 @@ public class DeleteExpiredCommitLogSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Delete expired CommitLog files"; ++ return "Delete expired CommitLog files."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java +index 5d86c10e4..c4762a296 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java +@@ -45,7 +45,7 @@ public class GetBrokerConfigCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Get broker config by cluster or special broker"; ++ return "Get broker config by cluster or special broker."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java +index abe8fc622..1a8961e04 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java +@@ -38,7 +38,7 @@ public class GetBrokerEpochSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Fetch broker epoch entries"; ++ return "Fetch broker epoch entries."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java +index 7c54e650c..34b3ba7d3 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java +@@ -47,7 +47,7 @@ public class GetColdDataFlowCtrInfoSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "get cold data flow ctr info"; ++ return "Get cold data flow ctr info."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java +index b0477924f..f20407480 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java +@@ -36,7 +36,7 @@ public class RemoveColdDataFlowCtrGroupConfigSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "remove consumer from cold ctr config"; ++ return "Remove consumer from cold ctr config."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java +index b2ac48c84..90451b51f 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java +@@ -33,7 +33,7 @@ public class ResetMasterFlushOffsetSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Reset master flush offset in slave"; ++ return "Reset master flush offset in slave."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java +index 98abeb6ae..62816ef03 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java +@@ -37,7 +37,7 @@ public class UpdateBrokerConfigSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Update broker's config"; ++ return "Update broker's config."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java +index d06a24b57..8d1a00077 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java +@@ -39,7 +39,7 @@ public class UpdateColdDataFlowCtrGroupConfigSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "addOrUpdate cold data flow ctr group config"; ++ return "Add or update cold data flow ctr group config."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java +index 7253970bd..d755e9e5d 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java +@@ -48,7 +48,7 @@ public class CLusterSendMsgRTCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "List All clusters Message Send RT"; ++ return "List All clusters Message Send RT."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java +index a7a840a44..ede0fa5cf 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java +@@ -41,7 +41,7 @@ public class ClusterListSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "List cluster infos"; ++ return "List cluster infos."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java +index 630961e31..35f73d8a0 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java +@@ -39,7 +39,7 @@ public class ConsumerConnectionSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Query consumer's socket connection, client version and subscription"; ++ return "Query consumer's socket connection, client version and subscription."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java +index 2533982c8..bde674ab2 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java +@@ -36,7 +36,7 @@ public class ProducerConnectionSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Query producer's socket connection and client version"; ++ return "Query producer's socket connection and client version."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java +index 72b9c975e..d8f6f9aa9 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java +@@ -47,7 +47,7 @@ public class ConsumerStatusSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Query consumer's internal data structure"; ++ return "Query consumer's internal data structure."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java +index 6095e7668..4a8253a02 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java +@@ -43,7 +43,7 @@ public class GetConsumerConfigSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Get consumer config by subscription group name"; ++ return "Get consumer config by subscription group name."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java +index 2d08d0bd0..f5e140433 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java +@@ -34,7 +34,7 @@ public class StartMonitoringSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Start Monitoring"; ++ return "Start Monitoring."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java +index f87bafc93..b17da4de4 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java +@@ -41,7 +41,7 @@ public class UpdateSubGroupSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Update or create subscription group"; ++ return "Update or create subscription group."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java +index e9e5be4a5..007d42ae6 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java +@@ -33,7 +33,7 @@ public class AddBrokerSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Add a broker to specified container"; ++ return "Add a broker to specified container."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java +index 7c455f858..ab25d8ebe 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java +@@ -33,7 +33,7 @@ public class RemoveBrokerSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Remove a broker from specified container"; ++ return "Remove a broker from specified container."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerMetaSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerMetaSubCommand.java +index 856e4b426..24ed02566 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerMetaSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerMetaSubCommand.java +@@ -37,7 +37,7 @@ public class CleanControllerBrokerMetaSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Clean metadata of broker on controller"; ++ return "Clean metadata of broker on controller."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java +index 70bd7f8e9..966443127 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java +@@ -34,7 +34,7 @@ public class GetControllerMetaDataSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Get controller cluster's metadata"; ++ return "Get controller cluster's metadata."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java +index 1affe81f9..a522a903d 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java +@@ -37,7 +37,7 @@ public class ReElectMasterSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Re-elect the specified broker as master"; ++ return "Re-elect the specified broker as master."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java +index b8191296d..03613b29c 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java +@@ -42,7 +42,7 @@ public class ExportConfigsCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Export configs"; ++ return "Export configs."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java +index 1f9cf7d96..748f7b16e 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java +@@ -46,7 +46,7 @@ public class ExportMetadataCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Export metadata"; ++ return "Export metadata."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java +index a793b4b84..5d8bb37ba 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java +@@ -56,7 +56,7 @@ public class ExportMetricsCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Export metrics"; ++ return "Export metrics."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java +index 44b3ec3e1..b6231e4f9 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java +@@ -40,7 +40,7 @@ public class GetSyncStateSetSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Fetch syncStateSet for target brokers"; ++ return "Fetch syncStateSet for target brokers."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java +index b1795e046..931658a08 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java +@@ -41,7 +41,7 @@ public class HAStatusSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Fetch ha runtime status data"; ++ return "Fetch ha runtime status data."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java +index 4c6d5ffb6..b15b59d50 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java +@@ -40,7 +40,7 @@ public class CheckMsgSendRTCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Check message send response time"; ++ return "Check message send response time."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java +index 8aed59ea4..02ff53269 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java +@@ -70,7 +70,7 @@ public class ConsumeMessageCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Consume message"; ++ return "Consume message."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java +index ae6d9bdcf..eee8f3d4b 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java +@@ -38,7 +38,7 @@ import java.nio.file.Paths; + public class DumpCompactionLogCommand implements SubCommand { + @Override + public String commandDesc() { +- return "parse compaction log to message"; ++ return "Parse compaction log to message."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java +index 654560167..0418e88a7 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java +@@ -108,7 +108,7 @@ public class PrintMessageByQueueCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Print Message Detail"; ++ return "Print Message Detail by queueId."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java +index d01c36d42..bb82f5079 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java +@@ -62,7 +62,7 @@ public class PrintMessageSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Print Message Detail"; ++ return "Print Message Detail."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java +index 2880477f1..b42612150 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java +@@ -186,7 +186,7 @@ public class QueryMsgByIdSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Query Message by Id"; ++ return "Query Message by Id."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java +index ba7b00c3b..64627fd19 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java +@@ -36,7 +36,7 @@ public class QueryMsgByKeySubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Query Message by Key"; ++ return "Query Message by Key."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java +index d27313af1..14d0625fd 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java +@@ -39,7 +39,7 @@ public class QueryMsgByOffsetSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Query Message by offset"; ++ return "Query Message by offset."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java +index 1b28f8be1..b71cee901 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java +@@ -141,7 +141,7 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Query Message by Unique key"; ++ return "Query Message by Unique key."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java +index 2b982efef..2c546ec56 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java +@@ -65,7 +65,7 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Query a message trace"; ++ return "Query a message trace."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java +index 836ee192b..970da6b16 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java +@@ -41,7 +41,7 @@ public class SendMessageCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Send a message"; ++ return "Send a message."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java +index 98542d065..0b0a075bd 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java +@@ -34,7 +34,7 @@ public class AddWritePermSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Add write perm of broker in all name server you defined in the -n param"; ++ return "Add write perm of broker in all name server you defined in the -n param."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java +index 213931ed8..637dd52c8 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java +@@ -34,7 +34,7 @@ public class WipeWritePermSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Wipe write perm of broker in all name server you defined in the -n param"; ++ return "Wipe write perm of broker in all name server you defined in the -n param."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java +index 139821f9c..b22491a59 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java +@@ -41,7 +41,7 @@ public class SkipAccumulationSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Skip all messages that are accumulated (not consumed) currently"; ++ return "Skip all messages that are accumulated (not consumed) currently."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java +index 1d49bbe11..96097a93e 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java +@@ -144,7 +144,7 @@ public class StatsAllSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Topic and Consumer tps stats"; ++ return "Topic and Consumer tps stats."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java +index 3fa42f297..6a9b81eb8 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java +@@ -41,7 +41,7 @@ public class AllocateMQSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Allocate MQ"; ++ return "Allocate MQ."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java +index 1dab693d9..098f34ff0 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java +@@ -34,7 +34,7 @@ public class TopicClusterSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Get cluster info for topic"; ++ return "Get cluster info for topic."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java +index 346bac704..d9a279f80 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java +@@ -45,7 +45,7 @@ public class TopicListSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Fetch all topic list from name server"; ++ return "Fetch all topic list from name server."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java +index f2dabec4e..70949d388 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java +@@ -42,7 +42,7 @@ public class TopicRouteSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Examine topic route info"; ++ return "Examine topic route info."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java +index fdb249fab..a1619eced 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java +@@ -40,7 +40,7 @@ public class TopicStatusSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Examine topic Status info"; ++ return "Examine topic Status info."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java +index bebc646b4..3040d04c2 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java +@@ -36,7 +36,7 @@ public class UpdateOrderConfCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Create or update or delete order conf"; ++ return "Create or update or delete order conf."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +index 85a18c654..3daeee86c 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +@@ -48,7 +48,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Update or create static topic, which has fixed number of queues"; ++ return "Update or create static topic, which has fixed number of queues."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java +index aaa881538..d27cd1861 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java +@@ -44,7 +44,7 @@ public class UpdateTopicPermSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Update topic perm"; ++ return "Update topic perm."; + } + + @Override +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java +index b68463396..298914175 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java +@@ -42,7 +42,7 @@ public class UpdateTopicSubCommand implements SubCommand { + + @Override + public String commandDesc() { +- return "Update or create topic"; ++ return "Update or create topic."; + } + + @Override +-- +2.32.0.windows.2 + + +From 744167bd01fab6821b4d5ae1794dc845153d5156 Mon Sep 17 00:00:00 2001 +From: Ziyi Tan +Date: Wed, 23 Aug 2023 08:32:17 +0800 +Subject: [PATCH 2/8] [ISSUE #7142] Add command `RocksDBConfigToJson` to + inspect rocksdb content (#7180) + +* feat: add command `RocksDBConfigToJson` to inspect rocksdb content + +Signed-off-by: Ziy1-Tan + +* refactor: fix style + +--------- + +Signed-off-by: Ziy1-Tan +Co-authored-by: Ziy1-Tan +--- + .../tools/command/MQAdminStartup.java | 2 + + .../metadata/RocksDBConfigToJsonCommand.java | 118 ++++++++++++++++++ + .../metadata/KvConfigToJsonCommandTest.java | 65 ++++++++++ + 3 files changed, 185 insertions(+) + create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java + create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java + +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +index 890125ca0..324aa1856 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +@@ -80,6 +80,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; + import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; + import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand; + import org.apache.rocketmq.tools.command.message.SendMessageCommand; ++import org.apache.rocketmq.tools.command.metadata.RocksDBConfigToJsonCommand; + import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand; + import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; + import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; +@@ -211,6 +212,7 @@ public class MQAdminStartup { + + initCommand(new ClusterListSubCommand()); + initCommand(new TopicListSubCommand()); ++ initCommand(new RocksDBConfigToJsonCommand()); + + initCommand(new UpdateKvConfigCommand()); + initCommand(new DeleteKvConfigCommand()); +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java +new file mode 100644 +index 000000000..3053f4684 +--- /dev/null ++++ b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java +@@ -0,0 +1,118 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.tools.command.metadata; ++ ++import com.alibaba.fastjson.JSONObject; ++import org.apache.commons.cli.CommandLine; ++import org.apache.commons.cli.Option; ++import org.apache.commons.cli.Options; ++import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.common.config.RocksDBConfigManager; ++import org.apache.rocketmq.common.utils.DataConverter; ++import org.apache.rocketmq.remoting.RPCHook; ++import org.apache.rocketmq.tools.command.SubCommand; ++import org.apache.rocketmq.tools.command.SubCommandException; ++ ++import java.io.File; ++import java.util.HashMap; ++import java.util.Map; ++ ++public class RocksDBConfigToJsonCommand implements SubCommand { ++ private static final String TOPICS_JSON_CONFIG = "topics"; ++ private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups"; ++ ++ @Override ++ public String commandName() { ++ return "rocksDBConfigToJson"; ++ } ++ ++ @Override ++ public String commandDesc() { ++ return "Convert RocksDB kv config (topics/subscriptionGroups) to json"; ++ } ++ ++ @Override ++ public Options buildCommandlineOptions(Options options) { ++ Option pathOption = new Option("p", "path", true, ++ "Absolute path to the metadata directory"); ++ pathOption.setRequired(true); ++ options.addOption(pathOption); ++ ++ Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " + ++ "topics/subscriptionGroups"); ++ configTypeOption.setRequired(true); ++ options.addOption(configTypeOption); ++ ++ return options; ++ } ++ ++ @Override ++ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { ++ String path = commandLine.getOptionValue("path").trim(); ++ if (StringUtils.isEmpty(path) || !new File(path).exists()) { ++ System.out.print("Rocksdb path is invalid.\n"); ++ return; ++ } ++ ++ String configType = commandLine.getOptionValue("configType").trim().toLowerCase(); ++ ++ final long memTableFlushInterval = 60 * 60 * 1000L; ++ RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(memTableFlushInterval); ++ try { ++ if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) { ++ // for topics.json ++ final Map topicsJsonConfig = new HashMap<>(); ++ final Map topicConfigTable = new HashMap<>(); ++ boolean isLoad = kvConfigManager.load(path, (key, value) -> { ++ final String topic = new String(key, DataConverter.charset); ++ final String topicConfig = new String(value, DataConverter.charset); ++ final JSONObject jsonObject = JSONObject.parseObject(topicConfig); ++ topicConfigTable.put(topic, jsonObject); ++ }); ++ ++ if (isLoad) { ++ topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable)); ++ final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true); ++ System.out.print(topicsJsonStr + "\n"); ++ return; ++ } ++ } ++ if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) { ++ // for subscriptionGroup.json ++ final Map subscriptionGroupJsonConfig = new HashMap<>(); ++ final Map subscriptionGroupTable = new HashMap<>(); ++ boolean isLoad = kvConfigManager.load(path, (key, value) -> { ++ final String subscriptionGroup = new String(key, DataConverter.charset); ++ final String subscriptionGroupConfig = new String(value, DataConverter.charset); ++ final JSONObject jsonObject = JSONObject.parseObject(subscriptionGroupConfig); ++ subscriptionGroupTable.put(subscriptionGroup, jsonObject); ++ }); ++ ++ if (isLoad) { ++ subscriptionGroupJsonConfig.put("subscriptionGroupTable", ++ (JSONObject) JSONObject.toJSON(subscriptionGroupTable)); ++ final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true); ++ System.out.print(subscriptionGroupJsonStr + "\n"); ++ return; ++ } ++ } ++ System.out.print("Config type was not recognized, configType=" + configType + "\n"); ++ } finally { ++ kvConfigManager.stop(); ++ } ++ } ++} +diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java +new file mode 100644 +index 000000000..b2f66c7b0 +--- /dev/null ++++ b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java +@@ -0,0 +1,65 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.tools.command.metadata; ++ ++import org.apache.commons.cli.CommandLine; ++import org.apache.commons.cli.DefaultParser; ++import org.apache.commons.cli.Options; ++import org.apache.rocketmq.srvutil.ServerUtil; ++import org.apache.rocketmq.tools.command.SubCommandException; ++import org.junit.Test; ++ ++import java.io.File; ++ ++import static org.assertj.core.api.Assertions.assertThat; ++ ++public class KvConfigToJsonCommandTest { ++ private static final String BASE_PATH = System.getProperty("user.home") + File.separator + "store/config/"; ++ ++ @Test ++ public void testExecute() throws SubCommandException { ++ { ++ String[] cases = new String[]{"topics", "subscriptionGroups"}; ++ for (String c : cases) { ++ RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand(); ++ Options options = ServerUtil.buildCommandlineOptions(new Options()); ++ String[] subargs = new String[]{"-p " + BASE_PATH + c, "-t " + c}; ++ final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, ++ cmd.buildCommandlineOptions(options), new DefaultParser()); ++ cmd.execute(commandLine, options, null); ++ assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c); ++ assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c); ++ } ++ } ++ // invalid cases ++ { ++ String[][] cases = new String[][]{ ++ {"-p " + BASE_PATH + "tmpPath", "-t topics"}, ++ {"-p ", "-t topics"}, ++ {"-p " + BASE_PATH + "topics", "-t invalid_type"} ++ }; ++ ++ for (String[] c : cases) { ++ RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand(); ++ Options options = ServerUtil.buildCommandlineOptions(new Options()); ++ final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), c, ++ cmd.buildCommandlineOptions(options), new DefaultParser()); ++ cmd.execute(commandLine, options, null); ++ } ++ } ++ } ++} +-- +2.32.0.windows.2 + + +From bdede35db365a49b211cdc249c68b0f60a3df46d Mon Sep 17 00:00:00 2001 +From: mxsm +Date: Wed, 23 Aug 2023 08:34:56 +0800 +Subject: [PATCH 3/8] [ISSUE #7124] Fix the typos in the code comments (#7125) + +--- + .../apache/rocketmq/broker/processor/ReplyMessageProcessor.java | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java +index b2db356c8..d3bb048f7 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java +@@ -234,7 +234,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor { + } else { + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); +- //set to zore to avoid client decoding exception ++ //set to zero to avoid client decoding exception + responseHeader.setMsgId("0"); + responseHeader.setQueueId(queueIdInt); + responseHeader.setQueueOffset(0L); +-- +2.32.0.windows.2 + + +From 9bb73b9a38548b99ac5126c40380c3c2e7fc586e Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Wed, 23 Aug 2023 09:46:27 +0800 +Subject: [PATCH 4/8] [#ISSUE 7222] Bug fix and refactoring of the Indexfile in + tiered storage (#7224) + +--- + .../tieredstore/file/TieredIndexFile.java | 38 +++++++-- + .../tieredstore/file/TieredIndexFileTest.java | 84 +++++-------------- + 2 files changed, 52 insertions(+), 70 deletions(-) + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java +index 50beb01ae..eda5e0106 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java +@@ -16,6 +16,7 @@ + */ + package org.apache.rocketmq.tieredstore.file; + ++import com.google.common.annotations.VisibleForTesting; + import java.io.File; + import java.io.IOException; + import java.nio.ByteBuffer; +@@ -99,7 +100,7 @@ public class TieredIndexFile { + this::doScheduleTask, 10, 10, TimeUnit.SECONDS); + } + +- private void doScheduleTask() { ++ protected void doScheduleTask() { + try { + curFileLock.lock(); + try { +@@ -145,6 +146,11 @@ public class TieredIndexFile { + } + } + ++ @VisibleForTesting ++ public MappedFile getPreMappedFile() { ++ return preMappedFile; ++ } ++ + private void initFile() throws IOException { + curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize); + initIndexFileHeader(curMappedFile); +@@ -156,19 +162,26 @@ public class TieredIndexFile { + + if (isFileSealed(curMappedFile)) { + if (preFileExists) { +- preFile.delete(); ++ if (preFile.delete()) { ++ logger.info("Pre IndexFile deleted success", preFilepath); ++ } else { ++ logger.error("Pre IndexFile deleted failed", preFilepath); ++ } + } + boolean rename = curMappedFile.renameTo(preFilepath); + if (rename) { + preMappedFile = curMappedFile; + curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize); ++ initIndexFileHeader(curMappedFile); + preFileExists = true; + } + } ++ + if (preFileExists) { + synchronized (TieredIndexFile.class) { + if (inflightCompactFuture.isDone()) { +- inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit(new CompactTask(storeConfig, preMappedFile, flatFile), null); ++ inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit( ++ new CompactTask(storeConfig, preMappedFile, flatFile), null); + } + } + } +@@ -261,7 +274,8 @@ public class TieredIndexFile { + } + } + +- public CompletableFuture>> queryAsync(String topic, String key, long beginTime, long endTime) { ++ public CompletableFuture>> queryAsync(String topic, String key, long beginTime, ++ long endTime) { + int hashCode = indexKeyHashMethod(buildKey(topic, key)); + int slotPosition = hashCode % maxHashSlotNum; + List fileSegmentList = flatFile.getFileListByTime(beginTime, endTime); +@@ -355,7 +369,7 @@ public class TieredIndexFile { + private final int fileMaxSize; + private MappedFile originFile; + private TieredFlatFile fileQueue; +- private final MappedFile compactFile; ++ private MappedFile compactFile; + + public CompactTask(TieredMessageStoreConfig storeConfig, MappedFile originFile, + TieredFlatFile fileQueue) throws IOException { +@@ -381,6 +395,17 @@ public class TieredIndexFile { + } catch (Throwable throwable) { + logger.error("TieredIndexFile#compactTask: compact index file failed:", throwable); + } ++ ++ try { ++ if (originFile != null) { ++ originFile.destroy(-1); ++ } ++ if (compactFile != null) { ++ compactFile.destroy(-1); ++ } ++ } catch (Throwable throwable) { ++ logger.error("TieredIndexFile#compactTask: destroy index file failed:", throwable); ++ } + } + + public void compact() { +@@ -396,6 +421,8 @@ public class TieredIndexFile { + fileQueue.commit(true); + compactFile.destroy(-1); + originFile.destroy(-1); ++ compactFile = null; ++ originFile = null; + } + + private void buildCompactFile() { +@@ -414,6 +441,7 @@ public class TieredIndexFile { + if (slotValue != -1) { + int indexTotalSize = 0; + int indexPosition = slotValue; ++ + while (indexPosition >= 0 && indexPosition < maxIndexNum) { + int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE + + indexPosition * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE; +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java +index 7ef49578d..262d6645b 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java +@@ -19,9 +19,8 @@ package org.apache.rocketmq.tieredstore.file; + import com.sun.jna.Platform; + import java.io.IOException; + import java.nio.ByteBuffer; ++import java.time.Duration; + import java.util.List; +-import java.util.concurrent.TimeUnit; +-import org.apache.commons.lang3.SystemUtils; + import org.apache.commons.lang3.tuple.Pair; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; +@@ -31,9 +30,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; + import org.awaitility.Awaitility; + import org.junit.After; + import org.junit.Assert; +-import org.junit.Assume; + import org.junit.Before; +-import org.junit.Ignore; + import org.junit.Test; + + public class TieredIndexFileTest { +@@ -45,11 +42,12 @@ public class TieredIndexFileTest { + @Before + public void setUp() { + storeConfig = new TieredMessageStoreConfig(); ++ storeConfig.setBrokerName("IndexFileBroker"); + storeConfig.setStorePathRootDir(storePath); +- storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment"); +- storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2); +- storeConfig.setTieredStoreIndexFileMaxIndexNum(3); +- mq = new MessageQueue("TieredIndexFileTest", storeConfig.getBrokerName(), 1); ++ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment"); ++ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5); ++ storeConfig.setTieredStoreIndexFileMaxIndexNum(20); ++ mq = new MessageQueue("IndexFileTest", storeConfig.getBrokerName(), 1); + TieredStoreUtil.getMetadataStore(storeConfig); + TieredStoreExecutor.init(); + } +@@ -61,77 +59,33 @@ public class TieredIndexFileTest { + TieredStoreExecutor.shutdown(); + } + +- @Ignore + @Test + public void testAppendAndQuery() throws IOException, ClassNotFoundException, NoSuchMethodException { + if (Platform.isWindows()) { + return; + } + +- // skip this test on windows +- Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS); +- + TieredFileAllocator fileQueueFactory = new TieredFileAllocator(storeConfig); + TieredIndexFile indexFile = new TieredIndexFile(fileQueueFactory, storePath); ++ + indexFile.append(mq, 0, "key3", 3, 300, 1000); + indexFile.append(mq, 0, "key2", 2, 200, 1100); + indexFile.append(mq, 0, "key1", 1, 100, 1200); + +- Awaitility.waitAtMost(5, TimeUnit.SECONDS) +- .until(() -> { +- List> indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); +- if (indexList.size() != 1) { +- return false; +- } +- +- ByteBuffer indexBuffer = indexList.get(0).getValue(); +- Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 2, indexBuffer.remaining()); +- +- Assert.assertEquals(1, indexBuffer.getLong(4 + 4 + 4)); +- Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8)); +- Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8 + 4)); +- +- Assert.assertEquals(3, indexBuffer.getLong(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4)); +- Assert.assertEquals(300, indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4 + 8)); +- Assert.assertEquals(0, indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4 + 8 + 4)); +- return true; +- }); +- +- indexFile.append(mq, 0, "key4", 4, 400, 1300); +- indexFile.append(mq, 0, "key4", 4, 400, 1300); +- indexFile.append(mq, 0, "key4", 4, 400, 1300); +- +- Awaitility.waitAtMost(5, TimeUnit.SECONDS) +- .until(() -> { +- List> indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1300, 1300).join(); +- if (indexList.size() != 1) { +- return false; +- } +- +- ByteBuffer indexBuffer = indexList.get(0).getValue(); +- Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3, indexBuffer.remaining()); +- Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4)); +- Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8)); +- Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4)); +- return true; +- }); +- +- List> indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1300, 1300).join(); ++ // do not do schedule task here ++ TieredStoreExecutor.shutdown(); ++ List> indexList = ++ indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); + Assert.assertEquals(0, indexList.size()); + +- indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1200, 1300).join(); +- Assert.assertEquals(2, indexList.size()); +- +- ByteBuffer indexBuffer = indexList.get(0).getValue(); +- Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3, indexBuffer.remaining()); +- Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4)); +- Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8)); +- Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4)); ++ // do compaction once ++ TieredStoreExecutor.init(); ++ storeConfig.setTieredStoreIndexFileRollingIdleInterval(0); ++ indexFile.doScheduleTask(); ++ Awaitility.await().atMost(Duration.ofSeconds(10)) ++ .until(() -> !indexFile.getPreMappedFile().getFile().exists()); + +- indexBuffer = indexList.get(1).getValue(); +- Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE, indexBuffer.remaining()); +- Assert.assertEquals(2, indexBuffer.getLong(4 + 4 + 4)); +- Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8)); +- Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8 + 4)); ++ indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); ++ Assert.assertEquals(1, indexList.size()); + } + } +-- +2.32.0.windows.2 + + +From 69c26d3d29cde7b4484ecd112ab9224f9f42bf45 Mon Sep 17 00:00:00 2001 +From: guyinyou <36399867+guyinyou@users.noreply.github.com> +Date: Wed, 23 Aug 2023 10:27:52 +0800 +Subject: [PATCH 5/8] [ISSUE #7228] Converge the use of some important + variables for some class + +--- + .../apache/rocketmq/store/ConsumeQueue.java | 16 ++++++------ + .../rocketmq/store/MappedFileQueue.java | 26 +++++++++++-------- + .../store/MultiPathMappedFileQueue.java | 4 +-- + 3 files changed, 24 insertions(+), 22 deletions(-) + +diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +index a0b886eb0..56bee2af3 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java ++++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +@@ -145,7 +145,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { + + if (offset >= 0 && size > 0) { + mappedFileOffset = i + CQ_STORE_UNIT_SIZE; +- this.maxPhysicOffset = offset + size; ++ this.setMaxPhysicOffset(offset + size); + if (isExtAddr(tagsCode)) { + maxExtAddr = tagsCode; + } +@@ -409,7 +409,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { + + int logicFileSize = this.mappedFileSize; + +- this.maxPhysicOffset = phyOffset; ++ this.setMaxPhysicOffset(phyOffset); + long maxExtAddr = 1; + boolean shouldDeleteFile = false; + while (true) { +@@ -435,7 +435,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { + mappedFile.setWrotePosition(pos); + mappedFile.setCommittedPosition(pos); + mappedFile.setFlushedPosition(pos); +- this.maxPhysicOffset = offset + size; ++ this.setMaxPhysicOffset(offset + size); + // This maybe not take effect, when not every consume queue has extend file. + if (isExtAddr(tagsCode)) { + maxExtAddr = tagsCode; +@@ -453,7 +453,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { + mappedFile.setWrotePosition(pos); + mappedFile.setCommittedPosition(pos); + mappedFile.setFlushedPosition(pos); +- this.maxPhysicOffset = offset + size; ++ this.setMaxPhysicOffset(offset + size); + if (isExtAddr(tagsCode)) { + maxExtAddr = tagsCode; + } +@@ -881,8 +881,8 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { + private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, + final long cqOffset) { + +- if (offset + size <= this.maxPhysicOffset) { +- log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); ++ if (offset + size <= this.getMaxPhysicOffset()) { ++ log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", this.getMaxPhysicOffset(), offset); + return true; + } + +@@ -926,7 +926,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { + ); + } + } +- this.maxPhysicOffset = offset + size; ++ this.setMaxPhysicOffset(offset + size); + return mappedFile.appendMessage(this.byteBufferIndex.array()); + } + return false; +@@ -1130,7 +1130,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { + + @Override + public void destroy() { +- this.maxPhysicOffset = -1; ++ this.setMaxPhysicOffset(-1); + this.minLogicOffset = 0; + this.mappedFileQueue.destroy(); + if (isExtReadEnable()) { +diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +index 0bc70642f..32b90d14f 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java ++++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +@@ -285,7 +285,7 @@ public class MappedFileQueue implements Swappable { + if (this.mappedFiles.isEmpty()) + return 0; + +- long committed = this.flushedWhere; ++ long committed = this.getFlushedWhere(); + if (committed != 0) { + MappedFile mappedFile = this.getLastMappedFile(0, false); + if (mappedFile != null) { +@@ -442,11 +442,11 @@ public class MappedFileQueue implements Swappable { + } + + public long remainHowManyDataToCommit() { +- return getMaxWrotePosition() - committedWhere; ++ return getMaxWrotePosition() - getCommittedWhere(); + } + + public long remainHowManyDataToFlush() { +- return getMaxOffset() - flushedWhere; ++ return getMaxOffset() - this.getFlushedWhere(); + } + + public void deleteLastMappedFile() { +@@ -616,15 +616,15 @@ public class MappedFileQueue implements Swappable { + + public boolean flush(final int flushLeastPages) { + boolean result = true; +- MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); ++ MappedFile mappedFile = this.findMappedFileByOffset(this.getFlushedWhere(), this.getFlushedWhere() == 0); + if (mappedFile != null) { + long tmpTimeStamp = mappedFile.getStoreTimestamp(); + int offset = mappedFile.flush(flushLeastPages); + long where = mappedFile.getFileFromOffset() + offset; +- result = where == this.flushedWhere; +- this.flushedWhere = where; ++ result = where == this.getFlushedWhere(); ++ this.setFlushedWhere(where); + if (0 == flushLeastPages) { +- this.storeTimestamp = tmpTimeStamp; ++ this.setStoreTimestamp(tmpTimeStamp); + } + } + +@@ -633,12 +633,12 @@ public class MappedFileQueue implements Swappable { + + public synchronized boolean commit(final int commitLeastPages) { + boolean result = true; +- MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0); ++ MappedFile mappedFile = this.findMappedFileByOffset(this.getCommittedWhere(), this.getCommittedWhere() == 0); + if (mappedFile != null) { + int offset = mappedFile.commit(commitLeastPages); + long where = mappedFile.getFileFromOffset() + offset; +- result = where == this.committedWhere; +- this.committedWhere = where; ++ result = where == this.getCommittedWhere(); ++ this.setCommittedWhere(where); + } + + return result; +@@ -763,7 +763,7 @@ public class MappedFileQueue implements Swappable { + mf.destroy(1000 * 3); + } + this.mappedFiles.clear(); +- this.flushedWhere = 0; ++ this.setFlushedWhere(0); + + // delete parent directory + File file = new File(storePath); +@@ -848,6 +848,10 @@ public class MappedFileQueue implements Swappable { + return storeTimestamp; + } + ++ public void setStoreTimestamp(long storeTimestamp) { ++ this.storeTimestamp = storeTimestamp; ++ } ++ + public List getMappedFiles() { + return mappedFiles; + } +diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java +index 8f5af9438..8ff050dfe 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java ++++ b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java +@@ -16,7 +16,6 @@ + */ + package org.apache.rocketmq.store; + +- + import java.util.Arrays; + import java.util.HashSet; + import java.util.Set; +@@ -113,8 +112,7 @@ public class MultiPathMappedFileQueue extends MappedFileQueue { + mf.destroy(1000 * 3); + } + this.mappedFiles.clear(); +- this.flushedWhere = 0; +- ++ this.setFlushedWhere(0); + + Set storePathSet = getPaths(); + storePathSet.addAll(getReadonlyPaths()); +-- +2.32.0.windows.2 + + +From 3884f595949462044c5cb3c236199bc1d7ad2341 Mon Sep 17 00:00:00 2001 +From: =?UTF-8?q?=E7=9F=B3=E8=87=BB=E8=87=BB=28Steven=20shi=29?= + +Date: Wed, 23 Aug 2023 11:10:30 +0800 +Subject: [PATCH 6/8] [ISSUE #7149] When creating and updating Topic, there + will be problems with permission settings (#7151) +MIME-Version: 1.0 +Content-Type: text/plain; charset=UTF-8 +Content-Transfer-Encoding: 8bit + +* [ISSUE #7149] fix bug : When creating and updating Topic, there will be problems with permission settings + +* [ISSUE #7149] fix bug : When creating and updating Topic, there will be problems with permission settings + +* [issue#7249] + +--------- + +Co-authored-by: 十真 +--- + .../main/java/org/apache/rocketmq/broker/BrokerController.java | 3 ++- + 1 file changed, 2 insertions(+), 1 deletion(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +index 13f9d002b..e8f943702 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +@@ -1733,7 +1733,8 @@ public class BrokerController { + new TopicConfig(topicConfig.getTopicName(), + topicConfig.getReadQueueNums(), + topicConfig.getWriteQueueNums(), +- this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag()); ++ topicConfig.getPerm() ++ & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag()); + } else { + registerTopicConfig = new TopicConfig(topicConfig); + } +-- +2.32.0.windows.2 + + +From 017ad110475e8024585327b44f47e5e97aabc63b Mon Sep 17 00:00:00 2001 +From: echooymxq +Date: Wed, 23 Aug 2023 11:11:42 +0800 +Subject: [PATCH 7/8] [ISSUE #7219] Fix Concurrent modify syncStateSet and Mark + synchronizing frequently when shrink. (#7220) + +--- + .../broker/controller/ReplicasManager.java | 29 ++++++++++--------- + .../ha/autoswitch/AutoSwitchHAService.java | 21 ++++++++------ + 2 files changed, 28 insertions(+), 22 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +index abae7cdb0..37c82e434 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +@@ -542,7 +542,7 @@ public class ReplicasManager { + this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId()); + this.tempBrokerMetadata.clear(); + this.brokerControllerId = this.brokerMetadata.getBrokerId(); +- this.haService.setBrokerControllerId(this.brokerControllerId); ++ this.haService.setLocalBrokerId(this.brokerControllerId); + return true; + } catch (Exception e) { + LOGGER.error("fail to create metadata file", e); +@@ -594,7 +594,7 @@ public class ReplicasManager { + if (this.brokerMetadata.isLoaded()) { + this.registerState = RegisterState.CREATE_METADATA_FILE_DONE; + this.brokerControllerId = brokerMetadata.getBrokerId(); +- this.haService.setBrokerControllerId(this.brokerControllerId); ++ this.haService.setLocalBrokerId(this.brokerControllerId); + return; + } + // 2. check if temp metadata exist +@@ -735,23 +735,26 @@ public class ReplicasManager { + if (this.checkSyncStateSetTaskFuture != null) { + this.checkSyncStateSetTaskFuture.cancel(false); + } +- this.checkSyncStateSetTaskFuture = this.scheduledService.scheduleAtFixedRate(() -> { +- checkSyncStateSetAndDoReport(); +- }, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(), TimeUnit.MILLISECONDS); ++ this.checkSyncStateSetTaskFuture = this.scheduledService.scheduleAtFixedRate(this::checkSyncStateSetAndDoReport, 3 * 1000, ++ this.brokerConfig.getCheckSyncStateSetPeriod(), TimeUnit.MILLISECONDS); + } + + private void checkSyncStateSetAndDoReport() { +- final Set newSyncStateSet = this.haService.maybeShrinkSyncStateSet(); +- newSyncStateSet.add(this.brokerControllerId); +- synchronized (this) { +- if (this.syncStateSet != null) { +- // Check if syncStateSet changed +- if (this.syncStateSet.size() == newSyncStateSet.size() && this.syncStateSet.containsAll(newSyncStateSet)) { +- return; ++ try { ++ final Set newSyncStateSet = this.haService.maybeShrinkSyncStateSet(); ++ newSyncStateSet.add(this.brokerControllerId); ++ synchronized (this) { ++ if (this.syncStateSet != null) { ++ // Check if syncStateSet changed ++ if (this.syncStateSet.size() == newSyncStateSet.size() && this.syncStateSet.containsAll(newSyncStateSet)) { ++ return; ++ } + } + } ++ doReportSyncStateSetChanged(newSyncStateSet); ++ } catch (Exception e) { ++ LOGGER.error("Check syncStateSet error", e); + } +- doReportSyncStateSetChanged(newSyncStateSet); + } + + private void doReportSyncStateSetChanged(Set newSyncStateSet) { +diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +index 6dc734e0c..d5393fdca 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java ++++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +@@ -41,6 +41,7 @@ import java.nio.channels.SocketChannel; + import java.util.ArrayList; + import java.util.HashSet; + import java.util.List; ++import java.util.Iterator; + import java.util.Map; + import java.util.Objects; + import java.util.Set; +@@ -73,7 +74,7 @@ public class AutoSwitchHAService extends DefaultHAService { + private EpochFileCache epochCache; + private AutoSwitchHAClient haClient; + +- private Long brokerControllerId = null; ++ private Long localBrokerId = null; + + public AutoSwitchHAService() { + } +@@ -287,9 +288,11 @@ public class AutoSwitchHAService extends DefaultHAService { + + // If the slaveBrokerId is in syncStateSet but not in connectionCaughtUpTimeTable, + // it means that the broker has not connected. +- for (Long slaveBrokerId : newSyncStateSet) { +- if (!this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) { +- newSyncStateSet.remove(slaveBrokerId); ++ Iterator iterator = newSyncStateSet.iterator(); ++ while (iterator.hasNext()) { ++ Long slaveBrokerId = iterator.next(); ++ if (!Objects.equals(slaveBrokerId, this.localBrokerId) && !this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) { ++ iterator.remove(); + isSyncStateSetChanged = true; + } + } +@@ -419,7 +422,7 @@ public class AutoSwitchHAService extends DefaultHAService { + // To avoid the syncStateSet is not consistent with connectionList. + // Fix issue: https://github.com/apache/rocketmq/issues/6662 + for (Long syncId : currentSyncStateSet) { +- if (!idList.contains(syncId) && this.brokerControllerId != null && !Objects.equals(syncId, this.brokerControllerId)) { ++ if (!idList.contains(syncId) && this.localBrokerId != null && !Objects.equals(syncId, this.localBrokerId)) { + LOGGER.warn("Slave {} is still in syncStateSet, but has lost its connection. So new offset can't be compute.", syncId); + // Without check and re-compute, return the confirmOffset's value directly. + return this.defaultMessageStore.getConfirmOffsetDirectly(); +@@ -545,12 +548,12 @@ public class AutoSwitchHAService extends DefaultHAService { + return this.epochCache.getAllEntries(); + } + +- public Long getBrokerControllerId() { +- return brokerControllerId; ++ public Long getLocalBrokerId() { ++ return localBrokerId; + } + +- public void setBrokerControllerId(Long brokerControllerId) { +- this.brokerControllerId = brokerControllerId; ++ public void setLocalBrokerId(Long localBrokerId) { ++ this.localBrokerId = localBrokerId; + } + + class AutoSwitchAcceptSocketService extends AcceptSocketService { +-- +2.32.0.windows.2 + + +From 77e8e54b37c3fc3ea0beffc1ace6f5bf20af10d9 Mon Sep 17 00:00:00 2001 +From: lk +Date: Wed, 23 Aug 2023 15:56:39 +0800 +Subject: [PATCH 8/8] [ISSUE #7223] Support batch ack for grpc client in proxy + (#7225) + +--- + .../client/impl/mqclient/MQClientAPIExt.java | 26 +++ + .../rocketmq/proxy/config/ProxyConfig.java | 10 + + .../grpc/v2/consumer/AckMessageActivity.java | 136 ++++++++--- + .../proxy/processor/AbstractProcessor.java | 4 +- + .../proxy/processor/BatchAckResult.java | 53 +++++ + .../proxy/processor/ConsumerProcessor.java | 64 +++++ + .../processor/DefaultMessagingProcessor.java | 7 + + .../proxy/processor/MessagingProcessor.java | 18 ++ + .../message/ClusterMessageService.java | 16 +- + .../service/message/LocalMessageService.java | 58 +++++ + .../proxy/service/message/MessageService.java | 8 + + .../service/message/ReceiptHandleMessage.java | 39 ++++ + .../v2/consumer/AckMessageActivityTest.java | 221 +++++++++++++++--- + .../proxy/processor/BaseProcessorTest.java | 18 +- + .../processor/ConsumerProcessorTest.java | 115 +++++++++ + .../service/mqclient/MQClientAPIExtTest.java | 12 + + 16 files changed, 728 insertions(+), 77 deletions(-) + create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java + create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java + +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java +index fb8f8d11f..d7c8ef8d9 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java +@@ -306,6 +306,32 @@ public class MQClientAPIExt extends MQClientAPIImpl { + return future; + } + ++ public CompletableFuture batchAckMessageAsync( ++ String brokerAddr, ++ String topic, ++ String consumerGroup, ++ List extraInfoList, ++ long timeoutMillis ++ ) { ++ CompletableFuture future = new CompletableFuture<>(); ++ try { ++ this.batchAckMessageAsync(brokerAddr, timeoutMillis, new AckCallback() { ++ @Override ++ public void onSuccess(AckResult ackResult) { ++ future.complete(ackResult); ++ } ++ ++ @Override ++ public void onException(Throwable t) { ++ future.completeExceptionally(t); ++ } ++ }, topic, consumerGroup, extraInfoList); ++ } catch (Throwable t) { ++ future.completeExceptionally(t); ++ } ++ return future; ++ } ++ + public CompletableFuture changeInvisibleTimeAsync( + String brokerAddr, + String brokerName, +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +index 39caaa0d9..76a243919 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +@@ -250,6 +250,8 @@ public class ProxyConfig implements ConfigFile { + private long remotingWaitTimeMillsInTopicRouteQueue = 3 * 1000; + private long remotingWaitTimeMillsInDefaultQueue = 3 * 1000; + ++ private boolean enableBatchAck = false; ++ + @Override + public void initData() { + parseDelayLevel(); +@@ -1379,4 +1381,12 @@ public class ProxyConfig implements ConfigFile { + public void setRemotingWaitTimeMillsInDefaultQueue(long remotingWaitTimeMillsInDefaultQueue) { + this.remotingWaitTimeMillsInDefaultQueue = remotingWaitTimeMillsInDefaultQueue; + } ++ ++ public boolean isEnableBatchAck() { ++ return enableBatchAck; ++ } ++ ++ public void setEnableBatchAck(boolean enableBatchAck) { ++ this.enableBatchAck = enableBatchAck; ++ } + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java +index 9a3a77201..97c716c8f 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java +@@ -31,12 +31,15 @@ import org.apache.rocketmq.client.consumer.AckStatus; + import org.apache.rocketmq.common.consumer.ReceiptHandle; + import org.apache.rocketmq.proxy.common.MessageReceiptHandle; + import org.apache.rocketmq.proxy.common.ProxyContext; ++import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity; + import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; + import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; + import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; + import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; ++import org.apache.rocketmq.proxy.processor.BatchAckResult; + import org.apache.rocketmq.proxy.processor.MessagingProcessor; ++import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; + + public class AckMessageActivity extends AbstractMessingActivity { + +@@ -50,60 +53,98 @@ public class AckMessageActivity extends AbstractMessingActivity { + + try { + validateTopicAndConsumerGroup(request.getTopic(), request.getGroup()); +- +- CompletableFuture[] futures = new CompletableFuture[request.getEntriesCount()]; +- for (int i = 0; i < request.getEntriesCount(); i++) { +- futures[i] = processAckMessage(ctx, request, request.getEntries(i)); ++ String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); ++ String topic = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()); ++ if (ConfigurationManager.getProxyConfig().isEnableBatchAck()) { ++ future = ackMessageInBatch(ctx, group, topic, request); ++ } else { ++ future = ackMessageOneByOne(ctx, group, topic, request); + } +- CompletableFuture.allOf(futures).whenComplete((val, throwable) -> { +- if (throwable != null) { +- future.completeExceptionally(throwable); +- return; +- } ++ } catch (Throwable t) { ++ future.completeExceptionally(t); ++ } ++ return future; ++ } ++ ++ protected CompletableFuture ackMessageInBatch(ProxyContext ctx, String group, String topic, AckMessageRequest request) { ++ List handleMessageList = new ArrayList<>(request.getEntriesCount()); + ++ for (AckMessageEntry ackMessageEntry : request.getEntriesList()) { ++ String handleString = getHandleString(ctx, group, request, ackMessageEntry); ++ handleMessageList.add(new ReceiptHandleMessage(ReceiptHandle.decode(handleString), ackMessageEntry.getMessageId())); ++ } ++ return this.messagingProcessor.batchAckMessage(ctx, handleMessageList, group, topic) ++ .thenApply(batchAckResultList -> { ++ AckMessageResponse.Builder responseBuilder = AckMessageResponse.newBuilder(); + Set responseCodes = new HashSet<>(); +- List entryList = new ArrayList<>(); +- for (CompletableFuture entryFuture : futures) { +- AckMessageResultEntry entryResult = entryFuture.join(); +- responseCodes.add(entryResult.getStatus().getCode()); +- entryList.add(entryResult); +- } +- AckMessageResponse.Builder responseBuilder = AckMessageResponse.newBuilder() +- .addAllEntries(entryList); +- if (responseCodes.size() > 1) { +- responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.MULTIPLE_RESULTS, Code.MULTIPLE_RESULTS.name())); +- } else if (responseCodes.size() == 1) { +- Code code = responseCodes.stream().findAny().get(); +- responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(code, code.name())); +- } else { +- responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack message result is empty")); ++ for (BatchAckResult batchAckResult : batchAckResultList) { ++ AckMessageResultEntry entry = convertToAckMessageResultEntry(batchAckResult); ++ responseBuilder.addEntries(entry); ++ responseCodes.add(entry.getStatus().getCode()); + } +- future.complete(responseBuilder.build()); ++ setAckResponseStatus(responseBuilder, responseCodes); ++ return responseBuilder.build(); + }); +- } catch (Throwable t) { +- future.completeExceptionally(t); ++ } ++ ++ protected AckMessageResultEntry convertToAckMessageResultEntry(BatchAckResult batchAckResult) { ++ ReceiptHandleMessage handleMessage = batchAckResult.getReceiptHandleMessage(); ++ AckMessageResultEntry.Builder resultBuilder = AckMessageResultEntry.newBuilder() ++ .setMessageId(handleMessage.getMessageId()) ++ .setReceiptHandle(handleMessage.getReceiptHandle().getReceiptHandle()); ++ if (batchAckResult.getProxyException() != null) { ++ resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(batchAckResult.getProxyException())); ++ } else { ++ AckResult ackResult = batchAckResult.getAckResult(); ++ if (AckStatus.OK.equals(ackResult.getStatus())) { ++ resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())); ++ } else { ++ resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack failed: status is abnormal")); ++ } + } +- return future; ++ return resultBuilder.build(); + } + +- protected CompletableFuture processAckMessage(ProxyContext ctx, AckMessageRequest request, ++ protected CompletableFuture ackMessageOneByOne(ProxyContext ctx, String group, String topic, AckMessageRequest request) { ++ CompletableFuture resultFuture = new CompletableFuture<>(); ++ CompletableFuture[] futures = new CompletableFuture[request.getEntriesCount()]; ++ for (int i = 0; i < request.getEntriesCount(); i++) { ++ futures[i] = processAckMessage(ctx, group, topic, request, request.getEntries(i)); ++ } ++ CompletableFuture.allOf(futures).whenComplete((val, throwable) -> { ++ if (throwable != null) { ++ resultFuture.completeExceptionally(throwable); ++ return; ++ } ++ ++ Set responseCodes = new HashSet<>(); ++ List entryList = new ArrayList<>(); ++ for (CompletableFuture entryFuture : futures) { ++ AckMessageResultEntry entryResult = entryFuture.join(); ++ responseCodes.add(entryResult.getStatus().getCode()); ++ entryList.add(entryResult); ++ } ++ AckMessageResponse.Builder responseBuilder = AckMessageResponse.newBuilder() ++ .addAllEntries(entryList); ++ setAckResponseStatus(responseBuilder, responseCodes); ++ resultFuture.complete(responseBuilder.build()); ++ }); ++ return resultFuture; ++ } ++ ++ protected CompletableFuture processAckMessage(ProxyContext ctx, String group, String topic, AckMessageRequest request, + AckMessageEntry ackMessageEntry) { + CompletableFuture future = new CompletableFuture<>(); + + try { +- String handleString = ackMessageEntry.getReceiptHandle(); +- +- String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); +- MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle()); +- if (messageReceiptHandle != null) { +- handleString = messageReceiptHandle.getReceiptHandleStr(); +- } ++ String handleString = this.getHandleString(ctx, group, request, ackMessageEntry); + CompletableFuture ackResultFuture = this.messagingProcessor.ackMessage( + ctx, + ReceiptHandle.decode(handleString), + ackMessageEntry.getMessageId(), + group, +- GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic())); ++ topic ++ ); + ackResultFuture.thenAccept(result -> { + future.complete(convertToAckMessageResultEntry(ctx, ackMessageEntry, result)); + }).exceptionally(t -> { +@@ -139,4 +180,25 @@ public class AckMessageActivity extends AbstractMessingActivity { + .setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack failed: status is abnormal")) + .build(); + } ++ ++ protected void setAckResponseStatus(AckMessageResponse.Builder responseBuilder, Set responseCodes) { ++ if (responseCodes.size() > 1) { ++ responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.MULTIPLE_RESULTS, Code.MULTIPLE_RESULTS.name())); ++ } else if (responseCodes.size() == 1) { ++ Code code = responseCodes.stream().findAny().get(); ++ responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(code, code.name())); ++ } else { ++ responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack message result is empty")); ++ } ++ } ++ ++ protected String getHandleString(ProxyContext ctx, String group, AckMessageRequest request, AckMessageEntry ackMessageEntry) { ++ String handleString = ackMessageEntry.getReceiptHandle(); ++ ++ MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle()); ++ if (messageReceiptHandle != null) { ++ handleString = messageReceiptHandle.getReceiptHandleStr(); ++ } ++ return handleString; ++ } + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java +index b61c3df9e..c63212c23 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java +@@ -27,6 +27,8 @@ public abstract class AbstractProcessor extends AbstractStartAndShutdown { + protected MessagingProcessor messagingProcessor; + protected ServiceManager serviceManager; + ++ protected static final ProxyException EXPIRED_HANDLE_PROXY_EXCEPTION = new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is expired"); ++ + public AbstractProcessor(MessagingProcessor messagingProcessor, + ServiceManager serviceManager) { + this.messagingProcessor = messagingProcessor; +@@ -35,7 +37,7 @@ public abstract class AbstractProcessor extends AbstractStartAndShutdown { + + protected void validateReceiptHandle(ReceiptHandle handle) { + if (handle.isExpired()) { +- throw new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is expired"); ++ throw EXPIRED_HANDLE_PROXY_EXCEPTION; + } + } + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java +new file mode 100644 +index 000000000..dfb9c9b9e +--- /dev/null ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java +@@ -0,0 +1,53 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.proxy.processor; ++ ++import org.apache.rocketmq.client.consumer.AckResult; ++import org.apache.rocketmq.proxy.common.ProxyException; ++import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; ++ ++public class BatchAckResult { ++ ++ private final ReceiptHandleMessage receiptHandleMessage; ++ private AckResult ackResult; ++ private ProxyException proxyException; ++ ++ public BatchAckResult(ReceiptHandleMessage receiptHandleMessage, ++ AckResult ackResult) { ++ this.receiptHandleMessage = receiptHandleMessage; ++ this.ackResult = ackResult; ++ } ++ ++ public BatchAckResult(ReceiptHandleMessage receiptHandleMessage, ++ ProxyException proxyException) { ++ this.receiptHandleMessage = receiptHandleMessage; ++ this.proxyException = proxyException; ++ } ++ ++ public ReceiptHandleMessage getReceiptHandleMessage() { ++ return receiptHandleMessage; ++ } ++ ++ public AckResult getAckResult() { ++ return ackResult; ++ } ++ ++ public ProxyException getProxyException() { ++ return proxyException; ++ } ++} +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +index 656a6339d..f3522b374 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +@@ -48,6 +48,7 @@ import org.apache.rocketmq.proxy.common.ProxyExceptionCode; + import org.apache.rocketmq.proxy.common.utils.FutureUtils; + import org.apache.rocketmq.proxy.common.utils.ProxyUtils; + import org.apache.rocketmq.proxy.service.ServiceManager; ++import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; + import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; + import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; + import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; +@@ -241,6 +242,69 @@ public class ConsumerProcessor extends AbstractProcessor { + return FutureUtils.addExecutor(future, this.executor); + } + ++ public CompletableFuture> batchAckMessage( ++ ProxyContext ctx, ++ List handleMessageList, ++ String consumerGroup, ++ String topic, ++ long timeoutMillis ++ ) { ++ CompletableFuture> future = new CompletableFuture<>(); ++ try { ++ List batchAckResultList = new ArrayList<>(handleMessageList.size()); ++ Map> brokerHandleListMap = new HashMap<>(); ++ ++ for (ReceiptHandleMessage handleMessage : handleMessageList) { ++ if (handleMessage.getReceiptHandle().isExpired()) { ++ batchAckResultList.add(new BatchAckResult(handleMessage, EXPIRED_HANDLE_PROXY_EXCEPTION)); ++ continue; ++ } ++ List brokerHandleList = brokerHandleListMap.computeIfAbsent(handleMessage.getReceiptHandle().getBrokerName(), key -> new ArrayList<>()); ++ brokerHandleList.add(handleMessage); ++ } ++ ++ if (brokerHandleListMap.isEmpty()) { ++ return FutureUtils.addExecutor(CompletableFuture.completedFuture(batchAckResultList), this.executor); ++ } ++ Set>> brokerHandleListMapEntrySet = brokerHandleListMap.entrySet(); ++ CompletableFuture>[] futures = new CompletableFuture[brokerHandleListMapEntrySet.size()]; ++ int futureIndex = 0; ++ for (Map.Entry> entry : brokerHandleListMapEntrySet) { ++ futures[futureIndex++] = processBrokerHandle(ctx, consumerGroup, topic, entry.getValue(), timeoutMillis); ++ } ++ CompletableFuture.allOf(futures).whenComplete((val, throwable) -> { ++ if (throwable != null) { ++ future.completeExceptionally(throwable); ++ } ++ for (CompletableFuture> resultFuture : futures) { ++ batchAckResultList.addAll(resultFuture.join()); ++ } ++ future.complete(batchAckResultList); ++ }); ++ } catch (Throwable t) { ++ future.completeExceptionally(t); ++ } ++ return FutureUtils.addExecutor(future, this.executor); ++ } ++ ++ protected CompletableFuture> processBrokerHandle(ProxyContext ctx, String consumerGroup, String topic, List handleMessageList, long timeoutMillis) { ++ return this.serviceManager.getMessageService().batchAckMessage(ctx, handleMessageList, consumerGroup, topic, timeoutMillis) ++ .thenApply(result -> { ++ List results = new ArrayList<>(); ++ for (ReceiptHandleMessage handleMessage : handleMessageList) { ++ results.add(new BatchAckResult(handleMessage, result)); ++ } ++ return results; ++ }) ++ .exceptionally(throwable -> { ++ List results = new ArrayList<>(); ++ for (ReceiptHandleMessage handleMessage : handleMessageList) { ++ results.add(new BatchAckResult(handleMessage, new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, throwable.getMessage(), throwable))); ++ } ++ return results; ++ }); ++ } ++ + public CompletableFuture changeInvisibleTime(ProxyContext ctx, ReceiptHandle handle, + String messageId, String groupName, String topicName, long invisibleTime, long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +index 188cb7b9b..ba150051b 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +@@ -46,6 +46,7 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.config.ProxyConfig; + import org.apache.rocketmq.proxy.service.ServiceManager; + import org.apache.rocketmq.proxy.service.ServiceManagerFactory; ++import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; + import org.apache.rocketmq.proxy.service.metadata.MetadataService; + import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; + import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData; +@@ -183,6 +184,12 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen + return this.consumerProcessor.ackMessage(ctx, handle, messageId, consumerGroup, topic, timeoutMillis); + } + ++ @Override ++ public CompletableFuture> batchAckMessage(ProxyContext ctx, ++ List handleMessageList, String consumerGroup, String topic, long timeoutMillis) { ++ return this.consumerProcessor.batchAckMessage(ctx, handleMessageList, consumerGroup, topic, timeoutMillis); ++ } ++ + @Override + public CompletableFuture changeInvisibleTime(ProxyContext ctx, ReceiptHandle handle, String messageId, + String groupName, String topicName, long invisibleTime, long timeoutMillis) { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +index d86be0bd8..2ae7418ba 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +@@ -37,6 +37,7 @@ import org.apache.rocketmq.proxy.common.Address; + import org.apache.rocketmq.proxy.common.MessageReceiptHandle; + import org.apache.rocketmq.proxy.common.ProxyContext; + import org.apache.rocketmq.common.utils.StartAndShutdown; ++import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; + import org.apache.rocketmq.proxy.service.metadata.MetadataService; + import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; + import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData; +@@ -155,6 +156,23 @@ public interface MessagingProcessor extends StartAndShutdown { + long timeoutMillis + ); + ++ default CompletableFuture> batchAckMessage( ++ ProxyContext ctx, ++ List handleMessageList, ++ String consumerGroup, ++ String topic ++ ) { ++ return batchAckMessage(ctx, handleMessageList, consumerGroup, topic, DEFAULT_TIMEOUT_MILLS); ++ } ++ ++ CompletableFuture> batchAckMessage( ++ ProxyContext ctx, ++ List handleMessageList, ++ String consumerGroup, ++ String topic, ++ long timeoutMillis ++ ); ++ + default CompletableFuture changeInvisibleTime( + ProxyContext ctx, + ReceiptHandle handle, +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java +index 9f163f1b9..70b72deae 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java +@@ -20,9 +20,11 @@ import com.google.common.collect.Lists; + import java.util.List; + import java.util.Set; + import java.util.concurrent.CompletableFuture; ++import java.util.stream.Collectors; + import org.apache.rocketmq.client.consumer.AckResult; + import org.apache.rocketmq.client.consumer.PopResult; + import org.apache.rocketmq.client.consumer.PullResult; ++import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; + import org.apache.rocketmq.client.producer.SendResult; + import org.apache.rocketmq.common.consumer.ReceiptHandle; + import org.apache.rocketmq.common.message.Message; +@@ -31,7 +33,6 @@ import org.apache.rocketmq.proxy.common.ProxyContext; + import org.apache.rocketmq.proxy.common.ProxyException; + import org.apache.rocketmq.proxy.common.ProxyExceptionCode; + import org.apache.rocketmq.proxy.common.utils.FutureUtils; +-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; + import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; + import org.apache.rocketmq.proxy.service.route.TopicRouteService; + import org.apache.rocketmq.remoting.protocol.RemotingCommand; +@@ -137,6 +138,19 @@ public class ClusterMessageService implements MessageService { + ); + } + ++ @Override ++ public CompletableFuture batchAckMessage(ProxyContext ctx, List handleList, String consumerGroup, ++ String topic, long timeoutMillis) { ++ List extraInfoList = handleList.stream().map(message -> message.getReceiptHandle().getReceiptHandle()).collect(Collectors.toList()); ++ return this.mqClientAPIFactory.getClient().batchAckMessageAsync( ++ this.resolveBrokerAddrInReceiptHandle(ctx, handleList.get(0).getReceiptHandle()), ++ topic, ++ consumerGroup, ++ extraInfoList, ++ timeoutMillis ++ ); ++ } ++ + @Override + public CompletableFuture pullMessage(ProxyContext ctx, AddressableMessageQueue messageQueue, + PullMessageRequestHeader requestHeader, long timeoutMillis) { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +index eb2c4d9ee..ca7dcc9eb 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.service.message; + import io.netty.channel.ChannelHandlerContext; + import java.nio.ByteBuffer; + import java.util.ArrayList; ++import java.util.BitSet; + import java.util.Collections; + import java.util.HashMap; + import java.util.List; +@@ -54,6 +55,8 @@ import org.apache.rocketmq.remoting.RPCHook; + import org.apache.rocketmq.remoting.protocol.RemotingCommand; + import org.apache.rocketmq.remoting.protocol.RequestCode; + import org.apache.rocketmq.remoting.protocol.ResponseCode; ++import org.apache.rocketmq.remoting.protocol.body.BatchAck; ++import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody; + import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; + import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; + import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader; +@@ -364,6 +367,61 @@ public class LocalMessageService implements MessageService { + }); + } + ++ @Override ++ public CompletableFuture batchAckMessage(ProxyContext ctx, List handleList, ++ String consumerGroup, String topic, long timeoutMillis) { ++ SimpleChannel channel = channelManager.createChannel(ctx); ++ ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); ++ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null); ++ ++ Map batchAckMap = new HashMap<>(); ++ for (ReceiptHandleMessage receiptHandleMessage : handleList) { ++ String extraInfo = receiptHandleMessage.getReceiptHandle().getReceiptHandle(); ++ String[] extraInfoData = ExtraInfoUtil.split(extraInfo); ++ String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" + ++ ExtraInfoUtil.getQueueId(extraInfoData) + "@" + ++ ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" + ++ ExtraInfoUtil.getPopTime(extraInfoData); ++ BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> { ++ BatchAck newBatchAck = new BatchAck(); ++ newBatchAck.setConsumerGroup(consumerGroup); ++ newBatchAck.setTopic(topic); ++ newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData)); ++ newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData)); ++ newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData)); ++ newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData)); ++ newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData)); ++ newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData)); ++ newBatchAck.setBitSet(new BitSet()); ++ return newBatchAck; ++ }); ++ bAck.getBitSet().set((int) (ExtraInfoUtil.getQueueOffset(extraInfoData) - ExtraInfoUtil.getCkQueueOffset(extraInfoData))); ++ } ++ BatchAckMessageRequestBody requestBody = new BatchAckMessageRequestBody(); ++ requestBody.setBrokerName(brokerController.getBrokerConfig().getBrokerName()); ++ requestBody.setAcks(new ArrayList<>(batchAckMap.values())); ++ ++ command.setBody(requestBody.encode()); ++ CompletableFuture future = new CompletableFuture<>(); ++ try { ++ RemotingCommand response = brokerController.getAckMessageProcessor() ++ .processRequest(channelHandlerContext, command); ++ future.complete(response); ++ } catch (Exception e) { ++ log.error("Fail to process batchAckMessage command", e); ++ future.completeExceptionally(e); ++ } ++ return future.thenApply(r -> { ++ AckResult ackResult = new AckResult(); ++ if (ResponseCode.SUCCESS == r.getCode()) { ++ ackResult.setStatus(AckStatus.OK); ++ } else { ++ ackResult.setStatus(AckStatus.NO_EXIST); ++ } ++ return ackResult; ++ }); ++ } ++ + @Override + public CompletableFuture pullMessage(ProxyContext ctx, AddressableMessageQueue messageQueue, + PullMessageRequestHeader requestHeader, long timeoutMillis) { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java +index 15da17154..58a835adb 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java +@@ -91,6 +91,14 @@ public interface MessageService { + long timeoutMillis + ); + ++ CompletableFuture batchAckMessage( ++ ProxyContext ctx, ++ List handleList, ++ String consumerGroup, ++ String topic, ++ long timeoutMillis ++ ); ++ + CompletableFuture pullMessage( + ProxyContext ctx, + AddressableMessageQueue messageQueue, +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java +new file mode 100644 +index 000000000..ae63fed49 +--- /dev/null ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java +@@ -0,0 +1,39 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.proxy.service.message; ++ ++import org.apache.rocketmq.common.consumer.ReceiptHandle; ++ ++public class ReceiptHandleMessage { ++ ++ private final ReceiptHandle receiptHandle; ++ private final String messageId; ++ ++ public ReceiptHandleMessage(ReceiptHandle receiptHandle, String messageId) { ++ this.receiptHandle = receiptHandle; ++ this.messageId = messageId; ++ } ++ ++ public ReceiptHandle getReceiptHandle() { ++ return receiptHandle; ++ } ++ ++ public String getMessageId() { ++ return messageId; ++ } ++} +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java +index 49fdfc6a8..3c4746105 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java +@@ -20,21 +20,32 @@ package org.apache.rocketmq.proxy.grpc.v2.consumer; + import apache.rocketmq.v2.AckMessageEntry; + import apache.rocketmq.v2.AckMessageRequest; + import apache.rocketmq.v2.AckMessageResponse; ++import apache.rocketmq.v2.AckMessageResultEntry; + import apache.rocketmq.v2.Code; + import apache.rocketmq.v2.Resource; ++import java.util.ArrayList; ++import java.util.HashMap; ++import java.util.List; ++import java.util.Map; + import java.util.concurrent.CompletableFuture; + import org.apache.rocketmq.client.consumer.AckResult; + import org.apache.rocketmq.client.consumer.AckStatus; + import org.apache.rocketmq.proxy.common.ProxyException; + import org.apache.rocketmq.proxy.common.ProxyExceptionCode; ++import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest; ++import org.apache.rocketmq.proxy.processor.BatchAckResult; ++import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; + import org.junit.Before; + import org.junit.Test; ++import org.mockito.stubbing.Answer; + + import static org.junit.Assert.assertEquals; + import static org.mockito.ArgumentMatchers.any; ++import static org.mockito.ArgumentMatchers.anyList; + import static org.mockito.ArgumentMatchers.anyString; + import static org.mockito.ArgumentMatchers.eq; ++import static org.mockito.Mockito.doAnswer; + import static org.mockito.Mockito.when; + + public class AckMessageActivityTest extends BaseActivityTest { +@@ -52,43 +63,197 @@ public class AckMessageActivityTest extends BaseActivityTest { + + @Test + public void testAckMessage() throws Throwable { +- when(this.messagingProcessor.ackMessage(any(), any(), eq("msg1"), anyString(), anyString())) ++ ConfigurationManager.getProxyConfig().setEnableBatchAck(false); ++ ++ String msg1 = "msg1"; ++ String msg2 = "msg2"; ++ String msg3 = "msg3"; ++ ++ when(this.messagingProcessor.ackMessage(any(), any(), eq(msg1), anyString(), anyString())) + .thenThrow(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is expired")); + + AckResult msg2AckResult = new AckResult(); + msg2AckResult.setStatus(AckStatus.OK); +- when(this.messagingProcessor.ackMessage(any(), any(), eq("msg2"), anyString(), anyString())) ++ when(this.messagingProcessor.ackMessage(any(), any(), eq(msg2), anyString(), anyString())) + .thenReturn(CompletableFuture.completedFuture(msg2AckResult)); + + AckResult msg3AckResult = new AckResult(); + msg3AckResult.setStatus(AckStatus.NO_EXIST); +- when(this.messagingProcessor.ackMessage(any(), any(), eq("msg3"), anyString(), anyString())) ++ when(this.messagingProcessor.ackMessage(any(), any(), eq(msg3), anyString(), anyString())) + .thenReturn(CompletableFuture.completedFuture(msg3AckResult)); + +- AckMessageResponse response = this.ackMessageActivity.ackMessage( +- createContext(), +- AckMessageRequest.newBuilder() +- .setTopic(Resource.newBuilder().setName(TOPIC).build()) +- .setGroup(Resource.newBuilder().setName(GROUP).build()) +- .addEntries(AckMessageEntry.newBuilder() +- .setMessageId("msg1") +- .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000)) +- .build()) +- .addEntries(AckMessageEntry.newBuilder() +- .setMessageId("msg2") +- .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) +- .build()) +- .addEntries(AckMessageEntry.newBuilder() +- .setMessageId("msg3") +- .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) +- .build()) +- .build() +- ).get(); +- +- assertEquals(Code.MULTIPLE_RESULTS, response.getStatus().getCode()); +- assertEquals(3, response.getEntriesCount()); +- assertEquals(Code.INVALID_RECEIPT_HANDLE, response.getEntries(0).getStatus().getCode()); +- assertEquals(Code.OK, response.getEntries(1).getStatus().getCode()); +- assertEquals(Code.INTERNAL_SERVER_ERROR, response.getEntries(2).getStatus().getCode()); ++ { ++ AckMessageResponse response = this.ackMessageActivity.ackMessage( ++ createContext(), ++ AckMessageRequest.newBuilder() ++ .setTopic(Resource.newBuilder().setName(TOPIC).build()) ++ .setGroup(Resource.newBuilder().setName(GROUP).build()) ++ .addEntries(AckMessageEntry.newBuilder() ++ .setMessageId(msg1) ++ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000)) ++ .build()) ++ .build() ++ ).get(); ++ assertEquals(Code.INVALID_RECEIPT_HANDLE, response.getStatus().getCode()); ++ } ++ { ++ AckMessageResponse response = this.ackMessageActivity.ackMessage( ++ createContext(), ++ AckMessageRequest.newBuilder() ++ .setTopic(Resource.newBuilder().setName(TOPIC).build()) ++ .setGroup(Resource.newBuilder().setName(GROUP).build()) ++ .addEntries(AckMessageEntry.newBuilder() ++ .setMessageId(msg2) ++ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000)) ++ .build()) ++ .build() ++ ).get(); ++ assertEquals(Code.OK, response.getStatus().getCode()); ++ } ++ { ++ AckMessageResponse response = this.ackMessageActivity.ackMessage( ++ createContext(), ++ AckMessageRequest.newBuilder() ++ .setTopic(Resource.newBuilder().setName(TOPIC).build()) ++ .setGroup(Resource.newBuilder().setName(GROUP).build()) ++ .addEntries(AckMessageEntry.newBuilder() ++ .setMessageId(msg3) ++ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000)) ++ .build()) ++ .build() ++ ).get(); ++ assertEquals(Code.INTERNAL_SERVER_ERROR, response.getStatus().getCode()); ++ } ++ { ++ AckMessageResponse response = this.ackMessageActivity.ackMessage( ++ createContext(), ++ AckMessageRequest.newBuilder() ++ .setTopic(Resource.newBuilder().setName(TOPIC).build()) ++ .setGroup(Resource.newBuilder().setName(GROUP).build()) ++ .addEntries(AckMessageEntry.newBuilder() ++ .setMessageId(msg1) ++ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000)) ++ .build()) ++ .addEntries(AckMessageEntry.newBuilder() ++ .setMessageId(msg2) ++ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) ++ .build()) ++ .addEntries(AckMessageEntry.newBuilder() ++ .setMessageId(msg3) ++ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) ++ .build()) ++ .build() ++ ).get(); ++ ++ assertEquals(Code.MULTIPLE_RESULTS, response.getStatus().getCode()); ++ assertEquals(3, response.getEntriesCount()); ++ assertEquals(Code.INVALID_RECEIPT_HANDLE, response.getEntries(0).getStatus().getCode()); ++ assertEquals(Code.OK, response.getEntries(1).getStatus().getCode()); ++ assertEquals(Code.INTERNAL_SERVER_ERROR, response.getEntries(2).getStatus().getCode()); ++ } ++ } ++ ++ @Test ++ public void testAckMessageInBatch() throws Throwable { ++ ConfigurationManager.getProxyConfig().setEnableBatchAck(true); ++ ++ String successMessageId = "msg1"; ++ String notOkMessageId = "msg2"; ++ String exceptionMessageId = "msg3"; ++ ++ doAnswer((Answer>>) invocation -> { ++ List receiptHandleMessageList = invocation.getArgument(1, List.class); ++ List batchAckResultList = new ArrayList<>(); ++ for (ReceiptHandleMessage receiptHandleMessage : receiptHandleMessageList) { ++ BatchAckResult batchAckResult; ++ if (receiptHandleMessage.getMessageId().equals(successMessageId)) { ++ AckResult ackResult = new AckResult(); ++ ackResult.setStatus(AckStatus.OK); ++ batchAckResult = new BatchAckResult(receiptHandleMessage, ackResult); ++ } else if (receiptHandleMessage.getMessageId().equals(notOkMessageId)) { ++ AckResult ackResult = new AckResult(); ++ ackResult.setStatus(AckStatus.NO_EXIST); ++ batchAckResult = new BatchAckResult(receiptHandleMessage, ackResult); ++ } else { ++ batchAckResult = new BatchAckResult(receiptHandleMessage, new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "")); ++ } ++ batchAckResultList.add(batchAckResult); ++ } ++ return CompletableFuture.completedFuture(batchAckResultList); ++ }).when(this.messagingProcessor).batchAckMessage(any(), anyList(), anyString(), anyString()); ++ ++ { ++ AckMessageResponse response = this.ackMessageActivity.ackMessage( ++ createContext(), ++ AckMessageRequest.newBuilder() ++ .setTopic(Resource.newBuilder().setName(TOPIC).build()) ++ .setGroup(Resource.newBuilder().setName(GROUP).build()) ++ .addEntries(AckMessageEntry.newBuilder() ++ .setMessageId(successMessageId) ++ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) ++ .build()) ++ .build() ++ ).get(); ++ assertEquals(Code.OK, response.getStatus().getCode()); ++ } ++ { ++ AckMessageResponse response = this.ackMessageActivity.ackMessage( ++ createContext(), ++ AckMessageRequest.newBuilder() ++ .setTopic(Resource.newBuilder().setName(TOPIC).build()) ++ .setGroup(Resource.newBuilder().setName(GROUP).build()) ++ .addEntries(AckMessageEntry.newBuilder() ++ .setMessageId(notOkMessageId) ++ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) ++ .build()) ++ .build() ++ ).get(); ++ assertEquals(Code.INTERNAL_SERVER_ERROR, response.getStatus().getCode()); ++ } ++ { ++ AckMessageResponse response = this.ackMessageActivity.ackMessage( ++ createContext(), ++ AckMessageRequest.newBuilder() ++ .setTopic(Resource.newBuilder().setName(TOPIC).build()) ++ .setGroup(Resource.newBuilder().setName(GROUP).build()) ++ .addEntries(AckMessageEntry.newBuilder() ++ .setMessageId(exceptionMessageId) ++ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) ++ .build()) ++ .build() ++ ).get(); ++ assertEquals(Code.INVALID_RECEIPT_HANDLE, response.getStatus().getCode()); ++ } ++ { ++ AckMessageResponse response = this.ackMessageActivity.ackMessage( ++ createContext(), ++ AckMessageRequest.newBuilder() ++ .setTopic(Resource.newBuilder().setName(TOPIC).build()) ++ .setGroup(Resource.newBuilder().setName(GROUP).build()) ++ .addEntries(AckMessageEntry.newBuilder() ++ .setMessageId(successMessageId) ++ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) ++ .build()) ++ .addEntries(AckMessageEntry.newBuilder() ++ .setMessageId(notOkMessageId) ++ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) ++ .build()) ++ .addEntries(AckMessageEntry.newBuilder() ++ .setMessageId(exceptionMessageId) ++ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) ++ .build()) ++ .build() ++ ).get(); ++ ++ assertEquals(Code.MULTIPLE_RESULTS, response.getStatus().getCode()); ++ assertEquals(3, response.getEntriesCount()); ++ Map msgCode = new HashMap<>(); ++ for (AckMessageResultEntry entry : response.getEntriesList()) { ++ msgCode.put(entry.getMessageId(), entry.getStatus().getCode()); ++ } ++ assertEquals(Code.OK, msgCode.get(successMessageId)); ++ assertEquals(Code.INTERNAL_SERVER_ERROR, msgCode.get(notOkMessageId)); ++ assertEquals(Code.INVALID_RECEIPT_HANDLE, msgCode.get(exceptionMessageId)); ++ } + } + } +\ No newline at end of file +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java +index 5c1ea9627..072630e39 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java +@@ -66,14 +66,6 @@ public class BaseProcessorTest extends InitConfigTest { + protected ProxyRelayService proxyRelayService; + @Mock + protected MetadataService metadataService; +- @Mock +- protected ProducerProcessor producerProcessor; +- @Mock +- protected ConsumerProcessor consumerProcessor; +- @Mock +- protected TransactionProcessor transactionProcessor; +- @Mock +- protected ClientProcessor clientProcessor; + + public void before() throws Throwable { + super.before(); +@@ -92,6 +84,13 @@ public class BaseProcessorTest extends InitConfigTest { + } + + protected static MessageExt createMessageExt(String topic, String tags, int reconsumeTimes, long invisibleTime) { ++ return createMessageExt(topic, tags, reconsumeTimes, invisibleTime, System.currentTimeMillis(), ++ RANDOM.nextInt(Integer.MAX_VALUE), RANDOM.nextInt(Integer.MAX_VALUE), RANDOM.nextInt(Integer.MAX_VALUE), ++ RANDOM.nextInt(Integer.MAX_VALUE), "mockBroker"); ++ } ++ ++ protected static MessageExt createMessageExt(String topic, String tags, int reconsumeTimes, long invisibleTime, long popTime, ++ long startOffset, int reviveQid, int queueId, long queueOffset, String brokerName) { + MessageExt messageExt = new MessageExt(); + messageExt.setTopic(topic); + messageExt.setTags(tags); +@@ -100,8 +99,7 @@ public class BaseProcessorTest extends InitConfigTest { + messageExt.setMsgId(MessageClientIDSetter.createUniqID()); + messageExt.setCommitLogOffset(RANDOM.nextInt(Integer.MAX_VALUE)); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_POP_CK, +- ExtraInfoUtil.buildExtraInfo(RANDOM.nextInt(Integer.MAX_VALUE), System.currentTimeMillis(), invisibleTime, +- RANDOM.nextInt(Integer.MAX_VALUE), topic, "mockBroker", RANDOM.nextInt(Integer.MAX_VALUE), RANDOM.nextInt(Integer.MAX_VALUE))); ++ ExtraInfoUtil.buildExtraInfo(startOffset, popTime, invisibleTime, reviveQid, topic, brokerName, queueId, queueOffset)); + return messageExt; + } + +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java +index 717e86fc0..db268a06e 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java +@@ -20,8 +20,11 @@ package org.apache.rocketmq.proxy.processor; + import com.google.common.collect.Sets; + import java.time.Duration; + import java.util.ArrayList; ++import java.util.Collections; ++import java.util.HashMap; + import java.util.HashSet; + import java.util.List; ++import java.util.Map; + import java.util.Set; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.Executors; +@@ -39,7 +42,10 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter; + import org.apache.rocketmq.common.message.MessageExt; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.proxy.common.ProxyContext; ++import org.apache.rocketmq.proxy.common.ProxyExceptionCode; ++import org.apache.rocketmq.proxy.common.utils.FutureUtils; + import org.apache.rocketmq.proxy.common.utils.ProxyUtils; ++import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; + import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; + import org.apache.rocketmq.proxy.service.route.MessageQueueView; + import org.apache.rocketmq.remoting.protocol.RemotingCommand; +@@ -50,16 +56,22 @@ import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; + import org.junit.Before; + import org.junit.Test; + import org.mockito.ArgumentCaptor; ++import org.mockito.stubbing.Answer; + + import static org.assertj.core.api.Assertions.assertThat; + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertNotNull; ++import static org.junit.Assert.assertNull; + import static org.junit.Assert.assertSame; + import static org.mockito.ArgumentMatchers.any; ++import static org.mockito.ArgumentMatchers.anyList; + import static org.mockito.ArgumentMatchers.anyLong; + import static org.mockito.ArgumentMatchers.anyString; + import static org.mockito.ArgumentMatchers.eq; ++import static org.mockito.Mockito.doAnswer; + import static org.mockito.Mockito.mock; ++import static org.mockito.Mockito.never; ++import static org.mockito.Mockito.verify; + import static org.mockito.Mockito.when; + + public class ConsumerProcessorTest extends BaseProcessorTest { +@@ -162,6 +174,109 @@ public class ConsumerProcessorTest extends BaseProcessorTest { + assertEquals(handle.getReceiptHandle(), requestHeaderArgumentCaptor.getValue().getExtraInfo()); + } + ++ @Test ++ public void testBatchAckExpireMessage() throws Throwable { ++ String brokerName1 = "brokerName1"; ++ ++ List receiptHandleMessageList = new ArrayList<>(); ++ for (int i = 0; i < 3; i++) { ++ MessageExt expireMessage = createMessageExt(TOPIC, "", 0, 3000, System.currentTimeMillis() - 10000, ++ 0, 0, 0, i, brokerName1); ++ ReceiptHandle expireHandle = create(expireMessage); ++ receiptHandleMessageList.add(new ReceiptHandleMessage(expireHandle, expireMessage.getMsgId())); ++ } ++ ++ List batchAckResultList = this.consumerProcessor.batchAckMessage(createContext(), receiptHandleMessageList, CONSUMER_GROUP, TOPIC, 3000).get(); ++ ++ verify(this.messageService, never()).batchAckMessage(any(), anyList(), anyString(), anyString(), anyLong()); ++ assertEquals(receiptHandleMessageList.size(), batchAckResultList.size()); ++ for (BatchAckResult batchAckResult : batchAckResultList) { ++ assertNull(batchAckResult.getAckResult()); ++ assertNotNull(batchAckResult.getProxyException()); ++ assertNotNull(batchAckResult.getReceiptHandleMessage()); ++ } ++ ++ } ++ ++ @Test ++ public void testBatchAckMessage() throws Throwable { ++ String brokerName1 = "brokerName1"; ++ String brokerName2 = "brokerName2"; ++ String errThrowBrokerName = "errThrowBrokerName"; ++ MessageExt expireMessage = createMessageExt(TOPIC, "", 0, 3000, System.currentTimeMillis() - 10000, ++ 0, 0, 0, 0, brokerName1); ++ ReceiptHandle expireHandle = create(expireMessage); ++ ++ List receiptHandleMessageList = new ArrayList<>(); ++ receiptHandleMessageList.add(new ReceiptHandleMessage(expireHandle, expireMessage.getMsgId())); ++ List broker1Msg = new ArrayList<>(); ++ List broker2Msg = new ArrayList<>(); ++ ++ long now = System.currentTimeMillis(); ++ int msgNum = 3; ++ for (int i = 0; i < msgNum; i++) { ++ MessageExt brokerMessage = createMessageExt(TOPIC, "", 0, 3000, now, ++ 0, 0, 0, i + 1, brokerName1); ++ ReceiptHandle brokerHandle = create(brokerMessage); ++ receiptHandleMessageList.add(new ReceiptHandleMessage(brokerHandle, brokerMessage.getMsgId())); ++ broker1Msg.add(brokerMessage.getMsgId()); ++ } ++ for (int i = 0; i < msgNum; i++) { ++ MessageExt brokerMessage = createMessageExt(TOPIC, "", 0, 3000, now, ++ 0, 0, 0, i + 1, brokerName2); ++ ReceiptHandle brokerHandle = create(brokerMessage); ++ receiptHandleMessageList.add(new ReceiptHandleMessage(brokerHandle, brokerMessage.getMsgId())); ++ broker2Msg.add(brokerMessage.getMsgId()); ++ } ++ ++ // for this message, will throw exception in batchAckMessage ++ MessageExt errThrowMessage = createMessageExt(TOPIC, "", 0, 3000, now, ++ 0, 0, 0, 0, errThrowBrokerName); ++ ReceiptHandle errThrowHandle = create(errThrowMessage); ++ receiptHandleMessageList.add(new ReceiptHandleMessage(errThrowHandle, errThrowMessage.getMsgId())); ++ ++ Collections.shuffle(receiptHandleMessageList); ++ ++ doAnswer((Answer>) invocation -> { ++ List handleMessageList = invocation.getArgument(1, List.class); ++ AckResult ackResult = new AckResult(); ++ String brokerName = handleMessageList.get(0).getReceiptHandle().getBrokerName(); ++ if (brokerName.equals(brokerName1)) { ++ ackResult.setStatus(AckStatus.OK); ++ } else if (brokerName.equals(brokerName2)) { ++ ackResult.setStatus(AckStatus.NO_EXIST); ++ } else { ++ return FutureUtils.completeExceptionally(new RuntimeException()); ++ } ++ ++ return CompletableFuture.completedFuture(ackResult); ++ }).when(this.messageService).batchAckMessage(any(), anyList(), anyString(), anyString(), anyLong()); ++ ++ List batchAckResultList = this.consumerProcessor.batchAckMessage(createContext(), receiptHandleMessageList, CONSUMER_GROUP, TOPIC, 3000).get(); ++ assertEquals(receiptHandleMessageList.size(), batchAckResultList.size()); ++ ++ // check ackResult for each msg ++ Map msgBatchAckResult = new HashMap<>(); ++ for (BatchAckResult batchAckResult : batchAckResultList) { ++ msgBatchAckResult.put(batchAckResult.getReceiptHandleMessage().getMessageId(), batchAckResult); ++ } ++ for (String msgId : broker1Msg) { ++ assertEquals(AckStatus.OK, msgBatchAckResult.get(msgId).getAckResult().getStatus()); ++ assertNull(msgBatchAckResult.get(msgId).getProxyException()); ++ } ++ for (String msgId : broker2Msg) { ++ assertEquals(AckStatus.NO_EXIST, msgBatchAckResult.get(msgId).getAckResult().getStatus()); ++ assertNull(msgBatchAckResult.get(msgId).getProxyException()); ++ } ++ assertNotNull(msgBatchAckResult.get(expireMessage.getMsgId()).getProxyException()); ++ assertEquals(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, msgBatchAckResult.get(expireMessage.getMsgId()).getProxyException().getCode()); ++ assertNull(msgBatchAckResult.get(expireMessage.getMsgId()).getAckResult()); ++ ++ assertNotNull(msgBatchAckResult.get(errThrowMessage.getMsgId()).getProxyException()); ++ assertEquals(ProxyExceptionCode.INTERNAL_SERVER_ERROR, msgBatchAckResult.get(errThrowMessage.getMsgId()).getProxyException().getCode()); ++ assertNull(msgBatchAckResult.get(errThrowMessage.getMsgId()).getAckResult()); ++ } ++ + @Test + public void testChangeInvisibleTime() throws Throwable { + ReceiptHandle handle = create(createMessageExt(MixAll.RETRY_GROUP_TOPIC_PREFIX + TOPIC, "", 0, 3000)); +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java +index 77a119a29..3f3a4ae40 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java +@@ -220,6 +220,18 @@ public class MQClientAPIExtTest { + assertSame(ackResult, mqClientAPI.ackMessageAsync(BROKER_ADDR, new AckMessageRequestHeader(), TIMEOUT).get()); + } + ++ @Test ++ public void testBatchAckMessageAsync() throws Exception { ++ AckResult ackResult = new AckResult(); ++ doAnswer((Answer) mock -> { ++ AckCallback ackCallback = mock.getArgument(2); ++ ackCallback.onSuccess(ackResult); ++ return null; ++ }).when(mqClientAPI).batchAckMessageAsync(anyString(), anyLong(), any(AckCallback.class), any()); ++ ++ assertSame(ackResult, mqClientAPI.batchAckMessageAsync(BROKER_ADDR, TOPIC, CONSUMER_GROUP, new ArrayList<>(), TIMEOUT).get()); ++ } ++ + @Test + public void testChangeInvisibleTimeAsync() throws Exception { + AckResult ackResult = new AckResult(); +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 1909b2f..8cbf224 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.3 -Release: 12 +Release: 13 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -21,6 +21,7 @@ Patch0008: patch008-backport-Allow-BoundaryType.patch Patch0009: patch009-backport-Support-KV-Storage.patch Patch0010: patch010-backport-add-some-fixes.patch Patch0011: patch011-backport-optimize-config.patch +Patch0012: patch012-backport-enhance-rockdbconfigtojson.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -55,6 +56,9 @@ exit 0 %changelog +* Wed Oct 2 2023 ShiZhili - 5.1.3-13 +- backport enhance medata to json + * Wed Oct 1 2023 ShiZhili - 5.1.3-12 - backport optimize config -- Gitee