From 58af6db3b47a82754cd4e900a64e59b6e670557a Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 9 Jun 2026 14:45:43 +0200 Subject: [PATCH 1/6] fix(qwp): fix missed terminal send errors during Sender close Fixes a race where QwpWebSocketSender.close() could return successfully even after a terminal QWP/WebSocket error was latched during close. **implementation details for reviewers** Track the exact `lastError` instance that `checkError()` has synchronously surfaced and make `QwpWebSocketSender.close()` suppress only that pre-owned instance. This avoids treating a freshly latched terminal error as already owned when it appears between separate `hasUnsurfacedError()` and `getLastError()` reads. --- .../qwp/client/QwpWebSocketSender.java | 12 +++---- .../sf/cursor/CursorWebSocketSendLoop.java | 36 +++++++++++-------- ...CursorWebSocketSendLoopErrorLatchTest.java | 9 +++++ 3 files changed, 36 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index 7d43d72f..ce78431e 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -962,14 +962,14 @@ public void close() { // SCHEMA_MISMATCH HALT) from users who only call close() and // never call flush() afterwards. Throwable terminalError = null; - // Snapshot the latched terminal error that the user thread has - // ALREADY caught (via flush()/at()) before close() ran. If - // flushPendingRows/drainOnClose below also rethrow the same + // Snapshot the exact terminal error instance that a user-thread + // API call ALREADY caught (via flush()/at()) before close() ran. + // If flushPendingRows/drainOnClose below also rethrow the same // instance, dropping it at the final rethrow avoids // try-with-resources self-suppression: Throwable.addSuppressed // raises IllegalArgumentException when primary == suppressed. - Throwable alreadyOwnedByUser = (cursorSendLoop != null && !cursorSendLoop.hasUnsurfacedError()) - ? cursorSendLoop.getLastError() : null; + Throwable alreadyOwnedByUser = cursorSendLoop != null + ? cursorSendLoop.getSynchronouslySurfacedError() : null; try { // Only drain when both the engine and the I/O loop are wired @@ -991,7 +991,7 @@ public void close() { // only when no other channel has already delivered it // to the user. "Already delivered" means either the // producer thread saw it synchronously via - // flush()/append() (errorSurfacedSynchronously) or the + // flush()/append() (synchronouslySurfacedError) or the // async dispatcher delivered it to a user-installed // custom handler at any point in this sender's life // (deliveredToCustomHandler). The latter survives a diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java index 9f7182da..2e9ce6de 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java @@ -170,14 +170,10 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // directly to the same dispatcher from QwpWebSocketSender. private volatile SenderConnectionDispatcher connectionDispatcher; private volatile SenderErrorDispatcher errorDispatcher; - // Set by checkError() the first time it actually rethrows lastError to a - // synchronous user-thread caller (flush/append/close). close() consults - // this to decide whether to rethrow the latched terminal -- if a producer - // thread already saw the error from a flush() call, throwing again from - // close() would mask any in-flight test assertion or user exception. The - // async dispatcher path does NOT set this flag: a user who only watches - // the async error inbox still gets a loud failure on shutdown. - private volatile boolean errorSurfacedSynchronously; + // Exact lastError instance that checkError() has thrown to a synchronous + // user-thread caller (flush/append/close). close() uses the instance so it + // only suppresses errors the user already owned before close() began. + private volatile Throwable synchronouslySurfacedError; // The send cursor has two coordinate systems: // // FSN: durable frame sequence number in the local cursor engine. This is @@ -472,7 +468,7 @@ public static boolean isTerminalCloseCode(int code) { public void checkError() { Throwable e = lastError; if (e != null) { - errorSurfacedSynchronously = true; + synchronouslySurfacedError = e; if (e instanceof LineSenderException) throw (LineSenderException) e; throw new LineSenderException("I/O thread failed: " + e.getMessage(), e); } @@ -528,6 +524,15 @@ public Throwable getLastError() { return lastError; } + /** + * Returns the exact latched throwable instance already thrown by + * {@link #checkError()}, or {@code null} when no synchronous caller has + * owned the terminal error yet. + */ + public Throwable getSynchronouslySurfacedError() { + return synchronouslySurfacedError; + } + /** * Snapshot of the typed server-rejection payload for the latched terminal error, * or {@code null} if the loop has not latched a server-rejection terminal (or has @@ -607,14 +612,15 @@ public boolean hasEverConnected() { /** * True when {@link #lastError} is set AND no synchronous user-thread - * caller has yet seen it via {@link #checkError()}. close() uses this - * to decide whether to rethrow as a safety net: a user who only ever - * called close() (e.g. async-initial-connect that never reached the - * server) needs to see the error from somewhere; a user who already - * caught it from flush() does not. + * caller has yet seen that same instance via {@link #checkError()}. + * close() uses this to decide whether to rethrow as a safety net: a user + * who only ever called close() (e.g. async-initial-connect that never + * reached the server) needs to see the error from somewhere; a user who + * already caught it from flush() does not. */ public boolean hasUnsurfacedError() { - return lastError != null && !errorSurfacedSynchronously; + Throwable e = lastError; + return e != null && synchronouslySurfacedError != e; } public boolean isRunning() { diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java index 8b86c006..56ae0577 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java @@ -54,6 +54,7 @@ public void testCheckErrorRethrowsLineSenderException() throws Exception { SenderError err = newSenderError(); LineSenderServerException original = new LineSenderServerException(err); setField(loop, "lastError", original); + Assert.assertNull(loop.getSynchronouslySurfacedError()); try { loop.checkError(); @@ -64,6 +65,10 @@ public void testCheckErrorRethrowsLineSenderException() throws Exception { Assert.assertSame(err, ((LineSenderServerException) thrown).getServerError()); } + Assert.assertSame("checkError must mark the exact latched throwable as user-owned", + original, loop.getSynchronouslySurfacedError()); + Assert.assertFalse("a synchronously surfaced latch is no longer unsurfaced", + loop.hasUnsurfacedError()); } @Test @@ -74,6 +79,7 @@ public void testCheckErrorWrapsNonLineSenderThrowable() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); Throwable raw = new RuntimeException("oh no"); setField(loop, "lastError", raw); + Assert.assertNull(loop.getSynchronouslySurfacedError()); try { loop.checkError(); @@ -83,6 +89,9 @@ public void testCheckErrorWrapsNonLineSenderThrowable() throws Exception { Assert.assertEquals(raw, thrown.getCause()); Assert.assertTrue(thrown.getMessage().contains("oh no")); } + Assert.assertSame("wrapped throwables are owned by their latched source instance", + raw, loop.getSynchronouslySurfacedError()); + Assert.assertFalse(loop.hasUnsurfacedError()); } @Test From e6088bd7d5121e8d9fc0330597eb9bfd0984ac00 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 11 Jun 2026 15:50:17 +0200 Subject: [PATCH 2/6] probabilistic test --- .../sf/cursor/CloseOwnershipRaceTest.java | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java new file mode 100644 index 00000000..eb4d161c --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java @@ -0,0 +1,86 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.cutlass.qwp.client.sf.cursor; + +import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorSendEngine; +import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorWebSocketSendLoop; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Regression guard for the close()-ownership race in {@code QwpWebSocketSender.close()}: + * its {@code alreadyOwnedByUser} snapshot was once computed from two separate volatile + * reads ({@code !hasUnsurfacedError() ? getLastError() : null}), so a terminal error + * latched by the I/O thread between the reads was mis-captured as already-owned and + * silently dropped on close(). The fix is the single read + * {@link CursorWebSocketSendLoop#getSynchronouslySurfacedError()}; this test races that + * read against real latch transitions and fails on any torn snapshot. + */ +public class CloseOwnershipRaceTest { + + private static final int ROUNDS = 1000; + + @Rule + public final TemporaryFolder sfDir = TemporaryFolder.builder().assureDeletion().build(); + + @Test(timeout = 60_000) + public void closeOwnershipSnapshotNeverClaimsAnUnsurfacedError() { + try (CursorSendEngine engine = new CursorSendEngine( + sfDir.getRoot().getAbsolutePath(), 16_384)) { + Throwable leaked = null; + for (int i = 0; i < ROUNDS && leaked == null; i++) { + // A null client, a reconnect factory that never produces one, + // and a zero reconnect budget: start()'s real I/O thread walks + // the production async-initial-connect path and latches a + // genuine RECONNECT_BUDGET_EXHAUSTED terminal within + // microseconds. One authentic null->error latch transition + // per round. + CursorWebSocketSendLoop loop = new CursorWebSocketSendLoop( + null, engine, 0, 1_000_000L, + () -> null, + 0, // reconnect budget: exhausted on arrival + 1, 1); + loop.start(); + // Race close()'s exact ownership snapshot against the latch + // transition, stopping once the latch has landed. Nothing in + // this test calls checkError(), so no synchronous caller ever + // owns the error and the snapshot must stay null. On fixed + // code it reads a field nobody here writes — it cannot flake; + // a reintroduced two-read snapshot tears when the latch lands + // between its reads, with near-certainty across all rounds. + while (leaked == null && loop.getLastError() == null) { + leaked = loop.getSynchronouslySurfacedError(); + } + loop.close(); + } + Assert.assertNull( + "close() captured a terminal error as 'already owned by user' that no " + + "synchronous caller ever saw -- close() would silently drop it: " + leaked, + leaked); + } + } +} From 2b0e443765f776d1316aa24e844a37eb7c65ec8c Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 11 Jun 2026 17:47:09 +0200 Subject: [PATCH 3/6] Simplify the cursor loop's terminal-error latch The latch leaked its invariants into callers: close() had to compose hasUnsurfacedError() + checkError() for the safety-net rethrow, and checkError() minted a fresh wrapper per call for raw causes, so drainOnClose() could rethrow an error the user had already caught. - recordFatal() wraps non-LineSenderException causes once, at latch time: every rethrow delivers the same instance, making close()'s identity suppression correct in every state. - checkUnsurfacedError() encapsulates the close() safety net; hasUnsurfacedError() becomes private, so the torn two-read ownership snapshot is no longer writable outside the class. - getLastTerminalServerError() derives the SenderError from the latched LineSenderServerException; the sibling field is gone. --- .../qwp/client/QwpWebSocketSender.java | 31 +++-- .../sf/cursor/CursorWebSocketSendLoop.java | 128 ++++++++++-------- .../sf/cursor/CloseOwnershipRaceTest.java | 10 +- ...CursorWebSocketSendLoopErrorLatchTest.java | 86 +++++++++--- 4 files changed, 163 insertions(+), 92 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index 80e4c9dd..beec7512 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -934,6 +934,12 @@ public void close() { // instance, dropping it at the final rethrow avoids // try-with-resources self-suppression: Throwable.addSuppressed // raises IllegalArgumentException when primary == suppressed. + // Must stay this single read: the snapshot needs the identity of + // the error the user already owns, and only + // getSynchronouslySurfacedError() holds it. Deriving it from two + // separate latch reads races the I/O thread -- a terminal latched + // between the reads would be adopted as user-owned and silently + // dropped (see CloseOwnershipRaceTest). Throwable alreadyOwnedByUser = cursorSendLoop != null ? cursorSendLoop.getSynchronouslySurfacedError() : null; @@ -957,21 +963,20 @@ public void close() { // only when no other channel has already delivered it // to the user. "Already delivered" means either the // producer thread saw it synchronously via - // flush()/append() (synchronouslySurfacedError) or the - // async dispatcher delivered it to a user-installed - // custom handler at any point in this sender's life - // (deliveredToCustomHandler). The latter survives a - // setErrorHandler(null) cleanup in test helpers -- - // once the user has owned an error, close() should - // not double-signal it. The default no-op logging - // handler does not count as "delivered to user", so a - // config-string-only caller still gets the loud - // rethrow on shutdown. + // flush()/append() (checkUnsurfacedError is silent in + // that case) or the async dispatcher delivered it to a + // user-installed custom handler at any point in this + // sender's life (deliveredToCustomHandler, checked + // here). The latter survives a setErrorHandler(null) + // cleanup in test helpers -- once the user has owned + // an error, close() should not double-signal it. The + // default no-op logging handler does not count as + // "delivered to user", so a config-string-only caller + // still gets the loud rethrow on shutdown. boolean alreadyDeliveredToCustomHandler = errorDispatcher != null && errorDispatcher.hasDeliveredToCustomHandler(); - if (!alreadyDeliveredToCustomHandler - && cursorSendLoop.hasUnsurfacedError()) { - cursorSendLoop.checkError(); + if (!alreadyDeliveredToCustomHandler) { + cursorSendLoop.checkUnsurfacedError(); } // 3) Bounded drain: block until the server has ACK'd // everything we just published, or until the diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java index 5e701308..89179e96 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java @@ -173,7 +173,7 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // Exact lastError instance that checkError() has thrown to a synchronous // user-thread caller (flush/append/close). close() uses the instance so it // only suppresses errors the user already owned before close() began. - private volatile Throwable synchronouslySurfacedError; + private volatile LineSenderException synchronouslySurfacedError; // The send cursor has two coordinate systems: // // FSN: durable frame sequence number in the local cursor engine. This is @@ -195,18 +195,17 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // up" (looks transient). private volatile boolean hasEverConnected; private volatile Thread ioThread; - private volatile Throwable lastError; + // The latched terminal failure — THE exception every checkError() call + // rethrows. Non-LineSenderException causes are wrapped once at latch time + // (recordFatal), so rethrows always deliver the same instance and close() + // can suppress double-signals by identity. + private volatile LineSenderException lastError; // Wall clock of the last outbound activity on the wire -- a sent frame // (trySendOne) or a keepalive PING (sendDurableAckKeepaliveIfDue). // Throttles the durable-ack keepalive PING so it fires only when the // configured interval has elapsed since the most recent outbound event. // Zero until the first send; reset to zero on reconnect. private long lastFrameOrPingNanos; - // Typed payload sibling to lastError. Set when recordFatal is called with - // a SenderError (HALT-policy server rejection or terminal protocol violation); - // remains null for wire-level fatals (reconnect-budget exhaustion, etc). - // Read by QwpWebSocketSender.getLastTerminalError() for ops visibility. - private volatile SenderError lastTerminalServerError; private long nextWireSeq; private volatile SenderProgressDispatcher progressDispatcher; // Frames sent during the post-reconnect catch-up window — i.e. frames @@ -462,15 +461,30 @@ public static boolean isTerminalCloseCode(int code) { /** * Surfaces any error the I/O thread recorded. Called by the producer * thread (typically from inside its append wrapper) so failures don't - * stay silent. Idempotent; once an error is set the loop has already - * exited. + * stay silent. Every call rethrows the exact latched instance — close() + * relies on that identity to suppress double-signals. Idempotent; once + * an error is set the loop has already exited. */ public void checkError() { - Throwable e = lastError; + LineSenderException e = lastError; if (e != null) { synchronouslySurfacedError = e; - if (e instanceof LineSenderException) throw (LineSenderException) e; - throw new LineSenderException("I/O thread failed: " + e.getMessage(), e); + throw e; + } + } + + /** + * Safety-net variant of {@link #checkError()} for + * {@code QwpWebSocketSender.close()}: rethrows the latched terminal error + * only when no synchronous caller has owned it yet. A user who already + * caught the error from flush()/append() stays undisturbed — throwing + * again from close() would double-signal an error they already handled. + * A user who only ever called close() (e.g. async-initial-connect that + * never reached the server) still gets the loud failure. + */ + public void checkUnsurfacedError() { + if (hasUnsurfacedError()) { + checkError(); } } @@ -524,24 +538,35 @@ public Throwable getLastError() { return lastError; } + /** + * Typed server-rejection payload of the latched terminal error, or + * {@code null} when the loop latched a wire-level failure (or nothing). + * Derived from the latch — a server-rejection terminal is always latched + * as a {@link LineSenderServerException} carrying its {@link SenderError}. + */ + public SenderError getLastTerminalServerError() { + LineSenderException e = lastError; + return e instanceof LineSenderServerException + ? ((LineSenderServerException) e).getServerError() : null; + } + /** * Returns the exact latched throwable instance already thrown by * {@link #checkError()}, or {@code null} when no synchronous caller has * owned the terminal error yet. + *

+ * This is the single read {@code QwpWebSocketSender.close()} uses to learn + * which terminal error the user already owns. The ownership decision must + * be taken from this one field only: deriving it from two separate latch + * reads (e.g. an unsurfaced-check followed by {@link #getLastError()}) + * races the I/O thread — a terminal latched between the reads gets + * mis-captured as already-owned and is then silently dropped on close(). + * Guarded by {@code CloseOwnershipRaceTest}. */ public Throwable getSynchronouslySurfacedError() { return synchronouslySurfacedError; } - /** - * Snapshot of the typed server-rejection payload for the latched terminal error, - * or {@code null} if the loop has not latched a server-rejection terminal (or has - * latched only a wire-level failure with no SenderError associated). - */ - public SenderError getLastTerminalServerError() { - return lastTerminalServerError; - } - public long getTotalAcks() { return totalAcks.get(); } @@ -610,19 +635,6 @@ public boolean hasEverConnected() { return hasEverConnected; } - /** - * True when {@link #lastError} is set AND no synchronous user-thread - * caller has yet seen that same instance via {@link #checkError()}. - * close() uses this to decide whether to rethrow as a safety net: a user - * who only ever called close() (e.g. async-initial-connect that never - * reached the server) needs to see the error from somewhere; a user who - * already caught it from flush() does not. - */ - public boolean hasUnsurfacedError() { - Throwable e = lastError; - return e != null && synchronouslySurfacedError != e; - } - public boolean isRunning() { return running; } @@ -836,7 +848,7 @@ private void connectLoop(Throwable initial, String phase) { System.nanoTime() ); totalServerErrors.incrementAndGet(); - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); return; } catch (QwpDurableAckMismatchException e) { @@ -861,7 +873,7 @@ private void connectLoop(Throwable initial, String phase) { System.nanoTime() ); totalServerErrors.incrementAndGet(); - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); return; } catch (QwpRoleMismatchException | QwpIngressRoleRejectedException e) { @@ -941,7 +953,7 @@ private void connectLoop(Throwable initial, String phase) { totalServerErrors.incrementAndGet(); // recordFatal MUST run before dispatchError so the producer-observable // terminal error is latched before the handler is invoked. - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); // Surface the terminal classification through the connection-event // dispatcher too. Listeners learn about budget exhaustion without @@ -1047,6 +1059,18 @@ private void fail(Throwable initial) { connectLoop(initial, "reconnect"); } + /** + * True when {@link #lastError} is set AND no synchronous user-thread + * caller has yet seen that same instance via {@link #checkError()}. + * The {@link #checkUnsurfacedError()} safety net composes this with + * checkError(); reads {@code lastError} once so the comparison cannot + * tear against a concurrent latch. + */ + private boolean hasUnsurfacedError() { + Throwable e = lastError; + return e != null && synchronouslySurfacedError != e; + } + private void ioLoop() { try { // Async-initial-connect path: ctor accepted a null client because @@ -1148,27 +1172,21 @@ private void positionCursorInSegment(MmapSegment seg, long targetFsn) { /** * Mark the loop as fatally failed. Caller has decided no reconnect - * is possible (or it ran out of budget) — record the error so + * is possible (or it ran out of budget) — latch the error so * {@link #checkError} can surface it to the producer thread, then - * stop the loop. + * stop the loop. Idempotent — only the first failure latches. + * Non-{@link LineSenderException} causes are wrapped once here, so + * every rethrow delivers the same instance. */ private void recordFatal(Throwable t) { - recordFatal(t, null); - } - - /** - * Server-rejection-aware variant. Stashes a typed {@link SenderError} alongside - * the throwable so {@code QwpWebSocketSender.getLastTerminalError()} can surface - * the structured payload for ops/observability. Idempotent — only the first - * failure latches. - */ - private void recordFatal(Throwable t, SenderError serverError) { if (lastError == null) { - lastError = t; - lastTerminalServerError = serverError; + lastError = t instanceof LineSenderException + ? (LineSenderException) t + : new LineSenderException("I/O thread failed: " + t.getMessage(), t); } running = false; - if (serverError != null) { + if (t instanceof LineSenderServerException) { + // server rejections carry a structured message; the stack adds noise LOG.error("Cursor I/O loop failure: {}", t.getMessage()); } else { LOG.error("Cursor I/O loop failure: {}", t.getMessage(), t); @@ -1480,7 +1498,7 @@ public void onClose(int code, String reason) { // recordFatal MUST run before dispatchError so the producer- // observable terminal error is latched before the handler is // invoked. - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); return; } @@ -1519,7 +1537,7 @@ private void handlePreSendRejection(long wireSeq, byte status, // so a synchronous probe of getLastTerminalError() / flush() // from inside the handler observes the typed error. Mirrors // the ordering in the post-send HALT path below. - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); } // DROP_AND_CONTINUE: no watermark advance -- there is nothing // sent on this connection to drop. The dispatch is the user's @@ -1584,7 +1602,7 @@ private void handleServerRejection(long wireSeq) { // probes getLastTerminalError() (or calls flush()) sees the // typed error rather than null. Bytes on disk are the bytes // the server rejected; reconnect/replay cannot fix them. - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); } else { // DROP_AND_CONTINUE: advance ackedFsn past the rejected span diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java index eb4d161c..5f8a0e8d 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java @@ -37,8 +37,14 @@ * reads ({@code !hasUnsurfacedError() ? getLastError() : null}), so a terminal error * latched by the I/O thread between the reads was mis-captured as already-owned and * silently dropped on close(). The fix is the single read - * {@link CursorWebSocketSendLoop#getSynchronouslySurfacedError()}; this test races that - * read against real latch transitions and fails on any torn snapshot. + * {@link CursorWebSocketSendLoop#getSynchronouslySurfacedError()}. + *

+ * Scope: this test races that accessor and fails if its implementation ever recomputes + * the snapshot from two reads. It does not execute {@code QwpWebSocketSender.close()} + * itself — old and new snapshots agree in every non-racy state, so no black-box test + * at the sender level can detect a callsite that bypasses the accessor; that half of + * the contract is pinned by the "must stay this single read" comments at the callsite + * and on the accessor. */ public class CloseOwnershipRaceTest { diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java index 56ae0577..d238da42 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java @@ -67,31 +67,45 @@ public void testCheckErrorRethrowsLineSenderException() throws Exception { } Assert.assertSame("checkError must mark the exact latched throwable as user-owned", original, loop.getSynchronouslySurfacedError()); - Assert.assertFalse("a synchronously surfaced latch is no longer unsurfaced", - loop.hasUnsurfacedError()); + // a synchronously surfaced latch is owned -- the close() safety net + // must stay silent now + loop.checkUnsurfacedError(); } @Test - public void testCheckErrorWrapsNonLineSenderThrowable() throws Exception { - // For non-LineSenderException throwables (NPE, IOException, etc.), - // checkError wraps in a fresh LineSenderException with the original - // as cause so producers always see one exception type. + public void testRecordFatalWrapsNonLineSenderThrowableOnce() throws Exception { + // Non-LineSenderException causes (NPE, IOException, etc.) are wrapped + // in a LineSenderException ONCE, at latch time -- so producers always + // see one exception type AND every rethrow delivers the same instance, + // which close() relies on to suppress double-signals by identity. CursorWebSocketSendLoop loop = newBareLoop(); Throwable raw = new RuntimeException("oh no"); - setField(loop, "lastError", raw); + invokeRecordFatal(loop, raw); Assert.assertNull(loop.getSynchronouslySurfacedError()); + LineSenderException first = null; try { loop.checkError(); Assert.fail("expected throw"); } catch (LineSenderException thrown) { Assert.assertNotSame(raw, thrown); - Assert.assertEquals(raw, thrown.getCause()); + Assert.assertSame(raw, thrown.getCause()); Assert.assertTrue(thrown.getMessage().contains("oh no")); + first = thrown; + } + Assert.assertSame("the latch must hold the wrapper, not the raw cause", + first, loop.getLastError()); + Assert.assertSame("ownership tracks the latched wrapper", + first, loop.getSynchronouslySurfacedError()); + loop.checkUnsurfacedError(); // owned -> silent + + try { + loop.checkError(); + Assert.fail("expected throw"); + } catch (LineSenderException thrownAgain) { + Assert.assertSame("every rethrow must deliver the same instance", + first, thrownAgain); } - Assert.assertSame("wrapped throwables are owned by their latched source instance", - raw, loop.getSynchronouslySurfacedError()); - Assert.assertFalse(loop.hasUnsurfacedError()); } @Test @@ -101,6 +115,33 @@ public void testCheckErrorIsNoopWhenNoLatch() throws Exception { loop.checkError(); // must not throw } + @Test + public void testCheckUnsurfacedErrorThrowsOnceThenStaysSilent() throws Exception { + // The close() safety net: an unowned latch must rethrow exactly like + // checkError; once any synchronous caller has owned the error, it + // must stay silent -- unlike checkError, which keeps throwing. + CursorWebSocketSendLoop loop = newBareLoop(); + loop.checkUnsurfacedError(); // no latch -> silent + + LineSenderException e = new LineSenderException("wire fail"); + setField(loop, "lastError", e); + try { + loop.checkUnsurfacedError(); + Assert.fail("an unowned latch must rethrow from the safety net"); + } catch (LineSenderException thrown) { + Assert.assertSame(e, thrown); + } + + loop.checkUnsurfacedError(); // the throw above made the caller owner -> silent + + try { + loop.checkError(); + Assert.fail("checkError must keep rethrowing a latched error"); + } catch (LineSenderException thrown) { + Assert.assertSame(e, thrown); + } + } + @Test public void testGetLastErrorReturnsLatchedThrowable() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); @@ -123,10 +164,10 @@ public void testRecordFatalLatchesThrowableOnly() throws Exception { setField(loop, "running", true); Throwable e = new LineSenderException("wire fail"); - invokeRecordFatal(loop, e, null); + invokeRecordFatal(loop, e); Assert.assertSame(e, loop.getLastError()); - Assert.assertNull("typed payload must be null when recordFatal called without one", + Assert.assertNull("typed payload must be null for a wire-level fatal", loop.getLastTerminalServerError()); Assert.assertFalse("recordFatal must stop the loop", (Boolean) getField(loop, "running")); @@ -139,10 +180,11 @@ public void testRecordFatalLatchesBothThrowableAndSenderError() throws Exception SenderError err = newSenderError(); LineSenderServerException ex = new LineSenderServerException(err); - invokeRecordFatal(loop, ex, err); + invokeRecordFatal(loop, ex); Assert.assertSame(ex, loop.getLastError()); - Assert.assertSame(err, loop.getLastTerminalServerError()); + Assert.assertSame("typed payload is derived from the latched LineSenderServerException", + err, loop.getLastTerminalServerError()); Assert.assertFalse((Boolean) getField(loop, "running")); } @@ -150,13 +192,13 @@ public void testRecordFatalLatchesBothThrowableAndSenderError() throws Exception public void testRecordFatalIsIdempotent() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); setField(loop, "running", true); - Throwable first = new LineSenderException("first"); - Throwable second = new LineSenderException("second"); SenderError firstErr = newSenderError(); SenderError secondErr = newSenderError(); + LineSenderServerException first = new LineSenderServerException(firstErr); + LineSenderServerException second = new LineSenderServerException(secondErr); - invokeRecordFatal(loop, first, firstErr); - invokeRecordFatal(loop, second, secondErr); + invokeRecordFatal(loop, first); + invokeRecordFatal(loop, second); // Only the first failure latches — subsequent calls must not // overwrite, otherwise a follow-on cascade would mask the original @@ -199,11 +241,11 @@ private static Object getField(Object target, String name) throws Exception { return f.get(target); } - private static void invokeRecordFatal(CursorWebSocketSendLoop loop, Throwable t, SenderError err) + private static void invokeRecordFatal(CursorWebSocketSendLoop loop, Throwable t) throws Exception { Method m = CursorWebSocketSendLoop.class.getDeclaredMethod( - "recordFatal", Throwable.class, SenderError.class); + "recordFatal", Throwable.class); m.setAccessible(true); - m.invoke(loop, t, err); + m.invoke(loop, t); } } From 26a9685841a31a2bd7a2f6c375927a1afab79ebb Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 11 Jun 2026 18:22:17 +0200 Subject: [PATCH 4/6] Pin both branches of close()'s safety-net rethrow No sender-level test asserted WHETHER close() throws: the existing close assertions (InitialConnectAsyncTest) install a handler and tolerate both outcomes, so deleting the safety net, inverting its handler gate, or always-suppressing the snapshot all survived the suite. CloseSafetyNetTest adds the strict pair, each awaiting the latched terminal deterministically before close(): - a config-string-only caller who never flushed must get the typed terminal thrown from close(); - a caller whose custom handler already received the error must get a silent close(). All three mutations above now fail the suite. The snapshot race itself stays covered by CloseOwnershipRaceTest at the accessor. Co-Authored-By: Claude Fable 5 --- .../qwp/client/CloseSafetyNetTest.java | 151 ++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseSafetyNetTest.java diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseSafetyNetTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseSafetyNetTest.java new file mode 100644 index 00000000..a874063a --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseSafetyNetTest.java @@ -0,0 +1,151 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.cutlass.qwp.client; + +import io.questdb.client.LineSenderServerException; +import io.questdb.client.Sender; +import io.questdb.client.SenderError; +import io.questdb.client.SenderErrorHandler; +import io.questdb.client.cutlass.line.LineSenderException; +import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Pins both branches of {@code QwpWebSocketSender.close()}'s safety-net + * rethrow, strictly — unlike the close assertions in + * {@link InitialConnectAsyncTest}, which tolerate either outcome: + *

    + *
  • a config-string-only caller (no custom error handler) who never calls + * flush() MUST see the latched terminal thrown from close() — close() is the + * only channel left;
  • + *
  • a caller whose custom error handler already received the terminal MUST + * NOT have it thrown again from close() — that would double-signal an error + * the user already handled and mask try-with-resources cleanup.
  • + *
+ * Both tests latch the terminal deterministically (await it) before calling + * close(), so they pin the safety net's logic, not the snapshot race — + * {@code CloseOwnershipRaceTest} covers that separately. + */ +public class CloseSafetyNetTest { + + @Rule + public final TemporaryFolder sfDir = TemporaryFolder.builder().assureDeletion().build(); + + @Test(timeout = 30_000) + public void testCloseRethrowsUnsurfacedTerminalWithoutCustomHandler() throws Exception { + // No server, no handler, tight reconnect budget: the I/O thread + // latches a never-connected budget-exhaustion terminal that nothing + // has surfaced to the user. close() must throw it. + Sender sender = Sender.fromConfig(cfg()); + boolean closed = false; + try { + awaitLatchedTerminal((QwpWebSocketSender) sender); + try { + closed = true; + sender.close(); + Assert.fail("close() must rethrow a terminal error that no synchronous " + + "caller and no custom handler has seen"); + } catch (LineSenderException e) { + String msg = e.getMessage() == null ? "" : e.getMessage(); + Assert.assertTrue("close() must rethrow the latched terminal: " + msg, + msg.contains("never-connected-budget-exhausted")); + Assert.assertTrue("the latched instance is the typed server exception", + e instanceof LineSenderServerException); + } + } finally { + if (!closed) { + sender.close(); + } + } + } + + @Test(timeout = 30_000) + public void testCloseStaysSilentWhenCustomHandlerAlreadyDelivered() throws Exception { + // Same terminal, but the user installed a custom error handler and + // the dispatcher delivered the error to it. close() must NOT + // double-signal. + ErrorInbox inbox = new ErrorInbox(); + Sender sender = Sender.builder(cfg()) + .errorHandler(inbox) + .build(); + Assert.assertTrue("terminal must reach the custom handler within 10s", + inbox.await(10, TimeUnit.SECONDS)); + Assert.assertNotNull(inbox.get()); + // The handler owns the error now; a rethrow here would double-signal. + sender.close(); + } + + /** + * Awaits the I/O thread's terminal latch via the read-only ops probe. + * getLastTerminalError() does not mark the error as surfaced, so the + * "no synchronous caller has seen it" precondition stays intact. + */ + private static void awaitLatchedTerminal(QwpWebSocketSender sender) { + long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); + while (sender.getLastTerminalError() == null) { + if (System.nanoTime() > deadlineNanos) { + throw new AssertionError("I/O thread did not latch a terminal within 10s"); + } + Thread.onSpinWait(); + } + } + + private String cfg() { + return "ws::addr=localhost:" + TestPorts.findUnusedPort() + + ";sf_dir=" + sfDir.getRoot().getAbsolutePath() + + ";initial_connect_retry=async" + + ";reconnect_max_duration_millis=400" + + ";reconnect_initial_backoff_millis=10" + + ";reconnect_max_backoff_millis=50" + + ";close_flush_timeout_millis=0;"; + } + + private static class ErrorInbox implements SenderErrorHandler { + private final CountDownLatch latch = new CountDownLatch(1); + private final AtomicReference ref = new AtomicReference<>(); + + boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return latch.await(timeout, unit); + } + + SenderError get() { + return ref.get(); + } + + @Override + public void onError(SenderError err) { + if (ref.compareAndSet(null, err)) { + latch.countDown(); + } + } + } +} From 7a719efd8ddc1375b86fc63e29b91739684a6b20 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 11 Jun 2026 18:23:10 +0200 Subject: [PATCH 5/6] style --- .../sf/cursor/CursorWebSocketSendLoop.java | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java index 89179e96..1f33ed4c 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java @@ -88,14 +88,22 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { */ public static final long DEFAULT_DURABLE_ACK_KEEPALIVE_INTERVAL_MILLIS = 200L; public static final long DEFAULT_PARK_NANOS = 50_000L; // 50us idle backoff - /** Default initial reconnect backoff (100 ms). */ + /** + * Default initial reconnect backoff (100 ms). + */ public static final long DEFAULT_RECONNECT_INITIAL_BACKOFF_MILLIS = 100L; - /** Default reconnect max backoff (5 s). */ + /** + * Default reconnect max backoff (5 s). + */ public static final long DEFAULT_RECONNECT_MAX_BACKOFF_MILLIS = 5_000L; - /** Default per-outage reconnect time cap (5 min). */ + /** + * Default per-outage reconnect time cap (5 min). + */ public static final long DEFAULT_RECONNECT_MAX_DURATION_MILLIS = 300_000L; private static final Logger LOG = LoggerFactory.getLogger(CursorWebSocketSendLoop.class); - /** Throttle "reconnect attempt N failed" WARN logs to one per 5 s. */ + /** + * Throttle "reconnect attempt N failed" WARN logs to one per 5 s. + */ private static final long RECONNECT_LOG_THROTTLE_NANOS = 5_000_000_000L; // Pre-converted to nanos for the comparison in sendDurableAckKeepaliveIfDue. // Zero or negative disables the keepalive entirely. @@ -170,10 +178,6 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // directly to the same dispatcher from QwpWebSocketSender. private volatile SenderConnectionDispatcher connectionDispatcher; private volatile SenderErrorDispatcher errorDispatcher; - // Exact lastError instance that checkError() has thrown to a synchronous - // user-thread caller (flush/append/close). close() uses the instance so it - // only suppresses errors the user already owned before close() began. - private volatile LineSenderException synchronouslySurfacedError; // The send cursor has two coordinate systems: // // FSN: durable frame sequence number in the local cursor engine. This is @@ -224,6 +228,10 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // at engine.activeSegment(); advances to newer sealed segments / the new // active as the producer rotates. private MmapSegment sendingSegment; + // Exact lastError instance that checkError() has thrown to a synchronous + // user-thread caller (flush/append/close). close() uses the instance so it + // only suppresses errors the user already owned before close() began. + private volatile LineSenderException synchronouslySurfacedError; /** * Full constructor with explicit reconnect-policy knobs. When @@ -315,7 +323,9 @@ public CursorWebSocketSendLoop(WebSocketClient client, CursorSendEngine engine, this.hasEverConnected = client != null; } - /** Maps a server status byte to a {@link SenderError.Category}. Exposed for unit tests. */ + /** + * Maps a server status byte to a {@link SenderError.Category}. Exposed for unit tests. + */ @TestOnly public static SenderError.Category classify(byte status) { switch (status) { @@ -373,7 +383,7 @@ public static WebSocketClient connectWithRetry( return c; } } catch (QwpAuthFailedException | QwpDurableAckMismatchException - | WebSocketUpgradeException e) { + | WebSocketUpgradeException e) { // Terminal across all configured endpoints per sf-client.md sections // 8.1 (durable-ack mismatch) and 13.3 (auth). Version mismatch is // NOT terminal here -- it falls through to the Throwable branch and @@ -605,7 +615,9 @@ public long getTotalFramesSent() { return totalFramesSent.get(); } - /** Total reconnect attempts (succeeded + failed). */ + /** + * Total reconnect attempts (succeeded + failed). + */ public long getTotalReconnectAttempts() { return totalReconnectAttempts.get(); } @@ -771,7 +783,7 @@ private void applyDurableAck() { */ private void attemptInitialConnect() { connectLoop(new LineSenderException( - "async initial connect deferred to I/O thread"), + "async initial connect deferred to I/O thread"), "initial connect"); } @@ -1406,7 +1418,9 @@ boolean isDurableUnder(CharSequenceLongHashMap watermarks) { } } - /** Inner ACK handler — parses the binary frame, calls engine.acknowledge. */ + /** + * Inner ACK handler — parses the binary frame, calls engine.acknowledge. + */ private final class ResponseHandler implements WebSocketFrameHandler { @Override public void onBinaryMessage(long payloadPtr, int payloadLen) { From 5b7afcd4df252e51e7db2a40862f52ecb06ba5e3 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 11 Jun 2026 21:56:36 +0200 Subject: [PATCH 6/6] Rename the error latch field to terminalError MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit lastError suggested most-recent-wins, but the field is a write-once first-writer-wins latch: transient failures reconnect upstream and never reach it; only the error that ends the loop is latched. The name now matches the documented invariant and the "latched terminal" language used throughout. recordFatal's javadoc also states the single-writer rule explicitly: the unsynchronized check-then-latch is only safe because every caller runs on the I/O thread. The retry-loop locals keep the lastError name — there it is accurate. Co-Authored-By: Claude Fable 5 --- .../qwp/client/QwpWebSocketSender.java | 2 +- .../sf/cursor/CursorWebSocketSendLoop.java | 50 ++++++++++++------- .../IoThreadErrorSurfacedOnRowApiTest.java | 2 +- .../qwp/client/PrReviewRedTestsE2e.java | 2 +- .../sf/cursor/CloseOwnershipRaceTest.java | 2 +- ...CursorWebSocketSendLoopErrorLatchTest.java | 26 +++++----- 6 files changed, 48 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index beec7512..545b848a 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -2563,7 +2563,7 @@ private void checkConnectionError() { error.fillInStackTrace(); throw error; } - // Poll the cursor I/O loop's lastError too. Without this, a fatal + // Poll the cursor I/O loop's terminalError too. Without this, a fatal // wire / server-rejection error recorded by the I/O thread would // only surface on the next flush() / close() — every row-level // method (table, longColumn, atNow, etc.) routes through diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java index 1f33ed4c..77db27c8 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java @@ -71,7 +71,7 @@ * into the engine; this thread reads. {@code engine.publishedFsn()} is * the volatile publish barrier. *

- * Errors are reported via {@link #getLastError()}; the I/O thread sets it + * Errors are reported via {@link #getTerminalError()}; the I/O thread sets it * and exits. Producers polling {@link #checkError()} surface the failure. */ public final class CursorWebSocketSendLoop implements QuietCloseable { @@ -200,10 +200,14 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { private volatile boolean hasEverConnected; private volatile Thread ioThread; // The latched terminal failure — THE exception every checkError() call - // rethrows. Non-LineSenderException causes are wrapped once at latch time - // (recordFatal), so rethrows always deliver the same instance and close() - // can suppress double-signals by identity. - private volatile LineSenderException lastError; + // rethrows. Write-once for the loop's lifetime: the only writer is + // recordFatal on the I/O thread (first-writer-wins). The whole + // close()-ownership protocol rests on that — the identity comparisons + // in hasUnsurfacedError() and in close()'s suppression are only + // meaningful because the latched instance never changes. + // Non-LineSenderException causes are wrapped once at latch time, so + // rethrows always deliver the same instance. + private volatile LineSenderException terminalError; // Wall clock of the last outbound activity on the wire -- a sent frame // (trySendOne) or a keepalive PING (sendDurableAckKeepaliveIfDue). // Throttles the durable-ack keepalive PING so it fires only when the @@ -228,7 +232,7 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // at engine.activeSegment(); advances to newer sealed segments / the new // active as the producer rotates. private MmapSegment sendingSegment; - // Exact lastError instance that checkError() has thrown to a synchronous + // Exact terminalError instance that checkError() has thrown to a synchronous // user-thread caller (flush/append/close). close() uses the instance so it // only suppresses errors the user already owned before close() began. private volatile LineSenderException synchronouslySurfacedError; @@ -476,7 +480,7 @@ public static boolean isTerminalCloseCode(int code) { * an error is set the loop has already exited. */ public void checkError() { - LineSenderException e = lastError; + LineSenderException e = terminalError; if (e != null) { synchronouslySurfacedError = e; throw e; @@ -544,10 +548,6 @@ public synchronized void close() { } } - public Throwable getLastError() { - return lastError; - } - /** * Typed server-rejection payload of the latched terminal error, or * {@code null} when the loop latched a wire-level failure (or nothing). @@ -555,7 +555,7 @@ public Throwable getLastError() { * as a {@link LineSenderServerException} carrying its {@link SenderError}. */ public SenderError getLastTerminalServerError() { - LineSenderException e = lastError; + LineSenderException e = terminalError; return e instanceof LineSenderServerException ? ((LineSenderServerException) e).getServerError() : null; } @@ -568,7 +568,7 @@ public SenderError getLastTerminalServerError() { * This is the single read {@code QwpWebSocketSender.close()} uses to learn * which terminal error the user already owns. The ownership decision must * be taken from this one field only: deriving it from two separate latch - * reads (e.g. an unsurfaced-check followed by {@link #getLastError()}) + * reads (e.g. an unsurfaced-check followed by {@link #getTerminalError()}) * races the I/O thread — a terminal latched between the reads gets * mis-captured as already-owned and is then silently dropped on close(). * Guarded by {@code CloseOwnershipRaceTest}. @@ -577,6 +577,14 @@ public Throwable getSynchronouslySurfacedError() { return synchronouslySurfacedError; } + /** + * The latched terminal failure, or {@code null} while the loop is + * healthy. Read-only — does not mark the error as surfaced. + */ + public Throwable getTerminalError() { + return terminalError; + } + public long getTotalAcks() { return totalAcks.get(); } @@ -1072,14 +1080,14 @@ private void fail(Throwable initial) { } /** - * True when {@link #lastError} is set AND no synchronous user-thread + * True when {@link #terminalError} is set AND no synchronous user-thread * caller has yet seen that same instance via {@link #checkError()}. * The {@link #checkUnsurfacedError()} safety net composes this with - * checkError(); reads {@code lastError} once so the comparison cannot + * checkError(); reads {@code terminalError} once so the comparison cannot * tear against a concurrent latch. */ private boolean hasUnsurfacedError() { - Throwable e = lastError; + Throwable e = terminalError; return e != null && synchronouslySurfacedError != e; } @@ -1186,13 +1194,17 @@ private void positionCursorInSegment(MmapSegment seg, long targetFsn) { * Mark the loop as fatally failed. Caller has decided no reconnect * is possible (or it ran out of budget) — latch the error so * {@link #checkError} can surface it to the producer thread, then - * stop the loop. Idempotent — only the first failure latches. + * stop the loop. First-writer-wins: only the first failure latches. + * The check-then-latch is unsynchronized and is safe ONLY because + * every caller runs on the I/O thread (connectLoop and the + * receive-path rejection handlers are all pumped by ioLoop); calling + * this from any other thread would be a lost-update race. * Non-{@link LineSenderException} causes are wrapped once here, so * every rethrow delivers the same instance. */ private void recordFatal(Throwable t) { - if (lastError == null) { - lastError = t instanceof LineSenderException + if (terminalError == null) { + terminalError = t instanceof LineSenderException ? (LineSenderException) t : new LineSenderException("I/O thread failed: " + t.getMessage(), t); } diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/IoThreadErrorSurfacedOnRowApiTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/IoThreadErrorSurfacedOnRowApiTest.java index 3300ae86..2f5a30a8 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/IoThreadErrorSurfacedOnRowApiTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/IoThreadErrorSurfacedOnRowApiTest.java @@ -80,7 +80,7 @@ public void testRowApiMethodSurfacesIoThreadTerminalError() throws Exception { sender.flush(); // Wait for the I/O thread to record the error. After this, - // cursorSendLoop.lastError is populated and the loop has + // cursorSendLoop.terminalError is populated and the loop has // exited. QwpWebSocketSender wss = (QwpWebSocketSender) sender; long deadline = System.currentTimeMillis() + 3_000L; diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/PrReviewRedTestsE2e.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/PrReviewRedTestsE2e.java index e3f80e96..669aaf23 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/PrReviewRedTestsE2e.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/PrReviewRedTestsE2e.java @@ -66,7 +66,7 @@ public class PrReviewRedTestsE2e { *

* Concrete consequence the spec calls out: a user-supplied error handler * that synchronously calls {@code sender.flush()} from inside - * {@code onError} can observe {@code lastError == null} and pass — + * {@code onError} can observe {@code terminalError == null} and pass — * landing post-HALT bytes in the engine. *

* This test asserts the spec invariant directly: by the time the diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java index 5f8a0e8d..f4cbffd1 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java @@ -78,7 +78,7 @@ public void closeOwnershipSnapshotNeverClaimsAnUnsurfacedError() { // code it reads a field nobody here writes — it cannot flake; // a reintroduced two-read snapshot tears when the latch lands // between its reads, with near-certainty across all rounds. - while (leaked == null && loop.getLastError() == null) { + while (leaked == null && loop.getTerminalError() == null) { leaked = loop.getSynchronouslySurfacedError(); } loop.close(); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java index d238da42..80c23ac2 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java @@ -37,7 +37,7 @@ /** * Pinpointed tests for the latched-error contract on {@link CursorWebSocketSendLoop}: - * {@code recordFatal} → {@link CursorWebSocketSendLoop#getLastError} + + * {@code recordFatal} → {@link CursorWebSocketSendLoop#getTerminalError} + * {@link CursorWebSocketSendLoop#getLastTerminalServerError} + * {@link CursorWebSocketSendLoop#checkError}. Bypasses the constructor entirely * via {@code Unsafe.allocateInstance} to avoid the live wire/engine dependencies @@ -53,7 +53,7 @@ public void testCheckErrorRethrowsLineSenderException() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); SenderError err = newSenderError(); LineSenderServerException original = new LineSenderServerException(err); - setField(loop, "lastError", original); + setField(loop, "terminalError", original); Assert.assertNull(loop.getSynchronouslySurfacedError()); try { @@ -94,7 +94,7 @@ public void testRecordFatalWrapsNonLineSenderThrowableOnce() throws Exception { first = thrown; } Assert.assertSame("the latch must hold the wrapper, not the raw cause", - first, loop.getLastError()); + first, loop.getTerminalError()); Assert.assertSame("ownership tracks the latched wrapper", first, loop.getSynchronouslySurfacedError()); loop.checkUnsurfacedError(); // owned -> silent @@ -111,7 +111,7 @@ public void testRecordFatalWrapsNonLineSenderThrowableOnce() throws Exception { @Test public void testCheckErrorIsNoopWhenNoLatch() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); - Assert.assertNull(loop.getLastError()); + Assert.assertNull(loop.getTerminalError()); loop.checkError(); // must not throw } @@ -124,7 +124,7 @@ public void testCheckUnsurfacedErrorThrowsOnceThenStaysSilent() throws Exception loop.checkUnsurfacedError(); // no latch -> silent LineSenderException e = new LineSenderException("wire fail"); - setField(loop, "lastError", e); + setField(loop, "terminalError", e); try { loop.checkUnsurfacedError(); Assert.fail("an unowned latch must rethrow from the safety net"); @@ -143,18 +143,18 @@ public void testCheckUnsurfacedErrorThrowsOnceThenStaysSilent() throws Exception } @Test - public void testGetLastErrorReturnsLatchedThrowable() throws Exception { + public void testGetTerminalErrorReturnsLatchedThrowable() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); Throwable e = new LineSenderException("boom"); - setField(loop, "lastError", e); - Assert.assertSame(e, loop.getLastError()); + setField(loop, "terminalError", e); + Assert.assertSame(e, loop.getTerminalError()); } @Test - public void testGetLastErrorIsNullBeforeAnyFailure() throws Exception { + public void testGetTerminalErrorIsNullBeforeAnyFailure() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); Assert.assertNull("loops with no latched error must report null", - loop.getLastError()); + loop.getTerminalError()); } @Test @@ -166,7 +166,7 @@ public void testRecordFatalLatchesThrowableOnly() throws Exception { invokeRecordFatal(loop, e); - Assert.assertSame(e, loop.getLastError()); + Assert.assertSame(e, loop.getTerminalError()); Assert.assertNull("typed payload must be null for a wire-level fatal", loop.getLastTerminalServerError()); Assert.assertFalse("recordFatal must stop the loop", @@ -182,7 +182,7 @@ public void testRecordFatalLatchesBothThrowableAndSenderError() throws Exception invokeRecordFatal(loop, ex); - Assert.assertSame(ex, loop.getLastError()); + Assert.assertSame(ex, loop.getTerminalError()); Assert.assertSame("typed payload is derived from the latched LineSenderServerException", err, loop.getLastTerminalServerError()); Assert.assertFalse((Boolean) getField(loop, "running")); @@ -204,7 +204,7 @@ public void testRecordFatalIsIdempotent() throws Exception { // overwrite, otherwise a follow-on cascade would mask the original // root cause. Assert.assertSame("first throwable must remain latched", - first, loop.getLastError()); + first, loop.getTerminalError()); Assert.assertSame("first SenderError must remain latched", firstErr, loop.getLastTerminalServerError()); }