diff --git a/0019-clean-inactive-bundle.patch b/0019-clean-inactive-bundle.patch new file mode 100644 index 0000000000000000000000000000000000000000..e18d319e219d0446dc0590b8a64441983d89c034 --- /dev/null +++ b/0019-clean-inactive-bundle.patch @@ -0,0 +1,37 @@ +diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +index d81f6949f4..8bf8a73ff1 100644 +--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java ++++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +@@ -523,6 +523,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager { + // load management decisions may be made. + private void updateBundleData() { + final Map bundleData = loadData.getBundleData(); ++ final Set activeBundles = new HashSet<>(); + // Iterate over the broker data. + for (Map.Entry brokerEntry : loadData.getBrokerData().entrySet()) { + final String broker = brokerEntry.getKey(); +@@ -534,6 +535,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager { + for (Map.Entry entry : statsMap.entrySet()) { + final String bundle = entry.getKey(); + final NamespaceBundleStats stats = entry.getValue(); ++ activeBundles.add(bundle); + if (bundleData.containsKey(bundle)) { + // If we recognize the bundle, add these stats as a new sample. + bundleData.get(bundle).update(stats); +@@ -545,6 +547,16 @@ public class ModularLoadManagerImpl implements ModularLoadManager { + bundleData.put(bundle, currentBundleData); + } + } ++ //Remove not active bundle from loadData ++ for (String bundle : bundleData.keySet()) { ++ if (!activeBundles.contains(bundle)){ ++ bundleData.remove(bundle); ++ if (pulsar.getLeaderElectionService().isLeader()){ ++ deleteBundleDataFromMetadataStore(bundle); ++ } ++ } ++ } ++ + + // Remove all loaded bundles from the preallocated maps. + final Map preallocatedBundleData = brokerData.getPreallocatedBundleData(); diff --git a/pulsar.spec b/pulsar.spec index e1f93aeec402768df2c6a5e05b857a504a90d7e3..dd2e0c29feb6fac9f4de52b6ae9954d548d5d565 100644 --- a/pulsar.spec +++ b/pulsar.spec @@ -1,6 +1,6 @@ %define debug_package %{nil} %define pulsar_ver 2.10.4 -%define pkg_ver 18 +%define pkg_ver 19 %define _prefix /opt/pulsar Summary: Cloud-Native, Distributed Messaging and Streaming Name: pulsar @@ -28,6 +28,7 @@ Patch0015: 0015-fix-no-messages.patch Patch0016: 0016-handle-exception.patch Patch0017: 0017-return-earliest-position.patch Patch0018: 0018-return-when-AbstractDispatcherSingleActiveConsumer-closed.patch +Patch0019: 0019-clean-inactive-bundle.patch BuildRoot: /root/rpmbuild/BUILDROOT/ BuildRequires: java-1.8.0-openjdk-devel,maven,systemd Requires: java-1.8.0-openjdk,systemd @@ -58,6 +59,7 @@ Pulsar is a distributed pub-sub messaging platform with a very flexible messagin %patch0016 -p1 %patch0017 -p1 %patch0018 -p1 +%patch0019 -p1 %build mvn clean install -Pcore-modules,-main -DskipTests @@ -83,6 +85,8 @@ getent passwd pulsar >/dev/null || useradd -r -g pulsar -d / -s /sbin/nologin pu exit 0 %changelog +* Fri Dec 8 2023 Dapeng Sun - 2.10.4-19 +- clean inactive bundle from bundleData in loadData and bundlesCache * Fri Dec 8 2023 Dapeng Sun - 2.10.4-18 - Return if AbstractDispatcherSingleActiveConsumer closed * Fri Dec 8 2023 Dapeng Sun - 2.10.4-17