From 452458a959814ef7bf67860f49606e5ed4649af9 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 24 May 2025 12:05:33 -0500 Subject: [PATCH] Optimize small HTTP requests/responses by coalescing headers and body into a single packet (#10991) --- CHANGES/10991.feature.rst | 7 + aiohttp/abc.py | 7 + aiohttp/client_reqrep.py | 5 + aiohttp/http_writer.py | 158 ++++++- aiohttp/web_response.py | 6 + docs/spelling_wordlist.txt | 1 + tests/test_client_functional.py | 79 +++- tests/test_http_writer.py | 780 +++++++++++++++++++++++++++++++- tests/test_web_response.py | 43 ++ tests/test_web_sendfile.py | 30 ++ tests/test_web_server.py | 5 +- 11 files changed, 1095 insertions(+), 26 deletions(-) create mode 100644 CHANGES/10991.feature.rst diff --git a/CHANGES/10991.feature.rst b/CHANGES/10991.feature.rst new file mode 100644 index 00000000000..687a1a752f6 --- /dev/null +++ b/CHANGES/10991.feature.rst @@ -0,0 +1,7 @@ +Optimized small HTTP requests/responses by coalescing headers and body into a single TCP packet -- by :user:`bdraco`. + +This change enhances network efficiency by reducing the number of packets sent for small HTTP payloads, improving latency and reducing overhead. Most importantly, this fixes compatibility with memory-constrained IoT devices that can only perform a single read operation and expect HTTP requests in one packet. The optimization uses zero-copy ``writelines`` when coalescing data and works with both regular and chunked transfer encoding. + +When ``aiohttp`` uses client middleware to communicate with an ``aiohttp`` server, connection reuse is more likely to occur since complete responses arrive in a single packet for small payloads. + +This aligns ``aiohttp`` with other popular HTTP clients that already coalesce small requests. diff --git a/aiohttp/abc.py b/aiohttp/abc.py index 53ce6ec495e..ce414012ea6 100644 --- a/aiohttp/abc.py +++ b/aiohttp/abc.py @@ -230,6 +230,13 @@ async def write_headers( ) -> None: """Write HTTP headers""" + def send_headers(self) -> None: + """Force sending buffered headers if not already sent. + + Required only if write_headers() buffers headers instead of sending immediately. + For backwards compatibility, this method does nothing by default. + """ + class AbstractAccessLogger(ABC): """Abstract writer to access log.""" diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index cf3c48a1c16..384087cd8b3 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -646,6 +646,8 @@ async def write_bytes( """ # 100 response if self._continue is not None: + # Force headers to be sent before waiting for 100-continue + writer.send_headers() await writer.drain() await self._continue @@ -763,7 +765,10 @@ async def send(self, conn: "Connection") -> "ClientResponse": # status + headers status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}" + + # Buffer headers for potential coalescing with body await writer.write_headers(status_line, self.headers) + task: Optional["asyncio.Task[None]"] if self.body or self._continue is not None or protocol.writing_paused: coro = self.write_bytes(writer, conn, self._get_content_length()) diff --git a/aiohttp/http_writer.py b/aiohttp/http_writer.py index 6b13e3cdd1d..1772e3c518b 100644 --- a/aiohttp/http_writer.py +++ b/aiohttp/http_writer.py @@ -3,6 +3,7 @@ import asyncio import sys from typing import ( # noqa + TYPE_CHECKING, Any, Awaitable, Callable, @@ -71,6 +72,8 @@ def __init__( self.loop = loop self._on_chunk_sent: _T_OnChunkSent = on_chunk_sent self._on_headers_sent: _T_OnHeadersSent = on_headers_sent + self._headers_buf: Optional[bytes] = None + self._headers_written: bool = False @property def transport(self) -> Optional[asyncio.Transport]: @@ -118,6 +121,49 @@ def _writelines( else: transport.writelines(chunks) # type: ignore[arg-type] + def _write_chunked_payload( + self, chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] + ) -> None: + """Write a chunk with proper chunked encoding.""" + chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii") + self._writelines((chunk_len_pre, chunk, b"\r\n")) + + def _send_headers_with_payload( + self, + chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"], + is_eof: bool, + ) -> None: + """Send buffered headers with payload, coalescing into single write.""" + # Mark headers as written + self._headers_written = True + headers_buf = self._headers_buf + self._headers_buf = None + + if TYPE_CHECKING: + # Safe because callers (write() and write_eof()) only invoke this method + # after checking that self._headers_buf is truthy + assert headers_buf is not None + + if not self.chunked: + # Non-chunked: coalesce headers with body + if chunk: + self._writelines((headers_buf, chunk)) + else: + self._write(headers_buf) + return + + # Coalesce headers with chunked data + if chunk: + chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii") + if is_eof: + self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n0\r\n\r\n")) + else: + self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n")) + elif is_eof: + self._writelines((headers_buf, b"0\r\n\r\n")) + else: + self._write(headers_buf) + async def write( self, chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"], @@ -125,7 +171,8 @@ async def write( drain: bool = True, LIMIT: int = 0x10000, ) -> None: - """Writes chunk of data to a stream. + """ + Writes chunk of data to a stream. write_eof() indicates end of stream. writer can't be used after write_eof() method being called. @@ -154,31 +201,75 @@ async def write( if not chunk: return + # Handle buffered headers for small payload optimization + if self._headers_buf and not self._headers_written: + self._send_headers_with_payload(chunk, False) + if drain and self.buffer_size > LIMIT: + self.buffer_size = 0 + await self.drain() + return + if chunk: if self.chunked: - self._writelines( - (f"{len(chunk):x}\r\n".encode("ascii"), chunk, b"\r\n") - ) + self._write_chunked_payload(chunk) else: self._write(chunk) - if self.buffer_size > LIMIT and drain: + if drain and self.buffer_size > LIMIT: self.buffer_size = 0 await self.drain() async def write_headers( self, status_line: str, headers: "CIMultiDict[str]" ) -> None: - """Write request/response status and headers.""" + """Write headers to the stream.""" if self._on_headers_sent is not None: await self._on_headers_sent(headers) - # status + headers buf = _serialize_headers(status_line, headers) - self._write(buf) + self._headers_written = False + self._headers_buf = buf + + def send_headers(self) -> None: + """Force sending buffered headers if not already sent.""" + if not self._headers_buf or self._headers_written: + return + + self._headers_written = True + headers_buf = self._headers_buf + self._headers_buf = None + + if TYPE_CHECKING: + # Safe because we only enter this block when self._headers_buf is truthy + assert headers_buf is not None + + self._write(headers_buf) def set_eof(self) -> None: """Indicate that the message is complete.""" + if self._eof: + return + + # If headers haven't been sent yet, send them now + # This handles the case where there's no body at all + if self._headers_buf and not self._headers_written: + self._headers_written = True + headers_buf = self._headers_buf + self._headers_buf = None + + if TYPE_CHECKING: + # Safe because we only enter this block when self._headers_buf is truthy + assert headers_buf is not None + + # Combine headers and chunked EOF marker in a single write + if self.chunked: + self._writelines((headers_buf, b"0\r\n\r\n")) + else: + self._write(headers_buf) + elif self.chunked and self._headers_written: + # Headers already sent, just send the final chunk marker + self._write(b"0\r\n\r\n") + self._eof = True async def write_eof(self, chunk: bytes = b"") -> None: @@ -188,6 +279,7 @@ async def write_eof(self, chunk: bytes = b"") -> None: if chunk and self._on_chunk_sent is not None: await self._on_chunk_sent(chunk) + # Handle body/compression if self._compress: chunks: List[bytes] = [] chunks_len = 0 @@ -200,6 +292,26 @@ async def write_eof(self, chunk: bytes = b"") -> None: chunks.append(flush_chunk) assert chunks_len + # Send buffered headers with compressed data if not yet sent + if self._headers_buf and not self._headers_written: + self._headers_written = True + headers_buf = self._headers_buf + self._headers_buf = None + + if self.chunked: + # Coalesce headers with compressed chunked data + chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii") + self._writelines( + (headers_buf, chunk_len_pre, *chunks, b"\r\n0\r\n\r\n") + ) + else: + # Coalesce headers with compressed data + self._writelines((headers_buf, *chunks)) + await self.drain() + self._eof = True + return + + # Headers already sent, just write compressed data if self.chunked: chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii") self._writelines((chunk_len_pre, *chunks, b"\r\n0\r\n\r\n")) @@ -207,16 +319,34 @@ async def write_eof(self, chunk: bytes = b"") -> None: self._writelines(chunks) else: self._write(chunks[0]) - elif self.chunked: + await self.drain() + self._eof = True + return + + # No compression - send buffered headers if not yet sent + if self._headers_buf and not self._headers_written: + # Use helper to send headers with payload + self._send_headers_with_payload(chunk, True) + await self.drain() + self._eof = True + return + + # Handle remaining body + if self.chunked: if chunk: - chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii") - self._writelines((chunk_len_pre, chunk, b"\r\n0\r\n\r\n")) + # Write final chunk with EOF marker + self._writelines( + (f"{len(chunk):x}\r\n".encode("ascii"), chunk, b"\r\n0\r\n\r\n") + ) else: self._write(b"0\r\n\r\n") - elif chunk: - self._write(chunk) + await self.drain() + self._eof = True + return - await self.drain() + if chunk: + self._write(chunk) + await self.drain() self._eof = True diff --git a/aiohttp/web_response.py b/aiohttp/web_response.py index 85389963616..21fcde45968 100644 --- a/aiohttp/web_response.py +++ b/aiohttp/web_response.py @@ -89,6 +89,7 @@ class StreamResponse(BaseClass, HeadersMixin, CookieMixin): _eof_sent: bool = False _must_be_empty_body: Optional[bool] = None _body_length = 0 + _send_headers_immediately = True def __init__( self, @@ -441,6 +442,10 @@ async def _write_headers(self) -> None: status_line = f"HTTP/{version[0]}.{version[1]} {self._status} {self._reason}" await writer.write_headers(status_line, self._headers) + # Send headers immediately if not opted into buffering + if self._send_headers_immediately: + writer.send_headers() + async def write( self, data: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] ) -> None: @@ -519,6 +524,7 @@ def __bool__(self) -> bool: class Response(StreamResponse): _compressed_body: Optional[bytes] = None + _send_headers_immediately = False def __init__( self, diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 48266e67486..68d4693bac0 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -154,6 +154,7 @@ initializer inline intaking io +IoT ip IP ipdb diff --git a/tests/test_client_functional.py b/tests/test_client_functional.py index 0fdea98cdd0..44009094431 100644 --- a/tests/test_client_functional.py +++ b/tests/test_client_functional.py @@ -1582,6 +1582,7 @@ async def handler(request: web.Request) -> web.Response: return web.json_response({"ok": True}) write_mock = None + writelines_mock = None original_write_bytes = ClientRequest.write_bytes async def write_bytes( @@ -1590,12 +1591,26 @@ async def write_bytes( conn: Connection, content_length: Optional[int] = None, ) -> None: - nonlocal write_mock + nonlocal write_mock, writelines_mock original_write = writer._write - - with mock.patch.object( - writer, "_write", autospec=True, spec_set=True, side_effect=original_write - ) as write_mock: + original_writelines = writer._writelines + + with ( + mock.patch.object( + writer, + "_write", + autospec=True, + spec_set=True, + side_effect=original_write, + ) as write_mock, + mock.patch.object( + writer, + "_writelines", + autospec=True, + spec_set=True, + side_effect=original_writelines, + ) as writelines_mock, + ): await original_write_bytes(self, writer, conn, content_length) with mock.patch.object(ClientRequest, "write_bytes", write_bytes): @@ -1608,9 +1623,20 @@ async def write_bytes( content = await resp.json() assert content == {"ok": True} - assert write_mock is not None - # No chunks should have been sent for an empty body. - write_mock.assert_not_called() + # With packet coalescing, headers are buffered and may be written + # during write_bytes if there's an empty body to process. + # The test should verify no body chunks are written, but headers + # may be written as part of the coalescing optimization. + # If _write was called, it should only be for headers ending with \r\n\r\n + # and not any body content + for call in write_mock.call_args_list: # type: ignore[union-attr] + data = call[0][0] + assert data.endswith( + b"\r\n\r\n" + ), "Only headers should be written, not body chunks" + + # No body data should be written via writelines either + writelines_mock.assert_not_called() # type: ignore[union-attr] async def test_GET_DEFLATE_no_body(aiohttp_client: AiohttpClient) -> None: @@ -4510,3 +4536,40 @@ async def handler(request: web.Request) -> NoReturn: assert client._session.connector is not None # Connection should be closed due to client-side network error assert len(client._session.connector._conns) == 0 + + +async def test_empty_response_non_chunked(aiohttp_client: AiohttpClient) -> None: + """Test non-chunked response with empty body.""" + + async def handler(request: web.Request) -> web.Response: + # Return empty response with Content-Length: 0 + return web.Response(body=b"", headers={"Content-Length": "0"}) + + app = web.Application() + app.router.add_get("/empty", handler) + client = await aiohttp_client(app) + + resp = await client.get("/empty") + assert resp.status == 200 + assert resp.headers.get("Content-Length") == "0" + data = await resp.read() + assert data == b"" + resp.close() + + +async def test_set_eof_on_empty_response(aiohttp_client: AiohttpClient) -> None: + """Test that triggers set_eof() method.""" + + async def handler(request: web.Request) -> web.Response: + # Return response that completes immediately + return web.Response(status=204) # No Content + + app = web.Application() + app.router.add_get("/no-content", handler) + client = await aiohttp_client(app) + + resp = await client.get("/no-content") + assert resp.status == 204 + data = await resp.read() + assert data == b"" + resp.close() diff --git a/tests/test_http_writer.py b/tests/test_http_writer.py index 3add5a0a073..6c83e08cc91 100644 --- a/tests/test_http_writer.py +++ b/tests/test_http_writer.py @@ -91,6 +91,95 @@ def test_payloadwriter_properties( assert writer.transport == transport +async def test_write_headers_buffered_small_payload( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + msg = http.StreamWriter(protocol, loop) + headers = CIMultiDict({"Content-Length": "11", "Host": "example.com"}) + + # Write headers - should be buffered + await msg.write_headers("GET / HTTP/1.1", headers) + assert len(buf) == 0 # Headers not sent yet + + # Write small body - should coalesce with headers + await msg.write(b"Hello World", drain=False) + + # Verify content + assert b"GET / HTTP/1.1\r\n" in buf + assert b"Host: example.com\r\n" in buf + assert b"Content-Length: 11\r\n" in buf + assert b"\r\n\r\nHello World" in buf + + +async def test_write_headers_chunked_coalescing( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + msg = http.StreamWriter(protocol, loop) + msg.enable_chunking() + headers = CIMultiDict({"Transfer-Encoding": "chunked", "Host": "example.com"}) + + # Write headers - should be buffered + await msg.write_headers("POST /upload HTTP/1.1", headers) + assert len(buf) == 0 # Headers not sent yet + + # Write first chunk - should coalesce with headers + await msg.write(b"First chunk", drain=False) + + # Verify content + assert b"POST /upload HTTP/1.1\r\n" in buf + assert b"Transfer-Encoding: chunked\r\n" in buf + # "b" is hex for 11 (length of "First chunk") + assert b"\r\n\r\nb\r\nFirst chunk\r\n" in buf + + +async def test_write_eof_with_buffered_headers( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + msg = http.StreamWriter(protocol, loop) + headers = CIMultiDict({"Content-Length": "9", "Host": "example.com"}) + + # Write headers - should be buffered + await msg.write_headers("POST /data HTTP/1.1", headers) + assert len(buf) == 0 + + # Call write_eof with body - should coalesce + await msg.write_eof(b"Last data") + + # Verify content + assert b"POST /data HTTP/1.1\r\n" in buf + assert b"\r\n\r\nLast data" in buf + + +async def test_set_eof_sends_buffered_headers( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + msg = http.StreamWriter(protocol, loop) + headers = CIMultiDict({"Host": "example.com"}) + + # Write headers - should be buffered + await msg.write_headers("GET /empty HTTP/1.1", headers) + assert len(buf) == 0 + + # Call set_eof without body - headers should be sent + msg.set_eof() + + # Headers should be sent + assert len(buf) > 0 + assert b"GET /empty HTTP/1.1\r\n" in buf + + async def test_write_payload_eof( transport: asyncio.Transport, protocol: BaseProtocol, @@ -886,14 +975,66 @@ async def test_set_eof_after_write_headers( msg = http.StreamWriter(protocol, loop) status_line = "HTTP/1.1 200 OK" good_headers = CIMultiDict({"Set-Cookie": "abc=123"}) + + # Write headers - should be buffered await msg.write_headers(status_line, good_headers) + assert not transport.write.called # Headers are buffered + + # set_eof should send the buffered headers + msg.set_eof() assert transport.write.called + + # Subsequent write_eof should do nothing transport.write.reset_mock() - msg.set_eof() await msg.write_eof() assert not transport.write.called +async def test_write_headers_does_not_write_immediately( + protocol: BaseProtocol, + transport: mock.Mock, + loop: asyncio.AbstractEventLoop, +) -> None: + msg = http.StreamWriter(protocol, loop) + status_line = "HTTP/1.1 200 OK" + headers = CIMultiDict({"Content-Type": "text/plain"}) + + # write_headers should buffer, not write immediately + await msg.write_headers(status_line, headers) + assert not transport.write.called + assert not transport.writelines.called + + # Headers should be sent when set_eof is called + msg.set_eof() + assert transport.write.called + + +async def test_write_headers_with_compression_coalescing( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + msg = http.StreamWriter(protocol, loop) + msg.enable_compression("deflate") + headers = CIMultiDict({"Content-Encoding": "deflate", "Host": "example.com"}) + + # Write headers - should be buffered + await msg.write_headers("POST /data HTTP/1.1", headers) + assert len(buf) == 0 + + # Write compressed data via write_eof - should coalesce + await msg.write_eof(b"Hello World") + + # Verify headers are present + assert b"POST /data HTTP/1.1\r\n" in buf + assert b"Content-Encoding: deflate\r\n" in buf + + # Verify compressed data is present + # The data should contain headers + compressed payload + assert len(buf) > 50 # Should have headers + some compressed data + + @pytest.mark.parametrize( "char", [ @@ -918,3 +1059,640 @@ def test_serialize_headers_raises_on_new_line_or_carriage_return(char: str) -> N ), ): _serialize_headers(status_line, headers) + + +async def test_write_compressed_data_with_headers_coalescing( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test that headers are coalesced with compressed data in write() method.""" + msg = http.StreamWriter(protocol, loop) + msg.enable_compression("deflate") + headers = CIMultiDict({"Content-Encoding": "deflate", "Host": "example.com"}) + + # Write headers - should be buffered + await msg.write_headers("POST /data HTTP/1.1", headers) + assert len(buf) == 0 + + # Write compressed data - should coalesce with headers + await msg.write(b"Hello World") + + # Headers and compressed data should be written together + assert b"POST /data HTTP/1.1\r\n" in buf + assert b"Content-Encoding: deflate\r\n" in buf + assert len(buf) > 50 # Headers + compressed data + + +async def test_write_compressed_chunked_with_headers_coalescing( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test headers coalescing with compressed chunked data.""" + msg = http.StreamWriter(protocol, loop) + msg.enable_compression("deflate") + msg.enable_chunking() + headers = CIMultiDict( + {"Content-Encoding": "deflate", "Transfer-Encoding": "chunked"} + ) + + # Write headers - should be buffered + await msg.write_headers("POST /data HTTP/1.1", headers) + assert len(buf) == 0 + + # Write compressed chunked data - should coalesce + await msg.write(b"Hello World") + + # Check headers are present + assert b"POST /data HTTP/1.1\r\n" in buf + assert b"Transfer-Encoding: chunked\r\n" in buf + + # Should have chunk size marker for compressed data + output = buf.decode("latin-1", errors="ignore") + assert "\r\n" in output # Should have chunk markers + + +async def test_write_multiple_compressed_chunks_after_headers_sent( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test multiple compressed writes after headers are already sent.""" + msg = http.StreamWriter(protocol, loop) + msg.enable_compression("deflate") + headers = CIMultiDict({"Content-Encoding": "deflate"}) + + # Write headers and send them immediately by writing first chunk + await msg.write_headers("POST /data HTTP/1.1", headers) + assert len(buf) == 0 # Headers buffered + + # Write first chunk - this will send headers + compressed data + await msg.write(b"First chunk of data that should compress") + len_after_first = len(buf) + assert len_after_first > 0 # Headers + first chunk written + + # Write second chunk and force flush via EOF + await msg.write(b"Second chunk of data that should also compress well") + await msg.write_eof() + + # After EOF, all compressed data should be flushed + final_len = len(buf) + assert final_len > len_after_first + + +async def test_write_eof_empty_compressed_with_buffered_headers( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test write_eof with no data but compression enabled and buffered headers.""" + msg = http.StreamWriter(protocol, loop) + msg.enable_compression("deflate") + headers = CIMultiDict({"Content-Encoding": "deflate"}) + + # Write headers - should be buffered + await msg.write_headers("GET /data HTTP/1.1", headers) + assert len(buf) == 0 + + # Write EOF with no data - should still coalesce headers with compression flush + await msg.write_eof() + + # Headers should be present + assert b"GET /data HTTP/1.1\r\n" in buf + assert b"Content-Encoding: deflate\r\n" in buf + # Should have compression flush data + assert len(buf) > 40 + + +async def test_write_compressed_gzip_with_headers_coalescing( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test gzip compression with header coalescing.""" + msg = http.StreamWriter(protocol, loop) + msg.enable_compression("gzip") + headers = CIMultiDict({"Content-Encoding": "gzip"}) + + # Write headers - should be buffered + await msg.write_headers("POST /data HTTP/1.1", headers) + assert len(buf) == 0 + + # Write gzip compressed data via write_eof + await msg.write_eof(b"Test gzip compression") + + # Verify coalescing happened + assert b"POST /data HTTP/1.1\r\n" in buf + assert b"Content-Encoding: gzip\r\n" in buf + # Gzip typically produces more overhead than deflate + assert len(buf) > 60 + + +async def test_compression_with_content_length_constraint( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test compression respects content length constraints.""" + msg = http.StreamWriter(protocol, loop) + msg.enable_compression("deflate") + msg.length = 5 # Set small content length + headers = CIMultiDict({"Content-Length": "5"}) + + await msg.write_headers("POST /data HTTP/1.1", headers) + # Write some initial data to trigger headers to be sent + await msg.write(b"12345") # This matches our content length of 5 + headers_and_first_chunk_len = len(buf) + + # Try to write more data than content length allows + await msg.write(b"This is a longer message") + + # The second write should not add any data since content length is exhausted + # After writing 5 bytes, length becomes 0, so additional writes are ignored + assert len(buf) == headers_and_first_chunk_len # No additional data written + + +async def test_write_compressed_zero_length_chunk( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test writing empty chunk with compression.""" + msg = http.StreamWriter(protocol, loop) + msg.enable_compression("deflate") + + await msg.write_headers("POST /data HTTP/1.1", CIMultiDict()) + # Force headers to be sent by writing something + await msg.write(b"x") # Write something to trigger header send + buf.clear() + + # Write empty chunk - compression may still produce output + await msg.write(b"") + + # With compression, even empty input might produce small output + # due to compression state, but it should be minimal + assert len(buf) < 10 # Should be very small if anything + + +async def test_chunked_compressed_eof_coalescing( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test chunked compressed data with EOF marker coalescing.""" + msg = http.StreamWriter(protocol, loop) + msg.enable_compression("deflate") + msg.enable_chunking() + headers = CIMultiDict( + {"Content-Encoding": "deflate", "Transfer-Encoding": "chunked"} + ) + + # Buffer headers + await msg.write_headers("POST /data HTTP/1.1", headers) + assert len(buf) == 0 + + # Write compressed chunked data with EOF + await msg.write_eof(b"Final compressed chunk") + + # Should have headers + assert b"POST /data HTTP/1.1\r\n" in buf + + # Should end with chunked EOF marker + assert buf.endswith(b"0\r\n\r\n") + + # Should have chunk size in hex before the compressed data + output = buf + # Verify we have chunk markers - look for \r\n followed by hex digits + # The chunk size should be between the headers and the compressed data + assert b"\r\n\r\n" in output # End of headers + # After headers, we should have a hex chunk size + headers_end = output.find(b"\r\n\r\n") + 4 + chunk_data = output[headers_end:] + # Should start with hex digits followed by \r\n + assert ( + chunk_data[:10] + .strip() + .decode("ascii", errors="ignore") + .replace("\r\n", "") + .isalnum() + ) + + +async def test_compression_different_strategies( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test compression with different strategies.""" + # Test with best speed strategy (default) + msg1 = http.StreamWriter(protocol, loop) + msg1.enable_compression("deflate") # Default strategy + + await msg1.write_headers("POST /fast HTTP/1.1", CIMultiDict()) + await msg1.write_eof(b"Test data for compression test data for compression") + + buf1_len = len(buf) + + # Both should produce output + assert buf1_len > 0 + # Headers should be present + assert b"POST /fast HTTP/1.1\r\n" in buf + + # Since we can't easily test different compression strategies + # (the compressor initialization might not support strategy parameter), + # we just verify that compression works + + +async def test_chunked_headers_single_write_with_set_eof( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test that set_eof combines headers and chunked EOF in single write.""" + msg = http.StreamWriter(protocol, loop) + msg.enable_chunking() + + # Write headers - should be buffered + headers = CIMultiDict({"Transfer-Encoding": "chunked", "Host": "example.com"}) + await msg.write_headers("GET /test HTTP/1.1", headers) + assert len(buf) == 0 # Headers not sent yet + assert not transport.writelines.called # type: ignore[attr-defined] # No writelines calls yet + + # Call set_eof - should send headers + chunked EOF in single write call + msg.set_eof() + + # Should have exactly one write call (since payload is small, writelines falls back to write) + assert transport.write.call_count == 1 # type: ignore[attr-defined] + assert transport.writelines.call_count == 0 # type: ignore[attr-defined] # Not called for small payloads + + # The write call should have the combined headers and chunked EOF marker + write_data = transport.write.call_args[0][0] # type: ignore[attr-defined] + assert write_data.startswith(b"GET /test HTTP/1.1\r\n") + assert b"Transfer-Encoding: chunked\r\n" in write_data + assert write_data.endswith(b"\r\n\r\n0\r\n\r\n") # Headers end + chunked EOF + + # Verify final output + assert b"GET /test HTTP/1.1\r\n" in buf + assert b"Transfer-Encoding: chunked\r\n" in buf + assert buf.endswith(b"0\r\n\r\n") + + +async def test_send_headers_forces_header_write( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test that send_headers() forces writing buffered headers.""" + msg = http.StreamWriter(protocol, loop) + headers = CIMultiDict({"Content-Length": "10", "Host": "example.com"}) + + # Write headers (should be buffered) + await msg.write_headers("GET /test HTTP/1.1", headers) + assert len(buf) == 0 # Headers buffered + + # Force send headers + msg.send_headers() + + # Headers should now be written + assert b"GET /test HTTP/1.1\r\n" in buf + assert b"Content-Length: 10\r\n" in buf + assert b"Host: example.com\r\n" in buf + + # Writing body should not resend headers + buf.clear() + await msg.write(b"0123456789") + assert b"GET /test" not in buf # Headers not repeated + assert buf == b"0123456789" # Just the body + + +async def test_send_headers_idempotent( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test that send_headers() is idempotent and safe to call multiple times.""" + msg = http.StreamWriter(protocol, loop) + headers = CIMultiDict({"Content-Length": "5", "Host": "example.com"}) + + # Write headers (should be buffered) + await msg.write_headers("GET /test HTTP/1.1", headers) + assert len(buf) == 0 # Headers buffered + + # Force send headers + msg.send_headers() + headers_output = bytes(buf) + + # Call send_headers again - should be no-op + msg.send_headers() + assert buf == headers_output # No additional output + + # Call send_headers after headers already sent - should be no-op + await msg.write(b"hello") + msg.send_headers() + assert buf[len(headers_output) :] == b"hello" # Only body added + + +async def test_send_headers_no_buffered_headers( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test that send_headers() is safe when no headers are buffered.""" + msg = http.StreamWriter(protocol, loop) + + # Call send_headers without writing headers first + msg.send_headers() # Should not crash + assert len(buf) == 0 # No output + + +async def test_write_drain_condition_with_small_buffer( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test that drain is not called when buffer_size <= LIMIT.""" + msg = http.StreamWriter(protocol, loop) + + # Write headers first + await msg.write_headers("GET /test HTTP/1.1", CIMultiDict()) + msg.send_headers() # Send headers to start with clean state + + # Reset buffer size manually since send_headers doesn't do it + msg.buffer_size = 0 + + # Reset drain helper mock + protocol._drain_helper.reset_mock() # type: ignore[attr-defined] + + # Write small amount of data with drain=True but buffer under limit + small_data = b"x" * 100 # Much less than LIMIT (2**16) + await msg.write(small_data, drain=True) + + # Drain should NOT be called because buffer_size <= LIMIT + assert not protocol._drain_helper.called # type: ignore[attr-defined] + assert msg.buffer_size == 100 + assert small_data in buf + + +async def test_write_drain_condition_with_large_buffer( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test that drain is called only when drain=True AND buffer_size > LIMIT.""" + msg = http.StreamWriter(protocol, loop) + + # Write headers first + await msg.write_headers("GET /test HTTP/1.1", CIMultiDict()) + msg.send_headers() # Send headers to start with clean state + + # Reset buffer size manually since send_headers doesn't do it + msg.buffer_size = 0 + + # Reset drain helper mock + protocol._drain_helper.reset_mock() # type: ignore[attr-defined] + + # Write large amount of data with drain=True + large_data = b"x" * (2**16 + 1) # Just over LIMIT + await msg.write(large_data, drain=True) + + # Drain should be called because drain=True AND buffer_size > LIMIT + assert protocol._drain_helper.called # type: ignore[attr-defined] + assert msg.buffer_size == 0 # Buffer reset after drain + assert large_data in buf + + +async def test_write_no_drain_with_large_buffer( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test that drain is not called when drain=False even with large buffer.""" + msg = http.StreamWriter(protocol, loop) + + # Write headers first + await msg.write_headers("GET /test HTTP/1.1", CIMultiDict()) + msg.send_headers() # Send headers to start with clean state + + # Reset buffer size manually since send_headers doesn't do it + msg.buffer_size = 0 + + # Reset drain helper mock + protocol._drain_helper.reset_mock() # type: ignore[attr-defined] + + # Write large amount of data with drain=False + large_data = b"x" * (2**16 + 1) # Just over LIMIT + await msg.write(large_data, drain=False) + + # Drain should NOT be called because drain=False + assert not protocol._drain_helper.called # type: ignore[attr-defined] + assert msg.buffer_size == (2**16 + 1) # Buffer not reset + assert large_data in buf + + +async def test_set_eof_idempotent( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test that set_eof() is idempotent and can be called multiple times safely.""" + msg = http.StreamWriter(protocol, loop) + + # Test 1: Multiple set_eof calls with buffered headers + headers = CIMultiDict({"Content-Length": "0"}) + await msg.write_headers("GET /test HTTP/1.1", headers) + + # First set_eof should send headers + msg.set_eof() + first_output = buf + assert b"GET /test HTTP/1.1\r\n" in first_output + assert b"Content-Length: 0\r\n" in first_output + + # Second set_eof should be no-op + msg.set_eof() + assert bytes(buf) == first_output # No additional output + + # Third set_eof should also be no-op + msg.set_eof() + assert bytes(buf) == first_output # Still no additional output + + # Test 2: set_eof with chunked encoding + buf.clear() + msg2 = http.StreamWriter(protocol, loop) + msg2.enable_chunking() + + headers2 = CIMultiDict({"Transfer-Encoding": "chunked"}) + await msg2.write_headers("POST /data HTTP/1.1", headers2) + + # First set_eof should send headers + chunked EOF + msg2.set_eof() + chunked_output = buf + assert b"POST /data HTTP/1.1\r\n" in buf + assert b"Transfer-Encoding: chunked\r\n" in buf + assert b"0\r\n\r\n" in buf # Chunked EOF marker + + # Second set_eof should be no-op + msg2.set_eof() + assert buf == chunked_output # No additional output + + # Test 3: set_eof after headers already sent + buf.clear() + msg3 = http.StreamWriter(protocol, loop) + + headers3 = CIMultiDict({"Content-Length": "5"}) + await msg3.write_headers("PUT /update HTTP/1.1", headers3) + + # Send headers by writing some data + await msg3.write(b"hello") + headers_and_body = buf + + # set_eof after headers sent should be no-op + msg3.set_eof() + assert buf == headers_and_body # No additional output + + # Another set_eof should still be no-op + msg3.set_eof() + assert buf == headers_and_body # Still no additional output + + +async def test_non_chunked_write_empty_body( + buf: bytearray, + protocol: BaseProtocol, + transport: mock.Mock, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test non-chunked response with empty body.""" + msg = http.StreamWriter(protocol, loop) + + # Non-chunked response with Content-Length: 0 + headers = CIMultiDict({"Content-Length": "0"}) + await msg.write_headers("GET /empty HTTP/1.1", headers) + + # Write empty body + await msg.write(b"") + + # Check the output + assert b"GET /empty HTTP/1.1\r\n" in buf + assert b"Content-Length: 0\r\n" in buf + + +async def test_chunked_headers_sent_with_empty_chunk_not_eof( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test chunked encoding where headers are sent without data and not EOF.""" + msg = http.StreamWriter(protocol, loop) + msg.enable_chunking() + + headers = CIMultiDict({"Transfer-Encoding": "chunked"}) + await msg.write_headers("POST /upload HTTP/1.1", headers) + + # This should trigger the else case in _send_headers_with_payload + # by having no chunk data and is_eof=False + await msg.write(b"") + + # Headers should be sent alone + assert b"POST /upload HTTP/1.1\r\n" in buf + assert b"Transfer-Encoding: chunked\r\n" in buf + # Should not have any chunk markers yet + assert b"0\r\n" not in buf + + +async def test_chunked_set_eof_after_headers_sent( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test chunked encoding where set_eof is called after headers already sent.""" + msg = http.StreamWriter(protocol, loop) + msg.enable_chunking() + + headers = CIMultiDict({"Transfer-Encoding": "chunked"}) + await msg.write_headers("POST /data HTTP/1.1", headers) + + # Send headers by writing some data + await msg.write(b"test data") + buf.clear() # Clear buffer to check only what set_eof writes + + # This should trigger writing chunked EOF when headers already sent + msg.set_eof() + + # Should only have the chunked EOF marker + assert buf == b"0\r\n\r\n" + + +@pytest.mark.usefixtures("enable_writelines") +@pytest.mark.usefixtures("force_writelines_small_payloads") +async def test_write_eof_chunked_with_data_using_writelines( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test write_eof with chunked data that uses writelines (line 336).""" + msg = http.StreamWriter(protocol, loop) + msg.enable_chunking() + + headers = CIMultiDict({"Transfer-Encoding": "chunked"}) + await msg.write_headers("POST /data HTTP/1.1", headers) + + # Send headers first + await msg.write(b"initial") + transport.writelines.reset_mock() # type: ignore[attr-defined] + + # This should trigger writelines for final chunk with EOF + await msg.write_eof(b"final chunk data") + + # Should have used writelines + assert transport.writelines.called # type: ignore[attr-defined] + # Get the data from writelines call + writelines_data = transport.writelines.call_args[0][0] # type: ignore[attr-defined] + combined = b"".join(writelines_data) + + # Should have chunk size, data, and EOF marker + assert b"10\r\n" in combined # hex for 16 (length of "final chunk data") + assert b"final chunk data" in combined + assert b"0\r\n\r\n" in combined + + +async def test_send_headers_with_payload_chunked_eof_no_data( + buf: bytearray, + protocol: BaseProtocol, + transport: asyncio.Transport, + loop: asyncio.AbstractEventLoop, +) -> None: + """Test _send_headers_with_payload with chunked, is_eof=True but no chunk data.""" + msg = http.StreamWriter(protocol, loop) + msg.enable_chunking() + + headers = CIMultiDict({"Transfer-Encoding": "chunked"}) + await msg.write_headers("GET /test HTTP/1.1", headers) + + # This triggers the elif is_eof branch in _send_headers_with_payload + # by calling write_eof with empty chunk + await msg.write_eof(b"") + + # Should have headers and chunked EOF marker together + assert b"GET /test HTTP/1.1\r\n" in buf + assert b"Transfer-Encoding: chunked\r\n" in buf + assert buf.endswith(b"0\r\n\r\n") diff --git a/tests/test_web_response.py b/tests/test_web_response.py index d84201afa2d..13425026f1e 100644 --- a/tests/test_web_response.py +++ b/tests/test_web_response.py @@ -1444,3 +1444,46 @@ async def test_passing_cimultidict_to_web_response_not_mutated( await resp.prepare(req) assert resp.content_length == 6 assert not headers + + +async def test_stream_response_sends_headers_immediately() -> None: + """Test that StreamResponse sends headers immediately.""" + writer = mock.create_autospec(StreamWriter, spec_set=True) + writer.write_headers = mock.AsyncMock() + writer.send_headers = mock.Mock() + writer.write_eof = mock.AsyncMock() + + req = make_request("GET", "/", writer=writer) + resp = web.StreamResponse() + + # StreamResponse should have _send_headers_immediately = True + assert resp._send_headers_immediately is True + + # Prepare the response + await resp.prepare(req) + + # Headers should be sent immediately + writer.send_headers.assert_called_once() + + +async def test_response_buffers_headers() -> None: + """Test that Response buffers headers for packet coalescing.""" + writer = mock.create_autospec(StreamWriter, spec_set=True) + writer.write_headers = mock.AsyncMock() + writer.send_headers = mock.Mock() + writer.write_eof = mock.AsyncMock() + + req = make_request("GET", "/", writer=writer) + resp = web.Response(body=b"hello") + + # Response should have _send_headers_immediately = False + assert resp._send_headers_immediately is False + + # Prepare the response + await resp.prepare(req) + + # Headers should NOT be sent immediately + writer.send_headers.assert_not_called() + + # But write_headers should have been called + writer.write_headers.assert_called_once() diff --git a/tests/test_web_sendfile.py b/tests/test_web_sendfile.py index f7f35a0b388..81308af4d54 100644 --- a/tests/test_web_sendfile.py +++ b/tests/test_web_sendfile.py @@ -4,6 +4,7 @@ from unittest import mock from aiohttp import hdrs +from aiohttp.http_writer import StreamWriter from aiohttp.test_utils import make_mocked_request from aiohttp.web_fileresponse import FileResponse @@ -134,3 +135,32 @@ def test_status_controlled_by_user(loop: asyncio.AbstractEventLoop) -> None: loop.run_until_complete(file_sender.prepare(request)) assert file_sender._status == 203 + + +async def test_file_response_sends_headers_immediately() -> None: + """Test that FileResponse sends headers immediately (inherits from StreamResponse).""" + writer = mock.create_autospec(StreamWriter, spec_set=True) + writer.write_headers = mock.AsyncMock() + writer.send_headers = mock.Mock() + writer.write_eof = mock.AsyncMock() + + request = make_mocked_request("GET", "http://python.org/logo.png", writer=writer) + + filepath = mock.create_autospec(Path, spec_set=True) + filepath.name = "logo.png" + filepath.stat.return_value.st_size = 1024 + filepath.stat.return_value.st_mtime_ns = 1603733507222449291 + filepath.stat.return_value.st_mode = MOCK_MODE + + file_sender = FileResponse(filepath) + file_sender._path = filepath + file_sender._sendfile = mock.AsyncMock(return_value=None) # type: ignore[method-assign] + + # FileResponse inherits from StreamResponse, so should send immediately + assert file_sender._send_headers_immediately is True + + # Prepare the response + await file_sender.prepare(request) + + # Headers should be sent immediately + writer.send_headers.assert_called_once() diff --git a/tests/test_web_server.py b/tests/test_web_server.py index d4a678468ea..488e6f8d843 100644 --- a/tests/test_web_server.py +++ b/tests/test_web_server.py @@ -283,9 +283,8 @@ async def handler(request: web.BaseRequest) -> MyResponse: server = await aiohttp_raw_server(handler, logger=logger) cli = await aiohttp_client(server) - resp = await cli.get("/path/to") - with pytest.raises(client.ClientPayloadError): - await resp.read() + with pytest.raises(client.ServerDisconnectedError): + await cli.get("/path/to") logger.debug.assert_called_with("Ignored premature client disconnection")