Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,20 +284,18 @@ private CursorSendEngine(String sfDir, long segmentSizeBytes, SegmentManager man
this.ring = ringInProgress;
this.watermark = watermarkInProgress;
} catch (Throwable t) {
// Order: ring first (releases mmap/fd), then manager (joins
// worker thread, but only if we started it AND we own it),
// then watermark (releases its own mmap/fd), then slot lock.
// Each in its own try/catch so a single failure doesn't
// strand later cleanups.
if (ringInProgress != null) {
// Stop an owned manager before freeing the ring and watermark it may
// touch, then release the slot lock. Each cleanup is in its own
// try/catch so a single failure doesn't strand later cleanups.
if (ownsManager && managerStarted) {
try {
ringInProgress.close();
manager.close();
} catch (Throwable ignored) {
}
}
if (ownsManager && managerStarted) {
if (ringInProgress != null) {
try {
manager.close();
ringInProgress.close();
} catch (Throwable ignored) {
}
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.questdb.client.cutlass.qwp.client.sf.cursor.AckWatermark;
import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorSendEngine;
import io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment;
import io.questdb.client.cutlass.qwp.client.sf.cursor.SegmentManager;
import io.questdb.client.cutlass.qwp.client.sf.cursor.SlotLock;
import io.questdb.client.std.Files;
import io.questdb.client.std.MemoryTag;
import io.questdb.client.std.ObjList;
Expand All @@ -37,7 +39,12 @@
import org.junit.Before;
import org.junit.Test;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -190,6 +197,50 @@ public void testCloseIsIdempotent() throws Exception {
});
}

@Test
public void testConstructorFailureAfterOwnedManagerStartCleansResources() throws Exception {
TestUtils.assertMemoryLeak(() -> {
SegmentManager manager = new SegmentManager(4096);
poisonRegisterGeneration(manager);

Throwable thrown = invokeOwnedPrivateConstructorExpectingFailure(tmpDir, 4096, manager);
assertTrue("register sabotage should surface from constructor catch: " + thrown,
thrown instanceof NullPointerException);

assertNull("owned manager worker must be stopped by constructor catch",
workerThread(manager));
assertSlotCanBeReacquired(tmpDir);
});
}

@Test
public void testConstructorFailureWithSharedManagerReleasesSlotButKeepsManagerRunning() throws Exception {
TestUtils.assertMemoryLeak(() -> {
SegmentManager manager = new SegmentManager(4096);
try {
manager.start();
Thread originalWorker = workerThread(manager);
assertNotNull("shared manager must be running before constructor", originalWorker);
assertTrue("shared manager worker must be alive before constructor",
originalWorker.isAlive());

poisonRegisterGeneration(manager);
Throwable thrown = invokeSharedConstructorExpectingFailure(tmpDir, 4096, manager);
assertTrue("register sabotage should surface from constructor catch: " + thrown,
thrown instanceof NullPointerException);

Thread stillOwnedByCaller = workerThread(manager);
assertNotNull("constructor catch must not close caller-owned manager",
stillOwnedByCaller);
assertTrue("caller-owned manager worker must remain alive",
stillOwnedByCaller.isAlive());
assertSlotCanBeReacquired(tmpDir);
} finally {
manager.close();
}
});
}

@Test
public void testMemoryModeSkipsDirAndStillWorks() throws Exception {
TestUtils.assertMemoryLeak(() -> {
Expand Down Expand Up @@ -283,6 +334,53 @@ public void testRecoveryIgnoresWatermarkAbovePublishedFsn() throws Exception {
});
}

@Test(timeout = 30_000L)
public void testManagerPersistedWatermarkSurvivesRestart() throws Exception {
// Positive twin of testRecoveryAdvancesAckedFsnPastWatermark. That test
// FORGES the .ack-watermark by hand; this one drives a real, started
// SegmentManager to PERSIST it from real acks, then proves a second
// session recovers the manager-written value. Without this, a regression
// that silently stopped the manager's trim-path watermark.write() (e.g.
// an inverted `registered` gate) would pass the whole suite: the durable-
// ack tests assert on the in-memory engine.ackedFsn(), and the recovery
// tests forge the watermark, so nothing observes the manager doing the
// write.
TestUtils.assertMemoryLeak(() -> {
long segSize = MmapSegment.HEADER_SIZE
+ 4 * (MmapSegment.FRAME_HEADER_SIZE + 64);
long buf = Unsafe.malloc(64, MemoryTag.NATIVE_DEFAULT);
try {
// Session 1: four frames (publishedFsn = 3), partially acked at 2.
// The ack is below publishedFsn so close() does not treat the slot
// as fully drained — segments and watermark survive for recovery.
// All four frames stay in the active segment, so nothing is
// trimmed and the segment-derived recovery seed is
// lowestBase - 1 == -1; the manager-written watermark (2) is the
// only thing that can lift the recovered ackedFsn above it.
try (CursorSendEngine engine = new CursorSendEngine(tmpDir, segSize)) {
for (int i = 0; i < 4; i++) {
engine.appendBlocking(buf, 64);
}
assertTrue("ack must advance", engine.acknowledge(2L));
// Block until the background worker has actually written the
// watermark to disk. If the trim-path write were gated off this
// never reaches 2 and the helper fails with a clear message,
// rather than the test flaking on a close()-before-tick race.
awaitManagerPersistedWatermark(tmpDir, 2L);
}
// Session 2: recovery must seed ackedFsn from the manager-written
// watermark (2), not the bare segment-derived seed (-1).
try (CursorSendEngine engine = new CursorSendEngine(tmpDir, segSize)) {
assertEquals("recovery must consume the manager-persisted watermark",
2L, engine.ackedFsn());
assertEquals(3L, engine.publishedFsn());
}
} finally {
Unsafe.free(buf, 64, MemoryTag.NATIVE_DEFAULT);
}
});
}

@Test
public void testRestartIntoNonEmptySfDirContinuesFsnSequence() throws Exception {
TestUtils.assertMemoryLeak(() -> {
Expand Down Expand Up @@ -500,4 +598,73 @@ public void testWasRecoveredFromDiskTrueOnReopen() throws Exception {
}
});
}

private static void assertSlotCanBeReacquired(String sfDir) {
try (SlotLock ignored = SlotLock.acquire(sfDir)) {
// good
}
}

private static Throwable invokeOwnedPrivateConstructorExpectingFailure(
String sfDir, long segmentSizeBytes, SegmentManager manager) throws Exception {
Constructor<CursorSendEngine> ctor = CursorSendEngine.class.getDeclaredConstructor(
String.class, long.class, SegmentManager.class, boolean.class, long.class);
ctor.setAccessible(true);
try {
ctor.newInstance(sfDir, segmentSizeBytes, manager, true,
CursorSendEngine.DEFAULT_APPEND_DEADLINE_NANOS);
fail("expected constructor failure");
return null;
} catch (InvocationTargetException e) {
return e.getCause();
}
}

private static Throwable invokeSharedConstructorExpectingFailure(
String sfDir, long segmentSizeBytes, SegmentManager manager) {
try {
new CursorSendEngine(sfDir, segmentSizeBytes, manager);
fail("expected constructor failure");
return null;
} catch (Throwable t) {
return t;
}
}

private static void poisonRegisterGeneration(SegmentManager manager) throws Exception {
// register() advances fileGeneration before publishing the ring. Nulling
// it forces a deterministic constructor failure after the ring and
// watermark exist, without adding a production test hook.
Field f = SegmentManager.class.getDeclaredField("fileGeneration");
f.setAccessible(true);
f.set(manager, null);
}

private static Thread workerThread(SegmentManager manager) throws Exception {
Field f = SegmentManager.class.getDeclaredField("workerThread");
f.setAccessible(true);
return (Thread) f.get(manager);
}

// Polls the on-disk watermark until it reads {@code expected}, or fails after
// a bounded wait. The probe is a second mapping of the same file the manager
// worker writes through; its MAP_SHARED reads observe the worker's writes, and
// it is closed before the next session opens the slot.
private static void awaitManagerPersistedWatermark(String slotDir, long expected) {
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
long last = AckWatermark.INVALID;
try (AckWatermark probe = AckWatermark.open(slotDir)) {
assertNotNull("watermark file must exist after register", probe);
while (System.nanoTime() < deadline) {
last = probe.read();
if (last == expected) {
return;
}
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(2));
}
}
fail("manager did not persist watermark=" + expected
+ " within 10s (last on-disk read=" + last + ")");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment;
import io.questdb.client.cutlass.qwp.client.sf.cursor.SegmentManager;
import io.questdb.client.cutlass.qwp.client.sf.cursor.SegmentRing;
import io.questdb.client.std.bytes.DirectByteSink;
import io.questdb.client.std.Files;
import io.questdb.client.std.str.DirectUtf8Sink;
import io.questdb.client.test.tools.TestUtils;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -36,6 +38,10 @@

import java.lang.reflect.Field;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
* Concurrent regression for the {@code SegmentManager} worker race vs
Expand Down Expand Up @@ -130,6 +136,76 @@ public void testManagerDoesNotInstallSpareIntoClosedRing() throws Exception {
});
}

@Test(timeout = 15_000L)
public void testCloseDoesNotFreePathScratchWhenWorkerStillAlive() throws Exception {
TestUtils.assertMemoryLeak(() -> {
long segSize = MmapSegment.HEADER_SIZE + (MmapSegment.FRAME_HEADER_SIZE + 32);
String slot = tmpDir + "/timeout-slot";
Assert.assertEquals(0, Files.mkdir(slot, Files.DIR_MODE_DEFAULT));
MmapSegment initial = MmapSegment.create(slot + "/sf-initial.sfa", 0L, segSize);
SegmentRing ring = new SegmentRing(initial, segSize);
SegmentManager manager = new SegmentManager(segSize, TimeUnit.SECONDS.toNanos(60));
CountDownLatch workerBlocked = new CountDownLatch(1);
CountDownLatch releaseWorker = new CountDownLatch(1);
AtomicBoolean fired = new AtomicBoolean();
AtomicReference<Throwable> hookErr = new AtomicReference<>();
boolean managerClosed = false;
try {
manager.register(ring, slot);
manager.setBeforeInstallSyncHook(() -> {
if (!fired.compareAndSet(false, true)) return;
workerBlocked.countDown();
try {
if (!releaseWorker.await(10, TimeUnit.SECONDS)) {
hookErr.compareAndSet(null,
new AssertionError("timed out waiting for test to release worker"));
}
} catch (Throwable t) {
hookErr.compareAndSet(null, t);
}
});
manager.start();
Assert.assertTrue("worker did not reach install hook",
workerBlocked.await(5, TimeUnit.SECONDS));
Assert.assertTrue("precondition: path scratch should be allocated",
readPathScratchImpl(manager) != 0L);

// Exercise the same branch as a timed-out join without making
// the test sleep for 5 seconds: join() returns while the worker
// is still alive. close() must leave worker-owned native memory
// alone so the worker can resume safely.
Thread.currentThread().interrupt();
manager.close();
Assert.assertTrue("close should preserve interrupted status",
Thread.interrupted());
Thread worker = readWorkerThread(manager);
Assert.assertTrue("worker should still be tracked after incomplete close",
worker != null && worker.isAlive());
Assert.assertTrue("path scratch was freed while worker was still alive",
readPathScratchImpl(manager) != 0L);

releaseWorker.countDown();
manager.close();
managerClosed = true;
Assert.assertNull("successful close should clear workerThread",
readWorkerThread(manager));
Assert.assertEquals("successful close should free path scratch",
0L, readPathScratchImpl(manager));
if (hookErr.get() != null) {
throw new AssertionError("install hook failed", hookErr.get());
}
} finally {
manager.setBeforeInstallSyncHook(null);
releaseWorker.countDown();
if (!managerClosed) {
Thread.interrupted();
manager.close();
}
ring.close();
}
});
}

private static void cleanupRecursively(String dir) {
if (!Files.exists(dir)) return;
long find = Files.findFirst(dir);
Expand All @@ -152,4 +228,22 @@ private static void cleanupRecursively(String dir) {
Files.findClose(find);
}
}

private static long readPathScratchImpl(SegmentManager manager) throws Exception {
Field pathScratchF = SegmentManager.class.getDeclaredField("pathScratch");
pathScratchF.setAccessible(true);
DirectUtf8Sink pathScratch = (DirectUtf8Sink) pathScratchF.get(manager);
Field sinkF = DirectUtf8Sink.class.getDeclaredField("sink");
sinkF.setAccessible(true);
DirectByteSink sink = (DirectByteSink) sinkF.get(pathScratch);
Field implF = DirectByteSink.class.getDeclaredField("impl");
implF.setAccessible(true);
return implF.getLong(sink);
}

private static Thread readWorkerThread(SegmentManager manager) throws Exception {
Field workerThreadF = SegmentManager.class.getDeclaredField("workerThread");
workerThreadF.setAccessible(true);
return (Thread) workerThreadF.get(manager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testInstallPathDoesNotCommitAfterDeregister() throws Exception {
CountDownLatch hookDone = new CountDownLatch(1);
AtomicBoolean fired = new AtomicBoolean();
AtomicReference<Throwable> hookErr = new AtomicReference<>();
setBeforeInstallSyncHook(mgr, () -> {
mgr.setBeforeInstallSyncHook(() -> {
if (!fired.compareAndSet(false, true)) return;
try {
mgr.deregister(ring);
Expand Down Expand Up @@ -170,7 +170,7 @@ public void testInstallPathDoesNotCommitAfterDeregister() throws Exception {
+ "under the same lock that covers deregister.",
0L, observed);
} finally {
setBeforeInstallSyncHook(mgr, null);
mgr.setBeforeInstallSyncHook(null);
try {
ring.close();
} catch (Throwable ignored) {
Expand Down Expand Up @@ -226,12 +226,6 @@ private static void rmDirRecursive(String dir) {
Files.remove(dir);
}

private static void setBeforeInstallSyncHook(SegmentManager mgr, Runnable hook) throws Exception {
Field f = SegmentManager.class.getDeclaredField("beforeInstallSyncHook");
f.setAccessible(true);
f.set(mgr, hook);
}

private static Thread workerThread(SegmentManager mgr) throws Exception {
Field f = SegmentManager.class.getDeclaredField("workerThread");
f.setAccessible(true);
Expand Down
Loading
Loading