diff --git a/patch017-backport-Convergent-thread-pool-creation.patch b/patch017-backport-Convergent-thread-pool-creation.patch new file mode 100644 index 0000000000000000000000000000000000000000..92d0bd2e9028f8150a26fda2cef8174ba7e54d02 --- /dev/null +++ b/patch017-backport-Convergent-thread-pool-creation.patch @@ -0,0 +1,2243 @@ +From c100d815d754d7cb330bc63e145bafd2d9b59cb1 Mon Sep 17 00:00:00 2001 +From: guyinyou <36399867+guyinyou@users.noreply.github.com> +Date: Mon, 11 Sep 2023 10:13:56 +0800 +Subject: [PATCH 1/6] [ISSUE #7328] Convergent thread pool creation (#7329) + +* Convergence thread pool creation to facilitate subsequent iteration management + +* Convergence thread pool creation in ThreadPoolMonitor.java + +* fix unit test + +* Convergence ThreadPool constructor + +* Convergence ScheduledThreadPool constructor + +* remove unused import + +* Convergence ScheduledThreadPool constructor + +* remove unused import + +--------- +--- + .../rocketmq/broker/BrokerController.java | 39 +++++----- + .../client/ClientHousekeepingService.java | 4 +- + .../DefaultConsumerIdsChangeListener.java | 3 +- + .../broker/controller/ReplicasManager.java | 9 +-- + .../dledger/DLedgerRoleChangeHandler.java | 4 +- + .../broker/failover/EscapeBridge.java | 4 +- + .../broker/latency/BrokerFastFailure.java | 5 +- + .../BrokerFixedThreadPoolExecutor.java | 57 -------------- + .../broker/latency/FutureTaskExt.java | 39 ---------- + .../rocketmq/broker/out/BrokerOuterAPI.java | 7 +- + .../schedule/ScheduleMessageService.java | 7 +- + .../broker/topic/TopicRouteInfoManager.java | 4 +- + ...ractTransactionalMessageCheckListener.java | 4 +- + .../rocketmq/broker/BrokerControllerTest.java | 2 +- + .../broker/latency/BrokerFastFailureTest.java | 1 + + .../common/config/AbstractRocksDBStorage.java | 6 +- + .../FutureTaskExtThreadPoolExecutor.java | 3 +- + .../common/thread/ThreadPoolMonitor.java | 6 +- + .../rocketmq/common/utils/ThreadUtils.java | 74 ++++++++++++++++--- + .../rocketmq/container/BrokerContainer.java | 6 +- + .../controller/ControllerManager.java | 14 +--- + .../controller/impl/DLedgerController.java | 10 +-- + .../DefaultBrokerHeartbeatManager.java | 3 +- + .../rocketmq/namesrv/NamesrvController.java | 22 ++---- + .../grpc/v2/channel/GrpcChannelManager.java | 6 +- + .../remoting/RemotingProtocolServer.java | 4 +- + .../proxy/service/ClusterServiceManager.java | 12 +-- + .../proxy/service/LocalServiceManager.java | 4 +- + .../receipt/DefaultReceiptHandleManager.java | 8 +- + .../service/route/TopicRouteService.java | 9 +-- + .../remoting/netty/NettyRemotingClient.java | 4 +- + .../remoting/netty/NettyRemotingServer.java | 4 +- + .../rocketmq/store/DefaultMessageStore.java | 8 +- + .../ha/autoswitch/AutoSwitchHAService.java | 38 +++++----- + .../rocketmq/store/kv/CompactionStore.java | 21 +++--- + .../store/queue/ConsumeQueueStore.java | 4 +- + .../store/stats/BrokerStatsManager.java | 14 ++-- + .../store/timer/TimerMessageStore.java | 6 +- + .../apache/rocketmq/test/util/StatUtil.java | 1 - + .../common/TieredStoreExecutor.java | 14 ++-- + .../tools/admin/DefaultMQAdminExtImpl.java | 3 +- + 41 files changed, 215 insertions(+), 278 deletions(-) + delete mode 100644 broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java + delete mode 100644 broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +index 6aba70cb2..275b64b1a 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +@@ -34,7 +34,6 @@ import org.apache.rocketmq.broker.failover.EscapeBridge; + import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap; + import org.apache.rocketmq.broker.filter.ConsumerFilterManager; + import org.apache.rocketmq.broker.latency.BrokerFastFailure; +-import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; + import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService; + import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener; + import org.apache.rocketmq.broker.longpolling.PullRequestHoldService; +@@ -98,6 +97,7 @@ import org.apache.rocketmq.common.message.MessageExt; + import org.apache.rocketmq.common.message.MessageExtBrokerInner; + import org.apache.rocketmq.common.stats.MomentStatsItem; + import org.apache.rocketmq.common.utils.ServiceProvider; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.Configuration; +@@ -160,7 +160,6 @@ import java.util.concurrent.ExecutorService; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ScheduledFuture; +-import java.util.concurrent.ScheduledThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReentrantLock; +@@ -455,10 +454,10 @@ public class BrokerController { + * Initialize resources including remoting server and thread executors. + */ + protected void initializeResources() { +- this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, ++ this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1, + new ThreadFactoryImpl("BrokerControllerScheduledThread", true, getBrokerIdentity())); + +- this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( ++ this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor( + this.brokerConfig.getSendMessageThreadPoolNums(), + this.brokerConfig.getSendMessageThreadPoolNums(), + 1000 * 60, +@@ -466,7 +465,7 @@ public class BrokerController { + this.sendThreadPoolQueue, + new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity())); + +- this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( ++ this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor( + this.brokerConfig.getPullMessageThreadPoolNums(), + this.brokerConfig.getPullMessageThreadPoolNums(), + 1000 * 60, +@@ -474,7 +473,7 @@ public class BrokerController { + this.pullThreadPoolQueue, + new ThreadFactoryImpl("PullMessageThread_", getBrokerIdentity())); + +- this.litePullMessageExecutor = new BrokerFixedThreadPoolExecutor( ++ this.litePullMessageExecutor = ThreadUtils.newThreadPoolExecutor( + this.brokerConfig.getLitePullMessageThreadPoolNums(), + this.brokerConfig.getLitePullMessageThreadPoolNums(), + 1000 * 60, +@@ -482,7 +481,7 @@ public class BrokerController { + this.litePullThreadPoolQueue, + new ThreadFactoryImpl("LitePullMessageThread_", getBrokerIdentity())); + +- this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor( ++ this.putMessageFutureExecutor = ThreadUtils.newThreadPoolExecutor( + this.brokerConfig.getPutMessageFutureThreadPoolNums(), + this.brokerConfig.getPutMessageFutureThreadPoolNums(), + 1000 * 60, +@@ -490,7 +489,7 @@ public class BrokerController { + this.putThreadPoolQueue, + new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity())); + +- this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor( ++ this.ackMessageExecutor = ThreadUtils.newThreadPoolExecutor( + this.brokerConfig.getAckMessageThreadPoolNums(), + this.brokerConfig.getAckMessageThreadPoolNums(), + 1000 * 60, +@@ -498,7 +497,7 @@ public class BrokerController { + this.ackThreadPoolQueue, + new ThreadFactoryImpl("AckMessageThread_", getBrokerIdentity())); + +- this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( ++ this.queryMessageExecutor = ThreadUtils.newThreadPoolExecutor( + this.brokerConfig.getQueryMessageThreadPoolNums(), + this.brokerConfig.getQueryMessageThreadPoolNums(), + 1000 * 60, +@@ -506,7 +505,7 @@ public class BrokerController { + this.queryThreadPoolQueue, + new ThreadFactoryImpl("QueryMessageThread_", getBrokerIdentity())); + +- this.adminBrokerExecutor = new BrokerFixedThreadPoolExecutor( ++ this.adminBrokerExecutor = ThreadUtils.newThreadPoolExecutor( + this.brokerConfig.getAdminBrokerThreadPoolNums(), + this.brokerConfig.getAdminBrokerThreadPoolNums(), + 1000 * 60, +@@ -514,7 +513,7 @@ public class BrokerController { + this.adminBrokerThreadPoolQueue, + new ThreadFactoryImpl("AdminBrokerThread_", getBrokerIdentity())); + +- this.clientManageExecutor = new BrokerFixedThreadPoolExecutor( ++ this.clientManageExecutor = ThreadUtils.newThreadPoolExecutor( + this.brokerConfig.getClientManageThreadPoolNums(), + this.brokerConfig.getClientManageThreadPoolNums(), + 1000 * 60, +@@ -522,7 +521,7 @@ public class BrokerController { + this.clientManagerThreadPoolQueue, + new ThreadFactoryImpl("ClientManageThread_", getBrokerIdentity())); + +- this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor( ++ this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor( + this.brokerConfig.getHeartbeatThreadPoolNums(), + this.brokerConfig.getHeartbeatThreadPoolNums(), + 1000 * 60, +@@ -530,7 +529,7 @@ public class BrokerController { + this.heartbeatThreadPoolQueue, + new ThreadFactoryImpl("HeartbeatThread_", true, getBrokerIdentity())); + +- this.consumerManageExecutor = new BrokerFixedThreadPoolExecutor( ++ this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor( + this.brokerConfig.getConsumerManageThreadPoolNums(), + this.brokerConfig.getConsumerManageThreadPoolNums(), + 1000 * 60, +@@ -538,7 +537,7 @@ public class BrokerController { + this.consumerManagerThreadPoolQueue, + new ThreadFactoryImpl("ConsumerManageThread_", true, getBrokerIdentity())); + +- this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor( ++ this.replyMessageExecutor = ThreadUtils.newThreadPoolExecutor( + this.brokerConfig.getProcessReplyMessageThreadPoolNums(), + this.brokerConfig.getProcessReplyMessageThreadPoolNums(), + 1000 * 60, +@@ -546,7 +545,7 @@ public class BrokerController { + this.replyThreadPoolQueue, + new ThreadFactoryImpl("ProcessReplyMessageThread_", getBrokerIdentity())); + +- this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor( ++ this.endTransactionExecutor = ThreadUtils.newThreadPoolExecutor( + this.brokerConfig.getEndTransactionThreadPoolNums(), + this.brokerConfig.getEndTransactionThreadPoolNums(), + 1000 * 60, +@@ -554,7 +553,7 @@ public class BrokerController { + this.endTransactionThreadPoolQueue, + new ThreadFactoryImpl("EndTransactionThread_", getBrokerIdentity())); + +- this.loadBalanceExecutor = new BrokerFixedThreadPoolExecutor( ++ this.loadBalanceExecutor = ThreadUtils.newThreadPoolExecutor( + this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), + this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), + 1000 * 60, +@@ -562,9 +561,9 @@ public class BrokerController { + this.loadBalanceThreadPoolQueue, + new ThreadFactoryImpl("LoadBalanceProcessorThread_", getBrokerIdentity())); + +- this.syncBrokerMemberGroupExecutorService = new ScheduledThreadPoolExecutor(1, ++ this.syncBrokerMemberGroupExecutorService = ThreadUtils.newScheduledThreadPool(1, + new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", getBrokerIdentity())); +- this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1, ++ this.brokerHeartbeatExecutorService = ThreadUtils.newScheduledThreadPool(1, + new ThreadFactoryImpl("BrokerControllerHeartbeatScheduledThread", getBrokerIdentity())); + + this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this); +@@ -828,8 +827,6 @@ public class BrokerController { + + initializeResources(); + +- registerProcessor(); +- + initializeScheduledTasks(); + + initialTransaction(); +@@ -1690,6 +1687,8 @@ public class BrokerController { + } + } + }, 10, 5, TimeUnit.SECONDS); ++ ++ registerProcessor(); + } + + protected void scheduleSendHeartbeat() { +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +index 98e5f450f..cbb81f632 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +@@ -18,11 +18,11 @@ package org.apache.rocketmq.broker.client; + + import io.netty.channel.Channel; + import java.util.concurrent.ScheduledExecutorService; +-import java.util.concurrent.ScheduledThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.broker.BrokerController; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.ChannelEventListener; +@@ -35,7 +35,7 @@ public class ClientHousekeepingService implements ChannelEventListener { + + public ClientHousekeepingService(final BrokerController brokerController) { + this.brokerController = brokerController; +- scheduledExecutorService = new ScheduledThreadPoolExecutor(1, ++ scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1, + new ThreadFactoryImpl("ClientHousekeepingScheduledThread", brokerController.getBrokerIdentity())); + } + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java +index 2ce036a0f..d17a2a547 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java +@@ -22,7 +22,6 @@ import java.util.List; + import java.util.Map; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ScheduledExecutorService; +-import java.util.concurrent.ScheduledThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.broker.BrokerController; + import org.apache.rocketmq.common.AbstractBrokerRunnable; +@@ -37,7 +36,7 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen + private final BrokerController brokerController; + private final int cacheSize = 8096; + +- private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, ++ private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1, + ThreadUtils.newGenericThreadFactory("DefaultConsumerIdsChangeListener", true)); + + private ConcurrentHashMap> consumerChannelMap = new ConcurrentHashMap<>(cacheSize); +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +index 37c82e434..a989e6e68 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +@@ -27,10 +27,8 @@ import java.util.concurrent.ArrayBlockingQueue; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.ExecutorService; +-import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ScheduledFuture; +-import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + + import org.apache.commons.lang3.StringUtils; +@@ -42,6 +40,7 @@ import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.Pair; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.protocol.EpochEntry; +@@ -107,9 +106,9 @@ public class ReplicasManager { + public ReplicasManager(final BrokerController brokerController) { + this.brokerController = brokerController; + this.brokerOuterAPI = brokerController.getBrokerOuterAPI(); +- this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity())); +- this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity())); +- this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, ++ this.scheduledService = ThreadUtils.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity())); ++ this.executorService = ThreadUtils.newThreadPoolExecutor(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity())); ++ this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("ReplicasManager_scan_thread_", brokerController.getBrokerIdentity())); + this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService(); + this.brokerConfig = brokerController.getBrokerConfig(); +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java +index 75023ee1b..e6cb97640 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java +@@ -21,12 +21,12 @@ import io.openmessaging.storage.dledger.DLedgerServer; + import io.openmessaging.storage.dledger.MemberState; + import io.openmessaging.storage.dledger.utils.DLedgerUtils; + import java.util.concurrent.ExecutorService; +-import java.util.concurrent.Executors; + import java.util.concurrent.Future; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.broker.BrokerController; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.store.DefaultMessageStore; +@@ -49,7 +49,7 @@ public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChange + this.messageStore = messageStore; + this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); + this.dLegerServer = dLedgerCommitLog.getdLedgerServer(); +- this.executorService = Executors.newSingleThreadExecutor( ++ this.executorService = ThreadUtils.newSingleThreadExecutor( + new ThreadFactoryImpl("DLegerRoleChangeHandler_", brokerController.getBrokerIdentity())); + } + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java +index 7c350fc1d..6a0817480 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java +@@ -24,7 +24,6 @@ import java.util.concurrent.BlockingQueue; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.LinkedBlockingQueue; +-import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.broker.BrokerController; +@@ -43,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; + import org.apache.rocketmq.common.message.MessageExt; + import org.apache.rocketmq.common.message.MessageExtBrokerInner; + import org.apache.rocketmq.common.message.MessageQueue; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.exception.RemotingException; +@@ -72,7 +72,7 @@ public class EscapeBridge { + public void start() throws Exception { + if (brokerController.getBrokerConfig().isEnableSlaveActingMaster() && brokerController.getBrokerConfig().isEnableRemoteEscape()) { + final BlockingQueue asyncSenderThreadPoolQueue = new LinkedBlockingQueue<>(50000); +- this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( ++ this.defaultAsyncSenderExecutor = ThreadUtils.newThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + Runtime.getRuntime().availableProcessors(), + 1000 * 60, +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +index d3d0bc8ba..3b6e9dc67 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +@@ -18,13 +18,14 @@ package org.apache.rocketmq.broker.latency; + + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.ScheduledExecutorService; +-import java.util.concurrent.ScheduledThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.broker.BrokerController; + import org.apache.rocketmq.common.AbstractBrokerRunnable; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.UtilAll; + import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.future.FutureTaskExt; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.netty.RequestTask; +@@ -43,7 +44,7 @@ public class BrokerFastFailure { + + public BrokerFastFailure(final BrokerController brokerController) { + this.brokerController = brokerController; +- this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, ++ this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1, + new ThreadFactoryImpl("BrokerFastFailureScheduledThread", true, + brokerController == null ? null : brokerController.getBrokerConfig())); + } +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java +deleted file mode 100644 +index d2d1143a3..000000000 +--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java ++++ /dev/null +@@ -1,57 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.rocketmq.broker.latency; +- +-import java.util.concurrent.BlockingQueue; +-import java.util.concurrent.RejectedExecutionHandler; +-import java.util.concurrent.RunnableFuture; +-import java.util.concurrent.ThreadFactory; +-import java.util.concurrent.ThreadPoolExecutor; +-import java.util.concurrent.TimeUnit; +- +-public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor { +- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, +- final TimeUnit unit, +- final BlockingQueue workQueue) { +- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); +- } +- +- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, +- final TimeUnit unit, +- final BlockingQueue workQueue, final ThreadFactory threadFactory) { +- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); +- } +- +- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, +- final TimeUnit unit, +- final BlockingQueue workQueue, final RejectedExecutionHandler handler) { +- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); +- } +- +- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, +- final TimeUnit unit, +- final BlockingQueue workQueue, final ThreadFactory threadFactory, +- final RejectedExecutionHandler handler) { +- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); +- } +- +- @Override +- protected RunnableFuture newTaskFor(final Runnable runnable, final T value) { +- return new FutureTaskExt<>(runnable, value); +- } +-} +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java +deleted file mode 100644 +index f132efaeb..000000000 +--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java ++++ /dev/null +@@ -1,39 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.rocketmq.broker.latency; +- +-import java.util.concurrent.Callable; +-import java.util.concurrent.FutureTask; +- +-public class FutureTaskExt extends FutureTask { +- private final Runnable runnable; +- +- public FutureTaskExt(final Callable callable) { +- super(callable); +- this.runnable = null; +- } +- +- public FutureTaskExt(final Runnable runnable, final V result) { +- super(runnable, result); +- this.runnable = runnable; +- } +- +- public Runnable getRunnable() { +- return runnable; +- } +-} +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +index ae81e8b11..9dfb8127d 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +@@ -27,9 +27,9 @@ import java.util.concurrent.ArrayBlockingQueue; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.CopyOnWriteArrayList; + import java.util.concurrent.CountDownLatch; ++import java.util.concurrent.ExecutorService; + import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.StringUtils; +-import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; + import org.apache.rocketmq.client.consumer.PullResult; + import org.apache.rocketmq.client.consumer.PullStatus; + import org.apache.rocketmq.client.exception.MQBrokerException; +@@ -59,6 +59,7 @@ import org.apache.rocketmq.common.namesrv.DefaultTopAddressing; + import org.apache.rocketmq.common.namesrv.TopAddressing; + import org.apache.rocketmq.common.sysflag.PullSysFlag; + import org.apache.rocketmq.common.topic.TopicValidator; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.InvokeCallback; +@@ -144,7 +145,7 @@ public class BrokerOuterAPI { + private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final RemotingClient remotingClient; + private final TopAddressing topAddressing = new DefaultTopAddressing(MixAll.getWSAddr()); +- private final BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES, ++ private final ExecutorService brokerOuterExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true)); + private final ClientMetadata clientMetadata; + private final RpcClient rpcClient; +@@ -1092,7 +1093,7 @@ public class BrokerOuterAPI { + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + +- public BrokerFixedThreadPoolExecutor getBrokerOuterExecutor() { ++ public ExecutorService getBrokerOuterExecutor() { + return brokerOuterExecutor; + } + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +index 297b14207..0c2e6507b 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.ScheduledExecutorService; +-import java.util.concurrent.ScheduledThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicInteger; +@@ -91,7 +90,7 @@ public class ScheduleMessageService extends ConfigManager { + public ScheduleMessageService(final BrokerController brokerController) { + this.brokerController = brokerController; + this.enableAsyncDeliver = brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver(); +- scheduledPersistService = new ScheduledThreadPoolExecutor(1, ++ scheduledPersistService = ThreadUtils.newScheduledThreadPool(1, + new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig())); + } + +@@ -134,9 +133,9 @@ public class ScheduleMessageService extends ConfigManager { + public void start() { + if (started.compareAndSet(false, true)) { + this.load(); +- this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_")); ++ this.deliverExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_")); + if (this.enableAsyncDeliver) { +- this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_")); ++ this.handleExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_")); + } + for (Map.Entry entry : this.delayLevelTable.entrySet()) { + Integer level = entry.getKey(); +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java +index b35564725..11bde5f5f 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java +@@ -23,7 +23,6 @@ import java.util.Objects; + import java.util.Set; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; +-import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.locks.Lock; +@@ -36,6 +35,7 @@ import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.message.MessageQueue; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.exception.RemotingException; +@@ -66,7 +66,7 @@ public class TopicRouteInfoManager { + } + + public void start() { +- this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread")); ++ this.scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread")); + + this.scheduledExecutorService.scheduleAtFixedRate(() -> { + try { +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java +index 771d84300..982355d78 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java +@@ -19,7 +19,6 @@ package org.apache.rocketmq.broker.transaction; + import io.netty.channel.Channel; + import java.util.concurrent.ArrayBlockingQueue; + import java.util.concurrent.ExecutorService; +-import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.broker.BrokerController; +@@ -27,6 +26,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.message.MessageConst; + import org.apache.rocketmq.common.message.MessageExt; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader; +@@ -97,7 +97,7 @@ public abstract class AbstractTransactionalMessageCheckListener { + + public synchronized void initExecutorService() { + if (executorService == null) { +- executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), ++ executorService = ThreadUtils.newThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), + new ThreadFactoryImpl("Transaction-msg-check-thread", brokerController.getBrokerIdentity()), new CallerRunsPolicy()); + } + } +diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java +index 75ad961ce..6035a20ac 100644 +--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java ++++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java +@@ -23,9 +23,9 @@ import java.util.concurrent.BlockingQueue; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.TimeUnit; + +-import org.apache.rocketmq.broker.latency.FutureTaskExt; + import org.apache.rocketmq.common.BrokerConfig; + import org.apache.rocketmq.common.UtilAll; ++import org.apache.rocketmq.common.future.FutureTaskExt; + import org.apache.rocketmq.remoting.netty.NettyClientConfig; + import org.apache.rocketmq.remoting.netty.NettyServerConfig; + import org.apache.rocketmq.remoting.netty.RequestTask; +diff --git a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java +index 5d0f7f9d7..31b547cf1 100644 +--- a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java ++++ b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java +@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.latency; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.TimeUnit; ++import org.apache.rocketmq.common.future.FutureTaskExt; + import org.apache.rocketmq.remoting.netty.RequestTask; + import org.junit.Test; + +diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +index a720a5be3..6f19a9815 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java ++++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +@@ -23,7 +23,6 @@ import java.util.List; + import java.util.Map; + import java.util.concurrent.ArrayBlockingQueue; + import java.util.concurrent.ScheduledExecutorService; +-import java.util.concurrent.ScheduledThreadPoolExecutor; + import java.util.concurrent.Semaphore; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; +@@ -33,6 +32,7 @@ import com.google.common.collect.Maps; + + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.rocksdb.ColumnFamilyDescriptor; +@@ -82,8 +82,8 @@ public abstract class AbstractRocksDBStorage { + private volatile boolean closed; + + private final Semaphore reloadPermit = new Semaphore(1); +- private final ScheduledExecutorService reloadScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("RocksDBStorageReloadService_")); +- private final ThreadPoolExecutor manualCompactionThread = new ThreadPoolExecutor( ++ private final ScheduledExecutorService reloadScheduler = ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("RocksDBStorageReloadService_")); ++ private final ThreadPoolExecutor manualCompactionThread = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor( + 1, 1, 1000 * 60, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(1), + new ThreadFactoryImpl("RocksDBManualCompactionService_"), +diff --git a/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java b/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java +index 411da9221..7b68873a9 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java ++++ b/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java +@@ -29,7 +29,8 @@ public class FutureTaskExtThreadPoolExecutor extends ThreadPoolExecutor { + + public FutureTaskExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, +- BlockingQueue workQueue, ThreadFactory threadFactory, ++ BlockingQueue workQueue, ++ ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } +diff --git a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java +index 49d97a5d7..1bfabbffe 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java ++++ b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java +@@ -22,12 +22,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.util.Collections; + import java.util.List; + import java.util.concurrent.CopyOnWriteArrayList; +-import java.util.concurrent.Executors; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.common.UtilAll; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + +@@ -36,7 +36,7 @@ public class ThreadPoolMonitor { + private static Logger waterMarkLogger = LoggerFactory.getLogger(ThreadPoolMonitor.class); + + private static final List MONITOR_EXECUTOR = new CopyOnWriteArrayList<>(); +- private static final ScheduledExecutorService MONITOR_SCHEDULED = Executors.newSingleThreadScheduledExecutor( ++ private static final ScheduledExecutorService MONITOR_SCHEDULED = ThreadUtils.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build() + ); + +@@ -81,7 +81,7 @@ public class ThreadPoolMonitor { + String name, + int queueCapacity, + List threadPoolStatusMonitors) { +- ThreadPoolExecutor executor = new FutureTaskExtThreadPoolExecutor( ++ ThreadPoolExecutor executor = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor( + corePoolSize, + maximumPoolSize, + keepAliveTime, +diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java +index 4b366d4e3..1644c6360 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java ++++ b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java +@@ -20,38 +20,94 @@ package org.apache.rocketmq.common.utils; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; ++import java.util.concurrent.LinkedBlockingQueue; ++import java.util.concurrent.RejectedExecutionHandler; + import java.util.concurrent.ScheduledExecutorService; ++import java.util.concurrent.ScheduledThreadPoolExecutor; + import java.util.concurrent.ThreadFactory; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.thread.FutureTaskExtThreadPoolExecutor; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + + public final class ThreadUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TOOLS_LOGGER_NAME); + +- public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, +- TimeUnit unit, BlockingQueue workQueue, String processName, boolean isDaemon) { +- return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon)); ++ public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) { ++ return ThreadUtils.newSingleThreadExecutor(newThreadFactory(processName, isDaemon)); + } + +- public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) { +- return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon)); ++ public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { ++ return ThreadUtils.newThreadPoolExecutor(1, threadFactory); ++ } ++ ++ public static ExecutorService newThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { ++ return ThreadUtils.newThreadPoolExecutor(corePoolSize, corePoolSize, ++ 0L, TimeUnit.MILLISECONDS, ++ new LinkedBlockingQueue<>(), ++ threadFactory); ++ } ++ ++ public static ExecutorService newThreadPoolExecutor(int corePoolSize, ++ int maximumPoolSize, ++ long keepAliveTime, ++ TimeUnit unit, BlockingQueue workQueue, ++ String processName, ++ boolean isDaemon) { ++ return ThreadUtils.newThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon)); ++ } ++ ++ public static ExecutorService newThreadPoolExecutor(final int corePoolSize, ++ final int maximumPoolSize, ++ final long keepAliveTime, ++ final TimeUnit unit, ++ final BlockingQueue workQueue, ++ final ThreadFactory threadFactory) { ++ return ThreadUtils.newThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy()); ++ } ++ ++ public static ExecutorService newThreadPoolExecutor(int corePoolSize, ++ int maximumPoolSize, ++ long keepAliveTime, ++ TimeUnit unit, ++ BlockingQueue workQueue, ++ ThreadFactory threadFactory, ++ RejectedExecutionHandler handler) { ++ return new FutureTaskExtThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) { +- return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon)); ++ return ThreadUtils.newScheduledThreadPool(1, processName, isDaemon); ++ } ++ ++ public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { ++ return ThreadUtils.newScheduledThreadPool(1, threadFactory); ++ } ++ ++ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { ++ return ThreadUtils.newScheduledThreadPool(corePoolSize, Executors.defaultThreadFactory()); + } + +- public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName, ++ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, String processName, + boolean isDaemon) { +- return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon)); ++ return ThreadUtils.newScheduledThreadPool(corePoolSize, newThreadFactory(processName, isDaemon)); ++ } ++ ++ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { ++ return ThreadUtils.newScheduledThreadPool(corePoolSize, threadFactory, new ThreadPoolExecutor.AbortPolicy()); ++ } ++ ++ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ++ ThreadFactory threadFactory, ++ RejectedExecutionHandler handler) { ++ return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, handler); + } + + public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) { +- return newGenericThreadFactory("Remoting-" + processName, isDaemon); ++ return newGenericThreadFactory("ThreadUtils-" + processName, isDaemon); + } + + public static ThreadFactory newGenericThreadFactory(String processName) { +diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java +index c6446f058..5b712bc30 100644 +--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java ++++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java +@@ -47,14 +47,12 @@ import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.ScheduledExecutorService; +-import java.util.concurrent.ScheduledThreadPoolExecutor; +-import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + + public class BrokerContainer implements IBrokerContainer { + private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + +- private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, ++ private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1, + new BasicThreadFactory.Builder() + .namingPattern("BrokerContainerScheduledThread") + .daemon(true) +@@ -143,7 +141,7 @@ public class BrokerContainer implements IBrokerContainer { + this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.containerClientHouseKeepingService); + this.fastRemotingServer = this.remotingServer.newRemotingServer(this.nettyServerConfig.getListenPort() - 2); + +- this.brokerContainerExecutor = new ThreadPoolExecutor( ++ this.brokerContainerExecutor = ThreadUtils.newThreadPoolExecutor( + 1, + 1, + 1000 * 60, +diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java +index 7c91e70da..3e6b0eba5 100644 +--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java ++++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java +@@ -25,8 +25,6 @@ import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; + import java.util.concurrent.Future; + import java.util.concurrent.LinkedBlockingQueue; +-import java.util.concurrent.RunnableFuture; +-import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + + import org.apache.commons.lang3.StringUtils; +@@ -34,8 +32,8 @@ import org.apache.rocketmq.common.ControllerConfig; + import org.apache.rocketmq.common.Pair; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; +-import org.apache.rocketmq.common.future.FutureTaskExt; + ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy; + import org.apache.rocketmq.controller.impl.DLedgerController; + import org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManager; +@@ -93,18 +91,14 @@ public class ControllerManager { + + public boolean initialize() { + this.controllerRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity()); +- this.controllerRequestExecutor = new ThreadPoolExecutor( ++ this.controllerRequestExecutor = ThreadUtils.newThreadPoolExecutor( + this.controllerConfig.getControllerThreadPoolNums(), + this.controllerConfig.getControllerThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.controllerRequestThreadPoolQueue, +- new ThreadFactoryImpl("ControllerRequestExecutorThread_")) { +- @Override +- protected RunnableFuture newTaskFor(final Runnable runnable, final T value) { +- return new FutureTaskExt(runnable, value); +- } +- }; ++ new ThreadFactoryImpl("ControllerRequestExecutorThread_")); ++ + this.notifyService.initialize(); + if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) { + throw new IllegalArgumentException("Attribute value controllerDLegerPeers of ControllerConfig is null or empty"); +diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java +index fa91f288e..33e4406e4 100644 +--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java ++++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java +@@ -32,7 +32,6 @@ import java.util.Map; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.ExecutorService; +-import java.util.concurrent.Executors; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ScheduledFuture; +@@ -44,6 +43,7 @@ import org.apache.rocketmq.common.ControllerConfig; + import org.apache.rocketmq.common.ServiceThread; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.controller.Controller; + import org.apache.rocketmq.controller.elect.ElectPolicy; + import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy; +@@ -66,11 +66,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; + import org.apache.rocketmq.remoting.protocol.ResponseCode; + import org.apache.rocketmq.remoting.protocol.body.SyncStateSet; + import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader; +-import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; +-import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; ++import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; + import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; + import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; ++import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader; +@@ -136,7 +136,7 @@ public class DLedgerController implements Controller { + this.dLedgerServer = new DLedgerServer(dLedgerConfig, nettyServerConfig, nettyClientConfig, channelEventListener); + this.dLedgerServer.registerStateMachine(this.statemachine); + this.dLedgerServer.getDLedgerLeaderElector().addRoleChangeHandler(this.roleHandler); +- this.scanInactiveMasterService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DLedgerController_scanInactiveService_")); ++ this.scanInactiveMasterService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DLedgerController_scanInactiveService_")); + this.brokerLifecycleListeners = new ArrayList<>(); + } + +@@ -513,7 +513,7 @@ public class DLedgerController implements Controller { + class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler { + + private final String selfId; +- private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_")); ++ private final ExecutorService executorService = ThreadUtils.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_")); + private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER; + + public RoleChangeHandler(final String selfId) { +diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java +index 2fbddb9cd..6ebb2c994 100644 +--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java ++++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java +@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.common.ControllerConfig; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.controller.BrokerHeartbeatManager; + import org.apache.rocketmq.controller.helper.BrokerLifecycleListener; + import org.apache.rocketmq.logging.org.slf4j.Logger; +@@ -66,7 +67,7 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager { + + @Override + public void initialize() { +- this.scheduledService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_")); ++ this.scheduledService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_")); + this.executor = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_")); + } + +diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java +index 15c65ebec..be327cffa 100644 +--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java ++++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java +@@ -20,10 +20,7 @@ import java.util.Collections; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.LinkedBlockingQueue; +-import java.util.concurrent.RunnableFuture; + import java.util.concurrent.ScheduledExecutorService; +-import java.util.concurrent.ScheduledThreadPoolExecutor; +-import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.concurrent.BasicThreadFactory; + import org.apache.rocketmq.common.ThreadFactoryImpl; +@@ -31,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.future.FutureTaskExt; + import org.apache.rocketmq.common.namesrv.NamesrvConfig; + import org.apache.rocketmq.common.utils.NetworkUtil; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager; +@@ -62,10 +60,10 @@ public class NamesrvController { + private final NettyServerConfig nettyServerConfig; + private final NettyClientConfig nettyClientConfig; + +- private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, ++ private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1, + new BasicThreadFactory.Builder().namingPattern("NSScheduledThread").daemon(true).build()); + +- private final ScheduledExecutorService scanExecutorService = new ScheduledThreadPoolExecutor(1, ++ private final ScheduledExecutorService scanExecutorService = ThreadUtils.newScheduledThreadPool(1, + new BasicThreadFactory.Builder().namingPattern("NSScanScheduledThread").daemon(true).build()); + + private final KVConfigManager kvConfigManager; +@@ -138,20 +136,10 @@ public class NamesrvController { + + private void initiateThreadExecutors() { + this.defaultThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity()); +- this.defaultExecutor = new ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_")) { +- @Override +- protected RunnableFuture newTaskFor(final Runnable runnable, final T value) { +- return new FutureTaskExt<>(runnable, value); +- } +- }; ++ this.defaultExecutor = ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_")); + + this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity()); +- this.clientRequestExecutor = new ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")) { +- @Override +- protected RunnableFuture newTaskFor(final Runnable runnable, final T value) { +- return new FutureTaskExt<>(runnable, value); +- } +- }; ++ this.clientRequestExecutor = ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")); + } + + private void initiateSslContext() { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java +index 14330dd8d..a18cf7600 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java +@@ -21,13 +21,13 @@ import java.util.Set; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; +-import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicLong; + import org.apache.rocketmq.common.ThreadFactoryImpl; +-import org.apache.rocketmq.proxy.common.ProxyContext; + import org.apache.rocketmq.common.utils.StartAndShutdown; ++import org.apache.rocketmq.common.utils.ThreadUtils; ++import org.apache.rocketmq.proxy.common.ProxyContext; + import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.config.ProxyConfig; + import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; +@@ -43,7 +43,7 @@ public class GrpcChannelManager implements StartAndShutdown { + protected final AtomicLong nonceIdGenerator = new AtomicLong(0); + protected final ConcurrentMap resultNonceFutureMap = new ConcurrentHashMap<>(); + +- protected final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( ++ protected final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor( + new ThreadFactoryImpl("GrpcChannelManager_") + ); + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +index bcc9edd09..fe07090d5 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +@@ -22,7 +22,6 @@ import io.netty.channel.Channel; + import java.util.List; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.CompletableFuture; +-import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; +@@ -33,6 +32,7 @@ import org.apache.rocketmq.common.future.FutureTaskExt; + import org.apache.rocketmq.common.thread.ThreadPoolMonitor; + import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor; + import org.apache.rocketmq.common.utils.StartAndShutdown; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.proxy.config.ConfigurationManager; +@@ -178,7 +178,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu + new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInDefaultQueue()) + ); + +- this.timerExecutor = Executors.newSingleThreadScheduledExecutor( ++ this.timerExecutor = ThreadUtils.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("RemotingServerScheduler-%d").build() + ); + this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10, 10, TimeUnit.SECONDS); +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java +index d2ddfc352..9786cec55 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java +@@ -16,7 +16,6 @@ + */ + package org.apache.rocketmq.proxy.service; + +-import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.broker.client.ClientChannelInfo; +@@ -27,23 +26,24 @@ import org.apache.rocketmq.broker.client.ProducerChangeListener; + import org.apache.rocketmq.broker.client.ProducerGroupEvent; + import org.apache.rocketmq.broker.client.ProducerManager; + import org.apache.rocketmq.client.common.NameserverAccessConfig; ++import org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor; ++import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; + import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; + import org.apache.rocketmq.proxy.common.ProxyContext; + import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.config.ProxyConfig; + import org.apache.rocketmq.proxy.service.admin.AdminService; + import org.apache.rocketmq.proxy.service.admin.DefaultAdminService; + import org.apache.rocketmq.proxy.service.client.ClusterConsumerManager; ++import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor; + import org.apache.rocketmq.proxy.service.message.ClusterMessageService; + import org.apache.rocketmq.proxy.service.message.MessageService; + import org.apache.rocketmq.proxy.service.metadata.ClusterMetadataService; + import org.apache.rocketmq.proxy.service.metadata.MetadataService; +-import org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor; +-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; +-import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor; + import org.apache.rocketmq.proxy.service.relay.ClusterProxyRelayService; + import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; + import org.apache.rocketmq.proxy.service.route.ClusterTopicRouteService; +@@ -73,7 +73,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + NameserverAccessConfig nameserverAccessConfig = new NameserverAccessConfig(proxyConfig.getNamesrvAddr(), + proxyConfig.getNamesrvDomain(), proxyConfig.getNamesrvDomainSubgroup()); +- this.scheduledExecutorService = Executors.newScheduledThreadPool(3); ++ this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(3); + + this.messagingClientAPIFactory = new MQClientAPIFactory( + nameserverAccessConfig, +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java +index 4d1ca7b66..59cd92685 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java +@@ -16,7 +16,6 @@ + */ + package org.apache.rocketmq.proxy.service; + +-import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.broker.BrokerController; +@@ -28,6 +27,7 @@ import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; + import org.apache.rocketmq.common.utils.StartAndShutdown; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.config.ProxyConfig; + import org.apache.rocketmq.proxy.service.admin.AdminService; +@@ -58,7 +58,7 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser + private final MQClientAPIFactory mqClientAPIFactory; + private final ChannelManager channelManager; + +- private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( ++ private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor( + new ThreadFactoryImpl("LocalServiceManagerScheduledThread")); + + public LocalServiceManager(BrokerController brokerController, RPCHook rpcHook) { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +index 69f44344a..207603fe8 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +@@ -24,7 +24,6 @@ import java.util.Set; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; +-import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; +@@ -42,20 +41,21 @@ import org.apache.rocketmq.common.thread.ThreadPoolMonitor; + import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; + import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; + import org.apache.rocketmq.common.utils.StartAndShutdown; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +-import org.apache.rocketmq.proxy.common.RenewEvent; + import org.apache.rocketmq.proxy.common.MessageReceiptHandle; + import org.apache.rocketmq.proxy.common.ProxyContext; + import org.apache.rocketmq.proxy.common.ProxyException; + import org.apache.rocketmq.proxy.common.ProxyExceptionCode; + import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; ++import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey; ++import org.apache.rocketmq.proxy.common.RenewEvent; + import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; + import org.apache.rocketmq.proxy.common.channel.ChannelHelper; + import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; + import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.config.ProxyConfig; +-import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey; + import org.apache.rocketmq.proxy.service.metadata.MetadataService; + import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; + import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +@@ -68,7 +68,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + protected final StateEventListener eventListener; + protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy(); + protected final ScheduledExecutorService scheduledExecutorService = +- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_")); ++ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_")); + protected final ThreadPoolExecutor renewalWorkerService; + + public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener eventListener) { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +index caf62a1e0..ccf094c03 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +@@ -19,25 +19,24 @@ package org.apache.rocketmq.proxy.service.route; + import com.github.benmanes.caffeine.cache.CacheLoader; + import com.github.benmanes.caffeine.cache.Caffeine; + import com.github.benmanes.caffeine.cache.LoadingCache; ++import com.google.common.base.Optional; + import java.time.Duration; + import java.util.List; +-import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; +- +-import com.google.common.base.Optional; + import org.apache.rocketmq.client.ClientConfig; + import org.apache.rocketmq.client.exception.MQClientException; ++import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; + import org.apache.rocketmq.client.latency.MQFaultStrategy; + import org.apache.rocketmq.client.latency.Resolver; + import org.apache.rocketmq.client.latency.ServiceDetector; +-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.common.thread.ThreadPoolMonitor; + import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.proxy.common.Address; +@@ -63,7 +62,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { + public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + +- this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( ++ this.scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor( + new ThreadFactoryImpl("TopicRouteService_") + ); + this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor( +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +index 8491f4354..64621dd6c 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +@@ -61,7 +61,6 @@ import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; +-import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.concurrent.atomic.AtomicReference; +@@ -71,6 +70,7 @@ import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.common.Pair; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.ChannelEventListener; +@@ -142,7 +142,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti + + this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactoryImpl("NettyClientPublicExecutor_")); + +- this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, ++ this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("NettyClientScan_thread_")); + + if (eventLoopGroup != null) { +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +index e626260c9..aa0d46542 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +@@ -61,6 +61,7 @@ import org.apache.rocketmq.common.constant.HAProxyConstants; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.utils.BinaryUtil; + import org.apache.rocketmq.common.utils.NetworkUtil; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.ChannelEventListener; +@@ -83,7 +84,6 @@ import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; +-import java.util.concurrent.ScheduledThreadPoolExecutor; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + +@@ -171,7 +171,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + } + + private ScheduledExecutorService buildScheduleExecutor() { +- return new ScheduledThreadPoolExecutor(1, ++ return ThreadUtils.newScheduledThreadPool(1, + new ThreadFactoryImpl("NettyServerScheduler_", true), + new ThreadPoolExecutor.DiscardOldestPolicy()); + } +diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +index f2a54ddf6..02ea47f13 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +@@ -48,7 +48,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.ExecutorService; +-import java.util.concurrent.Executors; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ThreadPoolExecutor; +@@ -83,6 +82,7 @@ import org.apache.rocketmq.common.topic.TopicValidator; + import org.apache.rocketmq.common.utils.CleanupPolicyUtils; + import org.apache.rocketmq.common.utils.QueueTypeUtils; + import org.apache.rocketmq.common.utils.ServiceProvider; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; +@@ -205,7 +205,7 @@ public class DefaultMessageStore implements MessageStore { + private ConcurrentMap topicConfigTable; + + private final ScheduledExecutorService scheduledCleanQueueExecutorService = +- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreCleanQueueScheduledThread")); ++ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreCleanQueueScheduledThread")); + + public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, + final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig, final ConcurrentMap topicConfigTable) throws IOException { +@@ -253,7 +253,7 @@ public class DefaultMessageStore implements MessageStore { + this.transientStorePool = new TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), messageStoreConfig.getMappedFileSizeCommitLog()); + + this.scheduledExecutorService = +- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity())); ++ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity())); + + this.dispatcherList = new LinkedList<>(); + this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); +@@ -2915,7 +2915,7 @@ public class DefaultMessageStore implements MessageStore { + private final ExecutorService batchDispatchRequestExecutor; + + public MainBatchDispatchRequestService() { +- batchDispatchRequestExecutor = new ThreadPoolExecutor( ++ batchDispatchRequestExecutor = ThreadUtils.newThreadPoolExecutor( + DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(), + DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(), + 1000 * 60, +diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +index d5393fdca..f20bc3e28 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java ++++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +@@ -17,10 +17,26 @@ + + package org.apache.rocketmq.store.ha.autoswitch; + +- ++import java.io.IOException; ++import java.nio.channels.SocketChannel; ++import java.util.ArrayList; ++import java.util.HashSet; ++import java.util.Iterator; ++import java.util.List; ++import java.util.Map; ++import java.util.Objects; ++import java.util.Set; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.ExecutorService; ++import java.util.concurrent.locks.Lock; ++import java.util.concurrent.locks.ReadWriteLock; ++import java.util.concurrent.locks.ReentrantReadWriteLock; ++import java.util.function.Consumer; ++import java.util.stream.Collectors; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.protocol.EpochEntry; +@@ -36,30 +52,12 @@ import org.apache.rocketmq.store.ha.HAClient; + import org.apache.rocketmq.store.ha.HAConnection; + import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService; + +-import java.io.IOException; +-import java.nio.channels.SocketChannel; +-import java.util.ArrayList; +-import java.util.HashSet; +-import java.util.List; +-import java.util.Iterator; +-import java.util.Map; +-import java.util.Objects; +-import java.util.Set; +-import java.util.concurrent.ConcurrentHashMap; +-import java.util.concurrent.ExecutorService; +-import java.util.concurrent.Executors; +-import java.util.concurrent.locks.Lock; +-import java.util.concurrent.locks.ReadWriteLock; +-import java.util.concurrent.locks.ReentrantReadWriteLock; +-import java.util.function.Consumer; +-import java.util.stream.Collectors; +- + /** + * SwitchAble ha service, support switch role to master or slave. + */ + public class AutoSwitchHAService extends DefaultHAService { + private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); +- private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_")); ++ private final ExecutorService executorService = ThreadUtils.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_")); + private final ConcurrentHashMap connectionCaughtUpTimeTable = new ConcurrentHashMap<>(); + private final List>> syncStateSetChangedListeners = new ArrayList<>(); + private final Set syncStateSet = new HashSet<>(); +diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java +index b37c90726..639084fa2 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java +@@ -16,17 +16,25 @@ + */ + package org.apache.rocketmq.store.kv; + +-import java.util.Random; ++import java.io.File; ++import java.io.IOException; ++import java.nio.file.Files; ++import java.nio.file.Paths; + import java.util.Iterator; + import java.util.List; + import java.util.Map; + import java.util.Objects; + import java.util.Optional; ++import java.util.Random; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.ScheduledExecutorService; ++import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.TopicConfig; + import org.apache.rocketmq.common.attribute.CleanupPolicy; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.utils.CleanupPolicyUtils; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.store.DefaultMessageStore; +@@ -35,15 +43,6 @@ import org.apache.rocketmq.store.GetMessageResult; + import org.apache.rocketmq.store.SelectMappedBufferResult; + import org.apache.rocketmq.store.config.MessageStoreConfig; + +-import java.io.File; +-import java.io.IOException; +-import java.nio.file.Files; +-import java.nio.file.Paths; +-import java.util.concurrent.ConcurrentHashMap; +-import java.util.concurrent.Executors; +-import java.util.concurrent.ScheduledExecutorService; +-import java.util.concurrent.TimeUnit; +- + public class CompactionStore { + + public static final String COMPACTION_DIR = "compaction"; +@@ -76,7 +75,7 @@ public class CompactionStore { + this.positionMgr = new CompactionPositionMgr(compactionPath); + this.compactionThreadNum = Math.min(Runtime.getRuntime().availableProcessors(), Math.max(1, config.getCompactionThreadNum())); + +- this.compactionSchedule = Executors.newScheduledThreadPool(this.compactionThreadNum, ++ this.compactionSchedule = ThreadUtils.newScheduledThreadPool(this.compactionThreadNum, + new ThreadFactoryImpl("compactionSchedule_")); + this.offsetMapSize = config.getMaxOffsetMapSize() / compactionThreadNum; + +diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +index 8d38503b3..d03d15d65 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +@@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.FutureTask; + import java.util.concurrent.LinkedBlockingQueue; +-import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.TopicConfig; +@@ -34,6 +33,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; + import org.apache.rocketmq.common.message.MessageExt; + import org.apache.rocketmq.common.topic.TopicValidator; + import org.apache.rocketmq.common.utils.QueueTypeUtils; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.store.CommitLog; +@@ -175,7 +175,7 @@ public class ConsumeQueueStore { + } + + private ExecutorService buildExecutorService(BlockingQueue blockingQueue, String threadNamePrefix) { +- return new ThreadPoolExecutor( ++ return ThreadUtils.newThreadPoolExecutor( + this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(), + this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(), + 1000 * 60, +diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +index 2dd3fc5b5..489d7b4fb 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java ++++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +@@ -17,7 +17,6 @@ + package org.apache.rocketmq.store.stats; + + import java.util.HashMap; +-import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import org.apache.commons.lang3.tuple.Pair; + import org.apache.rocketmq.common.BrokerConfig; +@@ -32,13 +31,14 @@ import org.apache.rocketmq.common.statistics.StatisticsItemScheduledPrinter; + import org.apache.rocketmq.common.statistics.StatisticsItemStateGetter; + import org.apache.rocketmq.common.statistics.StatisticsKindMeta; + import org.apache.rocketmq.common.statistics.StatisticsManager; ++import org.apache.rocketmq.common.stats.MomentStatsItemSet; + import org.apache.rocketmq.common.stats.Stats; ++import org.apache.rocketmq.common.stats.StatsItem; ++import org.apache.rocketmq.common.stats.StatsItemSet; + import org.apache.rocketmq.common.topic.TopicValidator; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +-import org.apache.rocketmq.common.stats.MomentStatsItemSet; +-import org.apache.rocketmq.common.stats.StatsItem; +-import org.apache.rocketmq.common.stats.StatsItemSet; + + public class BrokerStatsManager { + +@@ -281,11 +281,11 @@ public class BrokerStatsManager { + + private void initScheduleService() { + this.scheduledExecutorService = +- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerStatsThread", true, brokerConfig)); ++ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerStatsThread", true, brokerConfig)); + this.commercialExecutor = +- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig)); ++ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig)); + this.accountExecutor = +- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("AccountStatsThread", true, brokerConfig)); ++ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("AccountStatsThread", true, brokerConfig)); + } + + public MomentStatsItemSet getMomentStatsItemSetFallSize() { +diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +index 181f7087a..0d50de65a 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +@@ -35,7 +35,6 @@ import java.util.Set; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.ConcurrentSkipListSet; + import java.util.concurrent.CountDownLatch; +-import java.util.concurrent.Executors; + import java.util.concurrent.LinkedBlockingDeque; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.TimeUnit; +@@ -54,6 +53,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; + import org.apache.rocketmq.common.message.MessageExt; + import org.apache.rocketmq.common.message.MessageExtBrokerInner; + import org.apache.rocketmq.common.topic.TopicValidator; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.store.ConsumeQueue; +@@ -174,11 +174,11 @@ public class TimerMessageStore { + this.lastBrokerRole = storeConfig.getBrokerRole(); + + if (messageStore instanceof DefaultMessageStore) { +- scheduler = Executors.newSingleThreadScheduledExecutor( ++ scheduler = ThreadUtils.newSingleThreadScheduledExecutor( + new ThreadFactoryImpl("TimerScheduledThread", + ((DefaultMessageStore) messageStore).getBrokerIdentity())); + } else { +- scheduler = Executors.newSingleThreadScheduledExecutor( ++ scheduler = ThreadUtils.newSingleThreadScheduledExecutor( + new ThreadFactoryImpl("TimerScheduledThread")); + } + +diff --git a/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java +index f3d105bc6..080b7e385 100644 +--- a/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java ++++ b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java +@@ -28,7 +28,6 @@ import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.concurrent.atomic.AtomicLong; +- + import javax.annotation.Generated; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java +index 6dd0e8846..65d586f43 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java +@@ -20,10 +20,10 @@ import java.util.concurrent.BlockingQueue; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.ScheduledExecutorService; +-import java.util.concurrent.ScheduledThreadPoolExecutor; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.common.ThreadFactoryImpl; ++import org.apache.rocketmq.common.utils.ThreadUtils; + + public class TieredStoreExecutor { + +@@ -43,20 +43,20 @@ public class TieredStoreExecutor { + public static ExecutorService compactIndexFileExecutor; + + public static void init() { +- commonScheduledExecutor = new ScheduledThreadPoolExecutor( ++ commonScheduledExecutor = ThreadUtils.newScheduledThreadPool( + Math.max(4, Runtime.getRuntime().availableProcessors()), + new ThreadFactoryImpl("TieredCommonExecutor_")); + +- commitExecutor = new ScheduledThreadPoolExecutor( ++ commitExecutor = ThreadUtils.newScheduledThreadPool( + Math.max(16, Runtime.getRuntime().availableProcessors() * 4), + new ThreadFactoryImpl("TieredCommitExecutor_")); + +- cleanExpiredFileExecutor = new ScheduledThreadPoolExecutor( ++ cleanExpiredFileExecutor = ThreadUtils.newScheduledThreadPool( + Math.max(4, Runtime.getRuntime().availableProcessors()), + new ThreadFactoryImpl("TieredCleanFileExecutor_")); + + dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); +- dispatchExecutor = new ThreadPoolExecutor( ++ dispatchExecutor = ThreadUtils.newThreadPoolExecutor( + Math.max(2, Runtime.getRuntime().availableProcessors()), + Math.max(16, Runtime.getRuntime().availableProcessors() * 4), + 1000 * 60, +@@ -66,7 +66,7 @@ public class TieredStoreExecutor { + new ThreadPoolExecutor.DiscardOldestPolicy()); + + fetchDataThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); +- fetchDataExecutor = new ThreadPoolExecutor( ++ fetchDataExecutor = ThreadUtils.newThreadPoolExecutor( + Math.max(16, Runtime.getRuntime().availableProcessors() * 4), + Math.max(64, Runtime.getRuntime().availableProcessors() * 8), + 1000 * 60, +@@ -75,7 +75,7 @@ public class TieredStoreExecutor { + new ThreadFactoryImpl("TieredFetchExecutor_")); + + compactIndexFileThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); +- compactIndexFileExecutor = new ThreadPoolExecutor( ++ compactIndexFileExecutor = ThreadUtils.newThreadPoolExecutor( + 1, + 1, + 1000 * 60, +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +index fa3596d51..1ebff6d8a 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +@@ -66,6 +66,7 @@ import org.apache.rocketmq.common.namesrv.NamesrvUtil; + import org.apache.rocketmq.common.topic.TopicValidator; + import org.apache.rocketmq.common.utils.NetworkUtil; + import org.apache.rocketmq.common.BoundaryType; ++import org.apache.rocketmq.common.utils.ThreadUtils; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.RPCHook; +@@ -193,7 +194,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { + + int threadPoolCoreSize = Integer.parseInt(System.getProperty("rocketmq.admin.threadpool.coresize", "20")); + +- this.threadPoolExecutor = new ThreadPoolExecutor(threadPoolCoreSize, 100, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_")); ++ this.threadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(threadPoolCoreSize, 100, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_")); + + break; + case RUNNING: +-- +2.32.0.windows.2 + + +From dad6b4dadfec7a58e78a6715ec16c2eb6b17ff27 Mon Sep 17 00:00:00 2001 +From: Ziyi Tan +Date: Mon, 11 Sep 2023 14:34:10 +0800 +Subject: [PATCH 2/6] [ISSUE #7334] `registerIncrementBrokerData` for single + topic update (#7335) + +Signed-off-by: Ziy1-Tan +--- + .../broker/topic/TopicConfigManager.java | 30 +++++++++++++++---- + 1 file changed, 25 insertions(+), 5 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +index 4e3c1736c..754605438 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +@@ -290,7 +290,11 @@ public class TopicConfigManager extends ConfigManager { + } + + if (createNew) { +- this.brokerController.registerBrokerAll(false, true, true); ++ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { ++ this.brokerController.registerSingleTopicAll(topicConfig); ++ } else { ++ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); ++ } + } + + return topicConfig; +@@ -394,7 +398,11 @@ public class TopicConfigManager extends ConfigManager { + } + + if (createNew) { +- this.brokerController.registerBrokerAll(false, true, true); ++ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { ++ this.brokerController.registerSingleTopicAll(topicConfig); ++ } else { ++ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); ++ } + } + + return topicConfig; +@@ -435,7 +443,11 @@ public class TopicConfigManager extends ConfigManager { + } + + if (createNew) { +- this.brokerController.registerBrokerAll(false, true, true); ++ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { ++ this.brokerController.registerSingleTopicAll(topicConfig); ++ } else { ++ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); ++ } + } + + return topicConfig; +@@ -461,7 +473,11 @@ public class TopicConfigManager extends ConfigManager { + dataVersion.nextVersion(stateMachineVersion); + + this.persist(); +- this.brokerController.registerBrokerAll(false, true, true); ++ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { ++ this.brokerController.registerSingleTopicAll(topicConfig); ++ } else { ++ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); ++ } + } + } + +@@ -484,7 +500,11 @@ public class TopicConfigManager extends ConfigManager { + dataVersion.nextVersion(stateMachineVersion); + + this.persist(); +- this.brokerController.registerBrokerAll(false, true, true); ++ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { ++ this.brokerController.registerSingleTopicAll(topicConfig); ++ } else { ++ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); ++ } + } + } + +-- +2.32.0.windows.2 + + +From 0dbd0772b99f618f757d42cd64542b83e2100e4f Mon Sep 17 00:00:00 2001 +From: Ziyi Tan +Date: Mon, 11 Sep 2023 15:48:07 +0800 +Subject: [PATCH 3/6] [ISSUE #7326] Split the request to register to the + nameserver (#7325) + +Signed-off-by: Ziy1-Tan +--- + .../rocketmq/broker/BrokerController.java | 41 +++++++++++-------- + .../broker/topic/TopicConfigManager.java | 21 ++++++++++ + .../apache/rocketmq/common/BrokerConfig.java | 24 +++++++++++ + .../test/route/CreateAndUpdateTopicIT.java | 31 ++++++++++++++ + 4 files changed, 99 insertions(+), 18 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +index 275b64b1a..9e49f636d 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +@@ -1765,29 +1765,34 @@ public class BrokerController { + } + + public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { ++ ConcurrentMap topicConfigMap = this.getTopicConfigManager().getTopicConfigTable(); ++ ConcurrentHashMap topicConfigTable = new ConcurrentHashMap<>(); + +- TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper(); +- +- topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion()); +- topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable()); +- +- topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map( +- entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())) +- ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); +- +- if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) +- || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { +- ConcurrentHashMap topicConfigTable = new ConcurrentHashMap<>(); +- for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { +- TopicConfig tmp = ++ for (TopicConfig topicConfig : topicConfigMap.values()) { ++ if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) ++ || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ++ topicConfigTable.put(topicConfig.getTopicName(), + new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), +- topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag()); +- topicConfigTable.put(topicConfig.getTopicName(), tmp); ++ topicConfig.getPerm() & getBrokerConfig().getBrokerPermission())); ++ } else { ++ topicConfigTable.put(topicConfig.getTopicName(), topicConfig); ++ } ++ ++ if (this.brokerConfig.isEnableSplitRegistration() ++ && topicConfigTable.size() >= this.brokerConfig.getSplitRegistrationSize()) { ++ TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildSerializeWrapper(topicConfigTable); ++ doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); ++ topicConfigTable.clear(); + } +- topicConfigWrapper.setTopicConfigTable(topicConfigTable); + } + +- if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), ++ Map topicQueueMappingInfoMap = this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream() ++ .map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))) ++ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); ++ ++ TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager(). ++ buildSerializeWrapper(topicConfigTable, topicQueueMappingInfoMap); ++ if (this.brokerConfig.isEnableSplitRegistration() || forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId(), +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +index 754605438..8537929be 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock; + + import com.google.common.collect.ImmutableMap; + ++import com.google.common.collect.Maps; + import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.broker.BrokerController; + import org.apache.rocketmq.broker.BrokerPathConfigHelper; +@@ -47,7 +48,9 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.protocol.DataVersion; + import org.apache.rocketmq.remoting.protocol.body.KVTable; ++import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper; + import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; ++import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo; + + import static com.google.common.base.Preconditions.checkNotNull; + +@@ -609,6 +612,24 @@ public class TopicConfigManager extends ConfigManager { + return topicConfigSerializeWrapper; + } + ++ public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(final ConcurrentMap topicConfigTable) { ++ return buildSerializeWrapper(topicConfigTable, Maps.newHashMap()); ++ } ++ ++ public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper( ++ final ConcurrentMap topicConfigTable, ++ final Map topicQueueMappingInfoMap ++ ) { ++ TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper(); ++ topicConfigWrapper.setTopicConfigTable(topicConfigTable); ++ topicConfigWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap); ++ topicConfigWrapper.setDataVersion(this.getDataVersion()); ++ if (this.brokerController.getBrokerConfig().isEnableSplitRegistration()) { ++ this.getDataVersion().nextVersion(); ++ } ++ return topicConfigWrapper; ++ } ++ + @Override + public String encode() { + return encode(false); +diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +index 45d26b29c..0d248c4e1 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java ++++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +@@ -396,6 +396,14 @@ public class BrokerConfig extends BrokerIdentity { + + private boolean enableMixedMessageType = false; + ++ /** ++ * This flag and deleteTopicWithBrokerRegistration flag in the NameServer cannot be set to true at the same time, ++ * otherwise there will be a loss of routing ++ */ ++ private boolean enableSplitRegistration = false; ++ ++ private int splitRegistrationSize = 800; ++ + public long getMaxPopPollingSize() { + return maxPopPollingSize; + } +@@ -1731,4 +1739,20 @@ public class BrokerConfig extends BrokerIdentity { + public void setEnableMixedMessageType(boolean enableMixedMessageType) { + this.enableMixedMessageType = enableMixedMessageType; + } ++ ++ public boolean isEnableSplitRegistration() { ++ return enableSplitRegistration; ++ } ++ ++ public void setEnableSplitRegistration(boolean enableSplitRegistration) { ++ this.enableSplitRegistration = enableSplitRegistration; ++ } ++ ++ public int getSplitRegistrationSize() { ++ return splitRegistrationSize; ++ } ++ ++ public void setSplitRegistrationSize(int splitRegistrationSize) { ++ this.splitRegistrationSize = splitRegistrationSize; ++ } + } +diff --git a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java +index 7e3c7b871..2370e68c0 100644 +--- a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java ++++ b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java +@@ -17,6 +17,7 @@ + + package org.apache.rocketmq.test.route; + ++import org.apache.rocketmq.common.TopicConfig; + import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; + import org.apache.rocketmq.test.base.BaseConf; + import org.apache.rocketmq.test.util.MQAdminTestUtils; +@@ -111,4 +112,34 @@ public class CreateAndUpdateTopicIT extends BaseConf { + brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false); + namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false); + } ++ ++ @Test ++ public void testCreateOrUpdateTopic_EnableSplitRegistration() { ++ brokerController1.getBrokerConfig().setEnableSplitRegistration(true); ++ brokerController2.getBrokerConfig().setEnableSplitRegistration(true); ++ brokerController3.getBrokerConfig().setEnableSplitRegistration(true); ++ ++ String testTopic = "test-topic-"; ++ ++ for (int i = 0; i < 1000; i++) { ++ TopicConfig topicConfig = new TopicConfig(testTopic + i, 8, 8); ++ brokerController1.getTopicConfigManager().updateTopicConfig(topicConfig); ++ brokerController2.getTopicConfigManager().updateTopicConfig(topicConfig); ++ brokerController3.getTopicConfigManager().updateTopicConfig(topicConfig); ++ } ++ ++ brokerController1.registerBrokerAll(false, true, true); ++ brokerController2.registerBrokerAll(false, true, true); ++ brokerController3.registerBrokerAll(false, true, true); ++ ++ for (int i = 0; i < 1000; i++) { ++ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic + i); ++ assertThat(route.getBrokerDatas()).hasSize(3); ++ assertThat(route.getQueueDatas()).hasSize(3); ++ } ++ ++ brokerController1.getBrokerConfig().setEnableSplitRegistration(false); ++ brokerController2.getBrokerConfig().setEnableSplitRegistration(false); ++ brokerController3.getBrokerConfig().setEnableSplitRegistration(false); ++ } + } +-- +2.32.0.windows.2 + + +From a9e353285cea762b0c5eab567bdfa8e5c8c2d279 Mon Sep 17 00:00:00 2001 +From: rongtong +Date: Mon, 11 Sep 2023 15:55:18 +0800 +Subject: [PATCH 4/6] Add the configuration of topicQueueLock number to better + support different scenarios (#7317) + +--- + .../main/java/org/apache/rocketmq/store/CommitLog.java | 2 +- + .../java/org/apache/rocketmq/store/TopicQueueLock.java | 8 ++++++++ + .../rocketmq/store/config/MessageStoreConfig.java | 10 ++++++++++ + 3 files changed, 19 insertions(+), 1 deletion(-) + +diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +index e6ee3bacc..456bf2b86 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ++++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +@@ -122,7 +122,7 @@ public class CommitLog implements Swappable { + + this.flushDiskWatcher = new FlushDiskWatcher(); + +- this.topicQueueLock = new TopicQueueLock(); ++ this.topicQueueLock = new TopicQueueLock(messageStore.getMessageStoreConfig().getTopicQueueLockNum()); + + this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); + } +diff --git a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java +index a78eeed23..5a131b5c3 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java ++++ b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java +@@ -34,6 +34,14 @@ public class TopicQueueLock { + } + } + ++ public TopicQueueLock(int size) { ++ this.size = size; ++ this.lockList = new ArrayList<>(size); ++ for (int i = 0; i < this.size; i++) { ++ this.lockList.add(new ReentrantLock()); ++ } ++ } ++ + public void lock(String topicQueueKey) { + Lock lock = this.lockList.get((topicQueueKey.hashCode() & 0x7fffffff) % this.size); + lock.lock(); +diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +index efb728ac0..9fa448043 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java ++++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +@@ -401,6 +401,8 @@ public class MessageStoreConfig { + private long memTableFlushInterval = 60 * 60 * 1000L; + private boolean enableRocksDBLog = false; + ++ private int topicQueueLockNum = 32; ++ + public boolean isDebugLockEnable() { + return debugLockEnable; + } +@@ -1751,4 +1753,12 @@ public class MessageStoreConfig { + public void setEnableRocksDBLog(boolean enableRocksDBLog) { + this.enableRocksDBLog = enableRocksDBLog; + } ++ ++ public int getTopicQueueLockNum() { ++ return topicQueueLockNum; ++ } ++ ++ public void setTopicQueueLockNum(int topicQueueLockNum) { ++ this.topicQueueLockNum = topicQueueLockNum; ++ } + } +-- +2.32.0.windows.2 + + +From 57f04c95d3a2ba6b91583058a6e4eda209f72d6e Mon Sep 17 00:00:00 2001 +From: guyinyou <36399867+guyinyou@users.noreply.github.com> +Date: Mon, 11 Sep 2023 18:23:25 +0800 +Subject: [PATCH 5/6] [ISSUE #7343] Rollback modifications to registerProcessor + +Co-authored-by: guyinyou +--- + .../java/org/apache/rocketmq/broker/BrokerController.java | 4 ++-- + 1 file changed, 2 insertions(+), 2 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +index 9e49f636d..13a3feb4e 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +@@ -827,6 +827,8 @@ public class BrokerController { + + initializeResources(); + ++ registerProcessor(); ++ + initializeScheduledTasks(); + + initialTransaction(); +@@ -1687,8 +1689,6 @@ public class BrokerController { + } + } + }, 10, 5, TimeUnit.SECONDS); +- +- registerProcessor(); + } + + protected void scheduleSendHeartbeat() { +-- +2.32.0.windows.2 + + +From dad6ad09d13dadc36b6342671c77f619bbb8c522 Mon Sep 17 00:00:00 2001 +From: Ao Qiao +Date: Tue, 12 Sep 2023 08:28:45 +0800 +Subject: [PATCH 6/6] [ISSUE #7340] Abstract Duplicate code into a method in + `TopicConfigManager` (#7341) + +--- + .../broker/topic/TopicConfigManager.java | 44 ++++++------------- + 1 file changed, 14 insertions(+), 30 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +index 8537929be..511d29e12 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +@@ -293,11 +293,7 @@ public class TopicConfigManager extends ConfigManager { + } + + if (createNew) { +- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { +- this.brokerController.registerSingleTopicAll(topicConfig); +- } else { +- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); +- } ++ registerBrokerData(topicConfig); + } + + return topicConfig; +@@ -337,11 +333,7 @@ public class TopicConfigManager extends ConfigManager { + log.error("createTopicIfAbsent ", e); + } + if (createNew && register) { +- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { +- this.brokerController.registerSingleTopicAll(topicConfig); +- } else { +- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); +- } ++ registerBrokerData(topicConfig); + } + return getTopicConfig(topicConfig.getTopicName()); + } +@@ -401,11 +393,7 @@ public class TopicConfigManager extends ConfigManager { + } + + if (createNew) { +- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { +- this.brokerController.registerSingleTopicAll(topicConfig); +- } else { +- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); +- } ++ registerBrokerData(topicConfig); + } + + return topicConfig; +@@ -446,11 +434,7 @@ public class TopicConfigManager extends ConfigManager { + } + + if (createNew) { +- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { +- this.brokerController.registerSingleTopicAll(topicConfig); +- } else { +- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); +- } ++ registerBrokerData(topicConfig); + } + + return topicConfig; +@@ -476,11 +460,7 @@ public class TopicConfigManager extends ConfigManager { + dataVersion.nextVersion(stateMachineVersion); + + this.persist(); +- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { +- this.brokerController.registerSingleTopicAll(topicConfig); +- } else { +- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); +- } ++ registerBrokerData(topicConfig); + } + } + +@@ -503,11 +483,7 @@ public class TopicConfigManager extends ConfigManager { + dataVersion.nextVersion(stateMachineVersion); + + this.persist(); +- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { +- this.brokerController.registerSingleTopicAll(topicConfig); +- } else { +- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); +- } ++ registerBrokerData(topicConfig); + } + } + +@@ -699,6 +675,14 @@ public class TopicConfigManager extends ConfigManager { + } + } + ++ private void registerBrokerData(TopicConfig topicConfig) { ++ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { ++ this.brokerController.registerSingleTopicAll(topicConfig); ++ } else { ++ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); ++ } ++ } ++ + public boolean containsTopic(String topic) { + return topicConfigTable.containsKey(topic); + } +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index f4c9fc69762e0ead83d5a429e66348387308dc58..2570f84f38f7b034c8ff7c8a4aa61fbc495e6574 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.3 -Release: 17 +Release: 18 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -26,6 +26,7 @@ Patch0013: patch013-backport-enhance-admin-output.patch Patch0014: patch014-backport-Queue-Selection-Strategy-Optimization.patch Patch0015: patch015-backport-fix-some-bugs.patch Patch0016: patch016-backport-Optimize-fault-tolerant-mechanism.patch +Patch0017: patch017-backport-Convergent-thread-pool-creation.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -60,6 +61,9 @@ exit 0 %changelog +* Mon Nov 20 2023 ShiZhili - 5.1.3-18 +- backport-Convergent-thread-pool-creation + * Mon Oct 30 2023 ShiZhili - 5.1.3-17 - backport Optimize fault tolerant-mechanism