diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java index 4a32b1f2..7834511d 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java @@ -73,12 +73,12 @@ public void testFullyAckedActiveDoesNotReplayAfterCleanRestart() throws Exceptio // Phase 1: server ACKs every frame. Sender writes a few rows, // flushes, then close() blocks for the default 5s drain — by the // time close returns, every frame has been ACK'd. - int port1 = TestPorts.findUnusedPort(); AckHandler ack1 = new AckHandler(); - try (TestWebSocketServer s1 = new TestWebSocketServer(port1, ack1)) { + try (TestWebSocketServer s1 = new TestWebSocketServer(ack1)) { s1.start(); Assert.assertTrue(s1.awaitStart(5, TimeUnit.SECONDS)); + int port1 = s1.getPort(); String cfg1 = "ws::addr=localhost:" + port1 + ";sf_dir=" + sfDir + ";"; try (Sender sender = Sender.fromConfig(cfg1)) { @@ -105,12 +105,12 @@ public void testFullyAckedActiveDoesNotReplayAfterCleanRestart() throws Exceptio // SAME slot dir. There is no unacked work — both rings should agree // there's nothing to send. The expected count of binary frames at // server 2 is zero. - int port2 = port1 + 50; AckHandler ack2 = new AckHandler(); - try (TestWebSocketServer s2 = new TestWebSocketServer(port2, ack2)) { + try (TestWebSocketServer s2 = new TestWebSocketServer(ack2)) { s2.start(); Assert.assertTrue(s2.awaitStart(5, TimeUnit.SECONDS)); + int port2 = s2.getPort(); String cfg2 = "ws::addr=localhost:" + port2 + ";sf_dir=" + sfDir + ";"; try (Sender ignored = Sender.fromConfig(cfg2)) { diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java index d4a77713..7f63ab11 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java @@ -54,13 +54,13 @@ public void testCloseBlocksUntilAckArrives() throws Exception { // Server delays every ACK by 800ms. With the default // close_flush_timeout_millis=60000, close() must wait for that // ACK before returning. Pre-fix close() returned within milliseconds. - int port = TestPorts.findUnusedPort(); long ackDelayMs = 800; DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String cfg = "ws::addr=localhost:" + port + ";"; // memory mode long elapsedMs; try (Sender sender = Sender.fromConfig(cfg)) { @@ -82,13 +82,13 @@ public void testCloseFastWhenTimeoutIsZero() throws Exception { // Same delayed-ACK server, but with close_flush_timeout_millis=0 // (fast close). close() must return immediately, well before the // ACK delay would have elapsed. - int port = TestPorts.findUnusedPort(); long ackDelayMs = 1500; DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String cfg = "ws::addr=localhost:" + port + ";close_flush_timeout_millis=0;"; long elapsedMs; @@ -115,13 +115,13 @@ public void testCloseFastWhenTimeoutIsMinusOne() throws Exception { // sentinel in LineSenderBuilder, so the build path silently substitutes // DEFAULT_CLOSE_FLUSH_TIMEOUT_MILLIS (60s) and close() blocks for the // full ACK delay instead of returning fast. - int port = TestPorts.findUnusedPort(); long ackDelayMs = 1500; DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String cfg = "ws::addr=localhost:" + port + ";close_flush_timeout_millis=-1;"; long elapsedMs; @@ -144,13 +144,13 @@ public void testCloseDrainTimesOutWhenAcksNeverArrive() throws Exception { // Server that buffers frames silently and never ACKs. close() must // throw a drain-timeout LineSenderException after roughly the // configured timeout — not hang forever and not return immediately. - int port = TestPorts.findUnusedPort(); long timeoutMs = 500; SilentHandler handler = new SilentHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String cfg = "ws::addr=localhost:" + port + ";close_flush_timeout_millis=" + timeoutMs + ";"; long elapsedMs; @@ -182,13 +182,13 @@ public void testDrainBlocksUntilAckArrivesAndReturnsTrue() throws Exception { // testCloseBlocksUntilAckArrives, but the wait happens inside the // explicit drain() call. The subsequent close() should be a near- // instant no-op because everything is already acked. - int port = TestPorts.findUnusedPort(); long ackDelayMs = 600; DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String cfg = "ws::addr=localhost:" + port + ";"; try (Sender sender = Sender.fromConfig(cfg)) { sender.table("foo").longColumn("v", 1L).atNow(); @@ -218,12 +218,12 @@ public void testDrainReturnsFalseOnTimeoutAndSenderStillUsable() throws Exceptio // usable for further row writes after a false return; the // outstanding frames remain pending and close()'s own drain still // runs. - int port = TestPorts.findUnusedPort(); SilentHandler handler = new SilentHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String cfg = "ws::addr=localhost:" + port + ";close_flush_timeout_millis=0;"; try (Sender sender = Sender.fromConfig(cfg)) { sender.table("foo").longColumn("v", 1L).atNow(); @@ -246,12 +246,12 @@ public void testDrainNonZeroTimeoutOnFastServerReturnsImmediately() throws Excep // Fast server: every frame is acked promptly. drain(longTimeout) // must return true quickly -- no spurious wait when there is // nothing to wait for. - int port = TestPorts.findUnusedPort(); DelayingAckHandler handler = new DelayingAckHandler(0); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String cfg = "ws::addr=localhost:" + port + ";"; try (Sender sender = Sender.fromConfig(cfg)) { sender.table("foo").longColumn("v", 1L).atNow(); @@ -266,9 +266,9 @@ public void testDrainNonZeroTimeoutOnFastServerReturnsImmediately() throws Excep @Test public void testAsyncCloseDrainSucceedsWhenServerStartsDuringDrain() throws Exception { - int port = TestPorts.findUnusedPort(); DelayingAckHandler handler = new DelayingAckHandler(0); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); String cfg = "ws::addr=localhost:" + port + sfDirOpt() + ";initial_connect_retry=async" @@ -303,12 +303,12 @@ public void testAsyncCloseDrainSucceedsWhenServerStartsDuringDrain() throws Exce @Test public void testAsyncCloseDrainSucceedsWhenServerWasUpAllAlong() throws Exception { - int port = TestPorts.findUnusedPort(); DelayingAckHandler handler = new DelayingAckHandler(0); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); for (int i = 0; i < 20; i++) { String cfg = "ws::addr=localhost:" + port + sfDirOpt() diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InitialConnectAsyncTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InitialConnectAsyncTest.java index a864e595..4e28b14b 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InitialConnectAsyncTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InitialConnectAsyncTest.java @@ -60,9 +60,9 @@ public void testAsyncAuthFailureDeliversToErrorInbox() throws Exception { // Server returns HTTP 401 on every upgrade attempt. Auth failures // are terminal at the I/O thread; in async mode they are // delivered as a SenderError, not thrown from fromConfig. - int port = TestPorts.findUnusedPort(); - try (Always401Fixture fixture = new Always401Fixture(port)) { + try (Always401Fixture fixture = new Always401Fixture()) { fixture.start(); + int port = fixture.getPort(); ErrorInbox inbox = new ErrorInbox(); String cfg = "ws::addr=localhost:" + port + sfDirOpt() + ";initial_connect_retry=async" @@ -159,9 +159,9 @@ public void testAsyncDeliversBufferedRowsWhenServerArrivesLate() { // appended to the cursor SF engine on the producer thread. The // I/O thread retries connect in the background; once the server // comes up, the buffered frame is sent and ACKed. - int port = TestPorts.findUnusedPort(); AckHandler handler = new AckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); String cfg = "ws::addr=localhost:" + port + sfDirOpt() + ";initial_connect_retry=async" + ";reconnect_max_duration_millis=10000" @@ -239,9 +239,9 @@ public void testConnectionLostBudgetExhaustionTagsDifferently() { // Because the loop did connect at least once before the outage, // the SenderError must use the connection-lost tag and the sender // must report wasEverConnected()==true. - int port = TestPorts.findUnusedPort(); AckHandler handler = new AckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); server.start(); Assert.assertTrue(server.awaitStart(5, java.util.concurrent.TimeUnit.SECONDS)); @@ -299,8 +299,8 @@ public void testWasEverConnectedTrueImmediatelyInSyncMode() { // caller — there is no observable "never connected" window in // those modes, so misclassifying a budget exhaustion as // never-connected is impossible. - int port = TestPorts.findUnusedPort(); - try (TestWebSocketServer server = new TestWebSocketServer(port, new AckHandler())) { + try (TestWebSocketServer server = new TestWebSocketServer(new AckHandler())) { + int port = server.getPort(); server.start(); Assert.assertTrue(server.awaitStart(5, java.util.concurrent.TimeUnit.SECONDS)); String cfg = "ws::addr=localhost:" + port @@ -446,8 +446,16 @@ private static class Always401Fixture implements AutoCloseable { private Thread acceptThread; private volatile boolean running; - Always401Fixture(int port) throws IOException { - this.serverSocket = new ServerSocket(port); + Always401Fixture() throws IOException { + // Bind the listener up front on an OS-assigned loopback port and + // hold it for the fixture's lifetime; read it back via getPort(). + // Owning the port from allocation to teardown avoids the bind race + // a pre-selected port would carry. + this.serverSocket = new ServerSocket(0, 50, java.net.InetAddress.getLoopbackAddress()); + } + + int getPort() { + return serverSocket.getLocalPort(); } @Override diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InitialConnectRetryTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InitialConnectRetryTest.java index efdd4ac5..d5c5d5af 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InitialConnectRetryTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InitialConnectRetryTest.java @@ -100,14 +100,14 @@ public void testWithRetryGivesUpAfterCap() { } @Test - public void testWithRetrySucceedsWhenServerComesUpInTime() { + public void testWithRetrySucceedsWhenServerComesUpInTime() throws Exception { // initial_connect_retry=true; we open the sender BEFORE starting // the server, then start the server in a background thread after // a short delay. The retry loop should see the server come up and // proceed cleanly. - int port = TestPorts.findUnusedPort(); AckHandler handler = new AckHandler(); - TestWebSocketServer server = new TestWebSocketServer(port, handler); + TestWebSocketServer server = new TestWebSocketServer(handler); + int port = server.getPort(); Thread starter = new Thread(() -> { try { Thread.sleep(300); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/IoThreadErrorSurfacedOnRowApiTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/IoThreadErrorSurfacedOnRowApiTest.java index 3300ae86..a966dc42 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/IoThreadErrorSurfacedOnRowApiTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/IoThreadErrorSurfacedOnRowApiTest.java @@ -64,9 +64,9 @@ public class IoThreadErrorSurfacedOnRowApiTest { @Test public void testRowApiMethodSurfacesIoThreadTerminalError() throws Exception { - int port = TestPorts.findUnusedPort(); ErrorAckHandler handler = new ErrorAckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/PrReviewRedTestsE2e.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/PrReviewRedTestsE2e.java index e3f80e96..27a26975 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/PrReviewRedTestsE2e.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/PrReviewRedTestsE2e.java @@ -78,13 +78,13 @@ public class PrReviewRedTestsE2e { @Test public void testC4_handlerMustObserveTerminalErrorWhenInvoked() throws Exception { TestUtils.assertMemoryLeak(() -> { - int port = TestPorts.findUnusedPort(); int iterations = 30; AtomicInteger nullObservations = new AtomicInteger(); AtomicInteger totalObservations = new AtomicInteger(); ParseErrorAckHandler serverHandler = new ParseErrorAckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, serverHandler)) { + try (TestWebSocketServer server = new TestWebSocketServer(serverHandler)) { + int port = server.getPort(); server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); @@ -161,9 +161,9 @@ public void testC4_handlerMustObserveTerminalErrorWhenInvoked() throws Exception @Test public void testC11_postHaltFlushThrowsTypedLineSenderServerException() throws Exception { TestUtils.assertMemoryLeak(() -> { - int port = TestPorts.findUnusedPort(); ParseErrorAckHandler serverHandler = new ParseErrorAckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, serverHandler)) { + try (TestWebSocketServer server = new TestWebSocketServer(serverHandler)) { + int port = server.getPort(); server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpQueryClientWalkTrackerTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpQueryClientWalkTrackerTest.java index acf434ed..ee5909ce 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpQueryClientWalkTrackerTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpQueryClientWalkTrackerTest.java @@ -76,12 +76,12 @@ public void testWalk_404NotFoundIsTransportNotTerminal() throws Exception { // 404 (per failover.md §4.1: a single mid-deploy node serving the // wrong path while peers are healthy is a routing glitch, not an // auth failure). Walk must continue. - int port404 = TestPorts.findUnusedPort(); - int portOk = TestPorts.findUnusedPort(); - TestWebSocketServer notFound = new TestWebSocketServer(port404, NOOP_HANDLER); + TestWebSocketServer notFound = new TestWebSocketServer(NOOP_HANDLER); notFound.setRejectWithStatus(404, "Not Found"); - TestWebSocketServer ok = new TestWebSocketServer(portOk, NOOP_HANDLER); + int port404 = notFound.getPort(); + TestWebSocketServer ok = new TestWebSocketServer(NOOP_HANDLER); ok.setSendServerInfo(true); + int portOk = ok.getPort(); try { notFound.start(); ok.start(); @@ -103,12 +103,12 @@ public void testWalk_404NotFoundIsTransportNotTerminal() throws Exception { public void testWalk_426UpgradeRequiredIsTransportNotTerminal() throws Exception { // 426 Upgrade Required is per failover.md §6 a transient/transport // failure (NOT terminal). The walk must continue to the next host. - int port426 = TestPorts.findUnusedPort(); - int portOk = TestPorts.findUnusedPort(); - TestWebSocketServer rejecting = new TestWebSocketServer(port426, NOOP_HANDLER); + TestWebSocketServer rejecting = new TestWebSocketServer(NOOP_HANDLER); rejecting.setRejectWithStatus(426, "Upgrade Required"); - TestWebSocketServer ok = new TestWebSocketServer(portOk, NOOP_HANDLER); + int port426 = rejecting.getPort(); + TestWebSocketServer ok = new TestWebSocketServer(NOOP_HANDLER); ok.setSendServerInfo(true); + int portOk = ok.getPort(); try { rejecting.start(); ok.start(); @@ -133,12 +133,12 @@ public void testWalk_AllReplicasThrowsRoleMismatch() throws Exception { // TopologyRejects from prior outages -- here there are none), // exhausts again, and surfaces QwpRoleMismatchException with the // last observed role attached. - int port1 = TestPorts.findUnusedPort(); - int port2 = TestPorts.findUnusedPort(); - TestWebSocketServer rep1 = new TestWebSocketServer(port1, NOOP_HANDLER); + TestWebSocketServer rep1 = new TestWebSocketServer(NOOP_HANDLER); rep1.setRejectWithRole("REPLICA"); - TestWebSocketServer rep2 = new TestWebSocketServer(port2, NOOP_HANDLER); + int port1 = rep1.getPort(); + TestWebSocketServer rep2 = new TestWebSocketServer(NOOP_HANDLER); rep2.setRejectWithRole("REPLICA"); + int port2 = rep2.getPort(); try { rep1.start(); rep2.start(); @@ -187,11 +187,11 @@ public void testWalk_AllUnreachableThrowsHttpClientException() { @Test public void testWalk_AuthFailure403IsTerminal() throws Exception { // 403 is symmetric to 401: same terminal classification. - int port403 = TestPorts.findUnusedPort(); - int portOk = TestPorts.findUnusedPort(); - TestWebSocketServer forbidden = new TestWebSocketServer(port403, NOOP_HANDLER); + TestWebSocketServer forbidden = new TestWebSocketServer(NOOP_HANDLER); forbidden.setRejectWithStatus(403, "Forbidden"); - TestWebSocketServer ok = new TestWebSocketServer(portOk, NOOP_HANDLER); + int port403 = forbidden.getPort(); + TestWebSocketServer ok = new TestWebSocketServer(NOOP_HANDLER); + int portOk = ok.getPort(); try { forbidden.start(); ok.start(); @@ -220,11 +220,11 @@ public void testWalk_AuthFailureFirstHostIsTerminal() throws Exception { // loop would continue to the second host, producing a // QwpRoleMismatchException or accepting the second host -- both // mask the credential failure (failover.md §6 AuthError). - int port401 = TestPorts.findUnusedPort(); - int portOk = TestPorts.findUnusedPort(); - TestWebSocketServer auth = new TestWebSocketServer(port401, NOOP_HANDLER); + TestWebSocketServer auth = new TestWebSocketServer(NOOP_HANDLER); auth.setRejectWithStatus(401, "Unauthorized"); - TestWebSocketServer ok = new TestWebSocketServer(portOk, NOOP_HANDLER); + int port401 = auth.getPort(); + TestWebSocketServer ok = new TestWebSocketServer(NOOP_HANDLER); + int portOk = ok.getPort(); try { auth.start(); ok.start(); @@ -266,13 +266,13 @@ public void testWalk_FallThroughResetRehabilitatesPriorTopologyRejects() throws // exercises the fall-through reset, not the role filter. The // SERVER_INFO-driven role filter (target=primary/replica) is // covered by a separate integration test in the parent QuestDB repo. - int portA = TestPorts.findUnusedPort(); - int portB = TestPorts.findUnusedPort(); - TestWebSocketServer a = new TestWebSocketServer(portA, NOOP_HANDLER); + TestWebSocketServer a = new TestWebSocketServer(NOOP_HANDLER); a.setRejectWithRole("REPLICA"); a.setSendServerInfo(true); - TestWebSocketServer b = new TestWebSocketServer(portB, NOOP_HANDLER); + int portA = a.getPort(); + TestWebSocketServer b = new TestWebSocketServer(NOOP_HANDLER); b.setRejectWithRole("REPLICA"); + int portB = b.getPort(); try { a.start(); b.start(); @@ -305,12 +305,12 @@ public void testWalk_FallThroughResetRehabilitatesPriorTopologyRejects() throws public void testWalk_FirstReachablePrimaryWins() throws Exception { // First host is REPLICA-rejecting; second is a PRIMARY-advertising // server. WalkTracker must skip the first and bind to the second. - int portReplica = TestPorts.findUnusedPort(); - int portPrimary = TestPorts.findUnusedPort(); - TestWebSocketServer rep = new TestWebSocketServer(portReplica, NOOP_HANDLER); + TestWebSocketServer rep = new TestWebSocketServer(NOOP_HANDLER); rep.setRejectWithRole("REPLICA"); - TestWebSocketServer prim = new TestWebSocketServer(portPrimary, NOOP_HANDLER, false, "PRIMARY"); + int portReplica = rep.getPort(); + TestWebSocketServer prim = new TestWebSocketServer(NOOP_HANDLER, false, "PRIMARY"); prim.setSendServerInfo(true); + int portPrimary = prim.getPort(); try { rep.start(); prim.start(); @@ -337,10 +337,10 @@ public void testWalk_ServerInfoReplicaRejectedForTargetPrimary() throws Exceptio // info is the decoded SERVER_INFO -- the branch that outlived the // v1-mismatch removal. A clean 101 ignores the upgrade-time role header, // so the rejection here is driven purely by the SERVER_INFO role. - int port = TestPorts.findUnusedPort(); - TestWebSocketServer replica = new TestWebSocketServer(port, NOOP_HANDLER); + TestWebSocketServer replica = new TestWebSocketServer(NOOP_HANDLER); replica.setAdvertisedRole("REPLICA"); replica.setSendServerInfo(true); + int port = replica.getPort(); try { replica.start(); Assert.assertTrue(replica.awaitStart(5, TimeUnit.SECONDS)); @@ -372,14 +372,14 @@ public void testWalk_ServerInfoRoleFilterSkipsReplicaBindsPrimary() throws Excep // REPLICA endpoint -- a SERVER_INFO role mismatch is a skip, not a // terminal failure -- and bind the PRIMARY one. Exercises the // walk-continues side of matchesTarget(info.getRole(), target). - int portReplica = TestPorts.findUnusedPort(); - int portPrimary = TestPorts.findUnusedPort(); - TestWebSocketServer replica = new TestWebSocketServer(portReplica, NOOP_HANDLER); + TestWebSocketServer replica = new TestWebSocketServer(NOOP_HANDLER); replica.setAdvertisedRole("REPLICA"); replica.setSendServerInfo(true); - TestWebSocketServer primary = new TestWebSocketServer(portPrimary, NOOP_HANDLER); + int portReplica = replica.getPort(); + TestWebSocketServer primary = new TestWebSocketServer(NOOP_HANDLER); primary.setAdvertisedRole("PRIMARY"); primary.setSendServerInfo(true); + int portPrimary = primary.getPort(); try { replica.start(); primary.start(); @@ -408,12 +408,12 @@ public void testWalk_ServerInfoTimeoutIsTransportNotTerminal() throws Exception // every connect, so a silent post-upgrade peer would otherwise stall the // client until the server-info timeout; bound it short here and verify // the walk falls through to a healthy node. - int portSilent = TestPorts.findUnusedPort(); - int portOk = TestPorts.findUnusedPort(); - TestWebSocketServer silent = new TestWebSocketServer(portSilent, NOOP_HANDLER); + TestWebSocketServer silent = new TestWebSocketServer(NOOP_HANDLER); + int portSilent = silent.getPort(); // sendServerInfo left off: the 101 upgrade succeeds, then the node stays silent. - TestWebSocketServer ok = new TestWebSocketServer(portOk, NOOP_HANDLER); + TestWebSocketServer ok = new TestWebSocketServer(NOOP_HANDLER); ok.setSendServerInfo(true); + int portOk = ok.getPort(); try { silent.start(); ok.start(); @@ -438,9 +438,9 @@ public void testWalk_TransportFailureContinuesWalk() throws Exception { // WalkTracker must classify the first as TransportError and bind // the second on the same walk (no fall-through reset needed yet). int portDead = TestPorts.findUnusedPort(); - int portOk = TestPorts.findUnusedPort(); - try (TestWebSocketServer ok = new TestWebSocketServer(portOk, NOOP_HANDLER)) { + try (TestWebSocketServer ok = new TestWebSocketServer(NOOP_HANDLER)) { ok.setSendServerInfo(true); + int portOk = ok.getPort(); ok.start(); Assert.assertTrue(ok.awaitStart(5, TimeUnit.SECONDS)); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java index d8976f41..f7bb4598 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java @@ -334,9 +334,9 @@ public void testDoubleColumnAfterCloseThrows() throws Exception { @Test public void testFlushAppendFailureDoesNotLeaveMicrobatchBufferInUse() throws Exception { assertMemoryLeak(() -> { - int port = TestPorts.findUnusedPort(); - try (TestWebSocketServer server = new TestWebSocketServer(port, new TestWebSocketServer.WebSocketServerHandler() { + try (TestWebSocketServer server = new TestWebSocketServer(new TestWebSocketServer.WebSocketServerHandler() { })) { + int port = server.getPort(); server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/ReconnectTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/ReconnectTest.java index 1e9d99d1..5c0f9bd2 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/ReconnectTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/ReconnectTest.java @@ -67,9 +67,9 @@ public void testReconnectAfterServerInducedDisconnect() throws Exception { // Without reconnect, the next batch's flush() would throw. With // reconnect, the I/O loop opens a fresh connection (same port, // same server) and the second batch goes through. - int port = TestPorts.findUnusedPort(); DisconnectAfterFirstAckHandler handler = new DisconnectAfterFirstAckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); @@ -101,14 +101,14 @@ public void testReconnectAfterServerInducedDisconnect() throws Exception { } @Test - public void testReconnectGivesUpAfterCap() { + public void testReconnectGivesUpAfterCap() throws Exception { // Server is up at first (initial connect succeeds + ACKs batch 1), // then we tear it down — subsequent reconnect attempts get TCP // connection-refused and accumulate against the budget. With a // 500ms cap, the loop should give up well inside the test's 5s // poll window and the next user-thread flush() must throw. - int port = TestPorts.findUnusedPort(); - try (TestWebSocketServer server = new TestWebSocketServer(port, new AckHandler())) { + try (TestWebSocketServer server = new TestWebSocketServer(new AckHandler())) { + int port = server.getPort(); server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); @@ -167,10 +167,10 @@ public void testTerminalUpgradeErrorAbortsReconnect() throws Exception { // a 401 happening on the very first reconnect, the cursor I/O // loop should surface the terminal error within hundreds of ms, // not after 10s. - int port = TestPorts.findUnusedPort(); try (Auth401AfterFirstConnectionFixture fixture = - new Auth401AfterFirstConnectionFixture(port)) { + new Auth401AfterFirstConnectionFixture()) { fixture.start(); + int port = fixture.getPort(); String cfg = "ws::addr=localhost:" + port + ";reconnect_max_duration_millis=10000" + ";close_flush_timeout_millis=0;"; @@ -220,9 +220,9 @@ public void testReplayResendsUnackedFramesAcrossReconnect() throws Exception { // ackedFsn is still -1. On reconnect, the cursor must reposition at // FSN 0 and replay it — the new connection should observe the // *same* batch a second time before any new batch arrives. - int port = TestPorts.findUnusedPort(); ReceiveThenDisconnectHandler handler = new ReceiveThenDisconnectHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); @@ -373,8 +373,16 @@ private static class Auth401AfterFirstConnectionFixture implements AutoCloseable private volatile boolean running; private final java.util.List openSockets = new java.util.concurrent.CopyOnWriteArrayList<>(); - Auth401AfterFirstConnectionFixture(int port) throws IOException { - this.serverSocket = new ServerSocket(port); + Auth401AfterFirstConnectionFixture() throws IOException { + // Bind the listener up front on an OS-assigned loopback port and + // hold it for the fixture's lifetime; read it back via getPort(). + // Owning the port from allocation to teardown avoids the bind race + // a pre-selected port would carry. + this.serverSocket = new ServerSocket(0, 50, java.net.InetAddress.getLoopbackAddress()); + } + + int getPort() { + return serverSocket.getLocalPort(); } void start() { diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java index d94926b8..32d2725d 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java @@ -74,8 +74,8 @@ public void testRestartReplaysSealedSegmentsAgainstFreshServer() throws Exceptio // Phase 1: silent server, sender 1 writes enough to rotate at // least once, closes fast (no drain). Slot ends up with sealed + // active segments holding unacked data. - int port1 = TestPorts.findUnusedPort(); - try (TestWebSocketServer silent = new TestWebSocketServer(port1, new SilentHandler())) { + try (TestWebSocketServer silent = new TestWebSocketServer(new SilentHandler())) { + int port1 = silent.getPort(); silent.start(); Assert.assertTrue(silent.awaitStart(5, TimeUnit.SECONDS)); @@ -111,9 +111,9 @@ public void testRestartReplaysSealedSegmentsAgainstFreshServer() throws Exceptio // sender 1 wrote (50 of them) reaches the new server. Without // the fix, the sender would only ship the active segment's data // (≪ 50) and orphan the sealed segments forever. - int port2 = TestPorts.findUnusedPort(); AckHandler ack = new AckHandler(); - try (TestWebSocketServer good = new TestWebSocketServer(port2, ack)) { + try (TestWebSocketServer good = new TestWebSocketServer(ack)) { + int port2 = good.getPort(); good.start(); Assert.assertTrue(good.awaitStart(5, TimeUnit.SECONDS)); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/SelfSufficientFramesTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/SelfSufficientFramesTest.java index ac2e0d54..45275528 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/SelfSufficientFramesTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/SelfSufficientFramesTest.java @@ -62,9 +62,9 @@ public void testEverySymbolBatchIncludesFullDeltaFromZero() throws Exception { // batch 2 would emit deltaStart=1, deltaCount=1 — only the new // symbol. With self-sufficient frames, batch 2 must emit // deltaStart=0 covering BOTH symbols. - int port = TestPorts.findUnusedPort(); CapturingHandler handler = new CapturingHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/ServerErrorAckTerminalTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/ServerErrorAckTerminalTest.java index 7e868444..f7d59f75 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/ServerErrorAckTerminalTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/ServerErrorAckTerminalTest.java @@ -66,12 +66,12 @@ public class ServerErrorAckTerminalTest { @Test public void testServerErrorAckIsTerminalAndDoesNotBurnReconnectBudget() throws Exception { - int port = TestPorts.findUnusedPort(); ErrorAckHandler handler = new ErrorAckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); // Tight reconnect cadence so the pre-fix loop accumulates // attempts quickly inside our observation window. String cfg = "ws::addr=localhost:" + port @@ -155,12 +155,12 @@ public void testServerErrorAckIsTerminalAndDoesNotBurnReconnectBudget() throws E */ @Test public void testDropPolicyNackDoesNotHaltAndAdvancesAck() throws Exception { - int port = TestPorts.findUnusedPort(); SchemaMismatchAckHandler handler = new SchemaMismatchAckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String cfg = "ws::addr=localhost:" + port + ";reconnect_max_duration_millis=10000" + ";reconnect_initial_backoff_millis=10" diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WriteFailoverTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WriteFailoverTest.java index 93b859dc..bb33eb1c 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WriteFailoverTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WriteFailoverTest.java @@ -84,8 +84,8 @@ public void testAuthTimeoutBoundsHungUpgrade() throws Exception { acceptor.start(); AckHandler ack = new AckHandler(); - goodPort = TestPorts.findUnusedPort(); - try (TestWebSocketServer good = new TestWebSocketServer(goodPort, ack, false, "PRIMARY")) { + try (TestWebSocketServer good = new TestWebSocketServer(ack, false, "PRIMARY")) { + goodPort = good.getPort(); good.start(); Assert.assertTrue(good.awaitStart(5, TimeUnit.SECONDS)); @@ -117,12 +117,12 @@ public void testFailoverPastReplicaToPrimary() throws Exception { // Two servers: server1 always rejects with 421 + REPLICA, server2 // accepts with 101 + PRIMARY. The sender's connect path must walk // past server1 and land on server2 within the reconnect budget. - int port1 = TestPorts.findUnusedPort(); - int port2 = TestPorts.findUnusedPort(); AckHandler ack = new AckHandler(); - TestWebSocketServer replica = new TestWebSocketServer(port1, ack); + TestWebSocketServer replica = new TestWebSocketServer(ack); + int port1 = replica.getPort(); replica.setRejectWithRole("REPLICA"); - TestWebSocketServer primary = new TestWebSocketServer(port2, ack, false, "PRIMARY"); + TestWebSocketServer primary = new TestWebSocketServer(ack, false, "PRIMARY"); + int port2 = primary.getPort(); try { replica.start(); primary.start(); @@ -152,12 +152,12 @@ public void testFailoverPromotedReplicaJoinsRotation() throws Exception { // is promoted (clear the reject) and we verify the sender stays on // server2 — currentAddressIndex stickiness means we don't rotate // off a healthy primary just because another node became writable. - int port1 = TestPorts.findUnusedPort(); - int port2 = TestPorts.findUnusedPort(); AckHandler ack = new AckHandler(); - TestWebSocketServer s1 = new TestWebSocketServer(port1, ack); + TestWebSocketServer s1 = new TestWebSocketServer(ack); + int port1 = s1.getPort(); s1.setRejectWithRole("REPLICA"); - TestWebSocketServer s2 = new TestWebSocketServer(port2, ack, false, "PRIMARY"); + TestWebSocketServer s2 = new TestWebSocketServer(ack, false, "PRIMARY"); + int port2 = s2.getPort(); try { s1.start(); s2.start(); @@ -194,11 +194,11 @@ public void testOffModeSinglePassExhaustionThrowsRoleMismatch() throws Exception // Off-mode walk that hits only replicas must surface // QwpRoleMismatchException — the walked address list resembles // a deployment mid-failover, distinguishable from "all hosts down". - int port1 = TestPorts.findUnusedPort(); - int port2 = TestPorts.findUnusedPort(); - TestWebSocketServer r1 = new TestWebSocketServer(port1, new AckHandler()); + TestWebSocketServer r1 = new TestWebSocketServer(new AckHandler()); + int port1 = r1.getPort(); r1.setRejectWithRole("REPLICA"); - TestWebSocketServer r2 = new TestWebSocketServer(port2, new AckHandler()); + TestWebSocketServer r2 = new TestWebSocketServer(new AckHandler()); + int port2 = r2.getPort(); r2.setRejectWithRole("PRIMARY_CATCHUP"); try { r1.start(); @@ -232,12 +232,12 @@ public void testOffModeSinglePassWalkFindsPrimary() throws Exception { // with NO inter-host backoff; only after every host has been // tried does it fail terminally. Java's prior off-mode tried // hosts[0] alone. - int port1 = TestPorts.findUnusedPort(); - int port2 = TestPorts.findUnusedPort(); AckHandler ack = new AckHandler(); - TestWebSocketServer replica = new TestWebSocketServer(port1, ack); + TestWebSocketServer replica = new TestWebSocketServer(ack); + int port1 = replica.getPort(); replica.setRejectWithRole("REPLICA"); - TestWebSocketServer primary = new TestWebSocketServer(port2, ack, false, "PRIMARY"); + TestWebSocketServer primary = new TestWebSocketServer(ack, false, "PRIMARY"); + int port2 = primary.getPort(); try { replica.start(); primary.start(); @@ -265,12 +265,12 @@ public void testRoleMismatchExceptionWhenAllReplicas() throws Exception { // QwpRoleMismatchException (not a generic LineSenderException) so // operators can distinguish "no primary elected yet" from // "everything is down". - int port1 = TestPorts.findUnusedPort(); - int port2 = TestPorts.findUnusedPort(); AckHandler ack = new AckHandler(); - TestWebSocketServer r1 = new TestWebSocketServer(port1, ack); + TestWebSocketServer r1 = new TestWebSocketServer(ack); + int port1 = r1.getPort(); r1.setRejectWithRole("REPLICA"); - TestWebSocketServer r2 = new TestWebSocketServer(port2, ack); + TestWebSocketServer r2 = new TestWebSocketServer(ack); + int port2 = r2.getPort(); r2.setRejectWithRole("PRIMARY_CATCHUP"); try { r1.start(); @@ -311,9 +311,9 @@ public void testStandaloneIsTreatedAsWritable() throws Exception { // OSS / single-node deployments advertise STANDALONE. The client // must accept that handshake without rotation since standalone // nodes are writable. - int port = TestPorts.findUnusedPort(); AckHandler ack = new AckHandler(); - try (TestWebSocketServer standalone = new TestWebSocketServer(port, ack, false, "STANDALONE")) { + try (TestWebSocketServer standalone = new TestWebSocketServer(ack, false, "STANDALONE")) { + int port = standalone.getPort(); standalone.start(); Assert.assertTrue(standalone.awaitStart(5, TimeUnit.SECONDS)); @@ -335,8 +335,8 @@ public void testUpgradeException421CarriesRoleHeader() throws Exception { // diagnostics. With a single-replica config the off-mode walk // wraps it in QwpRoleMismatchException; the WebSocketUpgradeException // sits on the cause chain via initCause(). - int port = TestPorts.findUnusedPort(); - TestWebSocketServer replica = new TestWebSocketServer(port, new AckHandler()); + TestWebSocketServer replica = new TestWebSocketServer(new AckHandler()); + int port = replica.getPort(); try (replica) { replica.setRejectWithRole("REPLICA"); replica.start(); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java index 9cd50443..6081ff08 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java @@ -73,9 +73,9 @@ public void tearDown() { @Test public void testDrainerEmptiesOrphanSlotAgainstAckServer() throws Exception { TestUtils.assertMemoryLeak(() -> { - int port1 = TestPorts.findUnusedPort(); // Phase 1: ghost sender against silent server. 30 frames; close fast. - try (TestWebSocketServer silent = new TestWebSocketServer(port1, new SilentHandler())) { + try (TestWebSocketServer silent = new TestWebSocketServer(new SilentHandler())) { + int port1 = silent.getPort(); silent.start(); Assert.assertTrue(silent.awaitStart(5, TimeUnit.SECONDS)); @@ -95,9 +95,9 @@ public void testDrainerEmptiesOrphanSlotAgainstAckServer() throws Exception { 1, OrphanScanner.scan(sfDir, "primary").size()); // Phase 2: foreground sender against ack server, with drain_orphans=on. - int port2 = TestPorts.findUnusedPort(); AckHandler ack = new AckHandler(); - try (TestWebSocketServer good = new TestWebSocketServer(port2, ack)) { + try (TestWebSocketServer good = new TestWebSocketServer(ack)) { + int port2 = good.getPort(); good.start(); Assert.assertTrue(good.awaitStart(5, TimeUnit.SECONDS)); @@ -139,8 +139,8 @@ public void testDrainerEmptiesOrphanSlotAgainstAckServer() throws Exception { public void testDrainerLeavesFailedSentinelOnTerminalError() throws Exception { TestUtils.assertMemoryLeak(() -> { // Drainer can't connect → exhausts its budget → drops .failed. - int port1 = TestPorts.findUnusedPort(); - try (TestWebSocketServer silent = new TestWebSocketServer(port1, new SilentHandler())) { + try (TestWebSocketServer silent = new TestWebSocketServer(new SilentHandler())) { + int port1 = silent.getPort(); silent.start(); Assert.assertTrue(silent.awaitStart(5, TimeUnit.SECONDS)); String cfg1 = "ws::addr=localhost:" + port1 @@ -158,10 +158,10 @@ public void testDrainerLeavesFailedSentinelOnTerminalError() throws Exception { // drainer should give up and drop .failed. // The foreground sender does need to start successfully, so we // give it its own working server on a different port. - int port2 = TestPorts.findUnusedPort(); int unreachablePort = TestPorts.findUnusedPort(); AckHandler fgAck = new AckHandler(); - try (TestWebSocketServer fgServer = new TestWebSocketServer(port2, fgAck)) { + try (TestWebSocketServer fgServer = new TestWebSocketServer(fgAck)) { + int port2 = fgServer.getPort(); fgServer.start(); Assert.assertTrue(fgServer.awaitStart(5, TimeUnit.SECONDS)); // Sender targets fgServer; drainer would inherit the same diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/DurableAckIntegrationTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/DurableAckIntegrationTest.java index 34d0b6de..3378f717 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/DurableAckIntegrationTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/DurableAckIntegrationTest.java @@ -52,8 +52,6 @@ */ public class DurableAckIntegrationTest { - private static int nextPort = 19_500; - private String sfDir; @Before @@ -89,12 +87,12 @@ public void testConnectStringOffParsesAndDoesNotOptIn() throws Exception { // the connection succeeds against a server that does NOT echo the // durable-ack confirmation, because the client never asked for it. TestUtils.assertMemoryLeak(() -> { - int port = allocPort(); DurableAckCapableHandler handler = new DurableAckCapableHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler, false)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler, false)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";request_durable_ack=off;"; try (Sender sender = Sender.fromConfig(config)) { sender.table("trades").longColumn("v", 1L).atNow(); @@ -110,12 +108,12 @@ public void testConnectStringOnRequiresServerSupport() throws Exception { // Opting in must throw at connect, not silently leave the SF store // to grow until disk fills. TestUtils.assertMemoryLeak(() -> { - int port = allocPort(); DurableAckCapableHandler handler = new DurableAckCapableHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler, false)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler, false)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";request_durable_ack=on;"; try (Sender ignored = Sender.fromConfig(config)) { Assert.fail("expected connect to fail with QwpDurableAckMismatchException"); @@ -135,12 +133,12 @@ public void testEndToEndDurableTrimDefersUntilUploadAck() throws Exception { // drains. The pair "OK-but-no-durable" -> grow, "durable-ack" -> drain // is the central durable-mode contract. TestUtils.assertMemoryLeak(() -> { - int port = allocPort(); DurableAckCapableHandler handler = new DurableAckCapableHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler, true)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler, true)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";request_durable_ack=on;close_flush_timeout_millis=5000;"; try (Sender sender = Sender.fromConfig(config)) { @@ -171,10 +169,6 @@ public void testEndToEndDurableTrimDefersUntilUploadAck() throws Exception { }); } - private static int allocPort() { - return nextPort++; - } - private static byte[] buildDurableAckFrame(long seqTxn) { byte[] name = DurableAckCapableHandler.TABLE_NAME.getBytes(StandardCharsets.UTF_8); ByteBuffer bb = ByteBuffer.allocate(1 + 2 + 2 + name.length + 8).order(ByteOrder.LITTLE_ENDIAN); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/OrphanScanIntegrationTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/OrphanScanIntegrationTest.java index 36ae59e1..2ec7b836 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/OrphanScanIntegrationTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/OrphanScanIntegrationTest.java @@ -28,7 +28,6 @@ import io.questdb.client.cutlass.qwp.client.sf.cursor.OrphanScanner; import io.questdb.client.std.Files; import io.questdb.client.std.ObjList; -import io.questdb.client.test.cutlass.qwp.client.TestPorts; import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer; import io.questdb.client.test.tools.TestUtils; import org.junit.After; @@ -74,14 +73,14 @@ public void testScanFindsOrphanFromPriorSenderUnderSameGroupRoot() throws Except // unacked .sfa files when the sender shuts down. Then the same // slot should be reported as an orphan when a second sender opens // with sender_id=primary and drain_orphans=true. - int port = TestPorts.findUnusedPort(); // Phase 1: ghost writes + closes; never acked. - try (TestWebSocketServer ghostServer = new TestWebSocketServer(port, new SilentHandler())) { + try (TestWebSocketServer ghostServer = new TestWebSocketServer(new SilentHandler())) { ghostServer.start(); Assert.assertTrue(ghostServer.awaitStart(5, TimeUnit.SECONDS)); - String ghostCfg = "ws::addr=localhost:" + port + int ghostPort = ghostServer.getPort(); + String ghostCfg = "ws::addr=localhost:" + ghostPort + ";sf_dir=" + sfDir + ";sender_id=ghost;close_flush_timeout_millis=0;"; try (Sender ghost = Sender.fromConfig(ghostCfg)) { ghost.table("foo").longColumn("v", 7L).atNow(); @@ -101,11 +100,12 @@ public void testScanFindsOrphanFromPriorSenderUnderSameGroupRoot() throws Except // can't directly assert the log output in this test, but the // call must not throw, and the primary's own slot must NOT // appear in a fresh scan (sender_id-filtered). - try (TestWebSocketServer primaryServer = new TestWebSocketServer(port + 1000, new AckHandler())) { + try (TestWebSocketServer primaryServer = new TestWebSocketServer(new AckHandler())) { primaryServer.start(); Assert.assertTrue(primaryServer.awaitStart(5, TimeUnit.SECONDS)); - String primaryCfg = "ws::addr=localhost:" + (port + 1000) + int primaryPort = primaryServer.getPort(); + String primaryCfg = "ws::addr=localhost:" + primaryPort + ";sf_dir=" + sfDir + ";sender_id=primary" + ";drain_orphans=true;"; diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/SfFromConfigTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/SfFromConfigTest.java index bce1e191..0c27d698 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/SfFromConfigTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/SfFromConfigTest.java @@ -27,7 +27,6 @@ import io.questdb.client.Sender; import io.questdb.client.cutlass.line.LineSenderException; import io.questdb.client.std.Files; -import io.questdb.client.test.cutlass.qwp.client.TestPorts; import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer; import io.questdb.client.test.tools.TestUtils; import org.junit.After; @@ -60,12 +59,12 @@ public void tearDown() { @Test public void testFromConfigEnablesSfAndOwnsLog() throws Exception { TestUtils.assertMemoryLeak(() -> { - int port = TestPorts.findUnusedPort(); AckHandler handler = new AckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; try (Sender sender = Sender.fromConfig(config)) { sender.table("foo").longColumn("v", 42L).atNow(); @@ -95,12 +94,12 @@ public void testSfDirOnTcpRejected() throws Exception { @Test public void testSfMaxBytesParsing() throws Exception { TestUtils.assertMemoryLeak(() -> { - int port = TestPorts.findUnusedPort(); AckHandler handler = new AckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";sf_max_bytes=131072;"; try (Sender sender = Sender.fromConfig(config)) { @@ -123,12 +122,12 @@ public void testNoSfDirMeansNoSf() throws Exception { // Absence of sf_dir is the only way to disable SF — no separate // off switch. Verify a basic SF-less sender still works end-to-end // and creates no directory. - int port = TestPorts.findUnusedPort(); AckHandler handler = new AckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String config = "ws::addr=localhost:" + port + ";"; try (Sender sender = Sender.fromConfig(config)) { sender.table("foo").longColumn("v", 1L).atNow(); @@ -148,12 +147,12 @@ public void testNoSfDirMeansNoSf() throws Exception { @Test public void testSfMaxTotalBytesAcceptsLargeValue() throws Exception { TestUtils.assertMemoryLeak(() -> { - int port = TestPorts.findUnusedPort(); AckHandler handler = new AckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); // 4 GiB > Integer.MAX_VALUE; pre-fix this would throw "invalid sf_max_total_bytes". String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir @@ -225,12 +224,12 @@ public void testSfDurabilityOnTcpRejected() throws Exception { @Test public void testSfMaxBytesAcceptsSizeSuffixes() throws Exception { TestUtils.assertMemoryLeak(() -> { - int port = TestPorts.findUnusedPort(); AckHandler handler = new AckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); // 64m / 4g should parse identically to their byte-count equivalents. String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir @@ -250,12 +249,12 @@ public void testSenderIdCreatesNamedSlotUnderSfDir() throws Exception { TestUtils.assertMemoryLeak(() -> { // sender_id="primary" => slot dir /primary; the engine writes // its segments and lock there, leaving sibling slot dirs untouched. - int port = TestPorts.findUnusedPort(); AckHandler handler = new AckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";sender_id=primary;"; try (Sender sender = Sender.fromConfig(config)) { @@ -277,12 +276,12 @@ public void testTwoSendersSameSlotIdCollideOnLock() throws Exception { // share a group root. The second open with a colliding id must // refuse to start — silently allowing it would interleave FSN // sequences on disk and corrupt recovery. - int port = TestPorts.findUnusedPort(); AckHandler handler = new AckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; try (Sender first = Sender.fromConfig(config)) { @@ -306,12 +305,12 @@ public void testTwoSendersDistinctSlotIdsCoexist() throws Exception { TestUtils.assertMemoryLeak(() -> { // Two senders against the same group root with distinct sender_id // values are independent slots — both must start cleanly. - int port = TestPorts.findUnusedPort(); AckHandler handler = new AckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + int port = server.getPort(); String cfgA = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";sender_id=a;"; String cfgB = "ws::addr=localhost:" + port diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopReconnectLeakTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopReconnectLeakTest.java index 7e7c30da..759ea873 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopReconnectLeakTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopReconnectLeakTest.java @@ -28,7 +28,6 @@ import io.questdb.client.cutlass.http.client.WebSocketClient; import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender; import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorWebSocketSendLoop; -import io.questdb.client.test.cutlass.qwp.client.TestPorts; import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer; import io.questdb.client.test.tools.TestUtils; import org.junit.Assert; @@ -61,9 +60,9 @@ public class CursorWebSocketSendLoopReconnectLeakTest { @Test public void testCloseClosesLivePostReconnectClient() throws Exception { TestUtils.assertMemoryLeak(() -> { - int port = TestPorts.findUnusedPort(); DisconnectAfterFirstAckHandler handler = new DisconnectAfterFirstAckHandler(); - try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); server.start(); Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/websocket/TestWebSocketServer.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/websocket/TestWebSocketServer.java index fadc3eed..ecdc42fa 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/websocket/TestWebSocketServer.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/websocket/TestWebSocketServer.java @@ -59,6 +59,7 @@ public class TestWebSocketServer implements Closeable { private final WebSocketServerHandler handler; private final int port; private final AtomicBoolean running = new AtomicBoolean(false); + private final ServerSocket serverSocket; private final CountDownLatch startLatch = new CountDownLatch(1); private Thread acceptThread; // X-QuestDB-Role value to emit on handshake responses. null = omit the @@ -83,10 +84,9 @@ public class TestWebSocketServer implements Closeable { // 401, 403, 404, 426, 503, etc. that the failover loop should // classify per failover.md §6. private volatile String rejectingStatusReason; - private ServerSocket serverSocket; - public TestWebSocketServer(int port, WebSocketServerHandler handler) { - this(port, handler, false); + public TestWebSocketServer(WebSocketServerHandler handler) throws IOException { + this(handler, false); } /** @@ -97,8 +97,8 @@ public TestWebSocketServer(int port, WebSocketServerHandler handler) { * that silently ignores the request and force * the client's early-fail check. */ - public TestWebSocketServer(int port, WebSocketServerHandler handler, boolean emitDurableAckHeader) { - this(port, handler, emitDurableAckHeader, null); + public TestWebSocketServer(WebSocketServerHandler handler, boolean emitDurableAckHeader) throws IOException { + this(handler, emitDurableAckHeader, null); } /** @@ -108,12 +108,20 @@ public TestWebSocketServer(int port, WebSocketServerHandler handler, boolean emi * handshakes. Pass {@code null} for legacy handshakes * without the header. */ - public TestWebSocketServer(int port, WebSocketServerHandler handler, - boolean emitDurableAckHeader, String advertisedRole) { - this.port = port; + public TestWebSocketServer(WebSocketServerHandler handler, + boolean emitDurableAckHeader, String advertisedRole) throws IOException { this.handler = handler; this.emitDurableAckHeader = emitDurableAckHeader; this.advertisedRole = advertisedRole; + // Bind the listener up front and hold it open for the server's whole + // lifetime, then read the OS-assigned ephemeral port back via getPort(). + // Owning the socket from allocation to teardown closes the window in + // which another process could grab a pre-selected port before start() + // binds it. Pinning to loopback keeps client "localhost" connections + // routed here rather than to a wildcard listener on the same port. + serverSocket = new ServerSocket(0, 50, java.net.InetAddress.getLoopbackAddress()); + serverSocket.setSoTimeout(100); + this.port = serverSocket.getLocalPort(); } public boolean awaitStart(long timeout, TimeUnit unit) throws InterruptedException { @@ -150,6 +158,14 @@ public void close() { } } + /** + * Returns the loopback port the listener is bound to. Stable for the + * server's whole lifetime; safe to read immediately after construction. + */ + public int getPort() { + return port; + } + /** * Replaces the advertised role for subsequent handshakes (live update). */ @@ -232,15 +248,8 @@ public void start() throws IOException { return; } - // Bind explicitly to the loopback address. The wildcard 0.0.0.0 - // default lets a leftover process holding 127.0.0.1:port coexist - // on the same port under BSD/macOS SO_REUSEADDR semantics, and - // client connections to "localhost" then route to the more-specific - // listener instead of this mock. Pinning to loopback forces the - // kernel to detect the conflict and pick a different ephemeral port. - serverSocket = new ServerSocket(port, 50, java.net.InetAddress.getLoopbackAddress()); - serverSocket.setSoTimeout(100); - + // The listener is already bound (see the constructor); just spin up the + // accept loop on it. acceptThread = new Thread(() -> { startLatch.countDown(); while (running.get()) {