diff --git a/CHANGES/10556.bugfix.rst b/CHANGES/10556.bugfix.rst new file mode 100644 index 00000000000..aad4eccbe48 --- /dev/null +++ b/CHANGES/10556.bugfix.rst @@ -0,0 +1,3 @@ +Break cyclic references at connection close when there was a traceback -- by :user:`bdraco`. + +Special thanks to :user:`availov` for reporting the issue. diff --git a/CHANGES/10564.feature.rst b/CHANGES/10564.feature.rst new file mode 100644 index 00000000000..24e2ecad76d --- /dev/null +++ b/CHANGES/10564.feature.rst @@ -0,0 +1 @@ +Improved logging on non-overlapping WebSocket client protocols to include the remote address -- by :user:`bdraco`. diff --git a/CHANGES/10569.bugfix.rst b/CHANGES/10569.bugfix.rst new file mode 100644 index 00000000000..7d817e867d4 --- /dev/null +++ b/CHANGES/10569.bugfix.rst @@ -0,0 +1 @@ +Break cyclic references when there is an exception handling a request -- by :user:`bdraco`. diff --git a/aiohttp/_websocket/reader_py.py b/aiohttp/_websocket/reader_py.py index 711c9c8bbbe..201b5e84de2 100644 --- a/aiohttp/_websocket/reader_py.py +++ b/aiohttp/_websocket/reader_py.py @@ -96,6 +96,7 @@ def _release_waiter(self) -> None: def feed_eof(self) -> None: self._eof = True self._release_waiter() + self._exception = None # Break cyclic references def feed_data(self, data: "WSMessage") -> None: size = data.size diff --git a/aiohttp/client_proto.py b/aiohttp/client_proto.py index 6e628a7c2fe..4d559af0a78 100644 --- a/aiohttp/client_proto.py +++ b/aiohttp/client_proto.py @@ -68,6 +68,7 @@ def force_close(self) -> None: self._should_close = True def close(self) -> None: + self._exception = None # Break cyclic references transport = self.transport if transport is not None: transport.close() diff --git a/aiohttp/web_protocol.py b/aiohttp/web_protocol.py index 0b99caf114a..d162a729f54 100644 --- a/aiohttp/web_protocol.py +++ b/aiohttp/web_protocol.py @@ -543,8 +543,6 @@ async def start(self) -> None: keep_alive(True) specified. """ loop = self._loop - handler = asyncio.current_task(loop) - assert handler is not None manager = self._manager assert manager is not None keepalive_timeout = self._keepalive_timeout @@ -574,7 +572,16 @@ async def start(self) -> None: request_handler = self._make_error_handler(message) message = ERROR - request = self._request_factory(message, payload, self, writer, handler) + # Important don't hold a reference to the current task + # as on traceback it will prevent the task from being + # collected and will cause a memory leak. + request = self._request_factory( + message, + payload, + self, + writer, + self._task_handler or asyncio.current_task(loop), # type: ignore[arg-type] + ) try: # a new task is used for copy context vars (#3406) coro = self._handle_request(request, start, request_handler) @@ -642,6 +649,7 @@ async def start(self) -> None: self.force_close() raise finally: + request._task = None # type: ignore[assignment] # Break reference cycle in case of exception if self.transport is None and resp is not None: self.log_debug("Ignored premature client disconnection.") diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 78c130179f5..9421dc2ac76 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -256,7 +256,8 @@ def _handshake( else: # No overlap found: Return no protocol as per spec ws_logger.warning( - "Client protocols %r don’t overlap server-known ones %r", + "%s: Client protocols %r don’t overlap server-known ones %r", + request.remote, req_protocols, self._protocols, ) diff --git a/tests/isolated/check_for_client_response_leak.py b/tests/isolated/check_for_client_response_leak.py new file mode 100644 index 00000000000..67393c2c2d8 --- /dev/null +++ b/tests/isolated/check_for_client_response_leak.py @@ -0,0 +1,47 @@ +import asyncio +import contextlib +import gc +import sys + +from aiohttp import ClientError, ClientSession, web +from aiohttp.test_utils import get_unused_port_socket + +gc.set_debug(gc.DEBUG_LEAK) + + +async def main() -> None: + app = web.Application() + + async def stream_handler(request: web.Request) -> web.Response: + assert request.transport is not None + request.transport.close() # Forcefully closing connection + return web.Response() + + app.router.add_get("/stream", stream_handler) + sock = get_unused_port_socket("127.0.0.1") + port = sock.getsockname()[1] + + runner = web.AppRunner(app) + await runner.setup() + site = web.SockSite(runner, sock) + await site.start() + + session = ClientSession() + + async def fetch_stream(url: str) -> None: + """Fetch a stream and read a few bytes from it.""" + with contextlib.suppress(ClientError): + await session.get(url) + + client_task = asyncio.create_task(fetch_stream(f"http://localhost:{port}/stream")) + await client_task + gc.collect() + client_response_present = any( + type(obj).__name__ == "ClientResponse" for obj in gc.garbage + ) + await session.close() + await runner.cleanup() + sys.exit(1 if client_response_present else 0) + + +asyncio.run(main()) diff --git a/tests/isolated/check_for_request_leak.py b/tests/isolated/check_for_request_leak.py new file mode 100644 index 00000000000..6f340a05277 --- /dev/null +++ b/tests/isolated/check_for_request_leak.py @@ -0,0 +1,41 @@ +import asyncio +import gc +import sys +from typing import NoReturn + +from aiohttp import ClientSession, web +from aiohttp.test_utils import get_unused_port_socket + +gc.set_debug(gc.DEBUG_LEAK) + + +async def main() -> None: + app = web.Application() + + async def handler(request: web.Request) -> NoReturn: + await request.json() + assert False + + app.router.add_route("GET", "/json", handler) + sock = get_unused_port_socket("127.0.0.1") + port = sock.getsockname()[1] + + runner = web.AppRunner(app) + await runner.setup() + site = web.SockSite(runner, sock) + await site.start() + + async with ClientSession() as session: + async with session.get(f"http://127.0.0.1:{port}/json") as resp: + await resp.read() + + # Give time for the cancelled task to be collected + await asyncio.sleep(0.5) + gc.collect() + request_present = any(type(obj).__name__ == "Request" for obj in gc.garbage) + await session.close() + await runner.cleanup() + sys.exit(1 if request_present else 0) + + +asyncio.run(main()) diff --git a/tests/test_leaks.py b/tests/test_leaks.py new file mode 100644 index 00000000000..f527ce18cae --- /dev/null +++ b/tests/test_leaks.py @@ -0,0 +1,42 @@ +import pathlib +import platform +import subprocess +import sys + +import pytest + +IS_PYPY = platform.python_implementation() == "PyPy" + + +@pytest.mark.skipif(IS_PYPY, reason="gc.DEBUG_LEAK not available on PyPy") +def test_client_response_does_not_leak_on_server_disconnected_error() -> None: + """Test that ClientResponse is collected after server disconnects. + + https://github.com/aio-libs/aiohttp/issues/10535 + """ + leak_test_script = pathlib.Path(__file__).parent.joinpath( + "isolated", "check_for_client_response_leak.py" + ) + + with subprocess.Popen( + [sys.executable, "-u", str(leak_test_script)], + stdout=subprocess.PIPE, + ) as proc: + assert proc.wait() == 0, "ClientResponse leaked" + + +@pytest.mark.skipif(IS_PYPY, reason="gc.DEBUG_LEAK not available on PyPy") +def test_request_does_not_leak_when_request_handler_raises() -> None: + """Test that the Request object is collected when the handler raises. + + https://github.com/aio-libs/aiohttp/issues/10548 + """ + leak_test_script = pathlib.Path(__file__).parent.joinpath( + "isolated", "check_for_request_leak.py" + ) + + with subprocess.Popen( + [sys.executable, "-u", str(leak_test_script)], + stdout=subprocess.PIPE, + ) as proc: + assert proc.wait() == 0, "Request leaked" diff --git a/tests/test_websocket_handshake.py b/tests/test_websocket_handshake.py index aed9d43bff8..e069795af73 100644 --- a/tests/test_websocket_handshake.py +++ b/tests/test_websocket_handshake.py @@ -175,7 +175,7 @@ async def test_handshake_protocol_unsupported(caplog: pytest.LogCaptureFixture) assert ( caplog.records[-1].msg - == "Client protocols %r don’t overlap server-known ones %r" + == "%s: Client protocols %r don’t overlap server-known ones %r" ) assert ws.ws_protocol is None