From 25062c671588d85aac50206532da2f33f716d95f Mon Sep 17 00:00:00 2001 From: Manan Bhatt Date: Fri, 12 Jun 2026 22:03:35 +0530 Subject: [PATCH] Fix worker poll hang: use client default timeout instead of disabling it The REST clients passed timeout=None per request, which httpx interprets as "no timeout" (infinite) rather than "use the client default". On a half-open connection the poll then hangs forever and the worker silently stops polling until restarted (affects both sync and async workers). Pass httpx.USE_CLIENT_DEFAULT instead so the client's configured timeout applies. Adds a regression test that points each client at a half-open server and asserts the request fails on a bounded timeout instead of hanging. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/conductor/client/http/async_rest.py | 5 +- src/conductor/client/http/rest.py | 5 +- tests/unit/api_client/test_poll_timeout.py | 105 +++++++++++++++++++++ 3 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 tests/unit/api_client/test_poll_timeout.py diff --git a/src/conductor/client/http/async_rest.py b/src/conductor/client/http/async_rest.py index fb2a0ff6..8586b921 100644 --- a/src/conductor/client/http/async_rest.py +++ b/src/conductor/client/http/async_rest.py @@ -141,7 +141,10 @@ async def request(self, method, url, query_params=None, headers=None, else: timeout = httpx.Timeout(_request_timeout) else: - timeout = None # Use client default + # httpx treats timeout=None as "no timeout" (infinite), so a + # half-open connection would hang forever. Use the sentinel so the + # client's configured timeout actually applies. + timeout = httpx.USE_CLIENT_DEFAULT if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' diff --git a/src/conductor/client/http/rest.py b/src/conductor/client/http/rest.py index c82708f8..5a790c3f 100644 --- a/src/conductor/client/http/rest.py +++ b/src/conductor/client/http/rest.py @@ -201,7 +201,10 @@ def request(self, method, url, query_params=None, headers=None, else: timeout = httpx.Timeout(_request_timeout) else: - timeout = None # Use client default + # httpx treats timeout=None as "no timeout" (infinite), so a + # half-open connection would hang forever. Use the sentinel so the + # client's configured timeout actually applies. + timeout = httpx.USE_CLIENT_DEFAULT if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' diff --git a/tests/unit/api_client/test_poll_timeout.py b/tests/unit/api_client/test_poll_timeout.py new file mode 100644 index 00000000..945eb2cc --- /dev/null +++ b/tests/unit/api_client/test_poll_timeout.py @@ -0,0 +1,105 @@ +"""Regression tests: a request with no explicit per-request timeout must use the +client's configured timeout, NOT disable it. Previously the code passed +timeout=None, which httpx interprets as "no timeout" — so a half-open +connection (request sent, no response, never closed) hung forever and the +worker stopped polling permanently. These tests point each client at a +half-open server and assert the request fails on a bounded timeout instead of +hanging.""" +import asyncio +import socket +import threading +import time +import unittest + +import httpx + +from conductor.client.http.rest import RESTClientObject, ApiException as SyncApiException +from conductor.client.http.async_rest import AsyncRESTClientObject, ApiException as AsyncApiException + +CLIENT_TIMEOUT = 2.0 # short client default; a hung request must fail near this +HANG_GUARD = 15.0 # if a request runs longer than this, it's "hanging" + + +def _start_blackhole_sync(): + """TCP server that accepts, reads the request, then never responds or closes.""" + srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + srv.bind(("127.0.0.1", 0)) + srv.listen(8) + port = srv.getsockname()[1] + conns = [] + + def loop(): + while True: + try: + c, _ = srv.accept() + except OSError: + return + conns.append(c) + try: + c.recv(65536) # consume request; never reply, never close + except OSError: + pass + + threading.Thread(target=loop, daemon=True).start() + return srv, port, conns + + +class TestPollTimeout(unittest.TestCase): + + def test_sync_request_does_not_hang_on_half_open(self): + srv, port, conns = _start_blackhole_sync() + client = httpx.Client(timeout=httpx.Timeout(CLIENT_TIMEOUT)) + rest = RESTClientObject(connection=client) + try: + start = time.monotonic() + with self.assertRaises(SyncApiException): + rest.GET(f"http://127.0.0.1:{port}/", _request_timeout=None) + elapsed = time.monotonic() - start + self.assertLess( + elapsed, HANG_GUARD, + "sync request hung — client default timeout was not applied", + ) + finally: + client.close() + srv.close() + for c in conns: + try: + c.close() + except OSError: + pass + + def test_async_request_does_not_hang_on_half_open(self): + asyncio.run(self._async_body()) + + async def _async_body(self): + async def handle(reader, writer): + try: + await reader.read(65536) + except Exception: + pass + await asyncio.Event().wait() # never respond, never close + + server = await asyncio.start_server(handle, "127.0.0.1", 0) + port = server.sockets[0].getsockname()[1] + client = httpx.AsyncClient(timeout=httpx.Timeout(CLIENT_TIMEOUT)) + rest = AsyncRESTClientObject(connection=client) + try: + start = time.monotonic() + with self.assertRaises(AsyncApiException): + await asyncio.wait_for( + rest.GET(f"http://127.0.0.1:{port}/", _request_timeout=None), + timeout=HANG_GUARD, + ) + elapsed = time.monotonic() - start + self.assertLess( + elapsed, HANG_GUARD, + "async request hung — client default timeout was not applied", + ) + finally: + await client.aclose() + server.close() + + +if __name__ == "__main__": + unittest.main()