diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorSendEngine.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorSendEngine.java index 1585d09d..e23b2252 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorSendEngine.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorSendEngine.java @@ -284,20 +284,18 @@ private CursorSendEngine(String sfDir, long segmentSizeBytes, SegmentManager man this.ring = ringInProgress; this.watermark = watermarkInProgress; } catch (Throwable t) { - // Order: ring first (releases mmap/fd), then manager (joins - // worker thread, but only if we started it AND we own it), - // then watermark (releases its own mmap/fd), then slot lock. - // Each in its own try/catch so a single failure doesn't - // strand later cleanups. - if (ringInProgress != null) { + // Stop an owned manager before freeing the ring and watermark it may + // touch, then release the slot lock. Each cleanup is in its own + // try/catch so a single failure doesn't strand later cleanups. + if (ownsManager && managerStarted) { try { - ringInProgress.close(); + manager.close(); } catch (Throwable ignored) { } } - if (ownsManager && managerStarted) { + if (ringInProgress != null) { try { - manager.close(); + ringInProgress.close(); } catch (Throwable ignored) { } } diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java index ce2ff990..2519a002 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java @@ -30,6 +30,7 @@ import io.questdb.client.std.ObjList; import io.questdb.client.std.QuietCloseable; import io.questdb.client.std.str.DirectUtf8Sink; +import org.jetbrains.annotations.TestOnly; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,17 +87,18 @@ public final class SegmentManager implements QuietCloseable { private final long segmentSizeBytes; // Test seam: runs on the worker thread just before the install path's // synchronized(lock) entry (the one that performs installHotSpare + the - // totalBytes += segmentSize commit). Null in production; only - // SegmentManagerInstallDeregisterRaceTest installs it, to deterministically - // inject a deregister(ring) call into the exact race window that the - // stillRegistered guard inside the install block closes. - volatile Runnable beforeInstallSyncHook; + // totalBytes += segmentSize commit). Null in production; tests use it to + // pause after the worker has snapshotted a RingEntry and created a spare, + // but before ownership/accounting commit. Callers may inject a deregister + // or hold this stale worker snapshot while caller-side cleanup runs. + private volatile Runnable beforeInstallSyncHook; // Test seam: runs on the worker thread just before the trim block's // synchronized(lock) entry. Null in production; only // SegmentManagerTrimDeregisterRaceTest installs it, to deterministically // inject a deregister(ring) call into the exact race window that the - // stillRegistered guard inside the trim block closes. - volatile Runnable beforeTrimSyncHook; + // registered flag check inside the trim block closes for watermark writes + // and totalBytes accounting. + private volatile Runnable beforeTrimSyncHook; private long lastDiskFullLogNs; private volatile boolean running; // Total bytes currently allocated across every segment owned by every @@ -122,23 +124,23 @@ public SegmentManager(long segmentSizeBytes, long pollNanos) { * Full constructor. * * @param segmentSizeBytes per-segment file size in bytes - * @param pollNanos how often the worker polls each registered ring; - * default {@link #DEFAULT_POLL_NANOS} - * @param maxTotalBytes upper bound on total bytes the manager tracks - * across all registered rings — counts every segment - * the ring owns (initial active + sealed + hot - * spare), including bytes already on disk at - * register-time (e.g. after recovery or orphan - * adoption). When provisioning a hot spare would - * exceed this, the manager skips the install — the - * requesting ring stays in the - * {@link SegmentRing#BACKPRESSURE_NO_SPARE} state - * until ACK-driven trim frees space. Pass - * {@link #UNLIMITED_TOTAL_BYTES} to disable. Must be - * at least one {@code segmentSizeBytes}; a sensible - * lower bound for a single ring is - * {@code 2 × segmentSizeBytes} so the manager can - * hold an initial active plus one hot spare. + * @param pollNanos how often the worker polls each registered ring; + * default {@link #DEFAULT_POLL_NANOS} + * @param maxTotalBytes upper bound on total bytes the manager tracks + * across all registered rings — counts every segment + * the ring owns (initial active + sealed + hot + * spare), including bytes already on disk at + * register-time (e.g. after recovery or orphan + * adoption). When provisioning a hot spare would + * exceed this, the manager skips the install — the + * requesting ring stays in the + * {@link SegmentRing#BACKPRESSURE_NO_SPARE} state + * until ACK-driven trim frees space. Pass + * {@link #UNLIMITED_TOTAL_BYTES} to disable. Must be + * at least one {@code segmentSizeBytes}; a sensible + * lower bound for a single ring is + * {@code 2 × segmentSizeBytes} so the manager can + * hold an initial active plus one hot spare. */ public SegmentManager(long segmentSizeBytes, long pollNanos, long maxTotalBytes) { if (segmentSizeBytes < MmapSegment.HEADER_SIZE + MmapSegment.FRAME_HEADER_SIZE + 1) { @@ -157,18 +159,26 @@ public SegmentManager(long segmentSizeBytes, long pollNanos, long maxTotalBytes) @Override public synchronized void close() { running = false; - if (workerThread != null) { - LockSupport.unpark(workerThread); + Thread t = workerThread; + if (t != null) { + LockSupport.unpark(t); try { - workerThread.join(5_000); + t.join(5_000); } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } + if (t.isAlive()) { + LOG.warn("SegmentManager worker did not stop before close wait completed; " + + "leaving worker-owned resources allocated"); + return; + } workerThread = null; } - // Free the rotation-path native scratch buffer. Safe to do here - // (after the worker has joined) since the buffer is only touched - // on the worker thread. + // Free the rotation-path native scratch buffer only after worker + // termination has been observed. The worker is the only thread that + // touches the buffer, but close() uses a bounded join; if the worker is + // still alive, leaking this one scratch allocation is safer than freeing + // native memory it may still read or write. pathScratch.close(); } @@ -181,7 +191,8 @@ public synchronized void close() { public void deregister(SegmentRing ring) { synchronized (lock) { for (int i = 0, n = rings.size(); i < n; i++) { - if (rings.get(i).ring == ring) { + RingEntry e = rings.get(i); + if (e.ring == ring) { // Reverse the ring's contribution to totalBytes — // mirrors the seed in register(). Any spares the // manager provisioned during the ring's lifetime @@ -189,6 +200,7 @@ public void deregister(SegmentRing ring) { // single subtraction covers both the initial seed // and the net manager activity (provisions minus // trims) for this ring. + e.registered = false; totalBytes -= ring.totalSegmentBytes(); rings.remove(i); return; @@ -222,31 +234,30 @@ public void register(SegmentRing ring, String dir) { * {@code lowestSurvivingBaseSeq - 1}. */ public void register(SegmentRing ring, String dir, AckWatermark watermark) { + // Account for bytes the ring already owns when it joins. A recovered + // ring (post-restart, orphan adoption) can come up at-or-above the cap; + // without this seed, totalBytes stays at 0 and the per-tick cap check + // at serviceRing would let the manager keep provisioning new spares on + // top of the recovered set, effectively doubling the documented cap. + long ringBytes = ring.totalSegmentBytes(); + // Skip the file-generation counter past whatever's already on disk in + // this slot. Without this, on recovery the manager would mint a new + // spare at sf-0000000000000000.sfa — and openCleanRW would truncate the + // user's existing active file out from under the I/O loop, scrambling + // the in-flight mmap. Memory-mode rings have no dir; nothing to scan. + long minNextGeneration = dir == null ? -1L : scanMaxGeneration(dir) + 1L; + Runnable managerWakeup = this::wakeWorker; + RingEntry e = new RingEntry(ring, dir, watermark); + // ObjList.add either throws before storing e or makes the entry visible. + // Once visible, only non-throwing state commits may remain. synchronized (lock) { - rings.add(new RingEntry(ring, dir, watermark)); - // Account for bytes the ring already owns when it joins. A - // recovered ring (post-restart, orphan adoption) can come up - // at-or-above the cap; without this seed, totalBytes stays - // at 0 and the per-tick cap check at serviceRing would let - // the manager keep provisioning new spares on top of the - // recovered set, effectively doubling the documented cap. - totalBytes += ring.totalSegmentBytes(); - // Skip the file-generation counter past whatever's already on - // disk in this slot. Without this, on recovery the manager - // would mint a new spare at sf-0000000000000000.sfa — and - // openCleanRW would truncate the user's existing active file - // out from under the I/O loop, scrambling the in-flight mmap. - // Memory-mode rings have no dir; nothing to scan there. if (dir != null) { - long minNext = scanMaxGeneration(dir) + 1L; - while (true) { - long cur = fileGeneration.get(); - if (cur >= minNext) break; - if (fileGeneration.compareAndSet(cur, minNext)) break; - } + advanceFileGeneration(minNextGeneration); } + rings.add(e); + totalBytes += ringBytes; } - ring.setManagerWakeup(this::wakeWorker); + ring.setManagerWakeup(managerWakeup); // Nudge the worker so it picks up the new ring on its very next // iteration. Without this, register-after-start has a race window: // start() schedules the worker thread, and if that thread reaches @@ -260,6 +271,16 @@ public void register(SegmentRing ring, String dir, AckWatermark watermark) { wakeWorker(); } + @TestOnly + public void setBeforeInstallSyncHook(Runnable hook) { + this.beforeInstallSyncHook = hook; + } + + @TestOnly + public void setBeforeTrimSyncHook(Runnable hook) { + this.beforeTrimSyncHook = hook; + } + public synchronized void start() { if (workerThread != null) { throw new IllegalStateException("already started"); @@ -322,6 +343,14 @@ private static long scanMaxGeneration(String dir) { return max; } + private void advanceFileGeneration(long minNext) { + while (true) { + long cur = fileGeneration.get(); + if (cur >= minNext) break; + if (fileGeneration.compareAndSet(cur, minNext)) break; + } + } + /** * Spare files are named with a JVM-wide monotonic generation counter * rather than a baseSeq-derived name, because the spare's baseSeq is @@ -411,19 +440,12 @@ private void serviceRing(RingEntry e) { // spare, since it wasn't installed yet) so a commit at // this point would inflate totalBytes by one segment // with no future subtractor. By holding `lock` across - // installHotSpare AND the += commit AND the still- - // registered check, deregister is forced to either + // installHotSpare AND the += commit AND the registration + // check, deregister is forced to either // observe the spare in the ring (and subtract it) or // run before installation (so no install happens). synchronized (lock) { - boolean stillRegistered = false; - for (int i = 0, n = rings.size(); i < n; i++) { - if (rings.get(i) == e) { - stillRegistered = true; - break; - } - } - if (stillRegistered) { + if (e.registered) { e.ring.installHotSpare(spare); totalBytes += segmentSizeBytes; installed = true; @@ -458,74 +480,47 @@ private void serviceRing(RingEntry e) { // memory-mode rings, "trim" is just close() (Unsafe.free) — no // file to unlink. // - // drainTrimmable + the totalBytes commit run together under - // `lock` so deregister cannot observe the intermediate state - // where segments have left ring.sealedSegments but the worker - // has not yet subtracted their bytes. Two interleavings would - // otherwise drift the counter: - // (a) deregister snapshots ring.totalSegmentBytes() before - // drainTrimmable mutates the ring → deregister subtracts - // including the about-to-be-drained bytes, then this - // loop subtracts them again. Drift: -drained. - // (b) drainTrimmable runs first (without holding lock) so - // ring.totalSegmentBytes() at deregister-time is already - // short by the drained bytes; if the worker then skips - // the subtract on a stillRegistered=false check, those - // bytes are never accounted. Drift: +drained. - // Atomic drain+commit under lock collapses both windows: any - // deregister sees either the pre-drain ring (with everything - // still counted) or the post-drain ring with the worker's - // subtraction already applied. Mirrors the spare-install - // path's stillRegistered guard above. + // The watermark write and totalBytes commit are registration-gated + // under `lock` so stale worker snapshots cannot touch the + // engine-owned watermark or mutate accounting after deregister() + // returns. drainTrimmable still runs for stale snapshots: it + // transfers ownership of fully-acked sealed segments to this + // worker, preserving the old close + unlink behavior. // // munmap + unlink stay outside the lock — they can be slow // and shouldn't block register/deregister or sibling rings. - // Persist the current ackedFsn watermark BEFORE the trim runs. - // On host crash between the persist and the unlinks below, the - // segments survive and the watermark is correct. On crash AFTER - // the unlinks but before next tick, the segments are gone and - // the watermark is stale, but recovery clamps with - // max(lowestSurvivingBaseSeq - 1, watermark) so either ordering - // is safe. Memory-mode rings (and callers that didn't supply a - // watermark) skip the write. - // Persist only on advance to avoid pointless mmap stores when - // ackedFsn is steady. The store is a single 8-byte put against - // an already-mapped region -- no syscall, no allocation -- but - // the gate keeps the dirty-page footprint minimal under - // steady-state load with no new acks arriving. - if (e.watermark != null) { - long currentAck = e.ring.ackedFsn(); - if (currentAck > e.lastPersistedAck) { - e.watermark.write(currentAck); - e.lastPersistedAck = currentAck; - } - } ObjList trim; Runnable hook = beforeTrimSyncHook; if (hook != null) { hook.run(); } synchronized (lock) { - trim = e.ring.drainTrimmable(); - if (trim != null) { - boolean stillRegistered = false; - for (int i = 0, n = rings.size(); i < n; i++) { - if (rings.get(i) == e) { - stillRegistered = true; - break; - } + boolean registered = e.registered; + // Persist the current ackedFsn watermark BEFORE the trim runs. + // On host crash between the persist and the unlinks below, the + // segments survive and the watermark is correct. On crash AFTER + // the unlinks but before next tick, the segments are gone and + // the watermark is stale, but recovery clamps with + // max(lowestSurvivingBaseSeq - 1, watermark) so either ordering + // is safe. Memory-mode rings (and callers that didn't supply a + // watermark) skip the write. + // Persist only on advance to avoid pointless mmap stores when + // ackedFsn is steady. The store is a single 8-byte put against + // an already-mapped region -- no syscall, no allocation -- but + // the gate keeps the dirty-page footprint minimal under + // steady-state load with no new acks arriving. + if (registered && e.watermark != null) { + long currentAck = e.ring.ackedFsn(); + if (currentAck > e.lastPersistedAck) { + e.watermark.write(currentAck); + e.lastPersistedAck = currentAck; } - if (stillRegistered) { - for (int i = 0, n = trim.size(); i < n; i++) { - totalBytes -= trim.get(i).sizeBytes(); - } + } + trim = e.ring.drainTrimmable(); + if (registered && trim != null) { + for (int i = 0, n = trim.size(); i < n; i++) { + totalBytes -= trim.get(i).sizeBytes(); } - // else: deregister already subtracted these bytes via - // ring.totalSegmentBytes() (the drained segments were - // still in sealedSegments at that read), so subtracting - // again here would double-count. The segments are still - // ours to close + unlink below — drainTrimmable has - // already transferred ownership. } } if (trim != null) { @@ -581,6 +576,11 @@ private static final class RingEntry { // Survives across multiple serviceRing ticks and avoids a // write-storm when ackedFsn is steady. long lastPersistedAck = -1L; + // Guarded by SegmentManager.lock. A worker snapshot may retain this + // entry after deregister() removes it from rings; registered=false is + // the O(1) ownership check that prevents post-deregister writes through + // the engine-owned watermark, hot-spare installs, and accounting. + boolean registered = true; RingEntry(SegmentRing ring, String dir, AckWatermark watermark) { this.ring = ring; diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorSendEngineTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorSendEngineTest.java index 7411f07e..f4de1ff2 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorSendEngineTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorSendEngineTest.java @@ -28,6 +28,8 @@ import io.questdb.client.cutlass.qwp.client.sf.cursor.AckWatermark; import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorSendEngine; import io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment; +import io.questdb.client.cutlass.qwp.client.sf.cursor.SegmentManager; +import io.questdb.client.cutlass.qwp.client.sf.cursor.SlotLock; import io.questdb.client.std.Files; import io.questdb.client.std.MemoryTag; import io.questdb.client.std.ObjList; @@ -37,7 +39,12 @@ import org.junit.Before; import org.junit.Test; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -190,6 +197,50 @@ public void testCloseIsIdempotent() throws Exception { }); } + @Test + public void testConstructorFailureAfterOwnedManagerStartCleansResources() throws Exception { + TestUtils.assertMemoryLeak(() -> { + SegmentManager manager = new SegmentManager(4096); + poisonRegisterGeneration(manager); + + Throwable thrown = invokeOwnedPrivateConstructorExpectingFailure(tmpDir, 4096, manager); + assertTrue("register sabotage should surface from constructor catch: " + thrown, + thrown instanceof NullPointerException); + + assertNull("owned manager worker must be stopped by constructor catch", + workerThread(manager)); + assertSlotCanBeReacquired(tmpDir); + }); + } + + @Test + public void testConstructorFailureWithSharedManagerReleasesSlotButKeepsManagerRunning() throws Exception { + TestUtils.assertMemoryLeak(() -> { + SegmentManager manager = new SegmentManager(4096); + try { + manager.start(); + Thread originalWorker = workerThread(manager); + assertNotNull("shared manager must be running before constructor", originalWorker); + assertTrue("shared manager worker must be alive before constructor", + originalWorker.isAlive()); + + poisonRegisterGeneration(manager); + Throwable thrown = invokeSharedConstructorExpectingFailure(tmpDir, 4096, manager); + assertTrue("register sabotage should surface from constructor catch: " + thrown, + thrown instanceof NullPointerException); + + Thread stillOwnedByCaller = workerThread(manager); + assertNotNull("constructor catch must not close caller-owned manager", + stillOwnedByCaller); + assertTrue("caller-owned manager worker must remain alive", + stillOwnedByCaller.isAlive()); + assertSlotCanBeReacquired(tmpDir); + } finally { + manager.close(); + } + }); + } + @Test public void testMemoryModeSkipsDirAndStillWorks() throws Exception { TestUtils.assertMemoryLeak(() -> { @@ -283,6 +334,53 @@ public void testRecoveryIgnoresWatermarkAbovePublishedFsn() throws Exception { }); } + @Test(timeout = 30_000L) + public void testManagerPersistedWatermarkSurvivesRestart() throws Exception { + // Positive twin of testRecoveryAdvancesAckedFsnPastWatermark. That test + // FORGES the .ack-watermark by hand; this one drives a real, started + // SegmentManager to PERSIST it from real acks, then proves a second + // session recovers the manager-written value. Without this, a regression + // that silently stopped the manager's trim-path watermark.write() (e.g. + // an inverted `registered` gate) would pass the whole suite: the durable- + // ack tests assert on the in-memory engine.ackedFsn(), and the recovery + // tests forge the watermark, so nothing observes the manager doing the + // write. + TestUtils.assertMemoryLeak(() -> { + long segSize = MmapSegment.HEADER_SIZE + + 4 * (MmapSegment.FRAME_HEADER_SIZE + 64); + long buf = Unsafe.malloc(64, MemoryTag.NATIVE_DEFAULT); + try { + // Session 1: four frames (publishedFsn = 3), partially acked at 2. + // The ack is below publishedFsn so close() does not treat the slot + // as fully drained — segments and watermark survive for recovery. + // All four frames stay in the active segment, so nothing is + // trimmed and the segment-derived recovery seed is + // lowestBase - 1 == -1; the manager-written watermark (2) is the + // only thing that can lift the recovered ackedFsn above it. + try (CursorSendEngine engine = new CursorSendEngine(tmpDir, segSize)) { + for (int i = 0; i < 4; i++) { + engine.appendBlocking(buf, 64); + } + assertTrue("ack must advance", engine.acknowledge(2L)); + // Block until the background worker has actually written the + // watermark to disk. If the trim-path write were gated off this + // never reaches 2 and the helper fails with a clear message, + // rather than the test flaking on a close()-before-tick race. + awaitManagerPersistedWatermark(tmpDir, 2L); + } + // Session 2: recovery must seed ackedFsn from the manager-written + // watermark (2), not the bare segment-derived seed (-1). + try (CursorSendEngine engine = new CursorSendEngine(tmpDir, segSize)) { + assertEquals("recovery must consume the manager-persisted watermark", + 2L, engine.ackedFsn()); + assertEquals(3L, engine.publishedFsn()); + } + } finally { + Unsafe.free(buf, 64, MemoryTag.NATIVE_DEFAULT); + } + }); + } + @Test public void testRestartIntoNonEmptySfDirContinuesFsnSequence() throws Exception { TestUtils.assertMemoryLeak(() -> { @@ -500,4 +598,73 @@ public void testWasRecoveredFromDiskTrueOnReopen() throws Exception { } }); } + + private static void assertSlotCanBeReacquired(String sfDir) { + try (SlotLock ignored = SlotLock.acquire(sfDir)) { + // good + } + } + + private static Throwable invokeOwnedPrivateConstructorExpectingFailure( + String sfDir, long segmentSizeBytes, SegmentManager manager) throws Exception { + Constructor ctor = CursorSendEngine.class.getDeclaredConstructor( + String.class, long.class, SegmentManager.class, boolean.class, long.class); + ctor.setAccessible(true); + try { + ctor.newInstance(sfDir, segmentSizeBytes, manager, true, + CursorSendEngine.DEFAULT_APPEND_DEADLINE_NANOS); + fail("expected constructor failure"); + return null; + } catch (InvocationTargetException e) { + return e.getCause(); + } + } + + private static Throwable invokeSharedConstructorExpectingFailure( + String sfDir, long segmentSizeBytes, SegmentManager manager) { + try { + new CursorSendEngine(sfDir, segmentSizeBytes, manager); + fail("expected constructor failure"); + return null; + } catch (Throwable t) { + return t; + } + } + + private static void poisonRegisterGeneration(SegmentManager manager) throws Exception { + // register() advances fileGeneration before publishing the ring. Nulling + // it forces a deterministic constructor failure after the ring and + // watermark exist, without adding a production test hook. + Field f = SegmentManager.class.getDeclaredField("fileGeneration"); + f.setAccessible(true); + f.set(manager, null); + } + + private static Thread workerThread(SegmentManager manager) throws Exception { + Field f = SegmentManager.class.getDeclaredField("workerThread"); + f.setAccessible(true); + return (Thread) f.get(manager); + } + + // Polls the on-disk watermark until it reads {@code expected}, or fails after + // a bounded wait. The probe is a second mapping of the same file the manager + // worker writes through; its MAP_SHARED reads observe the worker's writes, and + // it is closed before the next session opens the slot. + private static void awaitManagerPersistedWatermark(String slotDir, long expected) { + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); + long last = AckWatermark.INVALID; + try (AckWatermark probe = AckWatermark.open(slotDir)) { + assertNotNull("watermark file must exist after register", probe); + while (System.nanoTime() < deadline) { + last = probe.read(); + if (last == expected) { + return; + } + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(2)); + } + } + fail("manager did not persist watermark=" + expected + + " within 10s (last on-disk read=" + last + ")"); + } + } diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerCloseRaceTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerCloseRaceTest.java index 945f598b..384cd7ad 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerCloseRaceTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerCloseRaceTest.java @@ -27,7 +27,9 @@ import io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment; import io.questdb.client.cutlass.qwp.client.sf.cursor.SegmentManager; import io.questdb.client.cutlass.qwp.client.sf.cursor.SegmentRing; +import io.questdb.client.std.bytes.DirectByteSink; import io.questdb.client.std.Files; +import io.questdb.client.std.str.DirectUtf8Sink; import io.questdb.client.test.tools.TestUtils; import org.junit.After; import org.junit.Assert; @@ -36,6 +38,10 @@ import java.lang.reflect.Field; import java.nio.file.Paths; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** * Concurrent regression for the {@code SegmentManager} worker race vs @@ -130,6 +136,76 @@ public void testManagerDoesNotInstallSpareIntoClosedRing() throws Exception { }); } + @Test(timeout = 15_000L) + public void testCloseDoesNotFreePathScratchWhenWorkerStillAlive() throws Exception { + TestUtils.assertMemoryLeak(() -> { + long segSize = MmapSegment.HEADER_SIZE + (MmapSegment.FRAME_HEADER_SIZE + 32); + String slot = tmpDir + "/timeout-slot"; + Assert.assertEquals(0, Files.mkdir(slot, Files.DIR_MODE_DEFAULT)); + MmapSegment initial = MmapSegment.create(slot + "/sf-initial.sfa", 0L, segSize); + SegmentRing ring = new SegmentRing(initial, segSize); + SegmentManager manager = new SegmentManager(segSize, TimeUnit.SECONDS.toNanos(60)); + CountDownLatch workerBlocked = new CountDownLatch(1); + CountDownLatch releaseWorker = new CountDownLatch(1); + AtomicBoolean fired = new AtomicBoolean(); + AtomicReference hookErr = new AtomicReference<>(); + boolean managerClosed = false; + try { + manager.register(ring, slot); + manager.setBeforeInstallSyncHook(() -> { + if (!fired.compareAndSet(false, true)) return; + workerBlocked.countDown(); + try { + if (!releaseWorker.await(10, TimeUnit.SECONDS)) { + hookErr.compareAndSet(null, + new AssertionError("timed out waiting for test to release worker")); + } + } catch (Throwable t) { + hookErr.compareAndSet(null, t); + } + }); + manager.start(); + Assert.assertTrue("worker did not reach install hook", + workerBlocked.await(5, TimeUnit.SECONDS)); + Assert.assertTrue("precondition: path scratch should be allocated", + readPathScratchImpl(manager) != 0L); + + // Exercise the same branch as a timed-out join without making + // the test sleep for 5 seconds: join() returns while the worker + // is still alive. close() must leave worker-owned native memory + // alone so the worker can resume safely. + Thread.currentThread().interrupt(); + manager.close(); + Assert.assertTrue("close should preserve interrupted status", + Thread.interrupted()); + Thread worker = readWorkerThread(manager); + Assert.assertTrue("worker should still be tracked after incomplete close", + worker != null && worker.isAlive()); + Assert.assertTrue("path scratch was freed while worker was still alive", + readPathScratchImpl(manager) != 0L); + + releaseWorker.countDown(); + manager.close(); + managerClosed = true; + Assert.assertNull("successful close should clear workerThread", + readWorkerThread(manager)); + Assert.assertEquals("successful close should free path scratch", + 0L, readPathScratchImpl(manager)); + if (hookErr.get() != null) { + throw new AssertionError("install hook failed", hookErr.get()); + } + } finally { + manager.setBeforeInstallSyncHook(null); + releaseWorker.countDown(); + if (!managerClosed) { + Thread.interrupted(); + manager.close(); + } + ring.close(); + } + }); + } + private static void cleanupRecursively(String dir) { if (!Files.exists(dir)) return; long find = Files.findFirst(dir); @@ -152,4 +228,22 @@ private static void cleanupRecursively(String dir) { Files.findClose(find); } } + + private static long readPathScratchImpl(SegmentManager manager) throws Exception { + Field pathScratchF = SegmentManager.class.getDeclaredField("pathScratch"); + pathScratchF.setAccessible(true); + DirectUtf8Sink pathScratch = (DirectUtf8Sink) pathScratchF.get(manager); + Field sinkF = DirectUtf8Sink.class.getDeclaredField("sink"); + sinkF.setAccessible(true); + DirectByteSink sink = (DirectByteSink) sinkF.get(pathScratch); + Field implF = DirectByteSink.class.getDeclaredField("impl"); + implF.setAccessible(true); + return implF.getLong(sink); + } + + private static Thread readWorkerThread(SegmentManager manager) throws Exception { + Field workerThreadF = SegmentManager.class.getDeclaredField("workerThread"); + workerThreadF.setAccessible(true); + return (Thread) workerThreadF.get(manager); + } } diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTotalBytesRaceTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTotalBytesRaceTest.java index ed884f25..431dc25e 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTotalBytesRaceTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTotalBytesRaceTest.java @@ -122,7 +122,7 @@ public void testInstallPathDoesNotCommitAfterDeregister() throws Exception { CountDownLatch hookDone = new CountDownLatch(1); AtomicBoolean fired = new AtomicBoolean(); AtomicReference hookErr = new AtomicReference<>(); - setBeforeInstallSyncHook(mgr, () -> { + mgr.setBeforeInstallSyncHook(() -> { if (!fired.compareAndSet(false, true)) return; try { mgr.deregister(ring); @@ -170,7 +170,7 @@ public void testInstallPathDoesNotCommitAfterDeregister() throws Exception { + "under the same lock that covers deregister.", 0L, observed); } finally { - setBeforeInstallSyncHook(mgr, null); + mgr.setBeforeInstallSyncHook(null); try { ring.close(); } catch (Throwable ignored) { @@ -226,12 +226,6 @@ private static void rmDirRecursive(String dir) { Files.remove(dir); } - private static void setBeforeInstallSyncHook(SegmentManager mgr, Runnable hook) throws Exception { - Field f = SegmentManager.class.getDeclaredField("beforeInstallSyncHook"); - f.setAccessible(true); - f.set(mgr, hook); - } - private static Thread workerThread(SegmentManager mgr) throws Exception { Field f = SegmentManager.class.getDeclaredField("workerThread"); f.setAccessible(true); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTrimDeregisterRaceTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTrimDeregisterRaceTest.java index 3648bab2..85efd40c 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTrimDeregisterRaceTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTrimDeregisterRaceTest.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; /** @@ -143,7 +144,7 @@ public void testTrimPathDoesNotDoubleSubtractAfterDeregister() throws Exception AtomicBoolean fired = new AtomicBoolean(); CountDownLatch hookDone = new CountDownLatch(1); AtomicReference hookErr = new AtomicReference<>(); - setBeforeTrimSyncHook(mgr, () -> { + mgr.setBeforeTrimSyncHook(() -> { if (!fired.compareAndSet(false, true)) return; try { mgr.deregister(ring); @@ -185,8 +186,14 @@ public void testTrimPathDoesNotDoubleSubtractAfterDeregister() throws Exception + "`totalBytes -= sz` on a stillRegistered re-check " + "under the same lock that covers deregister.", 0L, observed); + assertFalse("stale SegmentManager snapshot skipped drainTrimmable() " + + "after deregister and left a fully-acked sealed " + + "segment on disk. The registration guard should " + + "protect watermark/accounting only; trim ownership " + + "transfer must still close and unlink " + activePath, + Files.exists(activePath)); } finally { - setBeforeTrimSyncHook(mgr, null); + mgr.setBeforeTrimSyncHook(null); Unsafe.free(buf, 32, MemoryTag.NATIVE_DEFAULT); try { ring.close(); @@ -220,12 +227,6 @@ private static void awaitSpare(SegmentRing ring, String where) { } } - private static void setBeforeTrimSyncHook(SegmentManager mgr, Runnable hook) throws Exception { - Field f = SegmentManager.class.getDeclaredField("beforeTrimSyncHook"); - f.setAccessible(true); - f.set(mgr, hook); - } - private static long readTotalBytes(SegmentManager mgr) throws Exception { Field f = SegmentManager.class.getDeclaredField("totalBytes"); f.setAccessible(true); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerWatermarkDeregisterRaceTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerWatermarkDeregisterRaceTest.java new file mode 100644 index 00000000..29794b68 --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerWatermarkDeregisterRaceTest.java @@ -0,0 +1,206 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.cutlass.qwp.client.sf.cursor; + +import io.questdb.client.cutlass.qwp.client.sf.cursor.AckWatermark; +import io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment; +import io.questdb.client.cutlass.qwp.client.sf.cursor.SegmentManager; +import io.questdb.client.cutlass.qwp.client.sf.cursor.SegmentRing; +import io.questdb.client.std.Files; +import io.questdb.client.std.MemoryTag; +import io.questdb.client.std.Unsafe; +import io.questdb.client.test.tools.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.nio.file.Paths; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Deterministic regression for the stale snapshot path where the manager worker + * keeps servicing a snapshotted ring entry after + * {@link SegmentManager#deregister(SegmentRing)} has removed it from the live + * registry. + * + *

This is intentionally a crash-capable regression. The worker is stopped + * at the stale-snapshot window, the test deregisters the ring and releases the + * watermark mapping, then the worker is allowed to continue. Correct code + * observes {@code registered=false} and never calls {@code watermark.write()}. + * Buggy code writes through the stale {@link AckWatermark} object into an + * unmapped page and should take down the test JVM. + */ +public class SegmentManagerWatermarkDeregisterRaceTest { + + private String tmpDir; + + @Before + public void setUp() { + tmpDir = Paths.get(System.getProperty("java.io.tmpdir"), + "qdb-segmgr-watermark-race-" + System.nanoTime()).toString(); + assertEquals(0, Files.mkdir(tmpDir, Files.DIR_MODE_DEFAULT)); + } + + @After + public void tearDown() { + if (tmpDir == null) return; + rmDirRecursive(tmpDir); + } + + @Test(timeout = 15_000L) + public void testStaleWorkerDoesNotWriteThroughUnmappedWatermarkAfterDeregister() throws Exception { + TestUtils.assertMemoryLeak(() -> { + long segSize = MmapSegment.HEADER_SIZE + + 4 * (MmapSegment.FRAME_HEADER_SIZE + 32); + String slot = tmpDir + "/single-ring"; + assertEquals(0, Files.mkdir(slot, Files.DIR_MODE_DEFAULT)); + + MmapSegment seg0 = MmapSegment.create(slot + "/sf-initial.sfa", 0L, segSize); + SegmentRing ring = new SegmentRing(seg0, segSize); + AckWatermark watermark = AckWatermark.open(slot); + assertNotNull(watermark); + SegmentManager mgr = new SegmentManager(segSize, TimeUnit.SECONDS.toNanos(60)); + long buf = Unsafe.malloc(32, MemoryTag.NATIVE_DEFAULT); + boolean managerClosed = false; + boolean watermarkStorageReleased = false; + CountDownLatch workerAtStaleSnapshot = new CountDownLatch(1); + CountDownLatch resumeWorker = new CountDownLatch(1); + try { + Unsafe.getUnsafe().putInt(buf, 42); + assertEquals(0L, ring.appendOrFsn(buf, 32)); + assertTrue("precondition: ack must advance", + ring.acknowledge(0L)); + + mgr.register(ring, slot, watermark); + + AtomicBoolean fired = new AtomicBoolean(); + AtomicReference hookErr = new AtomicReference<>(); + mgr.setBeforeInstallSyncHook(() -> { + if (!fired.compareAndSet(false, true)) return; + try { + workerAtStaleSnapshot.countDown(); + if (!resumeWorker.await(10, TimeUnit.SECONDS)) { + hookErr.compareAndSet(null, + new AssertionError("timed out waiting for test to release stale worker")); + } + } catch (Throwable t) { + hookErr.compareAndSet(null, t); + } + }); + + // The first manager tick snapshots the registered entry, creates + // a spare, and stops before the install block. The main test + // thread now performs the production-close sequence's dangerous + // part: deregister, then release the engine-owned watermark + // mapping while the stale worker snapshot is still live. + mgr.start(); + + assertTrue("install hook never fired", + workerAtStaleSnapshot.await(5, TimeUnit.SECONDS)); + mgr.deregister(ring); + // Do not call watermark.close(): synchronizing the worker with + // a latch after close() would give it a happens-before edge to + // closed=true, masking the original bug's plain-boolean guard. + // Releasing the mmap/fd directly leaves the object in the stale + // state that a racing worker is allowed to observe. + releaseWatermarkStorageButLeaveObjectWritable(watermark); + watermarkStorageReleased = true; + resumeWorker.countDown(); + if (hookErr.get() != null) { + throw new AssertionError("install hook failed", hookErr.get()); + } + mgr.close(); + managerClosed = true; + } finally { + mgr.setBeforeInstallSyncHook(null); + if (!watermarkStorageReleased) { + resumeWorker.countDown(); + } + if (!managerClosed) { + mgr.close(); + } + try { + ring.close(); + } catch (Throwable ignored) { + // best-effort + } + if (!watermarkStorageReleased) { + try { + watermark.close(); + } catch (Throwable ignored) { + // best-effort + } + } + Unsafe.free(buf, 32, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + private static void releaseWatermarkStorageButLeaveObjectWritable(AckWatermark watermark) throws Exception { + Field mmapAddressF = AckWatermark.class.getDeclaredField("mmapAddress"); + mmapAddressF.setAccessible(true); + long mmapAddress = mmapAddressF.getLong(watermark); + if (mmapAddress != 0L && mmapAddress != Files.FAILED_MMAP_ADDRESS) { + Files.munmap(mmapAddress, AckWatermark.FILE_SIZE, MemoryTag.MMAP_DEFAULT); + } + + Field fdF = AckWatermark.class.getDeclaredField("fd"); + fdF.setAccessible(true); + int fd = fdF.getInt(watermark); + if (fd >= 0) { + Files.close(fd); + } + } + + private static void rmDirRecursive(String dir) { + long find = Files.findFirst(dir); + if (find > 0) { + try { + int rc = 1; + while (rc > 0) { + String name = Files.utf8ToString(Files.findName(find)); + if (name != null && !".".equals(name) && !"..".equals(name)) { + String child = dir + "/" + name; + if (!Files.remove(child)) { + rmDirRecursive(child); + } + } + rc = Files.findNext(find); + } + } finally { + Files.findClose(find); + } + } + Files.remove(dir); + } +}