diff --git a/0013-AlterIsr.patch b/0013-AlterIsr.patch new file mode 100644 index 0000000000000000000000000000000000000000..aef5085631d8f968999860a0a4888f78f4a96d29 --- /dev/null +++ b/0013-AlterIsr.patch @@ -0,0 +1,223 @@ +diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala +index a58f4238ff..88b337311d 100755 +--- a/core/src/main/scala/kafka/cluster/Partition.scala ++++ b/core/src/main/scala/kafka/cluster/Partition.scala +@@ -581,9 +581,6 @@ class Partition(val topicPartition: TopicPartition, + leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset) + zkVersion = partitionState.zkVersion + +- // Clear any pending AlterIsr requests and check replica state +- alterIsrManager.clearPending(topicPartition) +- + // In the case of successive leader elections in a short time period, a follower may have + // entries in its log from a later epoch than any entry in the new leader's log. In order + // to ensure that these followers can truncate to the right offset, we must cache the new +@@ -661,9 +658,6 @@ class Partition(val topicPartition: TopicPartition, + leaderEpochStartOffsetOpt = None + zkVersion = partitionState.zkVersion + +- // Since we might have been a leader previously, still clear any pending AlterIsr requests +- alterIsrManager.clearPending(topicPartition) +- + if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) { + false + } else { +@@ -1373,13 +1367,15 @@ class Partition(val topicPartition: TopicPartition, + isrState = proposedIsrState + + if (!alterIsrManager.submit(alterIsrItem)) { +- // If the ISR manager did not accept our update, we need to revert back to previous state ++ // If the ISR manager did not accept our update, we need to revert the proposed state. ++ // This can happen if the ISR state was updated by the controller (via LeaderAndIsr in ZK-mode or ++ // ChangePartitionRecord in KRaft mode) but we have an AlterIsr request still in-flight. + isrState = oldState + isrChangeListener.markFailed() +- throw new IllegalStateException(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition") ++ warn(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition") ++ } else { ++ debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState") + } +- +- debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState") + } + + /** +diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala +index 9ad734f708..1059a3df3e 100644 +--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala ++++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala +@@ -49,8 +49,6 @@ trait AlterIsrManager { + def shutdown(): Unit = {} + + def submit(alterIsrItem: AlterIsrItem): Boolean +- +- def clearPending(topicPartition: TopicPartition): Unit + } + + case class AlterIsrItem(topicPartition: TopicPartition, +@@ -134,9 +132,6 @@ class DefaultAlterIsrManager( + enqueued + } + +- override def clearPending(topicPartition: TopicPartition): Unit = { +- unsentIsrUpdates.remove(topicPartition) +- } + + private[server] def maybePropagateIsrChanges(): Unit = { + // Send all pending items if there is not already a request in-flight. +diff --git a/core/src/main/scala/kafka/server/ZkIsrManager.scala b/core/src/main/scala/kafka/server/ZkIsrManager.scala +index 2d88aac6b4..8dffcdf307 100644 +--- a/core/src/main/scala/kafka/server/ZkIsrManager.scala ++++ b/core/src/main/scala/kafka/server/ZkIsrManager.scala +@@ -55,12 +55,6 @@ class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) ex + period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS) + } + +- override def clearPending(topicPartition: TopicPartition): Unit = { +- // Since we always immediately process ZK updates and never actually enqueue anything, there is nothing to +- // clear here so this is a no-op. Even if there are changes that have not been propagated, the write to ZK +- // has already happened, so we may as well send the notification to the controller. +- } +- + override def submit(alterIsrItem: AlterIsrItem): Boolean = { + debug(s"Writing new ISR ${alterIsrItem.leaderAndIsr.isr} to ZooKeeper with version " + + s"${alterIsrItem.leaderAndIsr.zkVersion} for partition ${alterIsrItem.topicPartition}") +diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +index 5eedb63ae5..4dbd735753 100644 +--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ++++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +@@ -18,10 +18,10 @@ + package kafka.controller + + import java.util.Properties +-import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue} +- ++import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingQueue, TimeUnit} + import com.yammer.metrics.core.Timer + import kafka.api.{ApiVersion, KAFKA_2_6_IV0, KAFKA_2_7_IV0, LeaderAndIsr} ++import kafka.controller.KafkaController.AlterIsrCallback + import kafka.metrics.KafkaYammerMetrics + import kafka.server.{KafkaConfig, KafkaServer} + import kafka.utils.{LogCaptureAppender, TestUtils} +@@ -849,6 +849,67 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { + latch.await() + } + ++ @Test ++ def testAlterIsrErrors(): Unit = { ++ servers = makeServers(1) ++ val controllerId = TestUtils.waitUntilControllerElected(zkClient) ++ val tp = new TopicPartition("t", 0) ++ val assignment = Map(tp.partition -> Seq(controllerId)) ++ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) ++ val controller = getController().kafkaController ++ var future = captureAlterIsrError(controllerId, controller.brokerEpoch - 1, ++ Map(tp -> LeaderAndIsr(controllerId, List(controllerId)))) ++ var capturedError = future.get(5, TimeUnit.SECONDS) ++ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError) ++ ++ future = captureAlterIsrError(99, controller.brokerEpoch, ++ Map(tp -> LeaderAndIsr(controllerId, List(controllerId)))) ++ capturedError = future.get(5, TimeUnit.SECONDS) ++ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError) ++ ++ val unknownTopicPartition = new TopicPartition("unknown", 99) ++ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch, ++ Map(unknownTopicPartition -> LeaderAndIsr(controllerId, List(controllerId))), unknownTopicPartition) ++ capturedError = future.get(5, TimeUnit.SECONDS) ++ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, capturedError) ++ ++ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch, ++ Map(tp -> LeaderAndIsr(controllerId, 1, List(controllerId), 99)), tp) ++ capturedError = future.get(5, TimeUnit.SECONDS) ++ assertEquals(Errors.INVALID_UPDATE_VERSION, capturedError) ++ } ++ ++ def captureAlterIsrError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr]): CompletableFuture[Errors] = { ++ val future = new CompletableFuture[Errors]() ++ val controller = getController().kafkaController ++ val callback: AlterIsrCallback = { ++ case Left(_: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) => ++ future.completeExceptionally(new AssertionError(s"Should have seen top-level error")) ++ case Right(error: Errors) => ++ future.complete(error) ++ } ++ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback)) ++ future ++ } ++ ++ def captureAlterIsrPartitionError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], tp: TopicPartition): CompletableFuture[Errors] = { ++ val future = new CompletableFuture[Errors]() ++ val controller = getController().kafkaController ++ val callback: AlterIsrCallback = { ++ case Left(partitionResults: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) => ++ partitionResults.get(tp) match { ++ case Some(Left(error: Errors)) => future.complete(error) ++ case Some(Right(_: LeaderAndIsr)) => future.completeExceptionally(new AssertionError(s"Should have seen an error for $tp in result")) ++ case None => future.completeExceptionally(new AssertionError(s"Should have seen $tp in result")) ++ } ++ case Right(_: Errors) => ++ future.completeExceptionally(new AssertionError(s"Should not seen top-level error")) ++ } ++ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback)) ++ future ++ } ++ ++ + @Test + def testTopicIdsAreAdded(): Unit = { + servers = makeServers(1) +diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala +index 1074fd3157..1c8c81471f 100644 +--- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala ++++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala +@@ -70,8 +70,10 @@ class AlterIsrManagerTest { + @Test + def testOverwriteWithinBatch(): Unit = { + val capture = EasyMock.newCapture[AbstractRequest.Builder[AlterIsrRequest]]() ++ val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]() ++ + EasyMock.expect(brokerToController.start()) +- EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.anyObject())).once() ++ EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.capture(callbackCapture))).times(2) + EasyMock.replay(brokerToController) + + val scheduler = new MockScheduler(time) +@@ -81,11 +83,21 @@ class AlterIsrManagerTest { + // Only send one ISR update for a given topic+partition + assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))) + assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2), 10), _ => {}, 0))) ++ ++ // Simulate response ++ val alterIsrResp = partitionResponse(tp0, Errors.NONE) ++ val resp = new ClientResponse(null, null, "", 0L, 0L, ++ false, null, null, alterIsrResp) ++ callbackCapture.getValue.onComplete(resp) ++ ++ // Now we can submit this partition again ++ assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1), 10), _ => {}, 0))) + EasyMock.verify(brokerToController) + ++ // Make sure we sent the right request ISR={1} + val request = capture.getValue.build() + assertEquals(request.data().topics().size(), 1) +- assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 3) ++ assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 1) + } + + @Test +diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala +index 43df2b97f4..8e52007bc7 100755 +--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala ++++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala +@@ -1106,10 +1106,6 @@ object TestUtils extends Logging { + } + } + +- override def clearPending(topicPartition: TopicPartition): Unit = { +- inFlight.set(false); +- } +- + def completeIsrUpdate(newZkVersion: Int): Unit = { + if (inFlight.compareAndSet(true, false)) { + val item = isrUpdates.head diff --git a/kafka.spec b/kafka.spec index 6573124807cda2ec7d02d488181181f42f8cd726..af977fa01505a48e0908bc3328baa9b078a63f3e 100644 --- a/kafka.spec +++ b/kafka.spec @@ -4,7 +4,7 @@ Name: kafka Version: 2.8.2 -Release: 12 +Release: 13 Summary: A Distributed Streaming Platform. License: Apache-2.0 @@ -24,6 +24,7 @@ Patch8: 0009-format-RocksDBConfigSetter.patch Patch9: 0010-not-update-connection.patch Patch10: 0011-ConfigEntry.patch Patch11: 0012-incorrectly-LeaderElectionCommand.patch +Patch12: 0013-AlterIsr.patch BuildRequires: systemd java-1.8.0-openjdk-devel Provides: kafka = %{version} @@ -75,6 +76,8 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses rm -rf %{buildroot} %changelog +* Fri Dec 08 2023 sundapeng - 2.8.2-13 +- AlterIsr and LeaderAndIsr race condition * Fri Dec 08 2023 sundapeng - 2.8.2-12 - Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand * Fri Dec 08 2023 sundapeng - 2.8.2-11