Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
413be42
[fix][broker] Fix markDeletedPosition race condition in maybeUpdateCu…
oneby-wang Dec 24, 2025
3f77d84
[fix][broker] Add debug log
oneby-wang Dec 24, 2025
9f8161e
[fix][broker] Wait updating cursor finished when opening managed ledger
oneby-wang Dec 24, 2025
0091bf5
[fix][broker] Modify uncorrected comment
oneby-wang Dec 24, 2025
76bed7a
[fix][broker] Add test for maybeUpdateCursorBeforeTrimmingConsumedLedger
oneby-wang Dec 24, 2025
7232f6d
[fix][broker] Fix test
oneby-wang Dec 24, 2025
8e5a110
[fix][broker] Fix test
oneby-wang Dec 24, 2025
3de1d97
[fix][test] Rollback some codes for test
oneby-wang Dec 24, 2025
a808715
[fix][broker] Try to fix maybeUpdateCursorBeforeTrimmingConsumedLedge…
oneby-wang Dec 27, 2025
47ff863
[fix][broker] Fix race condition in ManagedCursorImpl.getNumberOfEntries
oneby-wang Dec 27, 2025
a51d9ec
[fix][broker] Fix CompactionTest.testCompactorReadsCompacted
oneby-wang Dec 27, 2025
6a5fffb
[fix][broker] Fix race condition in ManagedCursorImpl.getNumberOfEntries
oneby-wang Dec 28, 2025
d4d79e9
[fix][broker] Fix ManagedCursorTest.testFlushCursorAfterError test
oneby-wang Dec 28, 2025
3294d78
[fix][broker] Modify logic
oneby-wang Dec 28, 2025
ca720cf
[fix][broker] Fix ManagedCursorTest testFlushCursorAfterInactivity an…
oneby-wang Dec 28, 2025
a61156f
[fix][broker] Fix ManagedCursorTest.testLazyCursorLedgerCreation test
oneby-wang Dec 28, 2025
db29e55
[fix][broker] Fix ManagedCursorTest.testPersistentMarkDeleteIfCreateC…
oneby-wang Dec 28, 2025
5d6a41d
[fix][broker] Fix ManagedCursorTest.testPersistentMarkDeleteIfSwitchC…
oneby-wang Dec 28, 2025
3569bde
[fix][broker] Fix tests
oneby-wang Dec 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1243,14 +1243,17 @@ public boolean hasMoreEntries() {

@Override
public long getNumberOfEntries() {
if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) {
Position readPos = readPosition;
Position lastPosition = ledger.getLastPosition();
Position nextPosition = lastPosition.getNext();
if (readPos.compareTo(nextPosition) > 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read position {} is ahead of last position {}. There are no entries to read",
ledger.getName(), name, readPosition, ledger.getLastPosition());
ledger.getName(), name, readPos, lastPosition);
}
return 0;
} else {
return getNumberOfEntries(Range.closedOpen(readPosition, ledger.getLastPosition().getNext()));
return getNumberOfEntries(Range.closedOpen(readPos, nextPosition));
}
}

Expand Down Expand Up @@ -2182,25 +2185,27 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
}

