From 847a97c5f530abde5a63137a16f674f198846de0 Mon Sep 17 00:00:00 2001 From: sundapeng Date: Thu, 7 Dec 2023 11:31:19 +0000 Subject: [PATCH] Can not receive any messages after switch to standby cluster --- 0015-fix-no-messages.patch | 149 +++++++++++++++++++++++++++++++++++++ pulsar.spec | 6 +- 2 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 0015-fix-no-messages.patch diff --git a/0015-fix-no-messages.patch b/0015-fix-no-messages.patch new file mode 100644 index 0000000..e4c0aad --- /dev/null +++ b/0015-fix-no-messages.patch @@ -0,0 +1,149 @@ +diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +index 1e1245ed36..cf1603788f 100644 +--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java ++++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +@@ -192,10 +192,14 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P + sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap()); + } else { + // Subscription doesn't exist. We need to force the creation of the subscription in this cluster, because +- log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription", ++ log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subscription", + topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos); +- topic.createSubscription(update.getSubscriptionName(), +- InitialPosition.Latest, true /* replicateSubscriptionState */, null); ++ topic.createSubscription(update.getSubscriptionName(), InitialPosition.Earliest, ++ true /* replicateSubscriptionState */, Collections.emptyMap()) ++ .thenAccept(subscriptionCreated -> { ++ subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos), ++ AckType.Cumulative, Collections.emptyMap()); ++ }); + } + } + +diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +index 046adaa5ec..fb5bae08f6 100644 +--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java ++++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +@@ -25,9 +25,11 @@ import static org.testng.Assert.assertNotNull; + import static org.testng.Assert.assertNull; + import static org.testng.Assert.assertTrue; + import com.google.common.collect.Sets; ++import static org.testng.Assert.fail; + import java.lang.reflect.Method; + import java.nio.charset.StandardCharsets; + import java.util.ArrayList; ++import java.util.Collections; + import java.util.HashSet; + import java.util.LinkedHashSet; + import java.util.Map; +@@ -41,6 +43,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; + import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController; + import org.apache.pulsar.client.api.Consumer; + import org.apache.pulsar.client.api.Message; ++import org.apache.pulsar.client.api.MessageId; + import org.apache.pulsar.client.api.MessageRoutingMode; + import org.apache.pulsar.client.api.Producer; + import org.apache.pulsar.client.api.PulsarClient; +@@ -48,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClientException; + import org.apache.pulsar.client.api.Schema; + import org.apache.pulsar.common.policies.data.PartitionedTopicStats; + import org.apache.pulsar.common.policies.data.TopicStats; ++import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; + import org.awaitility.Awaitility; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; +@@ -154,6 +158,94 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { + "messages don't match."); + } + ++ @Test ++ public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws Exception { ++ final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_"); ++ final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_"); ++ final String subscriptionName = "s1"; ++ final boolean isReplicatedSubscription = true; ++ final int messagesCount = 20; ++ final LinkedHashSet sentMessages = new LinkedHashSet<>(); ++ final Set receivedMessages = Collections.synchronizedSet(new LinkedHashSet<>()); ++ admin1.namespaces().createNamespace(namespace); ++ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); ++ admin1.topics().createNonPartitionedTopic(topicName); ++ admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest, isReplicatedSubscription); ++ final PersistentTopic topic1 = ++ (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); ++ ++ // Send messages ++ // Wait for the topic created on the cluster2. ++ // Wait for the snapshot created. ++ final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).build(); ++ Producer producer1 = client1.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create(); ++ Consumer consumer1 = client1.newConsumer(Schema.STRING).topic(topicName) ++ .subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe(); ++ for (int i = 0; i < messagesCount / 2; i++) { ++ String msg = i + ""; ++ producer1.send(msg); ++ sentMessages.add(msg); ++ } ++ Awaitility.await().untilAsserted(() -> { ++ ConcurrentOpenHashMap replicators = topic1.getReplicators(); ++ assertTrue(replicators != null && replicators.size() == 1, "Replicator should started"); ++ assertTrue(replicators.values().iterator().next().isConnected(), "Replicator should be connected"); ++ assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(), ++ "One snapshot should be finished"); ++ }); ++ final PersistentTopic topic2 = ++ (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); ++ Awaitility.await().untilAsserted(() -> { ++ assertTrue(topic2.getReplicatedSubscriptionController().isPresent(), ++ "Replicated subscription controller should created"); ++ }); ++ for (int i = messagesCount / 2; i < messagesCount; i++) { ++ String msg = i + ""; ++ producer1.send(msg); ++ sentMessages.add(msg); ++ } ++ ++ // Consume half messages and wait the subscription created on the cluster2. ++ for (int i = 0; i < messagesCount / 2; i++){ ++ Message message = consumer1.receive(2, TimeUnit.SECONDS); ++ if (message == null) { ++ fail("Should not receive null."); ++ } ++ receivedMessages.add(message.getValue()); ++ consumer1.acknowledge(message); ++ } ++ Awaitility.await().untilAsserted(() -> { ++ assertNotNull(topic2.getSubscriptions().get(subscriptionName), "Subscription should created"); ++ }); ++ ++ // Switch client to cluster2. ++ // Since the cluster1 was not crash, all messages will be replicated to the cluster2. ++ consumer1.close(); ++ final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).build(); ++ final Consumer consumer2 = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName) ++ .subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe(); ++ ++ // Verify all messages will be consumed. ++ Awaitility.await().untilAsserted(() -> { ++ while (true) { ++ Message message = consumer2.receive(2, TimeUnit.SECONDS); ++ if (message != null) { ++ receivedMessages.add(message.getValue().toString()); ++ consumer2.acknowledge(message); ++ } else { ++ break; ++ } ++ } ++ assertEquals(receivedMessages.size(), sentMessages.size()); ++ }); ++ ++ consumer2.close(); ++ producer1.close(); ++ client1.close(); ++ client2.close(); ++ } ++ ++ + /** + * If there's no traffic, the snapshot creation should stop and then resume when traffic comes back + */ diff --git a/pulsar.spec b/pulsar.spec index cfbcb46..c2bfccc 100644 --- a/pulsar.spec +++ b/pulsar.spec @@ -1,6 +1,6 @@ %define debug_package %{nil} %define pulsar_ver 2.10.4 -%define pkg_ver 14 +%define pkg_ver 15 %define _prefix /opt/pulsar Summary: Cloud-Native, Distributed Messaging and Streaming Name: pulsar @@ -24,6 +24,7 @@ Patch0011: 0011-CVE-2023-25194.patch Patch0012: 0012-CVE-2023-2976.patch Patch0013: 0013-fix-deadlock.patch Patch0014: 0014-CVE-2023-32732.patch +Patch0015: 0015-fix-no-messages.patch BuildRoot: /root/rpmbuild/BUILDROOT/ BuildRequires: java-1.8.0-openjdk-devel,maven,systemd Requires: java-1.8.0-openjdk,systemd @@ -50,6 +51,7 @@ Pulsar is a distributed pub-sub messaging platform with a very flexible messagin %patch0012 -p1 %patch0013 -p1 %patch0014 -p1 +%patch0015 -p1 %build mvn clean install -Pcore-modules,-main -DskipTests @@ -75,6 +77,8 @@ getent passwd pulsar >/dev/null || useradd -r -g pulsar -d / -s /sbin/nologin pu exit 0 %changelog +* Thu Dec 7 2023 Dapeng Sun - 2.10.4-15 +- fix Can not receive any messages after switch to standby cluster * Thu Dec 7 2023 Dapeng Sun - 2.10.4-14 - resolve cve-2023-32732 * Thu Dec 7 2023 Dapeng Sun - 2.10.4-13 -- Gitee