Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion core/src/main/java/io/questdb/client/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -469,11 +469,21 @@ default Sender decimalColumn(CharSequence name, CharSequence value) {
/**
* Convenience: flush every buffered row and block until the server has
* acknowledged the resulting frame, or until {@code timeoutMillis} elapses.
* Equivalent to {@code awaitAckedFsn(flushAndGetSequence(), timeoutMillis)},
* <br>
* The default implementation is equivalent to
* {@code awaitAckedFsn(flushAndGetSequence(), timeoutMillis)},
* which is the same shape as the implicit drain {@link #close()} runs --
* with the caller controlling the timeout per call-site rather than
* relying on the builder-time {@code close_flush_timeout_millis}.
* <br>
* <b>Note:</b> Implementations that support frame-level acknowledgements
* (e.g. WebSocket QWP) may override this method with <i>watermark
* semantics</i>: flushing first, then waiting for the current global
* published FSN rather than the per-call value from
* {@code flushAndGetSequence()}. This ensures that {@code drain()} after
* a previous publish with unacknowledged frames still blocks until those
* frames are acknowledged, even when the current flush publishes nothing.
* <br>
* Returns immediately on transports that do not track frame sequence
* numbers ({@code HTTP}, {@code TCP}, {@code UDP}): the flush still
* happens, the wait is a no-op, and the return value is {@code true}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,8 @@ public long flushAndGetSequence() {
ensureNoInProgressRow();
ensureConnected();

long beforeFsn = cursorEngine != null ? cursorEngine.publishedFsn() : -1L;

// Cursor SF: SF.append happens on the user thread inside
// sealAndSwapBuffer, so by the time we reach here every encoded
// batch is durable on its mmap'd segment. No processingCount to
Expand All @@ -1354,7 +1356,40 @@ public long flushAndGetSequence() {
}
cursorSendLoop.checkError();
checkConnectionError();
return cursorEngine != null ? cursorEngine.publishedFsn() : -1L;

long afterFsn = cursorEngine != null ? cursorEngine.publishedFsn() : -1L;
return afterFsn > beforeFsn ? afterFsn : -1L;
}

/**
* Flushes pending rows and blocks until the server has acknowledged
* every frame published so far (the current published-FSN watermark),
* or until {@code timeoutMillis} elapses.
* <p>
* This override uses <b>watermark semantics</b> rather than per-call
* semantics: it waits for the global {@code publishedFsn()}, not just
* the FSN returned by the flush in this call. This is necessary because
* {@link #flushAndGetSequence()} now returns {@code -1} when no data
* was published by the call, and the default {@link Sender#drain}
* implementation ({@code awaitAckedFsn(flushAndGetSequence(), timeout)})
* would short-circuit immediately on an empty flush even when prior
* publishes remain unacknowledged.
* <p>
* Close-time drain ({@link #drainOnClose()}) already uses the same
* watermark approach directly.
*
* @param timeoutMillis upper bound on the wait; {@code <= 0} returns
* the current state without blocking (the flush
* still happens before the check)
* @return {@code true} if the server has acknowledged every published
* frame on return, {@code false} on timeout
* @throws LineSenderException if the transport has latched a terminal error
*/
@Override
public boolean drain(long timeoutMillis) {
flush();
long targetFsn = cursorEngine != null ? cursorEngine.publishedFsn() : -1L;
return awaitAckedFsn(targetFsn, timeoutMillis);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,47 @@ public void testDrainBlocksUntilAckArrivesAndReturnsTrue() throws Exception {
}
}

/**
* Regression test for #7142: drain() after a prior flush() with unacked
* frames must block for those frames, even though the inner
* flushAndGetSequence() publishes nothing and returns -1.
* <p>
* On buggy code (no drain() override): drain() calls the default
* Sender.drain() → flushAndGetSequence() returns -1 → awaitAckedFsn(-1, ...)
* returns true immediately (ackedFsn >= -1 is always true) at elapsed≈0ms.
* The elapsed >= 300 assertion fails deterministically.
* <p>
* On fixed code: drain() uses the watermark override → waits for the
* delayed ACK (~600ms) → passes.
*/
@Test
public void testDrainAfterFlushWaitsForPriorUnackedFrames() throws Exception {
int port = TestPorts.findUnusedPort();
long ackDelayMs = 600;
DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs);
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

String cfg = "ws::addr=localhost:" + port + ";";
try (Sender sender = Sender.fromConfig(cfg)) {
sender.table("foo").longColumn("v", 1L).atNow();
sender.flush(); // publish FSN 0; ACK delayed ~600ms

long t0 = System.nanoTime();
boolean drained = sender.drain(5_000); // empty flush → -1 on buggy code
long elapsedMs = (System.nanoTime() - t0) / 1_000_000;

Assert.assertTrue("drain() must return true when ACK arrives within budget",
drained);
Assert.assertTrue(
"drain() must wait for prior unacked frame, but returned in only "
+ elapsedMs + "ms (expected >= " + (ackDelayMs / 2) + "ms)",
elapsedMs >= ackDelayMs / 2);
}
}
}

@Test
public void testDrainReturnsFalseOnTimeoutAndSenderStillUsable() throws Exception {
// Server never ACKs. drain() with a small timeout must return false
Expand Down
Loading