Position newPosition = ackBatchPosition(position);
if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) {
Position markDeletePos = markDeletePosition;
Position lastConfirmedEntry = ledger.getLastConfirmedEntry();
if (lastConfirmedEntry.compareTo(newPosition) < 0) {
boolean shouldCursorMoveForward = false;
try {
long ledgerEntries = ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries();
Long nextValidLedger = ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId());
long ledgerEntries = ledger.getLedgerInfo(markDeletePos.getLedgerId()).get().getEntries();
Long nextValidLedger = ledger.getNextValidLedger(lastConfirmedEntry.getLedgerId());
shouldCursorMoveForward = nextValidLedger != null
&& (markDeletePosition.getEntryId() + 1 >= ledgerEntries)
&& (markDeletePos.getEntryId() + 1 >= ledgerEntries)
&& (newPosition.getLedgerId() == nextValidLedger);
} catch (Exception e) {
log.warn("Failed to get ledger entries while setting mark-delete-position", e);
}

if (shouldCursorMoveForward) {
log.info("[{}] move mark-delete-position from {} to {} since all the entries have been consumed",
ledger.getName(), markDeletePosition, newPosition);
ledger.getName(), markDeletePos, newPosition);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}"
+ " for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
+ " for cursor [{}]", ledger.getName(), position, lastConfirmedEntry, name);
}
callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,14 +481,15 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
public void initializeComplete() {
log.info("[{}] Successfully initialize managed ledger", name);
pendingInitializeLedgers.remove(name, pendingLedger);
future.complete(newledger);

// May need to update the cursor position
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
// May need to trigger offloading
if (config.isTriggerOffloadOnTopicLoad()) {
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
}
// May need to update the cursor position and wait them finished
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger().whenComplete((__, ex) -> {
// ignore ex since it is handled in maybeUpdateCursorBeforeTrimmingConsumedLedger
future.complete(newledger);
// May need to trigger offloading
if (config.isTriggerOffloadOnTopicLoad()) {
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1761,11 +1761,10 @@ public void operationComplete(Void v, Stat stat) {
updateLedgersIdsComplete(originalCurrentLedger);
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
- lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS);
// May need to update the cursor position
maybeUpdateCursorBeforeTrimmingConsumedLedger();
}
metadataMutex.unlock();

// May need to update the cursor position
maybeUpdateCursorBeforeTrimmingConsumedLedger();
}

@Override
Expand Down Expand Up @@ -2709,18 +2708,23 @@ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
this.waitingEntryCallBacks.add(cb);
}

public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
public CompletableFuture<Void> maybeUpdateCursorBeforeTrimmingConsumedLedger() {
List<CompletableFuture<Void>> cursorMarkDeleteFutures = new ArrayList<>();
for (ManagedCursor cursor : cursors) {
Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null
? cursor.getPersistentMarkDeletedPosition() : cursor.getMarkDeletedPosition();
LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
CompletableFuture<Void> future = new CompletableFuture<>();
cursorMarkDeleteFutures.add(future);

// Snapshot positions into a local variables to avoid race condition.
Position markDeletedPosition = cursor.getMarkDeletedPosition();
Position lastAckedPosition = markDeletedPosition;
LedgerInfo curPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
.map(Map.Entry::getValue).orElse(null);

if (currPointedLedger != null) {
if (curPointedLedger != null) {
if (nextPointedLedger != null) {
if (lastAckedPosition.getEntryId() != -1
&& lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) {
&& lastAckedPosition.getEntryId() + 1 >= curPointedLedger.getEntries()) {
lastAckedPosition = PositionFactory.create(nextPointedLedger.getLedgerId(), -1);
}
} else {
Expand All @@ -2730,25 +2734,39 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
log.warn("Cursor: {} does not exist in the managed-ledger.", cursor);
}

if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) {
int compareResult = lastAckedPosition.compareTo(markDeletedPosition);
if (compareResult > 0) {
Position finalPosition = lastAckedPosition;
log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition);
log.info("Mark deleting cursor:{} from {} to {} since ledger consumed completely.", cursor,
markDeletedPosition, lastAckedPosition);
cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(),
new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
log.info("Successfully persisted cursor position for cursor:{} to {}",
cursor, finalPosition);
future.complete(null);
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.",
cursor, cursor.getMarkDeletedPosition(), finalPosition, exception);
log.warn("Failed to mark delete: {} from {} to {}. ", cursor,
cursor.getMarkDeletedPosition(), finalPosition, exception);
future.completeExceptionally(exception);
}
}, null);
} else if (compareResult == 0) {
log.debug("No need to reset cursor: {}, last acked position equals to current mark-delete position {}.",
cursor, markDeletedPosition);
future.complete(null);
} else {
// Should not happen
log.warn("Ledger rollover tries to mark delete an already mark-deleted position. Current mark-delete:"
+ " {} -- attempted position: {}", markDeletedPosition, lastAckedPosition);
future.complete(null);
}
}
return FutureUtil.waitForAll(cursorMarkDeleteFutures);
}

private void trimConsumedLedgersInBackground() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception {
ml.close();
ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName);
assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry);
assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThan(lastEntry);

// cleanup.
ml.delete();
Expand Down Expand Up @@ -384,12 +385,18 @@ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception {
assertTrue(slowestReadPosition.getLedgerId() >= lastEntry.getLedgerId());
assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);

