diff --git a/0017-return-earliest-position.patch b/0017-return-earliest-position.patch new file mode 100644 index 0000000000000000000000000000000000000000..7a3e8ad8a51b8ec927c28c2f9f30ee7cf952570d --- /dev/null +++ b/0017-return-earliest-position.patch @@ -0,0 +1,132 @@ +diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +index 46ca0f1400..0653e40cb3 100644 +--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java ++++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +@@ -532,6 +532,23 @@ public interface ManagedCursor { + void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + FindEntryCallback callback, Object ctx); + ++ /** ++ * Find the newest entry that matches the given predicate. ++ * ++ * @param constraint ++ * search only active entries or all entries ++ * @param condition ++ * predicate that reads an entry an applies a condition ++ * @param callback ++ * callback object returning the resultant position ++ * @param ctx ++ * opaque context ++ * @param isFindFromLedger ++ * find the newest entry from ledger ++ */ ++ void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, ++ FindEntryCallback callback, Object ctx, boolean isFindFromLedger); ++ + /** + * reset the cursor to specified position to enable replay of messages. + * +diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +index 011d3df77f..6045412398 100644 +--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java ++++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +@@ -1078,6 +1078,12 @@ public class ManagedCursorImpl implements ManagedCursor { + @Override + public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + FindEntryCallback callback, Object ctx) { ++ asyncFindNewestMatching(constraint, condition, callback, ctx, false); ++ } ++ ++ @Override ++ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, ++ FindEntryCallback callback, Object ctx, boolean isFindFromLedger) { + OpFindNewest op; + PositionImpl startPosition = null; + long max = 0; +@@ -1099,7 +1105,11 @@ public class ManagedCursorImpl implements ManagedCursor { + Optional.empty(), ctx); + return; + } +- op = new OpFindNewest(this, startPosition, condition, max, callback, ctx); ++ if (isFindFromLedger) { ++ op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx); ++ } else { ++ op = new OpFindNewest(this, startPosition, condition, max, callback, ctx); ++ } + op.find(); + } + +diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +index 18fe4dba31..675f28e2d2 100644 +--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java ++++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +@@ -241,6 +241,11 @@ public class ManagedCursorContainerTest { + AsyncCallbacks.FindEntryCallback callback, Object ctx) { + } + ++ @Override ++ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, ++ AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean isFindFromLedger) { ++ } ++ + @Override + public void asyncResetCursor(final Position position, boolean forceReset, + AsyncCallbacks.ResetCursorCallback callback) { +diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java +index 825bc546f4..838771e6d3 100644 +--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java ++++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java +@@ -71,7 +71,7 @@ public class PersistentMessageFinder implements AsyncCallbacks.FindEntryCallback + entry.release(); + } + return false; +- }, this, callback); ++ }, this, callback, true); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Ignore message position find scheduled task, last find is still running", topicName, +diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +index e70d0dc2b5..da78a0411d 100644 +--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java ++++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +@@ -243,6 +243,40 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { + factory.shutdown(); + } + ++ @Test ++ void testPersistentMessageFinderWhenLastMessageDelete() throws Exception { ++ final String ledgerAndCursorName = "testPersistentMessageFinderWhenLastMessageDelete"; ++ ++ ManagedLedgerConfig config = new ManagedLedgerConfig(); ++ config.setRetentionSizeInMB(10); ++ config.setMaxEntriesPerLedger(10); ++ config.setRetentionTime(1, TimeUnit.HOURS); ++ ManagedLedger ledger = factory.open(ledgerAndCursorName, config); ++ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); ++ ++ ledger.addEntry(createMessageWrittenToLedger("msg1")); ++ ledger.addEntry(createMessageWrittenToLedger("msg2")); ++ ledger.addEntry(createMessageWrittenToLedger("msg3")); ++ Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("last-message")); ++ ++ long endTimestamp = System.currentTimeMillis() + 1000; ++ ++ Result result = new Result(); ++ // delete last position message ++ cursor.delete(lastPosition); ++ CompletableFuture future = findMessage(result, cursor, endTimestamp); ++ future.get(); ++ assertNull(result.exception); ++ assertNotEquals(result.position, null); ++ assertEquals(result.position, lastPosition); ++ ++ result.reset(); ++ cursor.close(); ++ ledger.close(); ++ factory.shutdown(); ++ } ++ ++ + @Test + void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception { + diff --git a/pulsar.spec b/pulsar.spec index a3d3e4d2969785e50178dfe48c1235cdc58fa614..d8ae4cd95b84974babcf2c09bfde697af864f144 100644 --- a/pulsar.spec +++ b/pulsar.spec @@ -1,6 +1,6 @@ %define debug_package %{nil} %define pulsar_ver 2.10.4 -%define pkg_ver 16 +%define pkg_ver 17 %define _prefix /opt/pulsar Summary: Cloud-Native, Distributed Messaging and Streaming Name: pulsar @@ -26,6 +26,7 @@ Patch0013: 0013-fix-deadlock.patch Patch0014: 0014-CVE-2023-32732.patch Patch0015: 0015-fix-no-messages.patch Patch0016: 0016-handle-exception.patch +Patch0017: 0017-return-earliest-position.patch BuildRoot: /root/rpmbuild/BUILDROOT/ BuildRequires: java-1.8.0-openjdk-devel,maven,systemd Requires: java-1.8.0-openjdk,systemd @@ -54,6 +55,7 @@ Pulsar is a distributed pub-sub messaging platform with a very flexible messagin %patch0014 -p1 %patch0015 -p1 %patch0016 -p1 +%patch0017 -p1 %build mvn clean install -Pcore-modules,-main -DskipTests @@ -79,6 +81,8 @@ getent passwd pulsar >/dev/null || useradd -r -g pulsar -d / -s /sbin/nologin pu exit 0 %changelog +* Fri Dec 8 2023 Dapeng Sun - 2.10.4-17 +- Fix return the earliest position when query position by timestamp. * Fri Dec 8 2023 Dapeng Sun - 2.10.4-16 - Only handle exception when there has * Thu Dec 7 2023 Dapeng Sun - 2.10.4-15