From c4a48bbf185a01f377b69e5f22d7d7b44c35cc38 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Wed, 24 Dec 2025 09:30:42 +0800 Subject: [PATCH 01/27] [fix][broker] Fix markDeletedPosition race condition in maybeUpdateCursorBeforeTrimmingConsumedLedger --- .../mledger/impl/ManagedLedgerImpl.java | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4b278cf6664d4..acf5fbb563f3c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2711,28 +2711,37 @@ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) { public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { for (ManagedCursor cursor : cursors) { + // Snapshot cursor.getMarkDeletedPosition() into a local variable to avoid race condition. + Position markDeletedPosition = PositionFactory.create(cursor.getMarkDeletedPosition()); Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null - ? cursor.getPersistentMarkDeletedPosition() : cursor.getMarkDeletedPosition(); - LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId()); + ? cursor.getPersistentMarkDeletedPosition() : markDeletedPosition; + LedgerInfo curPointedLedger = ledgers.get(lastAckedPosition.getLedgerId()); LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId())) .map(Map.Entry::getValue).orElse(null); - if (currPointedLedger != null) { + if (curPointedLedger != null) { if (nextPointedLedger != null) { if (lastAckedPosition.getEntryId() != -1 - && lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) { + && lastAckedPosition.getEntryId() + 1 >= curPointedLedger.getEntries()) { lastAckedPosition = PositionFactory.create(nextPointedLedger.getLedgerId(), -1); } } else { log.debug("No need to reset cursor: {}, current ledger is the last ledger.", cursor); } } else { + // TODO curPointedLedger==null, should we move cursor mark deleted position to nextPointedLedger:-1? + // Sample case: Opening an empty ledger with ledgers:(ledgerId:-1) will cause curPointedLedger==null, + // then recovery read will create a new ledger: (ledgerId+1:-1). + // If old markDeletePosition==(ledgerId:-1), I think we should move it to (ledgerId+1:-1), + // or it will casue cursor position and ledger inconsistency. + // See PR https://github.com/apache/pulsar/pull/25087 log.warn("Cursor: {} does not exist in the managed-ledger.", cursor); } - if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) { + int compareResult = lastAckedPosition.compareTo(markDeletedPosition); + if (compareResult > 0) { Position finalPosition = lastAckedPosition; - log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition); + log.info("Mark delete cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition); cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(), new MarkDeleteCallback() { @Override @@ -2743,10 +2752,14 @@ public void markDeleteComplete(Object ctx) { @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.", - cursor, cursor.getMarkDeletedPosition(), finalPosition, exception); + log.warn("Failed to mark delete cursor: {} from {} to {}. ", cursor, + cursor.getMarkDeletedPosition(), finalPosition, exception); } }, null); + } else if (compareResult < 0) { + // Should not happen + log.warn("Trying to mark delete cursor to an already mark-deleted position. Current mark-delete:" + + " {} -- attempted position: {}", markDeletedPosition, lastAckedPosition); } } } From 732264fff21359f766e9764c46f07118321fb8b0 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Wed, 24 Dec 2025 09:47:31 +0800 Subject: [PATCH 02/27] [fix][broker] Add debug log --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index acf5fbb563f3c..f01844995dda8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2756,7 +2756,10 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { cursor.getMarkDeletedPosition(), finalPosition, exception); } }, null); - } else if (compareResult < 0) { + } else if (compareResult == 0) { + log.debug("No need to reset cursor: {}, last acked position equals to current mark-delete position {}.", + cursor, markDeletedPosition); + } else { // Should not happen log.warn("Trying to mark delete cursor to an already mark-deleted position. Current mark-delete:" + " {} -- attempted position: {}", markDeletedPosition, lastAckedPosition); From 1d34d9789a63beb165fbbc66440045650afef2a0 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Wed, 24 Dec 2025 12:50:26 +0800 Subject: [PATCH 03/27] [fix][broker] Wait updating cursor finished when opening managed ledger --- .../mledger/impl/ManagedLedgerFactoryImpl.java | 15 +++++++++------ .../mledger/impl/ManagedLedgerImpl.java | 11 ++++++++++- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index a452c6682a53b..ba17ee9a8a531 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -483,12 +483,15 @@ public void initializeComplete() { pendingInitializeLedgers.remove(name, pendingLedger); future.complete(newledger); - // May need to update the cursor position - newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); - // May need to trigger offloading - if (config.isTriggerOffloadOnTopicLoad()) { - newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); - } + // May need to update the cursor position and wait them finished + newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger().whenComplete((__, ex) -> { + // ignore ex since it is handled in maybeUpdateCursorBeforeTrimmingConsumedLedger + future.complete(newledger); + // May need to trigger offloading + if (config.isTriggerOffloadOnTopicLoad()) { + newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); + } + }); } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f01844995dda8..15d30e74ad7d6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2709,8 +2709,12 @@ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) { this.waitingEntryCallBacks.add(cb); } - public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { + public CompletableFuture maybeUpdateCursorBeforeTrimmingConsumedLedger() { + List> cursorMarkDeleteFutures = new ArrayList<>(); for (ManagedCursor cursor : cursors) { + CompletableFuture future = new CompletableFuture<>(); + cursorMarkDeleteFutures.add(future); + // Snapshot cursor.getMarkDeletedPosition() into a local variable to avoid race condition. Position markDeletedPosition = PositionFactory.create(cursor.getMarkDeletedPosition()); Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null @@ -2748,23 +2752,28 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { public void markDeleteComplete(Object ctx) { log.info("Successfully persisted cursor position for cursor:{} to {}", cursor, finalPosition); + future.complete(null); } @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { log.warn("Failed to mark delete cursor: {} from {} to {}. ", cursor, cursor.getMarkDeletedPosition(), finalPosition, exception); + future.completeExceptionally(exception); } }, null); } else if (compareResult == 0) { log.debug("No need to reset cursor: {}, last acked position equals to current mark-delete position {}.", cursor, markDeletedPosition); + future.complete(null); } else { // Should not happen log.warn("Trying to mark delete cursor to an already mark-deleted position. Current mark-delete:" + " {} -- attempted position: {}", markDeletedPosition, lastAckedPosition); + future.complete(null); } } + return FutureUtil.waitForAll(cursorMarkDeleteFutures); } private void trimConsumedLedgersInBackground() { From e3812ca37fefe6d6d4a98429ce041c40eb8257d0 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Wed, 24 Dec 2025 13:58:18 +0800 Subject: [PATCH 04/27] [fix][broker] Modify uncorrected comment --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 15d30e74ad7d6..c00c51681b34f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2734,11 +2734,10 @@ public CompletableFuture maybeUpdateCursorBeforeTrimmingConsumedLedger() { } } else { // TODO curPointedLedger==null, should we move cursor mark deleted position to nextPointedLedger:-1? - // Sample case: Opening an empty ledger with ledgers:(ledgerId:-1) will cause curPointedLedger==null, - // then recovery read will create a new ledger: (ledgerId+1:-1). - // If old markDeletePosition==(ledgerId:-1), I think we should move it to (ledgerId+1:-1), - // or it will casue cursor position and ledger inconsistency. - // See PR https://github.com/apache/pulsar/pull/25087 + // Sample case: Opening an empty ledger with ledgers:(ledgerId:-1), if cursor position is + // (ledgerId-1:entryNum-1) and old ledger is trimmed, then curPointedLedger wil be null. + // I think we should move cursor position to (ledgerId:-1), or it will cause cursor position and ledger + // inconsistency. See PR https://github.com/apache/pulsar/pull/25087 log.warn("Cursor: {} does not exist in the managed-ledger.", cursor); } From aa25bcb8b157b3ba726d267da57cb22eb0e691b1 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Wed, 24 Dec 2025 15:33:41 +0800 Subject: [PATCH 05/27] [fix][broker] Add test for maybeUpdateCursorBeforeTrimmingConsumedLedger --- .../mledger/impl/ManagedLedgerTest.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 1cdeb415b7c21..f9529fbed5be5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3784,6 +3784,51 @@ public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exceptio }); } + @Test(timeOut = 20000) + public void testNeverThrowsMarkDeletingMarkedPositionInMaybeUpdateCursorBeforeTrimmingConsumedLedger() + throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + initManagedLedgerConfig(config); + int entryNum = 100; + config.setMaxEntriesPerLedger(entryNum); + + ManagedLedgerImpl managedLedger = + (ManagedLedgerImpl) factory.open("maybeUpdateCursorBeforeTrimmingConsumed_ledger", config); + ManagedCursor cursor = managedLedger.openCursor("c1"); + + final CountDownLatch latch = new CountDownLatch(entryNum); + List> updateCursorFutures = new ArrayList<>(entryNum); + for (int i = 0; i < entryNum; i++) { + managedLedger.asyncAddEntry("entry".getBytes(Encoding), new AddEntryCallback() { + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + cursor.asyncMarkDelete(position, new MarkDeleteCallback() { + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + }, null); + + } + }, null); + CompletableFuture future = managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); + updateCursorFutures.add(future); + } + + latch.countDown(); + assertEquals(cursor.getNumberOfEntries(), 0); + // Will not throw exception + FutureUtil.waitForAll(updateCursorFutures).get(); + } + @Test(timeOut = 20000) public void testAsyncTruncateLedgerRetention() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); From 7d03b0ade6d03c0568644dac590c77889ce78d02 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Wed, 24 Dec 2025 16:09:48 +0800 Subject: [PATCH 06/27] [fix][broker] Fix test --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 12 ++++++++---- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 8 ++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index c00c51681b34f..f6bcf9ccac2ca 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2744,7 +2744,8 @@ public CompletableFuture maybeUpdateCursorBeforeTrimmingConsumedLedger() { int compareResult = lastAckedPosition.compareTo(markDeletedPosition); if (compareResult > 0) { Position finalPosition = lastAckedPosition; - log.info("Mark delete cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition); + log.info("Mark deleting cursor:{} from {} to {} since ledger consumed completely.", cursor, + markDeletedPosition, lastAckedPosition); cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(), new MarkDeleteCallback() { @Override @@ -2756,7 +2757,7 @@ public void markDeleteComplete(Object ctx) { @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - log.warn("Failed to mark delete cursor: {} from {} to {}. ", cursor, + log.warn("Failed to mark delete: {} from {} to {}. ", cursor, cursor.getMarkDeletedPosition(), finalPosition, exception); future.completeExceptionally(exception); } @@ -2767,9 +2768,12 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { future.complete(null); } else { // Should not happen - log.warn("Trying to mark delete cursor to an already mark-deleted position. Current mark-delete:" + log.warn("Trying to mark delete an already mark-deleted position. Current mark-delete:" + " {} -- attempted position: {}", markDeletedPosition, lastAckedPosition); - future.complete(null); + future.completeExceptionally(new ManagedLedgerException( + "Trying to mark delete an already mark-deleted position when update cursor. Current " + + "mark-delete: " + markDeletedPosition + " -- attempted mark delete: " + + lastAckedPosition)); } } return FutureUtil.waitForAll(cursorMarkDeleteFutures); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index f9529fbed5be5..fb3582863760c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3789,6 +3789,7 @@ public void testNeverThrowsMarkDeletingMarkedPositionInMaybeUpdateCursorBeforeTr throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); initManagedLedgerConfig(config); + config.setMaxEntriesPerLedger(1); int entryNum = 100; config.setMaxEntriesPerLedger(entryNum); @@ -3798,7 +3799,12 @@ public void testNeverThrowsMarkDeletingMarkedPositionInMaybeUpdateCursorBeforeTr final CountDownLatch latch = new CountDownLatch(entryNum); List> updateCursorFutures = new ArrayList<>(entryNum); + // Three asyncMarkDelete operations running concurrently: + // 1. ledger rollover triggered maybeUpdateCursorBeforeTrimmingConsumedLedger. + // 2. user triggered asyncMarkDelete. + // 3. user triggered maybeUpdateCursorBeforeTrimmingConsumedLedger. for (int i = 0; i < entryNum; i++) { + CountDownLatch taskFireLatch = new CountDownLatch(1); managedLedger.asyncAddEntry("entry".getBytes(Encoding), new AddEntryCallback() { @Override public void addFailed(ManagedLedgerException exception, Object ctx) { @@ -3806,6 +3812,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { @Override public void addComplete(Position position, ByteBuf entryData, Object ctx) { + taskFireLatch.countDown(); cursor.asyncMarkDelete(position, new MarkDeleteCallback() { @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { @@ -3819,6 +3826,7 @@ public void markDeleteComplete(Object ctx) { } }, null); + taskFireLatch.wait(); CompletableFuture future = managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); updateCursorFutures.add(future); } From 93b0255ce28e4568b0be050cdd30487eace4e4fe Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Wed, 24 Dec 2025 16:58:27 +0800 Subject: [PATCH 07/27] [fix][broker] Fix test --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index fb3582863760c..b5b5d51ce4bbc 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3826,12 +3826,12 @@ public void markDeleteComplete(Object ctx) { } }, null); - taskFireLatch.wait(); + taskFireLatch.await(); CompletableFuture future = managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); updateCursorFutures.add(future); } - latch.countDown(); + latch.await(); assertEquals(cursor.getNumberOfEntries(), 0); // Will not throw exception FutureUtil.waitForAll(updateCursorFutures).get(); From 40932eb14a7a54b56e20c9ee7200246bf48054b6 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Wed, 24 Dec 2025 22:46:47 +0800 Subject: [PATCH 08/27] [fix][test] Rollback some codes for test --- .../impl/ManagedLedgerFactoryImpl.java | 23 +++++++++------- .../mledger/impl/ManagedLedgerImpl.java | 11 ++++---- .../mledger/impl/ManagedLedgerTest.java | 26 +++++++++++-------- 3 files changed, 35 insertions(+), 25 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index ba17ee9a8a531..0fe949948e45d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -483,15 +483,20 @@ public void initializeComplete() { pendingInitializeLedgers.remove(name, pendingLedger); future.complete(newledger); - // May need to update the cursor position and wait them finished - newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger().whenComplete((__, ex) -> { - // ignore ex since it is handled in maybeUpdateCursorBeforeTrimmingConsumedLedger - future.complete(newledger); - // May need to trigger offloading - if (config.isTriggerOffloadOnTopicLoad()) { - newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); - } - }); + newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); + if (config.isTriggerOffloadOnTopicLoad()) { + newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); + } + +// // May need to update the cursor position and wait them finished +// newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger().whenComplete((__, ex) -> { +// // ignore ex since it is handled in maybeUpdateCursorBeforeTrimmingConsumedLedger +// future.complete(newledger); +// // May need to trigger offloading +// if (config.isTriggerOffloadOnTopicLoad()) { +// newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); +// } +// }); } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f6bcf9ccac2ca..886460f33763b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2716,7 +2716,7 @@ public CompletableFuture maybeUpdateCursorBeforeTrimmingConsumedLedger() { cursorMarkDeleteFutures.add(future); // Snapshot cursor.getMarkDeletedPosition() into a local variable to avoid race condition. - Position markDeletedPosition = PositionFactory.create(cursor.getMarkDeletedPosition()); + Position markDeletedPosition = cursor.getMarkDeletedPosition(); Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null ? cursor.getPersistentMarkDeletedPosition() : markDeletedPosition; LedgerInfo curPointedLedger = ledgers.get(lastAckedPosition.getLedgerId()); @@ -2770,10 +2770,11 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { // Should not happen log.warn("Trying to mark delete an already mark-deleted position. Current mark-delete:" + " {} -- attempted position: {}", markDeletedPosition, lastAckedPosition); - future.completeExceptionally(new ManagedLedgerException( - "Trying to mark delete an already mark-deleted position when update cursor. Current " - + "mark-delete: " + markDeletedPosition + " -- attempted mark delete: " - + lastAckedPosition)); + future.completeExceptionally(null); +// future.completeExceptionally(new ManagedLedgerException( +// "Trying to mark delete an already mark-deleted position when update cursor. Current " +// + "mark-delete: " + markDeletedPosition + " -- attempted mark delete: " +// + lastAckedPosition)); } } return FutureUtil.waitForAll(cursorMarkDeleteFutures); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index b5b5d51ce4bbc..6d56ec4bbf3ed 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -56,6 +56,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -69,6 +70,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; @@ -3784,6 +3786,7 @@ public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exceptio }); } + // Huh, maybeUpdateCursorBeforeTrimmingConsumedLedger seems not working well. @Test(timeOut = 20000) public void testNeverThrowsMarkDeletingMarkedPositionInMaybeUpdateCursorBeforeTrimmingConsumedLedger() throws Exception { @@ -3791,20 +3794,24 @@ public void testNeverThrowsMarkDeletingMarkedPositionInMaybeUpdateCursorBeforeTr initManagedLedgerConfig(config); config.setMaxEntriesPerLedger(1); int entryNum = 100; - config.setMaxEntriesPerLedger(entryNum); - ManagedLedgerImpl managedLedger = + ManagedLedgerImpl realManagedLedger = (ManagedLedgerImpl) factory.open("maybeUpdateCursorBeforeTrimmingConsumed_ledger", config); + ManagedLedgerImpl managedLedger = spy(realManagedLedger); ManagedCursor cursor = managedLedger.openCursor("c1"); + Deque> futures = new ConcurrentLinkedDeque<>(); + doAnswer(invocation -> { + CompletableFuture result = (CompletableFuture) invocation.callRealMethod(); + futures.offer(result); + return result; + }).when(managedLedger).maybeUpdateCursorBeforeTrimmingConsumedLedger(); + final CountDownLatch latch = new CountDownLatch(entryNum); - List> updateCursorFutures = new ArrayList<>(entryNum); - // Three asyncMarkDelete operations running concurrently: + // Two asyncMarkDelete operations running concurrently: // 1. ledger rollover triggered maybeUpdateCursorBeforeTrimmingConsumedLedger. // 2. user triggered asyncMarkDelete. - // 3. user triggered maybeUpdateCursorBeforeTrimmingConsumedLedger. for (int i = 0; i < entryNum; i++) { - CountDownLatch taskFireLatch = new CountDownLatch(1); managedLedger.asyncAddEntry("entry".getBytes(Encoding), new AddEntryCallback() { @Override public void addFailed(ManagedLedgerException exception, Object ctx) { @@ -3812,7 +3819,6 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { @Override public void addComplete(Position position, ByteBuf entryData, Object ctx) { - taskFireLatch.countDown(); cursor.asyncMarkDelete(position, new MarkDeleteCallback() { @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { @@ -3826,15 +3832,13 @@ public void markDeleteComplete(Object ctx) { } }, null); - taskFireLatch.await(); - CompletableFuture future = managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); - updateCursorFutures.add(future); } latch.await(); assertEquals(cursor.getNumberOfEntries(), 0); + // Will not throw exception - FutureUtil.waitForAll(updateCursorFutures).get(); + FutureUtil.waitForAll(futures).get(); } @Test(timeOut = 20000) From 892a372c50cb1f91fae031a64adacd9145d3fb6e Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sat, 27 Dec 2025 19:08:52 +0800 Subject: [PATCH 09/27] [fix][broker] Try to fix maybeUpdateCursorBeforeTrimmingConsumedLedger method --- .../mledger/impl/ManagedCursorImpl.java | 14 ++++++---- .../impl/ManagedLedgerFactoryImpl.java | 25 ++++++----------- .../mledger/impl/ManagedLedgerImpl.java | 28 +++++++------------ .../mledger/impl/ManagedLedgerTest.java | 4 +-- 4 files changed, 29 insertions(+), 42 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index b3cb8dc45962b..1e30b65bee453 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2255,13 +2255,15 @@ public void asyncMarkDelete(final Position position, Map propertie } Position newPosition = ackBatchPosition(position); - if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) { + Position markDeletePos = markDeletePosition; + Position lastConfirmedEntry = ledger.getLastConfirmedEntry(); + if (lastConfirmedEntry.compareTo(newPosition) < 0) { boolean shouldCursorMoveForward = false; try { - long ledgerEntries = ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries(); - Long nextValidLedger = ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId()); + long ledgerEntries = ledger.getLedgerInfo(markDeletePos.getLedgerId()).get().getEntries(); + Long nextValidLedger = ledger.getNextValidLedger(lastConfirmedEntry.getLedgerId()); shouldCursorMoveForward = nextValidLedger != null - && (markDeletePosition.getEntryId() + 1 >= ledgerEntries) + && (markDeletePos.getEntryId() + 1 >= ledgerEntries) && (newPosition.getLedgerId() == nextValidLedger); } catch (Exception e) { log.warn("Failed to get ledger entries while setting mark-delete-position", e); @@ -2269,11 +2271,11 @@ public void asyncMarkDelete(final Position position, Map propertie if (shouldCursorMoveForward) { log.info("[{}] move mark-delete-position from {} to {} since all the entries have been consumed", - ledger.getName(), markDeletePosition, newPosition); + ledger.getName(), markDeletePos, newPosition); } else { if (log.isDebugEnabled()) { log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}" - + " for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name); + + " for cursor [{}]", ledger.getName(), position, lastConfirmedEntry, name); } callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx); return; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 0fe949948e45d..97333fbb1e36c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -481,22 +481,15 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final public void initializeComplete() { log.info("[{}] Successfully initialize managed ledger", name); pendingInitializeLedgers.remove(name, pendingLedger); - future.complete(newledger); - - newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); - if (config.isTriggerOffloadOnTopicLoad()) { - newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); - } - -// // May need to update the cursor position and wait them finished -// newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger().whenComplete((__, ex) -> { -// // ignore ex since it is handled in maybeUpdateCursorBeforeTrimmingConsumedLedger -// future.complete(newledger); -// // May need to trigger offloading -// if (config.isTriggerOffloadOnTopicLoad()) { -// newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); -// } -// }); + // May need to update the cursor position and wait them finished + newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger().whenComplete((__, ex) -> { + // ignore ex since it is handled in maybeUpdateCursorBeforeTrimmingConsumedLedger + future.complete(newledger); + // May need to trigger offloading + if (config.isTriggerOffloadOnTopicLoad()) { + newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); + } + }); } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 886460f33763b..202426f2bd2a0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1761,11 +1761,10 @@ public void operationComplete(Void v, Stat stat) { updateLedgersIdsComplete(originalCurrentLedger); mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS); + // May need to update the cursor position + maybeUpdateCursorBeforeTrimmingConsumedLedger(); } metadataMutex.unlock(); - - // May need to update the cursor position - maybeUpdateCursorBeforeTrimmingConsumedLedger(); } @Override @@ -2715,10 +2714,12 @@ public CompletableFuture maybeUpdateCursorBeforeTrimmingConsumedLedger() { CompletableFuture future = new CompletableFuture<>(); cursorMarkDeleteFutures.add(future); - // Snapshot cursor.getMarkDeletedPosition() into a local variable to avoid race condition. + // Snapshot positions into a local variables to avoid race condition. + Position persistentMarkDeletedPosition = cursor.getPersistentMarkDeletedPosition(); Position markDeletedPosition = cursor.getMarkDeletedPosition(); - Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null - ? cursor.getPersistentMarkDeletedPosition() : markDeletedPosition; + Position lastAckedPosition = + persistentMarkDeletedPosition != null ? persistentMarkDeletedPosition : markDeletedPosition; +// Position lastAckedPosition = markDeletedPosition; LedgerInfo curPointedLedger = ledgers.get(lastAckedPosition.getLedgerId()); LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId())) .map(Map.Entry::getValue).orElse(null); @@ -2733,11 +2734,6 @@ public CompletableFuture maybeUpdateCursorBeforeTrimmingConsumedLedger() { log.debug("No need to reset cursor: {}, current ledger is the last ledger.", cursor); } } else { - // TODO curPointedLedger==null, should we move cursor mark deleted position to nextPointedLedger:-1? - // Sample case: Opening an empty ledger with ledgers:(ledgerId:-1), if cursor position is - // (ledgerId-1:entryNum-1) and old ledger is trimmed, then curPointedLedger wil be null. - // I think we should move cursor position to (ledgerId:-1), or it will cause cursor position and ledger - // inconsistency. See PR https://github.com/apache/pulsar/pull/25087 log.warn("Cursor: {} does not exist in the managed-ledger.", cursor); } @@ -2767,14 +2763,10 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { cursor, markDeletedPosition); future.complete(null); } else { - // Should not happen - log.warn("Trying to mark delete an already mark-deleted position. Current mark-delete:" + // May happen, persistentMarkDeletedPosition is updated after markDeletedPosition + log.info("Ledger rollover tries to mark delete an already mark-deleted position. Current mark-delete:" + " {} -- attempted position: {}", markDeletedPosition, lastAckedPosition); - future.completeExceptionally(null); -// future.completeExceptionally(new ManagedLedgerException( -// "Trying to mark delete an already mark-deleted position when update cursor. Current " -// + "mark-delete: " + markDeletedPosition + " -- attempted mark delete: " -// + lastAckedPosition)); + future.complete(null); } } return FutureUtil.waitForAll(cursorMarkDeleteFutures); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 6d56ec4bbf3ed..12c4157e63a4b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3786,8 +3786,7 @@ public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exceptio }); } - // Huh, maybeUpdateCursorBeforeTrimmingConsumedLedger seems not working well. - @Test(timeOut = 20000) + @Test(timeOut = 20000, invocationCount = 1000) public void testNeverThrowsMarkDeletingMarkedPositionInMaybeUpdateCursorBeforeTrimmingConsumedLedger() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); @@ -3822,6 +3821,7 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) { cursor.asyncMarkDelete(position, new MarkDeleteCallback() { @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Should never fail", exception); } @Override From 15ec22e1e1986a8881856f0795f46bbc74e415b0 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sat, 27 Dec 2025 22:19:23 +0800 Subject: [PATCH 10/27] [fix][broker] Fix race condition in ManagedCursorImpl.getNumberOfEntries --- .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 1e30b65bee453..1f1041c05f11d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1250,14 +1250,16 @@ public boolean hasMoreEntries() { @Override public long getNumberOfEntries() { - if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) { + Position lastPosition = ledger.getLastPosition(); + Position nextPosition = lastPosition.getNext(); + if (readPosition.compareTo(nextPosition) > 0) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Read position {} is ahead of last position {}. There are no entries to read", - ledger.getName(), name, readPosition, ledger.getLastPosition()); + ledger.getName(), name, readPosition, lastPosition); } return 0; } else { - return getNumberOfEntries(Range.closedOpen(readPosition, ledger.getLastPosition().getNext())); + return getNumberOfEntries(Range.closedOpen(readPosition, nextPosition)); } } From 33b327d891b1cb8a0e60eece1589cdb28d400640 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sat, 27 Dec 2025 23:18:53 +0800 Subject: [PATCH 11/27] [fix][broker] Fix CompactionTest.testCompactorReadsCompacted --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 ++-- .../java/org/apache/pulsar/compaction/CompactionTest.java | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 202426f2bd2a0..ec4bc3c0da958 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2763,8 +2763,8 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { cursor, markDeletedPosition); future.complete(null); } else { - // May happen, persistentMarkDeletedPosition is updated after markDeletedPosition - log.info("Ledger rollover tries to mark delete an already mark-deleted position. Current mark-delete:" + // Should not happen + log.warn("Ledger rollover tries to mark delete an already mark-deleted position. Current mark-delete:" + " {} -- attempted position: {}", markDeletedPosition, lastAckedPosition); future.complete(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 077cf9d0b11b0..5b4601e21130f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -995,10 +995,9 @@ public void testCompactorReadsCompacted() throws Exception { // compact the topic again compact(topic); - // shouldn't have opened first ledger (already compacted), penultimate would have some uncompacted data. - // last ledger already open for writing + // shouldn't have opened first and penultimate ledger (already compacted), last ledger already open for writing assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId)); - assertTrue(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); + assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId)); // all three messages should be there when we read compacted From 6f4b2fe19a54c5ca8a1b0adc6803cee5faa999b6 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 28 Dec 2025 09:26:35 +0800 Subject: [PATCH 12/27] [fix][broker] Fix race condition in ManagedCursorImpl.getNumberOfEntries --- .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 1f1041c05f11d..2e648a1e9d8d9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1250,16 +1250,17 @@ public boolean hasMoreEntries() { @Override public long getNumberOfEntries() { + Position readPos = readPosition; Position lastPosition = ledger.getLastPosition(); Position nextPosition = lastPosition.getNext(); - if (readPosition.compareTo(nextPosition) > 0) { + if (readPos.compareTo(nextPosition) > 0) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Read position {} is ahead of last position {}. There are no entries to read", - ledger.getName(), name, readPosition, lastPosition); + ledger.getName(), name, readPos, lastPosition); } return 0; } else { - return getNumberOfEntries(Range.closedOpen(readPosition, nextPosition)); + return getNumberOfEntries(Range.closedOpen(readPos, nextPosition)); } } From 33cb384d52401ed0cd9e2e98eb321ff9e72af2bd Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 28 Dec 2025 09:28:30 +0800 Subject: [PATCH 13/27] [fix][broker] Fix ManagedCursorTest.testFlushCursorAfterError test --- .../apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index c87477a95a0ed..45ecdd9ef8d9d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4506,7 +4506,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { - @Test + @Test(invocationCount = 100) public void testFlushCursorAfterError() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setThrottleMarkDelete(1.0); @@ -4551,8 +4551,7 @@ public void testFlushCursorAfterError() throws Exception { ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config); ManagedCursor c2 = ledger2.openCursor("c"); - - assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1)); }); } From f320c559718dbdf46f655240506504cda2288704 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 28 Dec 2025 10:03:45 +0800 Subject: [PATCH 14/27] [fix][broker] Modify logic --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 +---- .../apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 2 +- .../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ec4bc3c0da958..07566e82ee8e8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2715,11 +2715,8 @@ public CompletableFuture maybeUpdateCursorBeforeTrimmingConsumedLedger() { cursorMarkDeleteFutures.add(future); // Snapshot positions into a local variables to avoid race condition. - Position persistentMarkDeletedPosition = cursor.getPersistentMarkDeletedPosition(); Position markDeletedPosition = cursor.getMarkDeletedPosition(); - Position lastAckedPosition = - persistentMarkDeletedPosition != null ? persistentMarkDeletedPosition : markDeletedPosition; -// Position lastAckedPosition = markDeletedPosition; + Position lastAckedPosition = markDeletedPosition; LedgerInfo curPointedLedger = ledgers.get(lastAckedPosition.getLedgerId()); LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId())) .map(Map.Entry::getValue).orElse(null); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 45ecdd9ef8d9d..8402816cb381a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4506,7 +4506,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { - @Test(invocationCount = 100) + @Test public void testFlushCursorAfterError() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setThrottleMarkDelete(1.0); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 12c4157e63a4b..038e95d70b191 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3786,7 +3786,7 @@ public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exceptio }); } - @Test(timeOut = 20000, invocationCount = 1000) + @Test(timeOut = 20000) public void testNeverThrowsMarkDeletingMarkedPositionInMaybeUpdateCursorBeforeTrimmingConsumedLedger() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); From 4b73a1525bf00b4088948caf4fd75362ce7fb7b4 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 28 Dec 2025 13:07:31 +0800 Subject: [PATCH 15/27] [fix][broker] Fix ManagedCursorTest testFlushCursorAfterInactivity and testFlushCursorAfterIndividualDeleteInactivity test --- .../apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 8402816cb381a..4bf618fb3e54d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4441,7 +4441,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config); ManagedCursor c2 = ledger2.openCursor("c"); - assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1)); }); } @@ -4500,7 +4500,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { ManagedLedger ledger2 = factory2.open("testFlushCursorAfterIndDelInactivity", config); ManagedCursor c2 = ledger2.openCursor("c"); - assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1)); }); } @@ -4551,6 +4551,7 @@ public void testFlushCursorAfterError() throws Exception { ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config); ManagedCursor c2 = ledger2.openCursor("c"); + assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1)); }); } From 3d1bb130ba0bb6974069e3a30c837f8fa0755d3d Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 28 Dec 2025 13:11:52 +0800 Subject: [PATCH 16/27] [fix][broker] Fix ManagedCursorTest.testLazyCursorLedgerCreation test --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 4bf618fb3e54d..8f04b8046460d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4840,8 +4840,8 @@ public void testLazyCursorLedgerCreation() throws Exception { ledger = (ManagedLedgerImpl) factory .open("testLazyCursorLedgerCreation", managedLedgerConfig); ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor("test"); - assertEquals(cursor1.getState(), "NoLedger"); - assertEquals(cursor1.getMarkDeletedPosition(), finalLastPosition); + assertEquals(cursor1.getState(), "Open"); + assertThat(cursor1.getMarkDeletedPosition()).isGreaterThan(finalLastPosition); // Verify the recovered cursor can work with new mark delete. lastPosition = null; From 87b25c67c681c6a5f81ab0c33ea1797dba9b60ea Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 28 Dec 2025 13:16:51 +0800 Subject: [PATCH 17/27] [fix][broker] Fix ManagedCursorTest.testPersistentMarkDeleteIfCreateCursorLedgerFailed test --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 8f04b8046460d..3bea931e66759 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -407,7 +407,8 @@ void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception { ml.close(); ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig); ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName); - assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry); + assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); + assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThan(lastEntry); // cleanup. ml.delete(); From d7b1f6aef2335653b301500c22c9d83e743a94ad Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 28 Dec 2025 13:32:21 +0800 Subject: [PATCH 18/27] [fix][broker] Fix ManagedCursorTest.testPersistentMarkDeleteIfSwitchCursorLedgerFailed test --- .../apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 3bea931e66759..d5fb2529e9390 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -499,12 +499,18 @@ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception { assertTrue(slowestReadPosition.getLedgerId() >= lastEntry.getLedgerId()); assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId()); assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry); + assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); + assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); // Verify the mark delete position can be recovered properly. ml.close(); ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig); ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName); - assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry); + assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); + // If previous ledger is trimmed, Cursor: ManagedCursorImpl{ledger=ml_test, name=c1, ackPos=12:0, readPos=15:0} + // does not exist in the managed-ledger. + // TODO should be handled in ledger trim process. + assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); // cleanup. ml.delete(); From e52772dd7553f99b7559135dd695ed48909493ef Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 28 Dec 2025 19:40:47 +0800 Subject: [PATCH 19/27] [fix][broker] Fix tests --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 4 ++-- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index d5fb2529e9390..bfb9b6ecca1b3 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -508,7 +508,7 @@ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception { ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName); assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); // If previous ledger is trimmed, Cursor: ManagedCursorImpl{ledger=ml_test, name=c1, ackPos=12:0, readPos=15:0} - // does not exist in the managed-ledger. + // does not exist in the managed-ledger. Recovered cursor's position will not be moved forward. // TODO should be handled in ledger trim process. assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); @@ -4822,7 +4822,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } @Test - public void testLazyCursorLedgerCreation() throws Exception { + public void testEagerCursorLedgerCreation() throws Exception { ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory .open("testLazyCursorLedgerCreation", managedLedgerConfig); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 038e95d70b191..3f5ff9c728f54 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3787,7 +3787,7 @@ public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exceptio } @Test(timeOut = 20000) - public void testNeverThrowsMarkDeletingMarkedPositionInMaybeUpdateCursorBeforeTrimmingConsumedLedger() + public void testNeverThrowExceptionInMaybeUpdateCursorBeforeTrimmingConsumedLedger() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); initManagedLedgerConfig(config); From 16deef761d4c9d5cef092c985b1e77c0d0c312e8 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Thu, 8 Jan 2026 23:22:44 +0800 Subject: [PATCH 20/27] [fix][broker] Fix some flaky assert in ManagedLedgerTest.testTrimmerRaceCondition --- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 3f5ff9c728f54..c7f032f049a03 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.impl; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyMap; @@ -5201,18 +5202,19 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { }, null); latch.await(); - assertEquals(cursor.getPersistentMarkDeletedPosition(), lastPosition); - assertEquals(ledger.getCursors().getSlowestCursorPosition(), lastPosition); + assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition); + assertThat(ledger.getCursors().getSlowestCursorPosition()).isGreaterThanOrEqualTo(lastPosition); assertEquals(cursor.getProperties(), properties); - // 3. Add Entry 2. Triggers Rollover. + // 3. Add Entry 2. Triggers second rollover process. // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger due to rollover Position p = ledger.addEntry("entry-2".getBytes(Encoding)); // Wait for background tasks (metadata callback) to complete. // We expect at least 2 ledgers (Rollover happened). Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> ledger.getLedgersInfo().size() >= 2); - assertEquals(cursor.getPersistentMarkDeletedPosition(), new ImmutablePositionImpl(p.getLedgerId(), -1)); + // First ledger is all consumed and trimmed, left current ledger and next empty ledger. + assertEquals(cursor.getPersistentMarkDeletedPosition(), PositionFactory.create(p.getLedgerId(), -1)); // Verify properties are preserved after cursor reset assertEquals(cursor.getProperties(), properties); From 6475bca93624cb247bd3091a51d0ecdddc082eb4 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Fri, 9 Jan 2026 10:00:25 +0800 Subject: [PATCH 21/27] [fix][broker] Remove some flaky asserts in ManagedLedgerTest.testTrimmerRaceCondition --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index c7f032f049a03..280c9c2fce923 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -5204,7 +5204,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { latch.await(); assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition); assertThat(ledger.getCursors().getSlowestCursorPosition()).isGreaterThanOrEqualTo(lastPosition); - assertEquals(cursor.getProperties(), properties); + // assertEquals(cursor.getProperties(), properties); // 3. Add Entry 2. Triggers second rollover process. // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger due to rollover @@ -5217,6 +5217,6 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { assertEquals(cursor.getPersistentMarkDeletedPosition(), PositionFactory.create(p.getLedgerId(), -1)); // Verify properties are preserved after cursor reset - assertEquals(cursor.getProperties(), properties); + // assertEquals(cursor.getProperties(), properties); } } From 0589f5ad47078f0b1c583e5f4e524ffb4f45d637 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Fri, 9 Jan 2026 10:53:02 +0800 Subject: [PATCH 22/27] [fix][broker] Try to fix race condition when setting mark delete properties in ManagedLedgerTest.testTrimmerRaceCondition --- .../mledger/impl/ManagedCursorImpl.java | 10 +++++-- .../mledger/impl/ManagedLedgerImpl.java | 28 +++++++++---------- .../mledger/impl/ManagedLedgerTest.java | 4 +-- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 2e648a1e9d8d9..f809c38de9012 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2334,11 +2334,15 @@ protected void internalAsyncMarkDelete(final Position newPosition, Map propertiesToUse = + properties != null ? properties : (last != null ? last.properties : getProperties()); + MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, propertiesToUse, callback, ctx, + alignAcknowledgeStatusAfterPersisted); + // The state might have changed while we were waiting on the queue mutex switch (state) { case Closed: diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 07566e82ee8e8..bb682cdb293da 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2739,22 +2739,20 @@ public CompletableFuture maybeUpdateCursorBeforeTrimmingConsumedLedger() { Position finalPosition = lastAckedPosition; log.info("Mark deleting cursor:{} from {} to {} since ledger consumed completely.", cursor, markDeletedPosition, lastAckedPosition); - cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(), - new MarkDeleteCallback() { - @Override - public void markDeleteComplete(Object ctx) { - log.info("Successfully persisted cursor position for cursor:{} to {}", - cursor, finalPosition); - future.complete(null); - } + cursor.asyncMarkDelete(lastAckedPosition, null, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + log.info("Successfully persisted cursor position for cursor:{} to {}", cursor, finalPosition); + future.complete(null); + } - @Override - public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - log.warn("Failed to mark delete: {} from {} to {}. ", cursor, - cursor.getMarkDeletedPosition(), finalPosition, exception); - future.completeExceptionally(exception); - } - }, null); + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + log.warn("Failed to mark delete: {} from {} to {}. ", cursor, cursor.getMarkDeletedPosition(), + finalPosition, exception); + future.completeExceptionally(exception); + } + }, null); } else if (compareResult == 0) { log.debug("No need to reset cursor: {}, last acked position equals to current mark-delete position {}.", cursor, markDeletedPosition); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 280c9c2fce923..c7f032f049a03 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -5204,7 +5204,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { latch.await(); assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition); assertThat(ledger.getCursors().getSlowestCursorPosition()).isGreaterThanOrEqualTo(lastPosition); - // assertEquals(cursor.getProperties(), properties); + assertEquals(cursor.getProperties(), properties); // 3. Add Entry 2. Triggers second rollover process. // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger due to rollover @@ -5217,6 +5217,6 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { assertEquals(cursor.getPersistentMarkDeletedPosition(), PositionFactory.create(p.getLedgerId(), -1)); // Verify properties are preserved after cursor reset - // assertEquals(cursor.getProperties(), properties); + assertEquals(cursor.getProperties(), properties); } } From bd62af6791dfc5520c7ee3e48d84a66e559602ec Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Fri, 9 Jan 2026 12:40:11 +0800 Subject: [PATCH 23/27] [fix][broker] Support null mark delete properties in NonDurableCursorImpl.internalAsyncMarkDelete --- .../bookkeeper/mledger/impl/NonDurableCursorImpl.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 91b46b3660c3b..129163eb317d2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -103,11 +103,13 @@ void recover(final VoidCallback callback) { protected void internalAsyncMarkDelete(final Position newPosition, Map properties, final MarkDeleteCallback callback, final Object ctx, Runnable alignAcknowledgeStatusAfterPersisted) { // Bypass persistence of mark-delete position and individually deleted messages info - - MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx, - alignAcknowledgeStatusAfterPersisted); + MarkDeleteEntry mdEntry; lock.writeLock().lock(); try { + // use given properties or when missing, use the properties from the previous field value + Map propertiesToUse = properties != null ? properties : getProperties(); + mdEntry = new MarkDeleteEntry(newPosition, propertiesToUse, callback, ctx, + alignAcknowledgeStatusAfterPersisted); lastMarkDeleteEntry = mdEntry; mdEntry.alignAcknowledgeStatus(); } finally { From 4cb904636626fc515036687fadb9387515b65557 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Fri, 9 Jan 2026 13:59:27 +0800 Subject: [PATCH 24/27] [fix][broker] Rollback CompactionTest.testCompactorReadsCompacted test --- .../java/org/apache/pulsar/compaction/CompactionTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 5b4601e21130f..077cf9d0b11b0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -995,9 +995,10 @@ public void testCompactorReadsCompacted() throws Exception { // compact the topic again compact(topic); - // shouldn't have opened first and penultimate ledger (already compacted), last ledger already open for writing + // shouldn't have opened first ledger (already compacted), penultimate would have some uncompacted data. + // last ledger already open for writing assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId)); - assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); + assertTrue(ledgersOpened.contains(info.ledgers.get(1).ledgerId)); assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId)); // all three messages should be there when we read compacted From 139957837909d5053534d31a11e3e94813a9885b Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Fri, 9 Jan 2026 14:30:25 +0800 Subject: [PATCH 25/27] [fix][broker] Add ManagedLedgerTest.testTrimmerRaceConditionInNonDurableCursor test --- .../mledger/impl/ManagedLedgerTest.java | 60 ++++++++++++++++++- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index c7f032f049a03..2a8ea917e740d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -5167,13 +5167,13 @@ public void testComparePositions() throws Exception { } @Test - public void testTrimmerRaceCondition() throws Exception { + public void testTrimmerRaceConditionInDurableCursor() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(1); config.setRetentionTime(0, TimeUnit.MILLISECONDS); config.setRetentionSizeInMB(0); - ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testTrimmerRaceCondition", config); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testTrimmerRaceConditionInDurableCursor", config); ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); // 1. Add Entry 1 (Ledger 1) @@ -5219,4 +5219,60 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { // Verify properties are preserved after cursor reset assertEquals(cursor.getProperties(), properties); } + + @Test + public void testTrimmerRaceConditionInNonDurableCursor() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(1); + config.setRetentionTime(0, TimeUnit.MILLISECONDS); + config.setRetentionSizeInMB(0); + + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testTrimmerRaceConditionInNonDurableCursor", config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.newNonDurableCursor(PositionFactory.EARLIEST); + + // 1. Add Entry 1 (Ledger 1) + ledger.addEntry("entry-1".getBytes(Encoding)); + + // 2. Ack Entry 1. Verify Persistence with properties. + List entries = cursor.readEntries(1); + assertEquals(entries.size(), 1); + Position lastPosition = entries.get(0).getPosition(); + entries.forEach(Entry::release); + + // Mark delete with properties + Map properties = new HashMap<>(); + properties.put("test-property", 12345L); + CountDownLatch latch = new CountDownLatch(1); + cursor.asyncMarkDelete(lastPosition, properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Mark delete should succeed"); + } + }, null); + + latch.await(); + assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition); + assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition); + assertEquals(cursor.getProperties(), properties); + + // 3. Add Entry 2. Triggers second rollover process. + // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger due to rollover + Position p = ledger.addEntry("entry-2".getBytes(Encoding)); + + // Wait for background tasks (metadata callback and trim) to complete. + // We expect only one ledger (Rollover and trim happened). + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> ledger.getLedgersInfo().size() == 1); + // All ledgers are trimmed, left one empty ledger, trim process moves markDeletedPosition to p.getLedgerId():0 + assertEquals(cursor.getMarkDeletedPosition(), PositionFactory.create(p.getLedgerId(), 0)); + assertEquals(cursor.getPersistentMarkDeletedPosition(), PositionFactory.create(p.getLedgerId(), 0)); + + // Verify properties are preserved after cursor reset + assertEquals(cursor.getProperties(), properties); + } } From 925b3ed3f06bc35044b0014ba729dd665ae22448 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Fri, 9 Jan 2026 16:37:02 +0800 Subject: [PATCH 26/27] [fix][broker] Fix race condition in ManagedCursorImpl.updateLastMarkDeleteEntryToLatest, add test --- .../mledger/impl/ManagedCursorImpl.java | 25 ++++---- .../mledger/impl/ManagedLedgerTest.java | 57 +++++++++++++++++++ 2 files changed, 71 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index f809c38de9012..c120b9fa719ec 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2710,17 +2710,20 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { // update lastMarkDeleteEntry field if newPosition is later than the current lastMarkDeleteEntry.newPosition private void updateLastMarkDeleteEntryToLatest(final Position newPosition, final Map properties) { - LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> { - if (last != null && last.newPosition.compareTo(newPosition) > 0) { - // keep current value, don't update - return last; - } else { - // use given properties or when missing, use the properties from the previous field value - Map propertiesToUse = - properties != null ? properties : (last != null ? last.properties : Collections.emptyMap()); - return new MarkDeleteEntry(newPosition, propertiesToUse, null, null); - } - }); + synchronized (pendingMarkDeleteOps) { + // use given properties or when missing, use the properties from the previous field value + MarkDeleteEntry lastPending = pendingMarkDeleteOps.peekLast(); + Map propertiesToUse = + properties != null ? properties : (lastPending != null ? lastPending.properties : getProperties()); + LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> { + if (last != null && last.newPosition.compareTo(newPosition) > 0) { + // keep current value, don't update + return last; + } else { + return new MarkDeleteEntry(newPosition, propertiesToUse, null, null); + } + }); + } } /** diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 2a8ea917e740d..a1675a328ce30 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -5275,4 +5275,61 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { // Verify properties are preserved after cursor reset assertEquals(cursor.getProperties(), properties); } + + @Test + public void testTrimmerRaceConditionWithThrottleMarkDeleteInDurableCursor() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + int maxEntriesPerLedger = 1; + config.setMaxEntriesPerLedger(maxEntriesPerLedger); + config.setThrottleMarkDelete(1); + config.setRetentionTime(0, TimeUnit.MILLISECONDS); + config.setRetentionSizeInMB(0); + + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testTrimmerRaceConditionWithThrottleMarkDeleteInDurableCursor", config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + + CountDownLatch latch = new CountDownLatch(1); + ledger.asyncAddEntry("entry-1".getBytes(Encoding), new AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + // Mark delete with properties + Map properties = new HashMap<>(); + properties.put("test-property", 12345L); + cursor.asyncMarkDelete(position, properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Mark delete should succeed"); + } + }, null); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + fail("Add entry should succeed"); + } + }, null); + + latch.await(); + + Map expectedProperties = new HashMap<>(); + expectedProperties.put("test-property", 12345L); + assertEquals(cursor.getProperties(), expectedProperties); + + // 3. Add Entry 2. Triggers second rollover process. + // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger due to rollover + ledger.addEntry(("entry-2").getBytes(Encoding)); + + // Wait for background tasks (metadata callback) to complete. + // We expect at least 2 ledgers (Rollover happened). + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> ledger.getLedgersInfo().size() >= 2); + + // Verify properties are preserved after cursor reset + assertEquals(cursor.getProperties(), expectedProperties); + } } From df9ac2985cd6bba6451848ddb84a3d2d0272b167 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Fri, 9 Jan 2026 16:41:40 +0800 Subject: [PATCH 27/27] [fix][broker] Fix checkstyle --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index a1675a328ce30..eff0058a12ac1 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -5286,7 +5286,8 @@ public void testTrimmerRaceConditionWithThrottleMarkDeleteInDurableCursor() thro config.setRetentionSizeInMB(0); ManagedLedgerImpl ledger = - (ManagedLedgerImpl) factory.open("testTrimmerRaceConditionWithThrottleMarkDeleteInDurableCursor", config); + (ManagedLedgerImpl) factory.open("testTrimmerRaceConditionWithThrottleMarkDeleteInDurableCursor", + config); ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); CountDownLatch latch = new CountDownLatch(1);