// Verify the mark delete position can be recovered properly.
ml.close();
ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName);
assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry);
assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
// If previous ledger is trimmed, Cursor: ManagedCursorImpl{ledger=ml_test, name=c1, ackPos=12:0, readPos=15:0}
// does not exist in the managed-ledger. Recovered cursor's position will not be moved forward.
// TODO should be handled in ledger trim process.
assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);

// cleanup.
ml.delete();
Expand Down Expand Up @@ -4252,7 +4259,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
ManagedCursor c2 = ledger2.openCursor("c");

assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1));
});
}

Expand Down Expand Up @@ -4311,7 +4318,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterIndDelInactivity", config);
ManagedCursor c2 = ledger2.openCursor("c");

assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1));
});
}

Expand Down Expand Up @@ -4363,7 +4370,7 @@ public void testFlushCursorAfterError() throws Exception {
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
ManagedCursor c2 = ledger2.openCursor("c");

assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1));
});
}

Expand Down Expand Up @@ -4626,7 +4633,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}

@Test
public void testLazyCursorLedgerCreation() throws Exception {
public void testEagerCursorLedgerCreation() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory
.open("testLazyCursorLedgerCreation", managedLedgerConfig);
Expand All @@ -4651,8 +4658,8 @@ public void testLazyCursorLedgerCreation() throws Exception {
ledger = (ManagedLedgerImpl) factory
.open("testLazyCursorLedgerCreation", managedLedgerConfig);
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor("test");
assertEquals(cursor1.getState(), "NoLedger");
assertEquals(cursor1.getMarkDeletedPosition(), finalLastPosition);
assertEquals(cursor1.getState(), "Open");
assertThat(cursor1.getMarkDeletedPosition()).isGreaterThan(finalLastPosition);

// Verify the recovered cursor can work with new mark delete.
lastPosition = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -69,6 +70,7 @@
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -3784,6 +3786,61 @@ public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exceptio
});
}

@Test(timeOut = 20000)
public void testNeverThrowExceptionInMaybeUpdateCursorBeforeTrimmingConsumedLedger()
throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
initManagedLedgerConfig(config);
config.setMaxEntriesPerLedger(1);
int entryNum = 100;

ManagedLedgerImpl realManagedLedger =
(ManagedLedgerImpl) factory.open("maybeUpdateCursorBeforeTrimmingConsumed_ledger", config);
ManagedLedgerImpl managedLedger = spy(realManagedLedger);
ManagedCursor cursor = managedLedger.openCursor("c1");

Deque<CompletableFuture<Void>> futures = new ConcurrentLinkedDeque<>();
doAnswer(invocation -> {
CompletableFuture<Void> result = (CompletableFuture<Void>) invocation.callRealMethod();
futures.offer(result);
return result;
}).when(managedLedger).maybeUpdateCursorBeforeTrimmingConsumedLedger();

final CountDownLatch latch = new CountDownLatch(entryNum);
// Two asyncMarkDelete operations running concurrently:
// 1. ledger rollover triggered maybeUpdateCursorBeforeTrimmingConsumedLedger.
// 2. user triggered asyncMarkDelete.
for (int i = 0; i < entryNum; i++) {
managedLedger.asyncAddEntry("entry".getBytes(Encoding), new AddEntryCallback() {
@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
}

@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
fail("Should never fail", exception);
}

@Override
public void markDeleteComplete(Object ctx) {
latch.countDown();
}
}, null);

}
}, null);
}

latch.await();
assertEquals(cursor.getNumberOfEntries(), 0);

// Will not throw exception
FutureUtil.waitForAll(futures).get();
}

@Test(timeOut = 20000)
public void testAsyncTruncateLedgerRetention() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,10 +975,9 @@ public void testCompactorReadsCompacted() throws Exception {
// compact the topic again
compact(topic);

// shouldn't have opened first ledger (already compacted), penultimate would have some uncompacted data.
// last ledger already open for writing
// shouldn't have opened first and penultimate ledger (already compacted), last ledger already open for writing
assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
assertTrue(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId));

// all three messages should be there when we read compacted
Expand Down