diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index b3cb8dc45962b..c120b9fa719ec 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1250,14 +1250,17 @@ public boolean hasMoreEntries() { @Override public long getNumberOfEntries() { - if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) { + Position readPos = readPosition; + Position lastPosition = ledger.getLastPosition(); + Position nextPosition = lastPosition.getNext(); + if (readPos.compareTo(nextPosition) > 0) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Read position {} is ahead of last position {}. There are no entries to read", - ledger.getName(), name, readPosition, ledger.getLastPosition()); + ledger.getName(), name, readPos, lastPosition); } return 0; } else { - return getNumberOfEntries(Range.closedOpen(readPosition, ledger.getLastPosition().getNext())); + return getNumberOfEntries(Range.closedOpen(readPos, nextPosition)); } } @@ -2255,13 +2258,15 @@ public void asyncMarkDelete(final Position position, Map propertie } Position newPosition = ackBatchPosition(position); - if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) { + Position markDeletePos = markDeletePosition; + Position lastConfirmedEntry = ledger.getLastConfirmedEntry(); + if (lastConfirmedEntry.compareTo(newPosition) < 0) { boolean shouldCursorMoveForward = false; try { - long ledgerEntries = ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries(); - Long nextValidLedger = ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId()); + long ledgerEntries = ledger.getLedgerInfo(markDeletePos.getLedgerId()).get().getEntries(); + Long nextValidLedger = ledger.getNextValidLedger(lastConfirmedEntry.getLedgerId()); shouldCursorMoveForward = nextValidLedger != null - && (markDeletePosition.getEntryId() + 1 >= ledgerEntries) + && (markDeletePos.getEntryId() + 1 >= ledgerEntries) && (newPosition.getLedgerId() == nextValidLedger); } catch (Exception e) { log.warn("Failed to get ledger entries while setting mark-delete-position", e); @@ -2269,11 +2274,11 @@ public void asyncMarkDelete(final Position position, Map propertie if (shouldCursorMoveForward) { log.info("[{}] move mark-delete-position from {} to {} since all the entries have been consumed", - ledger.getName(), markDeletePosition, newPosition); + ledger.getName(), markDeletePos, newPosition); } else { if (log.isDebugEnabled()) { log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}" - + " for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name); + + " for cursor [{}]", ledger.getName(), position, lastConfirmedEntry, name); } callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx); return; @@ -2329,11 +2334,15 @@ protected void internalAsyncMarkDelete(final Position newPosition, Map propertiesToUse = + properties != null ? properties : (last != null ? last.properties : getProperties()); + MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, propertiesToUse, callback, ctx, + alignAcknowledgeStatusAfterPersisted); + // The state might have changed while we were waiting on the queue mutex switch (state) { case Closed: @@ -2701,17 +2710,20 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { // update lastMarkDeleteEntry field if newPosition is later than the current lastMarkDeleteEntry.newPosition private void updateLastMarkDeleteEntryToLatest(final Position newPosition, final Map properties) { - LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> { - if (last != null && last.newPosition.compareTo(newPosition) > 0) { - // keep current value, don't update - return last; - } else { - // use given properties or when missing, use the properties from the previous field value - Map propertiesToUse = - properties != null ? properties : (last != null ? last.properties : Collections.emptyMap()); - return new MarkDeleteEntry(newPosition, propertiesToUse, null, null); - } - }); + synchronized (pendingMarkDeleteOps) { + // use given properties or when missing, use the properties from the previous field value + MarkDeleteEntry lastPending = pendingMarkDeleteOps.peekLast(); + Map propertiesToUse = + properties != null ? properties : (lastPending != null ? lastPending.properties : getProperties()); + LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> { + if (last != null && last.newPosition.compareTo(newPosition) > 0) { + // keep current value, don't update + return last; + } else { + return new MarkDeleteEntry(newPosition, propertiesToUse, null, null); + } + }); + } } /** diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index a452c6682a53b..97333fbb1e36c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -481,14 +481,15 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final public void initializeComplete() { log.info("[{}] Successfully initialize managed ledger", name); pendingInitializeLedgers.remove(name, pendingLedger); - future.complete(newledger); - - // May need to update the cursor position - newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); - // May need to trigger offloading - if (config.isTriggerOffloadOnTopicLoad()) { - newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); - } + // May need to update the cursor position and wait them finished + newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger().whenComplete((__, ex) -> { + // ignore ex since it is handled in maybeUpdateCursorBeforeTrimmingConsumedLedger + future.complete(newledger); + // May need to trigger offloading + if (config.isTriggerOffloadOnTopicLoad()) { + newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); + } + }); } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4b278cf6664d4..bb682cdb293da 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1761,11 +1761,10 @@ public void operationComplete(Void v, Stat stat) { updateLedgersIdsComplete(originalCurrentLedger); mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS); + // May need to update the cursor position + maybeUpdateCursorBeforeTrimmingConsumedLedger(); } metadataMutex.unlock(); - - // May need to update the cursor position - maybeUpdateCursorBeforeTrimmingConsumedLedger(); } @Override @@ -2709,18 +2708,23 @@ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) { this.waitingEntryCallBacks.add(cb); } - public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { + public CompletableFuture maybeUpdateCursorBeforeTrimmingConsumedLedger() { + List> cursorMarkDeleteFutures = new ArrayList<>(); for (ManagedCursor cursor : cursors) { - Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null - ? cursor.getPersistentMarkDeletedPosition() : cursor.getMarkDeletedPosition(); - LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId()); + CompletableFuture future = new CompletableFuture<>(); + cursorMarkDeleteFutures.add(future); + + // Snapshot positions into a local variables to avoid race condition. + Position markDeletedPosition = cursor.getMarkDeletedPosition(); + Position lastAckedPosition = markDeletedPosition; + LedgerInfo curPointedLedger = ledgers.get(lastAckedPosition.getLedgerId()); LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId())) .map(Map.Entry::getValue).orElse(null); - if (currPointedLedger != null) { + if (curPointedLedger != null) { if (nextPointedLedger != null) { if (lastAckedPosition.getEntryId() != -1 - && lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) { + && lastAckedPosition.getEntryId() + 1 >= curPointedLedger.getEntries()) { lastAckedPosition = PositionFactory.create(nextPointedLedger.getLedgerId(), -1); } } else { @@ -2730,25 +2734,37 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { log.warn("Cursor: {} does not exist in the managed-ledger.", cursor); } - if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) { + int compareResult = lastAckedPosition.compareTo(markDeletedPosition); + if (compareResult > 0) { Position finalPosition = lastAckedPosition; - log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition); - cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(), - new MarkDeleteCallback() { - @Override - public void markDeleteComplete(Object ctx) { - log.info("Successfully persisted cursor position for cursor:{} to {}", - cursor, finalPosition); - } + log.info("Mark deleting cursor:{} from {} to {} since ledger consumed completely.", cursor, + markDeletedPosition, lastAckedPosition); + cursor.asyncMarkDelete(lastAckedPosition, null, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + log.info("Successfully persisted cursor position for cursor:{} to {}", cursor, finalPosition); + future.complete(null); + } - @Override - public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.", - cursor, cursor.getMarkDeletedPosition(), finalPosition, exception); - } - }, null); + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + log.warn("Failed to mark delete: {} from {} to {}. ", cursor, cursor.getMarkDeletedPosition(), + finalPosition, exception); + future.completeExceptionally(exception); + } + }, null); + } else if (compareResult == 0) { + log.debug("No need to reset cursor: {}, last acked position equals to current mark-delete position {}.", + cursor, markDeletedPosition); + future.complete(null); + } else { + // Should not happen + log.warn("Ledger rollover tries to mark delete an already mark-deleted position. Current mark-delete:" + + " {} -- attempted position: {}", markDeletedPosition, lastAckedPosition); + future.complete(null); } } + return FutureUtil.waitForAll(cursorMarkDeleteFutures); } private void trimConsumedLedgersInBackground() { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 91b46b3660c3b..129163eb317d2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -103,11 +103,13 @@ void recover(final VoidCallback callback) { protected void internalAsyncMarkDelete(final Position newPosition, Map properties, final MarkDeleteCallback callback, final Object ctx, Runnable alignAcknowledgeStatusAfterPersisted) { // Bypass persistence of mark-delete position and individually deleted messages info - - MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx, - alignAcknowledgeStatusAfterPersisted); + MarkDeleteEntry mdEntry; lock.writeLock().lock(); try { + // use given properties or when missing, use the properties from the previous field value + Map propertiesToUse = properties != null ? properties : getProperties(); + mdEntry = new MarkDeleteEntry(newPosition, propertiesToUse, callback, ctx, + alignAcknowledgeStatusAfterPersisted); lastMarkDeleteEntry = mdEntry; mdEntry.alignAcknowledgeStatus(); } finally { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index c87477a95a0ed..bfb9b6ecca1b3 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -407,7 +407,8 @@ void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception { ml.close(); ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig); ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName); - assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry); + assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); + assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThan(lastEntry); // cleanup. ml.delete(); @@ -498,12 +499,18 @@ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception { assertTrue(slowestReadPosition.getLedgerId() >= lastEntry.getLedgerId()); assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId()); assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry); + assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); + assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); // Verify the mark delete position can be recovered properly. ml.close(); ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig); ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName); - assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry); + assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); + // If previous ledger is trimmed, Cursor: ManagedCursorImpl{ledger=ml_test, name=c1, ackPos=12:0, readPos=15:0} + // does not exist in the managed-ledger. Recovered cursor's position will not be moved forward. + // TODO should be handled in ledger trim process. + assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry); // cleanup. ml.delete(); @@ -4441,7 +4448,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config); ManagedCursor c2 = ledger2.openCursor("c"); - assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1)); }); } @@ -4500,7 +4507,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { ManagedLedger ledger2 = factory2.open("testFlushCursorAfterIndDelInactivity", config); ManagedCursor c2 = ledger2.openCursor("c"); - assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1)); }); } @@ -4552,7 +4559,7 @@ public void testFlushCursorAfterError() throws Exception { ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config); ManagedCursor c2 = ledger2.openCursor("c"); - assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1)); }); } @@ -4815,7 +4822,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } @Test - public void testLazyCursorLedgerCreation() throws Exception { + public void testEagerCursorLedgerCreation() throws Exception { ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory .open("testLazyCursorLedgerCreation", managedLedgerConfig); @@ -4840,8 +4847,8 @@ public void testLazyCursorLedgerCreation() throws Exception { ledger = (ManagedLedgerImpl) factory .open("testLazyCursorLedgerCreation", managedLedgerConfig); ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor("test"); - assertEquals(cursor1.getState(), "NoLedger"); - assertEquals(cursor1.getMarkDeletedPosition(), finalLastPosition); + assertEquals(cursor1.getState(), "Open"); + assertThat(cursor1.getMarkDeletedPosition()).isGreaterThan(finalLastPosition); // Verify the recovered cursor can work with new mark delete. lastPosition = null; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 1cdeb415b7c21..eff0058a12ac1 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.impl; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyMap; @@ -56,6 +57,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -69,6 +71,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; @@ -3784,6 +3787,61 @@ public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exceptio }); } + @Test(timeOut = 20000) + public void testNeverThrowExceptionInMaybeUpdateCursorBeforeTrimmingConsumedLedger() + throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + initManagedLedgerConfig(config); + config.setMaxEntriesPerLedger(1); + int entryNum = 100; + + ManagedLedgerImpl realManagedLedger = + (ManagedLedgerImpl) factory.open("maybeUpdateCursorBeforeTrimmingConsumed_ledger", config); + ManagedLedgerImpl managedLedger = spy(realManagedLedger); + ManagedCursor cursor = managedLedger.openCursor("c1"); + + Deque> futures = new ConcurrentLinkedDeque<>(); + doAnswer(invocation -> { + CompletableFuture result = (CompletableFuture) invocation.callRealMethod(); + futures.offer(result); + return result; + }).when(managedLedger).maybeUpdateCursorBeforeTrimmingConsumedLedger(); + + final CountDownLatch latch = new CountDownLatch(entryNum); + // Two asyncMarkDelete operations running concurrently: + // 1. ledger rollover triggered maybeUpdateCursorBeforeTrimmingConsumedLedger. + // 2. user triggered asyncMarkDelete. + for (int i = 0; i < entryNum; i++) { + managedLedger.asyncAddEntry("entry".getBytes(Encoding), new AddEntryCallback() { + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + cursor.asyncMarkDelete(position, new MarkDeleteCallback() { + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Should never fail", exception); + } + + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + }, null); + + } + }, null); + } + + latch.await(); + assertEquals(cursor.getNumberOfEntries(), 0); + + // Will not throw exception + FutureUtil.waitForAll(futures).get(); + } + @Test(timeOut = 20000) public void testAsyncTruncateLedgerRetention() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); @@ -5109,13 +5167,13 @@ public void testComparePositions() throws Exception { } @Test - public void testTrimmerRaceCondition() throws Exception { + public void testTrimmerRaceConditionInDurableCursor() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(1); config.setRetentionTime(0, TimeUnit.MILLISECONDS); config.setRetentionSizeInMB(0); - ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testTrimmerRaceCondition", config); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testTrimmerRaceConditionInDurableCursor", config); ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); // 1. Add Entry 1 (Ledger 1) @@ -5144,20 +5202,135 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { }, null); latch.await(); - assertEquals(cursor.getPersistentMarkDeletedPosition(), lastPosition); - assertEquals(ledger.getCursors().getSlowestCursorPosition(), lastPosition); + assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition); + assertThat(ledger.getCursors().getSlowestCursorPosition()).isGreaterThanOrEqualTo(lastPosition); assertEquals(cursor.getProperties(), properties); - // 3. Add Entry 2. Triggers Rollover. + // 3. Add Entry 2. Triggers second rollover process. // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger due to rollover Position p = ledger.addEntry("entry-2".getBytes(Encoding)); // Wait for background tasks (metadata callback) to complete. // We expect at least 2 ledgers (Rollover happened). Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> ledger.getLedgersInfo().size() >= 2); - assertEquals(cursor.getPersistentMarkDeletedPosition(), new ImmutablePositionImpl(p.getLedgerId(), -1)); + // First ledger is all consumed and trimmed, left current ledger and next empty ledger. + assertEquals(cursor.getPersistentMarkDeletedPosition(), PositionFactory.create(p.getLedgerId(), -1)); // Verify properties are preserved after cursor reset assertEquals(cursor.getProperties(), properties); } + + @Test + public void testTrimmerRaceConditionInNonDurableCursor() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(1); + config.setRetentionTime(0, TimeUnit.MILLISECONDS); + config.setRetentionSizeInMB(0); + + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testTrimmerRaceConditionInNonDurableCursor", config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.newNonDurableCursor(PositionFactory.EARLIEST); + + // 1. Add Entry 1 (Ledger 1) + ledger.addEntry("entry-1".getBytes(Encoding)); + + // 2. Ack Entry 1. Verify Persistence with properties. + List entries = cursor.readEntries(1); + assertEquals(entries.size(), 1); + Position lastPosition = entries.get(0).getPosition(); + entries.forEach(Entry::release); + + // Mark delete with properties + Map properties = new HashMap<>(); + properties.put("test-property", 12345L); + CountDownLatch latch = new CountDownLatch(1); + cursor.asyncMarkDelete(lastPosition, properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Mark delete should succeed"); + } + }, null); + + latch.await(); + assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition); + assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition); + assertEquals(cursor.getProperties(), properties); + + // 3. Add Entry 2. Triggers second rollover process. + // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger due to rollover + Position p = ledger.addEntry("entry-2".getBytes(Encoding)); + + // Wait for background tasks (metadata callback and trim) to complete. + // We expect only one ledger (Rollover and trim happened). + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> ledger.getLedgersInfo().size() == 1); + // All ledgers are trimmed, left one empty ledger, trim process moves markDeletedPosition to p.getLedgerId():0 + assertEquals(cursor.getMarkDeletedPosition(), PositionFactory.create(p.getLedgerId(), 0)); + assertEquals(cursor.getPersistentMarkDeletedPosition(), PositionFactory.create(p.getLedgerId(), 0)); + + // Verify properties are preserved after cursor reset + assertEquals(cursor.getProperties(), properties); + } + + @Test + public void testTrimmerRaceConditionWithThrottleMarkDeleteInDurableCursor() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + int maxEntriesPerLedger = 1; + config.setMaxEntriesPerLedger(maxEntriesPerLedger); + config.setThrottleMarkDelete(1); + config.setRetentionTime(0, TimeUnit.MILLISECONDS); + config.setRetentionSizeInMB(0); + + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testTrimmerRaceConditionWithThrottleMarkDeleteInDurableCursor", + config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + + CountDownLatch latch = new CountDownLatch(1); + ledger.asyncAddEntry("entry-1".getBytes(Encoding), new AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + // Mark delete with properties + Map properties = new HashMap<>(); + properties.put("test-property", 12345L); + cursor.asyncMarkDelete(position, properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail("Mark delete should succeed"); + } + }, null); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + fail("Add entry should succeed"); + } + }, null); + + latch.await(); + + Map expectedProperties = new HashMap<>(); + expectedProperties.put("test-property", 12345L); + assertEquals(cursor.getProperties(), expectedProperties); + + // 3. Add Entry 2. Triggers second rollover process. + // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger due to rollover + ledger.addEntry(("entry-2").getBytes(Encoding)); + + // Wait for background tasks (metadata callback) to complete. + // We expect at least 2 ledgers (Rollover happened). + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> ledger.getLedgersInfo().size() >= 2); + + // Verify properties are preserved after cursor reset + assertEquals(cursor.getProperties(), expectedProperties); + } }