diff --git a/awscrt/aio/http.py b/awscrt/aio/http.py index d37bec401..e31535d34 100644 --- a/awscrt/aio/http.py +++ b/awscrt/aio/http.py @@ -38,7 +38,9 @@ async def new(cls, bootstrap: Optional[ClientBootstrap] = None, socket_options: Optional[SocketOptions] = None, tls_connection_options: Optional[TlsConnectionOptions] = None, - proxy_options: Optional[HttpProxyOptions] = None) -> "AIOHttpClientConnectionUnified": + proxy_options: Optional[HttpProxyOptions] = None, + manual_window_management: bool = False, + initial_window_size: Optional[int] = None) -> "AIOHttpClientConnectionUnified": """ Asynchronously establish a new AIOHttpClientConnectionUnified. @@ -60,6 +62,23 @@ async def new(cls, proxy_options (Optional[HttpProxyOptions]): Optional proxy options. If None is provided then a proxy is not used. + manual_window_management (bool): Set to True to manually manage the flow-control window + of each stream. If False, the connection maintains flow-control windows such that + no back-pressure is applied and data arrives as fast as possible. If True, the + flow-control window of each stream shrinks as body data is received (headers, + padding, and other metadata do not affect the window). `initial_window_size` + determines the starting size of each stream's window. When a stream's window + reaches 0, no further data is received until `update_window()` is called. + For HTTP/2, this only controls stream windows; connection window is controlled + by `conn_manual_window_management`. Default is False. + + initial_window_size (Optional[int]): The starting size of each stream's flow-control + window. Required if `manual_window_management` is True, ignored otherwise. + For HTTP/2, this becomes the `INITIAL_WINDOW_SIZE` setting and can be overridden + by `initial_settings`. Must be <= 2^31-1 or connection fails. If set to 0 with + `manual_window_management` True, streams start with zero window. If None, uses + default value. + Returns: AIOHttpClientConnectionUnified: A new unified HTTP client connection. """ @@ -70,7 +89,9 @@ async def new(cls, socket_options, tls_connection_options, proxy_options, - asyncio_connection=True) + asyncio_connection=True, + manual_window_management=manual_window_management, + initial_window_size=initial_window_size) return await asyncio.wrap_future(future) async def close(self) -> None: @@ -118,7 +139,10 @@ async def new(cls, bootstrap: Optional[ClientBootstrap] = None, socket_options: Optional[SocketOptions] = None, tls_connection_options: Optional[TlsConnectionOptions] = None, - proxy_options: Optional[HttpProxyOptions] = None) -> "AIOHttpClientConnection": + proxy_options: Optional[HttpProxyOptions] = None, + manual_window_management: bool = False, + initial_window_size: Optional[int] = None, + read_buffer_capacity: Optional[int] = None) -> "AIOHttpClientConnection": """ Asynchronously establish a new AIOHttpClientConnection. @@ -140,6 +164,18 @@ async def new(cls, proxy_options (Optional[HttpProxyOptions]): Optional proxy options. If None is provided then a proxy is not used. + manual_window_management (bool): If True, enables manual flow control window management. + Default is False. + + initial_window_size (Optional[int]): Initial window size for flow control. + If None, uses default value. + + read_buffer_capacity (Optional[int]): Capacity in bytes of the HTTP/1.1 connection's + read buffer. The buffer grows when the flow-control window of the incoming stream + reaches zero. Ignored if `manual_window_management` is False. A capacity that is + too small may hinder throughput. A capacity that is too large may waste memory + without improving throughput. If None or zero, a default value is used. + Returns: AIOHttpClientConnection: A new HTTP client connection. """ @@ -151,7 +187,10 @@ async def new(cls, tls_connection_options, proxy_options, expected_version=HttpVersion.Http1_1, - asyncio_connection=True) + asyncio_connection=True, + manual_window_management=manual_window_management, + initial_window_size=initial_window_size, + read_buffer_capacity=read_buffer_capacity) return await asyncio.wrap_future(future) def request(self, @@ -189,8 +228,12 @@ async def new(cls, tls_connection_options: Optional[TlsConnectionOptions] = None, proxy_options: Optional[HttpProxyOptions] = None, initial_settings: Optional[List[Http2Setting]] = None, - on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], - None]] = None) -> "AIOHttp2ClientConnection": + on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], None]] = None, + manual_window_management: bool = False, + initial_window_size: Optional[int] = None, + conn_manual_window_management: bool = False, + conn_window_size_threshold: Optional[int] = None, + stream_window_size_threshold: Optional[int] = None) -> "AIOHttp2ClientConnection": """ Asynchronously establish an HTTP/2 client connection. Notes: to set up the connection, the server must support HTTP/2 and TlsConnectionOptions @@ -205,6 +248,30 @@ async def new(cls, The function should take the following arguments and return nothing: * `settings` (List[Http2Setting]): List of settings that were changed. + + manual_window_management (bool): If True, enables manual flow control window management. + Default is False. + + initial_window_size (Optional[int]): Initial window size for flow control. + If None, uses default value. + + conn_manual_window_management (bool): If True, enables manual connection-level flow control + for the entire HTTP/2 connection. When enabled, the connection's flow-control window + shrinks as body data is received across all streams. The initial connection window is + 65,535 bytes. When the window reaches 0, all streams stop receiving data until + `update_window()` is called to increment the connection's window. + Note: Padding in DATA frames counts against the window, but window updates for padding + are sent automatically even in manual mode. Default is False. + + conn_window_size_threshold (Optional[int]): Threshold for sending connection-level WINDOW_UPDATE + frames. Ignored if `conn_manual_window_management` is False. When the connection's window + is above this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update + is sent. Default is 32,767 (half of the initial 65,535 window). + + stream_window_size_threshold (Optional[int]): Threshold for sending stream-level WINDOW_UPDATE + frames. Ignored if `manual_window_management` is False. When a stream's window is above + this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update is sent. + Default is half of `initial_window_size`. """ future = cls._generic_new( host_name, @@ -216,7 +283,12 @@ async def new(cls, expected_version=HttpVersion.Http2, initial_settings=initial_settings, on_remote_settings_changed=on_remote_settings_changed, - asyncio_connection=True) + asyncio_connection=True, + manual_window_management=manual_window_management, + initial_window_size=initial_window_size, + conn_manual_window_management=conn_manual_window_management, + conn_window_size_threshold=conn_window_size_threshold, + stream_window_size_threshold=stream_window_size_threshold) return await asyncio.wrap_future(future) def request(self, @@ -237,6 +309,15 @@ def request(self, """ return AIOHttp2ClientStream(self, request, request_body_generator, loop) + def update_window(self, increment_size: int) -> None: + """ + Update the connection's flow control window. + + Args: + increment_size (int): Number of bytes to increment the window by. + """ + _awscrt.http2_connection_update_window(self._binding, increment_size) + class AIOHttpClientStreamUnified(HttpClientStreamBase): __slots__ = ( diff --git a/awscrt/http.py b/awscrt/http.py index e8a9c2a73..a3bb6119f 100644 --- a/awscrt/http.py +++ b/awscrt/http.py @@ -131,7 +131,13 @@ def _generic_new( expected_version: Optional[HttpVersion] = None, initial_settings: Optional[List[Http2Setting]] = None, on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], None]] = None, - asyncio_connection=False) -> "concurrent.futures.Future": + asyncio_connection=False, + manual_window_management: bool = False, + initial_window_size: Optional[int] = None, + read_buffer_capacity: Optional[int] = None, + conn_manual_window_management: bool = False, + conn_window_size_threshold: Optional[int] = None, + stream_window_size_threshold: Optional[int] = None) -> "concurrent.futures.Future": """ Initialize the generic part of the HttpClientConnection class. """ @@ -170,7 +176,13 @@ def _generic_new( proxy_options, initial_settings, on_remote_settings_changed, - connection_core) + connection_core, + manual_window_management, + initial_window_size, + read_buffer_capacity, + conn_manual_window_management, + conn_window_size_threshold, + stream_window_size_threshold) except Exception as e: future.set_exception(e) @@ -203,7 +215,10 @@ def new(cls, bootstrap: Optional[ClientBootstrap] = None, socket_options: Optional[SocketOptions] = None, tls_connection_options: Optional[TlsConnectionOptions] = None, - proxy_options: Optional['HttpProxyOptions'] = None) -> "concurrent.futures.Future": + proxy_options: Optional['HttpProxyOptions'] = None, + manual_window_management: bool = False, + initial_window_size: Optional[int] = None, + read_buffer_capacity: Optional[int] = None) -> "concurrent.futures.Future": """ Asynchronously establish a new HttpClientConnection. @@ -225,6 +240,26 @@ def new(cls, proxy_options (Optional[HttpProxyOptions]): Optional proxy options. If None is provided then a proxy is not used. + manual_window_management (bool): Set to True to manually manage the flow-control window + of each stream. If False, the connection maintains flow-control windows such that + no back-pressure is applied and data arrives as fast as possible. If True, the + flow-control window of each stream shrinks as body data is received (headers, + padding, and other metadata do not affect the window). `initial_window_size` + determines the starting size of each stream's window. When a stream's window + reaches 0, no further data is received until `update_window()` is called. + Default is False. + + initial_window_size (Optional[int]): The starting size of each stream's flow-control + window. Required if `manual_window_management` is True, ignored otherwise. + Must be <= 2^31-1 or connection fails. If set to 0 with `manual_window_management` + True, streams start with zero window. If None, uses default value. + + read_buffer_capacity (Optional[int]): Capacity in bytes of the HTTP/1.1 connection's + read buffer. The buffer grows when the flow-control window of the incoming stream + reaches zero. Ignored if `manual_window_management` is False. A capacity that is + too small may hinder throughput. A capacity that is too large may waste memory + without improving throughput. If None or zero, a default value is used. + Returns: concurrent.futures.Future: A Future which completes when connection succeeds or fails. If successful, the Future will contain a new :class:`HttpClientConnection`. @@ -236,7 +271,10 @@ def new(cls, bootstrap, socket_options, tls_connection_options, - proxy_options) + proxy_options, + manual_window_management=manual_window_management, + initial_window_size=initial_window_size, + read_buffer_capacity=read_buffer_capacity) def request(self, request: 'HttpRequest', @@ -311,7 +349,12 @@ def new(cls, proxy_options: Optional['HttpProxyOptions'] = None, initial_settings: Optional[List[Http2Setting]] = None, on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], - None]] = None) -> "concurrent.futures.Future": + None]] = None, + manual_window_management: bool = False, + initial_window_size: Optional[int] = None, + conn_manual_window_management: bool = False, + conn_window_size_threshold: Optional[int] = None, + stream_window_size_threshold: Optional[int] = None) -> "concurrent.futures.Future": """ Asynchronously establish an HTTP/2 client connection. Notes: to set up the connection, the server must support HTTP/2 and TlsConnectionOptions @@ -326,6 +369,30 @@ def new(cls, The function should take the following arguments and return nothing: * `settings` (List[Http2Setting]): List of settings that were changed. + + manual_window_management (bool): If True, enables manual flow control window management. + Default is False. + + initial_window_size (Optional[int]): Initial window size for flow control. + If None, uses default value. + + conn_manual_window_management (bool): If True, enables manual connection-level flow control + for the entire HTTP/2 connection. When enabled, the connection's flow-control window + shrinks as body data is received across all streams. The initial connection window is + 65,535 bytes. When the window reaches 0, all streams stop receiving data until + `update_window()` is called to increment the connection's window. + Note: Padding in DATA frames counts against the window, but window updates for padding + are sent automatically even in manual mode. Default is False. + + conn_window_size_threshold (Optional[int]): Threshold for sending connection-level WINDOW_UPDATE + frames. Ignored if `conn_manual_window_management` is False. When the connection's window + is above this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update + is sent. Default is 32,767 (half of the initial 65,535 window). + + stream_window_size_threshold (Optional[int]): Threshold for sending stream-level WINDOW_UPDATE + frames. Ignored if `manual_window_management` is False. When a stream's window is above + this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update is sent. + Default is half of `initial_window_size`. """ return cls._generic_new( host_name, @@ -336,7 +403,12 @@ def new(cls, proxy_options, HttpVersion.Http2, initial_settings, - on_remote_settings_changed) + on_remote_settings_changed, + manual_window_management=manual_window_management, + initial_window_size=initial_window_size, + conn_manual_window_management=conn_manual_window_management, + conn_window_size_threshold=conn_window_size_threshold, + stream_window_size_threshold=stream_window_size_threshold) def request(self, request: 'HttpRequest', @@ -397,6 +469,15 @@ def close(self) -> "concurrent.futures.Future": _awscrt.http_connection_close(self._binding) return self.shutdown_future + def update_window(self, increment_size: int) -> None: + """ + Update the connection's flow control window. + + Args: + increment_size (int): Number of bytes to increment the window by. + """ + _awscrt.http2_connection_update_window(self._binding, increment_size) + class HttpStreamBase(NativeResource): """Base for HTTP stream classes. @@ -486,6 +567,15 @@ def _on_complete(self, error_code: int) -> None: else: self._completion_future.set_exception(awscrt.exceptions.from_code(error_code)) + def update_window(self, increment_size: int) -> None: + """ + Update the stream's flow control window. + + Args: + increment_size (int): Number of bytes to increment the window by. + """ + _awscrt.http_stream_update_window(self, increment_size) + class HttpClientStream(HttpClientStreamBase): """HTTP stream that sends a request and receives a response. diff --git a/source/http.h b/source/http.h index 2cfbf91e2..dbdab9af1 100644 --- a/source/http.h +++ b/source/http.h @@ -30,6 +30,16 @@ PyObject *aws_py_http_connection_close(PyObject *self, PyObject *args); */ PyObject *aws_py_http_connection_is_open(PyObject *self, PyObject *args); +/** + * Update HTTP/2 connection window size. + */ +PyObject *aws_py_http2_connection_update_window(PyObject *self, PyObject *args); + +/** + * Update HTTP stream window size. + */ +PyObject *aws_py_http_stream_update_window(PyObject *self, PyObject *args); + /** * Create a new connection. returns void. The on_setup callback will be invoked * upon either success or failure of the connection. diff --git a/source/http_connection.c b/source/http_connection.c index 7daa9823b..4301add98 100644 --- a/source/http_connection.c +++ b/source/http_connection.c @@ -232,11 +232,17 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { PyObject *initial_settings_py; PyObject *on_remote_settings_changed_py; PyObject *py_core; + int manual_window_management = 0; + PyObject *initial_window_size_py = Py_None; + PyObject *read_buffer_capacity_py = Py_None; + int conn_manual_window_management = 0; + PyObject *conn_window_size_threshold_py = Py_None; + PyObject *stream_window_size_threshold_py = Py_None; bool success = false; if (!PyArg_ParseTuple( args, - "Os#IOOOOOO", + "Os#IOOOOOOpOOpOO", &bootstrap_py, &host_name, &host_name_len, @@ -246,7 +252,13 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { &proxy_options_py, &initial_settings_py, &on_remote_settings_changed_py, - &py_core)) { + &py_core, + &manual_window_management, + &initial_window_size_py, + &read_buffer_capacity_py, + &conn_manual_window_management, + &conn_window_size_threshold_py, + &stream_window_size_threshold_py)) { return NULL; } @@ -286,6 +298,24 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { http2_options.on_remote_settings_change = s_http2_on_remote_settings_change; } + /* Set up HTTP/2 flow control options */ + if (conn_manual_window_management) { + http2_options.conn_manual_window_management = true; + } + if (conn_window_size_threshold_py != Py_None) { + http2_options.conn_window_size_threshold_to_send_update = PyLong_AsUnsignedLong(conn_window_size_threshold_py); + if (PyErr_Occurred()) { + goto done; + } + } + if (stream_window_size_threshold_py != Py_None) { + http2_options.stream_window_size_threshold_to_send_update = + PyLong_AsUnsignedLong(stream_window_size_threshold_py); + if (PyErr_Occurred()) { + goto done; + } + } + /* proxy options are optional */ struct aws_http_proxy_options proxy_options_storage; struct aws_http_proxy_options *proxy_options = NULL; @@ -296,6 +326,15 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { } } + /* Set up HTTP/1.1 options if needed */ + struct aws_http1_connection_options http1_options = {0}; + if (read_buffer_capacity_py != Py_None) { + http1_options.read_buffer_capacity = PyLong_AsSize_t(read_buffer_capacity_py); + if (PyErr_Occurred()) { + goto done; + } + } + struct aws_http_client_connection_options http_options = { .self_size = sizeof(http_options), .bootstrap = bootstrap, @@ -310,8 +349,18 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { .on_setup = s_on_client_connection_setup, .on_shutdown = s_on_connection_shutdown, .http2_options = &http2_options, + .http1_options = (read_buffer_capacity_py != Py_None) ? &http1_options : NULL, + .manual_window_management = manual_window_management, }; + /* Set initial window size if provided */ + if (initial_window_size_py != Py_None) { + http_options.initial_window_size = PyLong_AsSize_t(initial_window_size_py); + if (PyErr_Occurred()) { + goto done; + } + } + connection->py_core = py_core; Py_INCREF(connection->py_core); @@ -365,3 +414,39 @@ PyObject *aws_py_http_connection_is_open(PyObject *self, PyObject *args) { } Py_RETURN_FALSE; } + +PyObject *aws_py_http2_connection_update_window(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + size_t increment_size; + if (!PyArg_ParseTuple(args, "On", &capsule, &increment_size)) { + return NULL; + } + + struct http_connection_binding *connection = PyCapsule_GetPointer(capsule, s_capsule_name_http_connection); + if (!connection) { + return NULL; + } + + aws_http2_connection_update_window(connection->native, increment_size); + + Py_RETURN_NONE; +} + +PyObject *aws_py_http_stream_update_window(PyObject *self, PyObject *args) { + (void)self; + PyObject *stream_capsule; + uint32_t increment_size; + if (!PyArg_ParseTuple(args, "OI", &stream_capsule, &increment_size)) { + return NULL; + } + + struct aws_http_stream *stream = aws_py_get_http_stream(stream_capsule); + if (!stream) { + return NULL; + } + + aws_http_stream_update_window(stream, increment_size); + + Py_RETURN_NONE; +} diff --git a/source/module.c b/source/module.c index 84b5673d2..dd346747e 100644 --- a/source/module.c +++ b/source/module.c @@ -866,6 +866,8 @@ static PyMethodDef s_module_methods[] = { /* HTTP */ AWS_PY_METHOD_DEF(http_connection_close, METH_VARARGS), AWS_PY_METHOD_DEF(http_connection_is_open, METH_VARARGS), + AWS_PY_METHOD_DEF(http2_connection_update_window, METH_VARARGS), + AWS_PY_METHOD_DEF(http_stream_update_window, METH_VARARGS), AWS_PY_METHOD_DEF(http_client_connection_new, METH_VARARGS), AWS_PY_METHOD_DEF(http_client_stream_new, METH_VARARGS), AWS_PY_METHOD_DEF(http_client_stream_activate, METH_VARARGS), diff --git a/test/test_aiohttp_client.py b/test/test_aiohttp_client.py index f3fa54471..757d2dde5 100644 --- a/test/test_aiohttp_client.py +++ b/test/test_aiohttp_client.py @@ -43,6 +43,8 @@ async def collect_response(self, stream): if not chunk: break self.body.extend(chunk) + if hasattr(stream, 'update_window'): + stream.update_window(len(chunk)) # Return status code for convenience return self.status_code @@ -657,5 +659,153 @@ def test_h2_mock_server_settings(self): asyncio.run(self._test_h2_mock_server_settings()) +class AIOFlowControlTest(NativeResourceTest): + timeout = 10.0 + + async def _test_h1_manual_window_management_happy_path(self): + """Test HTTP/1.1 manual window management happy path""" + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['http/1.1'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_options = tls_ctx.new_connection_options() + tls_options.set_server_name("httpbin.org") + + connection = await AIOHttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=tls_options, + manual_window_management=True, + initial_window_size=5, + read_buffer_capacity=1000 + ) + + request = HttpRequest('GET', '/bytes/10') + request.headers.add('host', 'httpbin.org') + stream = connection.request(request) + + response = Response() + status_code = await response.collect_response(stream) + + self.assertEqual(200, status_code) + self.assertEqual(10, len(response.body)) + await connection.close() + + def test_h1_manual_window_management_happy_path(self): + asyncio.run(self._test_h1_manual_window_management_happy_path()) + + async def _test_h2_manual_window_management_happy_path(self): + """Test HTTP/2 manual window management happy path""" + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['h2'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_options = tls_ctx.new_connection_options() + tls_options.set_server_name("httpbin.org") + + connection = await AIOHttp2ClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=tls_options, + manual_window_management=True, + initial_window_size=65536 + ) + + request = HttpRequest('GET', '/get') + request.headers.add('host', 'httpbin.org') + stream = connection.request(request) + + response = Response() + status_code = await response.collect_response(stream) + + self.assertEqual(200, status_code) + self.assertGreater(len(response.body), 0, "No data received") + await connection.close() + + def test_h2_manual_window_management_happy_path(self): + asyncio.run(self._test_h2_manual_window_management_happy_path()) + + async def _test_h2_stream_flow_control_blocks_and_resumes(self): + """Test that stream flow control actually blocks and resumes""" + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['h2'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_options = tls_ctx.new_connection_options() + tls_options.set_server_name("httpbin.org") + + connection = await AIOHttp2ClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=tls_options, + manual_window_management=True, + initial_window_size=10 # Tiny window + ) + + request = HttpRequest('GET', '/bytes/100') + request.headers.add('host', 'httpbin.org') + stream = connection.request(request) + + chunks_received = [] + body = bytearray() + + await stream.get_response_status_code() + while True: + chunk = await stream.get_next_response_chunk() + if not chunk: + break + chunks_received.append(len(chunk)) + body.extend(chunk) + stream.update_window(len(chunk)) + + self.assertEqual(100, len(body)) + self.assertEqual(len(chunks_received), 10, "Should receive exactly 10 chunks") + await connection.close() + + def test_h2_stream_flow_control_blocks_and_resumes(self): + asyncio.run(self._test_h2_stream_flow_control_blocks_and_resumes()) + + async def _test_h1_stream_flow_control_blocks_and_resumes(self): + """Test that HTTP/1.1 stream flow control actually blocks and resumes""" + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['http/1.1'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_options = tls_ctx.new_connection_options() + tls_options.set_server_name("httpbin.org") + + connection = await AIOHttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=tls_options, + manual_window_management=True, + initial_window_size=1, # Tiny window + read_buffer_capacity=1000 + ) + + request = HttpRequest('GET', '/bytes/100') + request.headers.add('host', 'httpbin.org') + stream = connection.request(request) + + chunks_received = [] + body = bytearray() + + await stream.get_response_status_code() + while True: + chunk = await stream.get_next_response_chunk() + if not chunk: + break + chunks_received.append(len(chunk)) + body.extend(chunk) + stream.update_window(len(chunk)) + + self.assertEqual(100, len(body)) + self.assertEqual(len(chunks_received), 100, "Should receive exactly 100 chunks") + await connection.close() + + def test_h1_stream_flow_control_blocks_and_resumes(self): + asyncio.run(self._test_h1_stream_flow_control_blocks_and_resumes()) + + if __name__ == '__main__': unittest.main() diff --git a/test/test_http_client.py b/test/test_http_client.py index 6aa99174e..a8c217698 100644 --- a/test/test_http_client.py +++ b/test/test_http_client.py @@ -16,7 +16,7 @@ from http.server import HTTPServer, SimpleHTTPRequestHandler from concurrent.futures import Future, thread from awscrt.io import ClientBootstrap, ClientTlsContext, DefaultHostResolver, EventLoopGroup, TlsConnectionOptions, TlsContextOptions, TlsCipherPref -from awscrt.http import HttpClientConnection, HttpClientStream, HttpHeaders, HttpProxyOptions, HttpRequest, HttpVersion, Http2ClientConnection, Http2Setting, Http2SettingID +from awscrt.http import HttpClientConnection, HttpClientStreamBase, HttpHeaders, HttpProxyOptions, HttpRequest, HttpVersion, Http2ClientConnection, Http2Setting, Http2SettingID import awscrt.exceptions @@ -638,5 +638,192 @@ def test_h2_mock_server_settings(self): self.assertEqual(None, connection.close().exception(self.timeout)) +class Response: + def __init__(self): + self.status_code = None + self.headers = None + self.body = bytearray() + + def on_response(self, http_stream, status_code, headers, **kwargs): + self.status_code = status_code + self.headers = HttpHeaders(headers) + + def on_body(self, http_stream, chunk, **kwargs): + self.body.extend(chunk) + + +class FlowControlTest(NativeResourceTest): + timeout = 10.0 + + def setUp(self): + super().setUp() + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['h2', 'http/1.1'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + self.tls_options = tls_ctx.new_connection_options() + self.tls_options.set_server_name("httpbin.org") + + def test_h1_manual_window_management_happy_path(self): + """Test HTTP/1.1 manual window management happy path""" + connection_future = HttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=self.tls_options, + manual_window_management=True, + initial_window_size=5, + read_buffer_capacity=1000 + ) + + try: + connection = connection_future.result(timeout=self.timeout) + request = HttpRequest('GET', '/bytes/10') + request.headers.add('host', 'httpbin.org') + + response = Response() + received_chunks = [] + window_updates_sent = [] + + def on_body_with_window_update(http_stream, chunk, **kwargs): + received_chunks.append(len(chunk)) + response.body.extend(chunk) + if hasattr(http_stream, '_binding') and http_stream._binding: + http_stream.update_window(len(chunk)) + window_updates_sent.append(len(chunk)) + + stream = connection.request(request, response.on_response, on_body_with_window_update) + stream.activate() + stream_completion_result = stream.completion_future.result(timeout=self.timeout) + + self.assertEqual(200, response.status_code) + self.assertEqual(200, stream_completion_result) + self.assertEqual(10, len(response.body)) + + if len(response.body) > 0: + self.assertGreater(len(received_chunks), 0, "No data chunks received") + self.assertGreater(len(window_updates_sent), 0, "No window updates sent") + self.assertEqual(sum(received_chunks), sum(window_updates_sent), + "Window updates don't match received data") + + connection.close() + except Exception as e: + self.skipTest(f"HTTP/1.1 flow control test skipped due to connection issue: {e}") + + def test_h2_manual_window_management_happy_path(self): + """Test HTTP/2 manual window management happy path""" + connection_future = Http2ClientConnection.new( + host_name="nghttp2.org", + port=443, + tls_connection_options=self.tls_options, + manual_window_management=True, + initial_window_size=65536 + ) + + try: + connection = connection_future.result(timeout=self.timeout) + request = HttpRequest('GET', '/httpbin/get') + request.headers.add('host', 'nghttp2.org') + + response = Response() + received_chunks = [] + window_updates_sent = [] + + def on_body_with_window_update(http_stream, chunk, **kwargs): + received_chunks.append(len(chunk)) + response.body.extend(chunk) + if hasattr(http_stream, '_binding') and http_stream._binding: + http_stream.update_window(len(chunk)) + window_updates_sent.append(len(chunk)) + + stream = connection.request(request, response.on_response, on_body_with_window_update) + stream.activate() + stream_completion_result = stream.completion_future.result(timeout=self.timeout) + + self.assertEqual(200, response.status_code) + self.assertEqual(200, stream_completion_result) + self.assertGreater(len(received_chunks), 0, "No data chunks received") + self.assertGreater(len(window_updates_sent), 0, "No window updates sent") + + connection.close() + except Exception as e: + self.skipTest(f"HTTP/2 flow control test skipped due to connection issue: {e}") + + def test_h2_stream_flow_control_blocks_and_resumes(self): + """Test that stream flow control actually blocks and resumes""" + connection_future = Http2ClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=self.tls_options, + manual_window_management=True, + initial_window_size=1 # Tiny window - will block immediately + ) + + try: + connection = connection_future.result(timeout=self.timeout) + request = HttpRequest('GET', '/bytes/100') + request.headers.add('host', 'httpbin.org') + + response = Response() + chunks_received = [] + + def on_body(http_stream, chunk, **kwargs): + chunks_received.append(len(chunk)) + response.body.extend(chunk) + # Update window to allow more data + http_stream.update_window(len(chunk)) + + stream = connection.request(request, response.on_response, on_body) + stream.activate() + stream.completion_future.result(timeout=self.timeout) + + self.assertEqual(100, len(response.body)) + # With window=10, we should receive many small chunks + self.assertEqual(len(chunks_received), 100, "Expected multiple chunks with tiny window") + + connection.close() + except Exception as e: + self.skipTest(f"HTTP/2 flow control test skipped: {e}") + + def test_h1_stream_flow_control_blocks_and_resumes(self): + """Test that HTTP/1.1 stream flow control actually blocks and resumes""" + connection_future = HttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=self.tls_options, + manual_window_management=True, + initial_window_size=1, # Tiny window + read_buffer_capacity=1000 + ) + + try: + connection = connection_future.result(timeout=self.timeout) + request = HttpRequest('GET', '/bytes/100') + request.headers.add('host', 'httpbin.org') + + response = Response() + chunks_received = [] + + def on_body(http_stream, chunk, **kwargs): + chunks_received.append(len(chunk)) + response.body.extend(chunk) + http_stream.update_window(len(chunk)) + + stream = connection.request(request, response.on_response, on_body) + stream.activate() + stream.completion_future.result(timeout=self.timeout) + + self.assertEqual(100, len(response.body)) + # With window=1, we should receive many small chunks + self.assertEqual(len(chunks_received), 100, "Should receive exactly 100 chunks") + + connection.close() + except Exception as e: + self.skipTest(f"HTTP/1.1 flow control test skipped: {e}") + + def tearDown(self): + self.tls_options = None + super().tearDown() + + if __name__ == '__main__': unittest.main()