diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index c8af5fc5..545b848a 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -928,14 +928,20 @@ public void close() { // SCHEMA_MISMATCH HALT) from users who only call close() and // never call flush() afterwards. Throwable terminalError = null; - // Snapshot the latched terminal error that the user thread has - // ALREADY caught (via flush()/at()) before close() ran. If - // flushPendingRows/drainOnClose below also rethrow the same + // Snapshot the exact terminal error instance that a user-thread + // API call ALREADY caught (via flush()/at()) before close() ran. + // If flushPendingRows/drainOnClose below also rethrow the same // instance, dropping it at the final rethrow avoids // try-with-resources self-suppression: Throwable.addSuppressed // raises IllegalArgumentException when primary == suppressed. - Throwable alreadyOwnedByUser = (cursorSendLoop != null && !cursorSendLoop.hasUnsurfacedError()) - ? cursorSendLoop.getLastError() : null; + // Must stay this single read: the snapshot needs the identity of + // the error the user already owns, and only + // getSynchronouslySurfacedError() holds it. Deriving it from two + // separate latch reads races the I/O thread -- a terminal latched + // between the reads would be adopted as user-owned and silently + // dropped (see CloseOwnershipRaceTest). + Throwable alreadyOwnedByUser = cursorSendLoop != null + ? cursorSendLoop.getSynchronouslySurfacedError() : null; try { // Only drain when both the engine and the I/O loop are wired @@ -957,21 +963,20 @@ public void close() { // only when no other channel has already delivered it // to the user. "Already delivered" means either the // producer thread saw it synchronously via - // flush()/append() (errorSurfacedSynchronously) or the - // async dispatcher delivered it to a user-installed - // custom handler at any point in this sender's life - // (deliveredToCustomHandler). The latter survives a - // setErrorHandler(null) cleanup in test helpers -- - // once the user has owned an error, close() should - // not double-signal it. The default no-op logging - // handler does not count as "delivered to user", so a - // config-string-only caller still gets the loud - // rethrow on shutdown. + // flush()/append() (checkUnsurfacedError is silent in + // that case) or the async dispatcher delivered it to a + // user-installed custom handler at any point in this + // sender's life (deliveredToCustomHandler, checked + // here). The latter survives a setErrorHandler(null) + // cleanup in test helpers -- once the user has owned + // an error, close() should not double-signal it. The + // default no-op logging handler does not count as + // "delivered to user", so a config-string-only caller + // still gets the loud rethrow on shutdown. boolean alreadyDeliveredToCustomHandler = errorDispatcher != null && errorDispatcher.hasDeliveredToCustomHandler(); - if (!alreadyDeliveredToCustomHandler - && cursorSendLoop.hasUnsurfacedError()) { - cursorSendLoop.checkError(); + if (!alreadyDeliveredToCustomHandler) { + cursorSendLoop.checkUnsurfacedError(); } // 3) Bounded drain: block until the server has ACK'd // everything we just published, or until the @@ -2558,7 +2563,7 @@ private void checkConnectionError() { error.fillInStackTrace(); throw error; } - // Poll the cursor I/O loop's lastError too. Without this, a fatal + // Poll the cursor I/O loop's terminalError too. Without this, a fatal // wire / server-rejection error recorded by the I/O thread would // only surface on the next flush() / close() — every row-level // method (table, longColumn, atNow, etc.) routes through diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java index 19997267..77db27c8 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java @@ -71,7 +71,7 @@ * into the engine; this thread reads. {@code engine.publishedFsn()} is * the volatile publish barrier. *
- * Errors are reported via {@link #getLastError()}; the I/O thread sets it + * Errors are reported via {@link #getTerminalError()}; the I/O thread sets it * and exits. Producers polling {@link #checkError()} surface the failure. */ public final class CursorWebSocketSendLoop implements QuietCloseable { @@ -88,14 +88,22 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { */ public static final long DEFAULT_DURABLE_ACK_KEEPALIVE_INTERVAL_MILLIS = 200L; public static final long DEFAULT_PARK_NANOS = 50_000L; // 50us idle backoff - /** Default initial reconnect backoff (100 ms). */ + /** + * Default initial reconnect backoff (100 ms). + */ public static final long DEFAULT_RECONNECT_INITIAL_BACKOFF_MILLIS = 100L; - /** Default reconnect max backoff (5 s). */ + /** + * Default reconnect max backoff (5 s). + */ public static final long DEFAULT_RECONNECT_MAX_BACKOFF_MILLIS = 5_000L; - /** Default per-outage reconnect time cap (5 min). */ + /** + * Default per-outage reconnect time cap (5 min). + */ public static final long DEFAULT_RECONNECT_MAX_DURATION_MILLIS = 300_000L; private static final Logger LOG = LoggerFactory.getLogger(CursorWebSocketSendLoop.class); - /** Throttle "reconnect attempt N failed" WARN logs to one per 5 s. */ + /** + * Throttle "reconnect attempt N failed" WARN logs to one per 5 s. + */ private static final long RECONNECT_LOG_THROTTLE_NANOS = 5_000_000_000L; // Pre-converted to nanos for the comparison in sendDurableAckKeepaliveIfDue. // Zero or negative disables the keepalive entirely. @@ -170,14 +178,6 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // directly to the same dispatcher from QwpWebSocketSender. private volatile SenderConnectionDispatcher connectionDispatcher; private volatile SenderErrorDispatcher errorDispatcher; - // Set by checkError() the first time it actually rethrows lastError to a - // synchronous user-thread caller (flush/append/close). close() consults - // this to decide whether to rethrow the latched terminal -- if a producer - // thread already saw the error from a flush() call, throwing again from - // close() would mask any in-flight test assertion or user exception. The - // async dispatcher path does NOT set this flag: a user who only watches - // the async error inbox still gets a loud failure on shutdown. - private volatile boolean errorSurfacedSynchronously; // The send cursor has two coordinate systems: // // FSN: durable frame sequence number in the local cursor engine. This is @@ -199,18 +199,21 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // up" (looks transient). private volatile boolean hasEverConnected; private volatile Thread ioThread; - private volatile Throwable lastError; + // The latched terminal failure — THE exception every checkError() call + // rethrows. Write-once for the loop's lifetime: the only writer is + // recordFatal on the I/O thread (first-writer-wins). The whole + // close()-ownership protocol rests on that — the identity comparisons + // in hasUnsurfacedError() and in close()'s suppression are only + // meaningful because the latched instance never changes. + // Non-LineSenderException causes are wrapped once at latch time, so + // rethrows always deliver the same instance. + private volatile LineSenderException terminalError; // Wall clock of the last outbound activity on the wire -- a sent frame // (trySendOne) or a keepalive PING (sendDurableAckKeepaliveIfDue). // Throttles the durable-ack keepalive PING so it fires only when the // configured interval has elapsed since the most recent outbound event. // Zero until the first send; reset to zero on reconnect. private long lastFrameOrPingNanos; - // Typed payload sibling to lastError. Set when recordFatal is called with - // a SenderError (HALT-policy server rejection or terminal protocol violation); - // remains null for wire-level fatals (reconnect-budget exhaustion, etc). - // Read by QwpWebSocketSender.getLastTerminalError() for ops visibility. - private volatile SenderError lastTerminalServerError; private long nextWireSeq; private volatile SenderProgressDispatcher progressDispatcher; // Frames sent during the post-reconnect catch-up window — i.e. frames @@ -229,6 +232,10 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // at engine.activeSegment(); advances to newer sealed segments / the new // active as the producer rotates. private MmapSegment sendingSegment; + // Exact terminalError instance that checkError() has thrown to a synchronous + // user-thread caller (flush/append/close). close() uses the instance so it + // only suppresses errors the user already owned before close() began. + private volatile LineSenderException synchronouslySurfacedError; /** * Full constructor with explicit reconnect-policy knobs. When @@ -320,7 +327,9 @@ public CursorWebSocketSendLoop(WebSocketClient client, CursorSendEngine engine, this.hasEverConnected = client != null; } - /** Maps a server status byte to a {@link SenderError.Category}. Exposed for unit tests. */ + /** + * Maps a server status byte to a {@link SenderError.Category}. Exposed for unit tests. + */ @TestOnly public static SenderError.Category classify(byte status) { switch (status) { @@ -378,7 +387,7 @@ public static WebSocketClient connectWithRetry( return c; } } catch (QwpAuthFailedException | QwpDurableAckMismatchException - | WebSocketUpgradeException e) { + | WebSocketUpgradeException e) { // Terminal across all configured endpoints per sf-client.md sections // 8.1 (durable-ack mismatch) and 13.3 (auth). Version mismatch is // NOT terminal here -- it falls through to the Throwable branch and @@ -466,15 +475,30 @@ public static boolean isTerminalCloseCode(int code) { /** * Surfaces any error the I/O thread recorded. Called by the producer * thread (typically from inside its append wrapper) so failures don't - * stay silent. Idempotent; once an error is set the loop has already - * exited. + * stay silent. Every call rethrows the exact latched instance — close() + * relies on that identity to suppress double-signals. Idempotent; once + * an error is set the loop has already exited. */ public void checkError() { - Throwable e = lastError; + LineSenderException e = terminalError; if (e != null) { - errorSurfacedSynchronously = true; - if (e instanceof LineSenderException) throw (LineSenderException) e; - throw new LineSenderException("I/O thread failed: " + e.getMessage(), e); + synchronouslySurfacedError = e; + throw e; + } + } + + /** + * Safety-net variant of {@link #checkError()} for + * {@code QwpWebSocketSender.close()}: rethrows the latched terminal error + * only when no synchronous caller has owned it yet. A user who already + * caught the error from flush()/append() stays undisturbed — throwing + * again from close() would double-signal an error they already handled. + * A user who only ever called close() (e.g. async-initial-connect that + * never reached the server) still gets the loud failure. + */ + public void checkUnsurfacedError() { + if (hasUnsurfacedError()) { + checkError(); } } @@ -524,17 +548,41 @@ public synchronized void close() { } } - public Throwable getLastError() { - return lastError; + /** + * Typed server-rejection payload of the latched terminal error, or + * {@code null} when the loop latched a wire-level failure (or nothing). + * Derived from the latch — a server-rejection terminal is always latched + * as a {@link LineSenderServerException} carrying its {@link SenderError}. + */ + public SenderError getLastTerminalServerError() { + LineSenderException e = terminalError; + return e instanceof LineSenderServerException + ? ((LineSenderServerException) e).getServerError() : null; } /** - * Snapshot of the typed server-rejection payload for the latched terminal error, - * or {@code null} if the loop has not latched a server-rejection terminal (or has - * latched only a wire-level failure with no SenderError associated). + * Returns the exact latched throwable instance already thrown by + * {@link #checkError()}, or {@code null} when no synchronous caller has + * owned the terminal error yet. + *
+ * This is the single read {@code QwpWebSocketSender.close()} uses to learn + * which terminal error the user already owns. The ownership decision must + * be taken from this one field only: deriving it from two separate latch + * reads (e.g. an unsurfaced-check followed by {@link #getTerminalError()}) + * races the I/O thread — a terminal latched between the reads gets + * mis-captured as already-owned and is then silently dropped on close(). + * Guarded by {@code CloseOwnershipRaceTest}. */ - public SenderError getLastTerminalServerError() { - return lastTerminalServerError; + public Throwable getSynchronouslySurfacedError() { + return synchronouslySurfacedError; + } + + /** + * The latched terminal failure, or {@code null} while the loop is + * healthy. Read-only — does not mark the error as surfaced. + */ + public Throwable getTerminalError() { + return terminalError; } public long getTotalAcks() { @@ -575,7 +623,9 @@ public long getTotalFramesSent() { return totalFramesSent.get(); } - /** Total reconnect attempts (succeeded + failed). */ + /** + * Total reconnect attempts (succeeded + failed). + */ public long getTotalReconnectAttempts() { return totalReconnectAttempts.get(); } @@ -605,18 +655,6 @@ public boolean hasEverConnected() { return hasEverConnected; } - /** - * True when {@link #lastError} is set AND no synchronous user-thread - * caller has yet seen it via {@link #checkError()}. close() uses this - * to decide whether to rethrow as a safety net: a user who only ever - * called close() (e.g. async-initial-connect that never reached the - * server) needs to see the error from somewhere; a user who already - * caught it from flush() does not. - */ - public boolean hasUnsurfacedError() { - return lastError != null && !errorSurfacedSynchronously; - } - public boolean isRunning() { return running; } @@ -753,7 +791,7 @@ private void applyDurableAck() { */ private void attemptInitialConnect() { connectLoop(new LineSenderException( - "async initial connect deferred to I/O thread"), + "async initial connect deferred to I/O thread"), "initial connect"); } @@ -830,7 +868,7 @@ private void connectLoop(Throwable initial, String phase) { System.nanoTime() ); totalServerErrors.incrementAndGet(); - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); return; } catch (QwpDurableAckMismatchException e) { @@ -855,7 +893,7 @@ private void connectLoop(Throwable initial, String phase) { System.nanoTime() ); totalServerErrors.incrementAndGet(); - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); return; } catch (QwpRoleMismatchException | QwpIngressRoleRejectedException e) { @@ -935,7 +973,7 @@ private void connectLoop(Throwable initial, String phase) { totalServerErrors.incrementAndGet(); // recordFatal MUST run before dispatchError so the producer-observable // terminal error is latched before the handler is invoked. - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); // Surface the terminal classification through the connection-event // dispatcher too. Listeners learn about budget exhaustion without @@ -1041,6 +1079,18 @@ private void fail(Throwable initial) { connectLoop(initial, "reconnect"); } + /** + * True when {@link #terminalError} is set AND no synchronous user-thread + * caller has yet seen that same instance via {@link #checkError()}. + * The {@link #checkUnsurfacedError()} safety net composes this with + * checkError(); reads {@code terminalError} once so the comparison cannot + * tear against a concurrent latch. + */ + private boolean hasUnsurfacedError() { + Throwable e = terminalError; + return e != null && synchronouslySurfacedError != e; + } + private void ioLoop() { try { // Async-initial-connect path: ctor accepted a null client because @@ -1142,27 +1192,25 @@ private void positionCursorInSegment(MmapSegment seg, long targetFsn) { /** * Mark the loop as fatally failed. Caller has decided no reconnect - * is possible (or it ran out of budget) — record the error so + * is possible (or it ran out of budget) — latch the error so * {@link #checkError} can surface it to the producer thread, then - * stop the loop. + * stop the loop. First-writer-wins: only the first failure latches. + * The check-then-latch is unsynchronized and is safe ONLY because + * every caller runs on the I/O thread (connectLoop and the + * receive-path rejection handlers are all pumped by ioLoop); calling + * this from any other thread would be a lost-update race. + * Non-{@link LineSenderException} causes are wrapped once here, so + * every rethrow delivers the same instance. */ private void recordFatal(Throwable t) { - recordFatal(t, null); - } - - /** - * Server-rejection-aware variant. Stashes a typed {@link SenderError} alongside - * the throwable so {@code QwpWebSocketSender.getLastTerminalError()} can surface - * the structured payload for ops/observability. Idempotent — only the first - * failure latches. - */ - private void recordFatal(Throwable t, SenderError serverError) { - if (lastError == null) { - lastError = t; - lastTerminalServerError = serverError; + if (terminalError == null) { + terminalError = t instanceof LineSenderException + ? (LineSenderException) t + : new LineSenderException("I/O thread failed: " + t.getMessage(), t); } running = false; - if (serverError != null) { + if (t instanceof LineSenderServerException) { + // server rejections carry a structured message; the stack adds noise LOG.error("Cursor I/O loop failure: {}", t.getMessage()); } else { LOG.error("Cursor I/O loop failure: {}", t.getMessage(), t); @@ -1382,7 +1430,9 @@ boolean isDurableUnder(CharSequenceLongHashMap watermarks) { } } - /** Inner ACK handler — parses the binary frame, calls engine.acknowledge. */ + /** + * Inner ACK handler — parses the binary frame, calls engine.acknowledge. + */ private final class ResponseHandler implements WebSocketFrameHandler { @Override public void onBinaryMessage(long payloadPtr, int payloadLen) { @@ -1474,7 +1524,7 @@ public void onClose(int code, String reason) { // recordFatal MUST run before dispatchError so the producer- // observable terminal error is latched before the handler is // invoked. - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); return; } @@ -1513,7 +1563,7 @@ private void handlePreSendRejection(long wireSeq, byte status, // so a synchronous probe of getLastTerminalError() / flush() // from inside the handler observes the typed error. Mirrors // the ordering in the post-send HALT path below. - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); } // DROP_AND_CONTINUE: no watermark advance -- there is nothing // sent on this connection to drop. The dispatch is the user's @@ -1578,7 +1628,7 @@ private void handleServerRejection(long wireSeq) { // probes getLastTerminalError() (or calls flush()) sees the // typed error rather than null. Bytes on disk are the bytes // the server rejected; reconnect/replay cannot fix them. - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); } else { // DROP_AND_CONTINUE: advance ackedFsn past the rejected span diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseSafetyNetTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseSafetyNetTest.java new file mode 100644 index 00000000..a874063a --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseSafetyNetTest.java @@ -0,0 +1,151 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.cutlass.qwp.client; + +import io.questdb.client.LineSenderServerException; +import io.questdb.client.Sender; +import io.questdb.client.SenderError; +import io.questdb.client.SenderErrorHandler; +import io.questdb.client.cutlass.line.LineSenderException; +import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Pins both branches of {@code QwpWebSocketSender.close()}'s safety-net + * rethrow, strictly — unlike the close assertions in + * {@link InitialConnectAsyncTest}, which tolerate either outcome: + *
* Concrete consequence the spec calls out: a user-supplied error handler * that synchronously calls {@code sender.flush()} from inside - * {@code onError} can observe {@code lastError == null} and pass — + * {@code onError} can observe {@code terminalError == null} and pass — * landing post-HALT bytes in the engine. *
* This test asserts the spec invariant directly: by the time the diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java new file mode 100644 index 00000000..f4cbffd1 --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java @@ -0,0 +1,92 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.cutlass.qwp.client.sf.cursor; + +import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorSendEngine; +import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorWebSocketSendLoop; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Regression guard for the close()-ownership race in {@code QwpWebSocketSender.close()}: + * its {@code alreadyOwnedByUser} snapshot was once computed from two separate volatile + * reads ({@code !hasUnsurfacedError() ? getLastError() : null}), so a terminal error + * latched by the I/O thread between the reads was mis-captured as already-owned and + * silently dropped on close(). The fix is the single read + * {@link CursorWebSocketSendLoop#getSynchronouslySurfacedError()}. + *
+ * Scope: this test races that accessor and fails if its implementation ever recomputes + * the snapshot from two reads. It does not execute {@code QwpWebSocketSender.close()} + * itself — old and new snapshots agree in every non-racy state, so no black-box test + * at the sender level can detect a callsite that bypasses the accessor; that half of + * the contract is pinned by the "must stay this single read" comments at the callsite + * and on the accessor. + */ +public class CloseOwnershipRaceTest { + + private static final int ROUNDS = 1000; + + @Rule + public final TemporaryFolder sfDir = TemporaryFolder.builder().assureDeletion().build(); + + @Test(timeout = 60_000) + public void closeOwnershipSnapshotNeverClaimsAnUnsurfacedError() { + try (CursorSendEngine engine = new CursorSendEngine( + sfDir.getRoot().getAbsolutePath(), 16_384)) { + Throwable leaked = null; + for (int i = 0; i < ROUNDS && leaked == null; i++) { + // A null client, a reconnect factory that never produces one, + // and a zero reconnect budget: start()'s real I/O thread walks + // the production async-initial-connect path and latches a + // genuine RECONNECT_BUDGET_EXHAUSTED terminal within + // microseconds. One authentic null->error latch transition + // per round. + CursorWebSocketSendLoop loop = new CursorWebSocketSendLoop( + null, engine, 0, 1_000_000L, + () -> null, + 0, // reconnect budget: exhausted on arrival + 1, 1); + loop.start(); + // Race close()'s exact ownership snapshot against the latch + // transition, stopping once the latch has landed. Nothing in + // this test calls checkError(), so no synchronous caller ever + // owns the error and the snapshot must stay null. On fixed + // code it reads a field nobody here writes — it cannot flake; + // a reintroduced two-read snapshot tears when the latch lands + // between its reads, with near-certainty across all rounds. + while (leaked == null && loop.getTerminalError() == null) { + leaked = loop.getSynchronouslySurfacedError(); + } + loop.close(); + } + Assert.assertNull( + "close() captured a terminal error as 'already owned by user' that no " + + "synchronous caller ever saw -- close() would silently drop it: " + leaked, + leaked); + } + } +} diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java index 8b86c006..80c23ac2 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java @@ -37,7 +37,7 @@ /** * Pinpointed tests for the latched-error contract on {@link CursorWebSocketSendLoop}: - * {@code recordFatal} → {@link CursorWebSocketSendLoop#getLastError} + + * {@code recordFatal} → {@link CursorWebSocketSendLoop#getTerminalError} + * {@link CursorWebSocketSendLoop#getLastTerminalServerError} + * {@link CursorWebSocketSendLoop#checkError}. Bypasses the constructor entirely * via {@code Unsafe.allocateInstance} to avoid the live wire/engine dependencies @@ -53,7 +53,8 @@ public void testCheckErrorRethrowsLineSenderException() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); SenderError err = newSenderError(); LineSenderServerException original = new LineSenderServerException(err); - setField(loop, "lastError", original); + setField(loop, "terminalError", original); + Assert.assertNull(loop.getSynchronouslySurfacedError()); try { loop.checkError(); @@ -64,47 +65,96 @@ public void testCheckErrorRethrowsLineSenderException() throws Exception { Assert.assertSame(err, ((LineSenderServerException) thrown).getServerError()); } + Assert.assertSame("checkError must mark the exact latched throwable as user-owned", + original, loop.getSynchronouslySurfacedError()); + // a synchronously surfaced latch is owned -- the close() safety net + // must stay silent now + loop.checkUnsurfacedError(); } @Test - public void testCheckErrorWrapsNonLineSenderThrowable() throws Exception { - // For non-LineSenderException throwables (NPE, IOException, etc.), - // checkError wraps in a fresh LineSenderException with the original - // as cause so producers always see one exception type. + public void testRecordFatalWrapsNonLineSenderThrowableOnce() throws Exception { + // Non-LineSenderException causes (NPE, IOException, etc.) are wrapped + // in a LineSenderException ONCE, at latch time -- so producers always + // see one exception type AND every rethrow delivers the same instance, + // which close() relies on to suppress double-signals by identity. CursorWebSocketSendLoop loop = newBareLoop(); Throwable raw = new RuntimeException("oh no"); - setField(loop, "lastError", raw); + invokeRecordFatal(loop, raw); + Assert.assertNull(loop.getSynchronouslySurfacedError()); + LineSenderException first = null; try { loop.checkError(); Assert.fail("expected throw"); } catch (LineSenderException thrown) { Assert.assertNotSame(raw, thrown); - Assert.assertEquals(raw, thrown.getCause()); + Assert.assertSame(raw, thrown.getCause()); Assert.assertTrue(thrown.getMessage().contains("oh no")); + first = thrown; + } + Assert.assertSame("the latch must hold the wrapper, not the raw cause", + first, loop.getTerminalError()); + Assert.assertSame("ownership tracks the latched wrapper", + first, loop.getSynchronouslySurfacedError()); + loop.checkUnsurfacedError(); // owned -> silent + + try { + loop.checkError(); + Assert.fail("expected throw"); + } catch (LineSenderException thrownAgain) { + Assert.assertSame("every rethrow must deliver the same instance", + first, thrownAgain); } } @Test public void testCheckErrorIsNoopWhenNoLatch() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); - Assert.assertNull(loop.getLastError()); + Assert.assertNull(loop.getTerminalError()); loop.checkError(); // must not throw } @Test - public void testGetLastErrorReturnsLatchedThrowable() throws Exception { + public void testCheckUnsurfacedErrorThrowsOnceThenStaysSilent() throws Exception { + // The close() safety net: an unowned latch must rethrow exactly like + // checkError; once any synchronous caller has owned the error, it + // must stay silent -- unlike checkError, which keeps throwing. + CursorWebSocketSendLoop loop = newBareLoop(); + loop.checkUnsurfacedError(); // no latch -> silent + + LineSenderException e = new LineSenderException("wire fail"); + setField(loop, "terminalError", e); + try { + loop.checkUnsurfacedError(); + Assert.fail("an unowned latch must rethrow from the safety net"); + } catch (LineSenderException thrown) { + Assert.assertSame(e, thrown); + } + + loop.checkUnsurfacedError(); // the throw above made the caller owner -> silent + + try { + loop.checkError(); + Assert.fail("checkError must keep rethrowing a latched error"); + } catch (LineSenderException thrown) { + Assert.assertSame(e, thrown); + } + } + + @Test + public void testGetTerminalErrorReturnsLatchedThrowable() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); Throwable e = new LineSenderException("boom"); - setField(loop, "lastError", e); - Assert.assertSame(e, loop.getLastError()); + setField(loop, "terminalError", e); + Assert.assertSame(e, loop.getTerminalError()); } @Test - public void testGetLastErrorIsNullBeforeAnyFailure() throws Exception { + public void testGetTerminalErrorIsNullBeforeAnyFailure() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); Assert.assertNull("loops with no latched error must report null", - loop.getLastError()); + loop.getTerminalError()); } @Test @@ -114,10 +164,10 @@ public void testRecordFatalLatchesThrowableOnly() throws Exception { setField(loop, "running", true); Throwable e = new LineSenderException("wire fail"); - invokeRecordFatal(loop, e, null); + invokeRecordFatal(loop, e); - Assert.assertSame(e, loop.getLastError()); - Assert.assertNull("typed payload must be null when recordFatal called without one", + Assert.assertSame(e, loop.getTerminalError()); + Assert.assertNull("typed payload must be null for a wire-level fatal", loop.getLastTerminalServerError()); Assert.assertFalse("recordFatal must stop the loop", (Boolean) getField(loop, "running")); @@ -130,10 +180,11 @@ public void testRecordFatalLatchesBothThrowableAndSenderError() throws Exception SenderError err = newSenderError(); LineSenderServerException ex = new LineSenderServerException(err); - invokeRecordFatal(loop, ex, err); + invokeRecordFatal(loop, ex); - Assert.assertSame(ex, loop.getLastError()); - Assert.assertSame(err, loop.getLastTerminalServerError()); + Assert.assertSame(ex, loop.getTerminalError()); + Assert.assertSame("typed payload is derived from the latched LineSenderServerException", + err, loop.getLastTerminalServerError()); Assert.assertFalse((Boolean) getField(loop, "running")); } @@ -141,19 +192,19 @@ public void testRecordFatalLatchesBothThrowableAndSenderError() throws Exception public void testRecordFatalIsIdempotent() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); setField(loop, "running", true); - Throwable first = new LineSenderException("first"); - Throwable second = new LineSenderException("second"); SenderError firstErr = newSenderError(); SenderError secondErr = newSenderError(); + LineSenderServerException first = new LineSenderServerException(firstErr); + LineSenderServerException second = new LineSenderServerException(secondErr); - invokeRecordFatal(loop, first, firstErr); - invokeRecordFatal(loop, second, secondErr); + invokeRecordFatal(loop, first); + invokeRecordFatal(loop, second); // Only the first failure latches — subsequent calls must not // overwrite, otherwise a follow-on cascade would mask the original // root cause. Assert.assertSame("first throwable must remain latched", - first, loop.getLastError()); + first, loop.getTerminalError()); Assert.assertSame("first SenderError must remain latched", firstErr, loop.getLastTerminalServerError()); } @@ -190,11 +241,11 @@ private static Object getField(Object target, String name) throws Exception { return f.get(target); } - private static void invokeRecordFatal(CursorWebSocketSendLoop loop, Throwable t, SenderError err) + private static void invokeRecordFatal(CursorWebSocketSendLoop loop, Throwable t) throws Exception { Method m = CursorWebSocketSendLoop.class.getDeclaredMethod( - "recordFatal", Throwable.class, SenderError.class); + "recordFatal", Throwable.class); m.setAccessible(true); - m.invoke(loop, t, err); + m.invoke(loop, t); } }