diff --git a/core/src/main/java/io/questdb/client/Sender.java b/core/src/main/java/io/questdb/client/Sender.java index dccefb86..8e9513b1 100644 --- a/core/src/main/java/io/questdb/client/Sender.java +++ b/core/src/main/java/io/questdb/client/Sender.java @@ -469,11 +469,21 @@ default Sender decimalColumn(CharSequence name, CharSequence value) { /** * Convenience: flush every buffered row and block until the server has * acknowledged the resulting frame, or until {@code timeoutMillis} elapses. - * Equivalent to {@code awaitAckedFsn(flushAndGetSequence(), timeoutMillis)}, + *
+ * The default implementation is equivalent to + * {@code awaitAckedFsn(flushAndGetSequence(), timeoutMillis)}, * which is the same shape as the implicit drain {@link #close()} runs -- * with the caller controlling the timeout per call-site rather than * relying on the builder-time {@code close_flush_timeout_millis}. *
+ * Note: Implementations that support frame-level acknowledgements + * (e.g. WebSocket QWP) may override this method with watermark + * semantics: flushing first, then waiting for the current global + * published FSN rather than the per-call value from + * {@code flushAndGetSequence()}. This ensures that {@code drain()} after + * a previous publish with unacknowledged frames still blocks until those + * frames are acknowledged, even when the current flush publishes nothing. + *
* Returns immediately on transports that do not track frame sequence * numbers ({@code HTTP}, {@code TCP}, {@code UDP}): the flush still * happens, the wait is a no-op, and the return value is {@code true}. diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index c8af5fc5..80f5d662 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -1341,6 +1341,8 @@ public long flushAndGetSequence() { ensureNoInProgressRow(); ensureConnected(); + long beforeFsn = cursorEngine != null ? cursorEngine.publishedFsn() : -1L; + // Cursor SF: SF.append happens on the user thread inside // sealAndSwapBuffer, so by the time we reach here every encoded // batch is durable on its mmap'd segment. No processingCount to @@ -1354,7 +1356,40 @@ public long flushAndGetSequence() { } cursorSendLoop.checkError(); checkConnectionError(); - return cursorEngine != null ? cursorEngine.publishedFsn() : -1L; + + long afterFsn = cursorEngine != null ? cursorEngine.publishedFsn() : -1L; + return afterFsn > beforeFsn ? afterFsn : -1L; + } + + /** + * Flushes pending rows and blocks until the server has acknowledged + * every frame published so far (the current published-FSN watermark), + * or until {@code timeoutMillis} elapses. + *

+ * This override uses watermark semantics rather than per-call + * semantics: it waits for the global {@code publishedFsn()}, not just + * the FSN returned by the flush in this call. This is necessary because + * {@link #flushAndGetSequence()} now returns {@code -1} when no data + * was published by the call, and the default {@link Sender#drain} + * implementation ({@code awaitAckedFsn(flushAndGetSequence(), timeout)}) + * would short-circuit immediately on an empty flush even when prior + * publishes remain unacknowledged. + *

+ * Close-time drain ({@link #drainOnClose()}) already uses the same + * watermark approach directly. + * + * @param timeoutMillis upper bound on the wait; {@code <= 0} returns + * the current state without blocking (the flush + * still happens before the check) + * @return {@code true} if the server has acknowledged every published + * frame on return, {@code false} on timeout + * @throws LineSenderException if the transport has latched a terminal error + */ + @Override + public boolean drain(long timeoutMillis) { + flush(); + long targetFsn = cursorEngine != null ? cursorEngine.publishedFsn() : -1L; + return awaitAckedFsn(targetFsn, timeoutMillis); } /** diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java index d4a77713..84068ac4 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java @@ -210,6 +210,47 @@ public void testDrainBlocksUntilAckArrivesAndReturnsTrue() throws Exception { } } + /** + * Regression test for #7142: drain() after a prior flush() with unacked + * frames must block for those frames, even though the inner + * flushAndGetSequence() publishes nothing and returns -1. + *

+ * On buggy code (no drain() override): drain() calls the default + * Sender.drain() → flushAndGetSequence() returns -1 → awaitAckedFsn(-1, ...) + * returns true immediately (ackedFsn >= -1 is always true) at elapsed≈0ms. + * The elapsed >= 300 assertion fails deterministically. + *

+ * On fixed code: drain() uses the watermark override → waits for the + * delayed ACK (~600ms) → passes. + */ + @Test + public void testDrainAfterFlushWaitsForPriorUnackedFrames() throws Exception { + int port = TestPorts.findUnusedPort(); + long ackDelayMs = 600; + DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs); + try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String cfg = "ws::addr=localhost:" + port + ";"; + try (Sender sender = Sender.fromConfig(cfg)) { + sender.table("foo").longColumn("v", 1L).atNow(); + sender.flush(); // publish FSN 0; ACK delayed ~600ms + + long t0 = System.nanoTime(); + boolean drained = sender.drain(5_000); // empty flush → -1 on buggy code + long elapsedMs = (System.nanoTime() - t0) / 1_000_000; + + Assert.assertTrue("drain() must return true when ACK arrives within budget", + drained); + Assert.assertTrue( + "drain() must wait for prior unacked frame, but returned in only " + + elapsedMs + "ms (expected >= " + (ackDelayMs / 2) + "ms)", + elapsedMs >= ackDelayMs / 2); + } + } + } + @Test public void testDrainReturnsFalseOnTimeoutAndSenderStillUsable() throws Exception { // Server never ACKs. drain() with a small timeout must return false