From 9e6d2c94fcdcb61250a5f2b2581b268a450719e7 Mon Sep 17 00:00:00 2001 From: Krish Date: Thu, 8 Jan 2026 13:16:05 -0800 Subject: [PATCH 01/12] preliminary changes --- awscrt/aio/http.py | 83 +++++++++++++++++++++++++++++++++--- awscrt/http.py | 82 ++++++++++++++++++++++++++++++++--- source/http.h | 10 +++++ source/http_connection.c | 92 +++++++++++++++++++++++++++++++++++++++- source/module.c | 2 + 5 files changed, 254 insertions(+), 15 deletions(-) diff --git a/awscrt/aio/http.py b/awscrt/aio/http.py index d37bec401..c6040e0e0 100644 --- a/awscrt/aio/http.py +++ b/awscrt/aio/http.py @@ -38,7 +38,13 @@ async def new(cls, bootstrap: Optional[ClientBootstrap] = None, socket_options: Optional[SocketOptions] = None, tls_connection_options: Optional[TlsConnectionOptions] = None, - proxy_options: Optional[HttpProxyOptions] = None) -> "AIOHttpClientConnectionUnified": + proxy_options: Optional[HttpProxyOptions] = None, + manual_window_management: bool = False, + initial_window_size: Optional[int] = None, + read_buffer_capacity: Optional[int] = None, + conn_manual_window_management: bool = False, + conn_window_size_threshold: Optional[int] = None, + stream_window_size_threshold: Optional[int] = None) -> "AIOHttpClientConnectionUnified": """ Asynchronously establish a new AIOHttpClientConnectionUnified. @@ -60,6 +66,24 @@ async def new(cls, proxy_options (Optional[HttpProxyOptions]): Optional proxy options. If None is provided then a proxy is not used. + manual_window_management (bool): If True, enables manual flow control window management. + Default is False. + + initial_window_size (Optional[int]): Initial window size for flow control. + If None, uses default value. + + read_buffer_capacity (Optional[int]): Read buffer capacity for the connection. + If None, uses default value. + + conn_manual_window_management (bool): If True, enables manual connection-level window management. + Default is False. + + conn_window_size_threshold (Optional[int]): Connection window size threshold. + If None, uses default value. + + stream_window_size_threshold (Optional[int]): Stream window size threshold. + If None, uses default value. + Returns: AIOHttpClientConnectionUnified: A new unified HTTP client connection. """ @@ -70,7 +94,13 @@ async def new(cls, socket_options, tls_connection_options, proxy_options, - asyncio_connection=True) + asyncio_connection=True, + manual_window_management=manual_window_management, + initial_window_size=initial_window_size, + read_buffer_capacity=read_buffer_capacity, + conn_manual_window_management=conn_manual_window_management, + conn_window_size_threshold=conn_window_size_threshold, + stream_window_size_threshold=stream_window_size_threshold) return await asyncio.wrap_future(future) async def close(self) -> None: @@ -118,7 +148,10 @@ async def new(cls, bootstrap: Optional[ClientBootstrap] = None, socket_options: Optional[SocketOptions] = None, tls_connection_options: Optional[TlsConnectionOptions] = None, - proxy_options: Optional[HttpProxyOptions] = None) -> "AIOHttpClientConnection": + proxy_options: Optional[HttpProxyOptions] = None, + manual_window_management: bool = False, + initial_window_size: Optional[int] = None, + read_buffer_capacity: Optional[int] = None) -> "AIOHttpClientConnection": """ Asynchronously establish a new AIOHttpClientConnection. @@ -140,6 +173,15 @@ async def new(cls, proxy_options (Optional[HttpProxyOptions]): Optional proxy options. If None is provided then a proxy is not used. + manual_window_management (bool): If True, enables manual flow control window management. + Default is False. + + initial_window_size (Optional[int]): Initial window size for flow control. + If None, uses default value. + + read_buffer_capacity (Optional[int]): Read buffer capacity for the connection. + If None, uses default value. + Returns: AIOHttpClientConnection: A new HTTP client connection. """ @@ -151,7 +193,10 @@ async def new(cls, tls_connection_options, proxy_options, expected_version=HttpVersion.Http1_1, - asyncio_connection=True) + asyncio_connection=True, + manual_window_management=manual_window_management, + initial_window_size=initial_window_size, + read_buffer_capacity=read_buffer_capacity) return await asyncio.wrap_future(future) def request(self, @@ -189,8 +234,12 @@ async def new(cls, tls_connection_options: Optional[TlsConnectionOptions] = None, proxy_options: Optional[HttpProxyOptions] = None, initial_settings: Optional[List[Http2Setting]] = None, - on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], - None]] = None) -> "AIOHttp2ClientConnection": + on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], None]] = None, + manual_window_management: bool = False, + initial_window_size: Optional[int] = None, + conn_manual_window_management: bool = False, + conn_window_size_threshold: Optional[int] = None, + stream_window_size_threshold: Optional[int] = None) -> "AIOHttp2ClientConnection": """ Asynchronously establish an HTTP/2 client connection. Notes: to set up the connection, the server must support HTTP/2 and TlsConnectionOptions @@ -205,6 +254,21 @@ async def new(cls, The function should take the following arguments and return nothing: * `settings` (List[Http2Setting]): List of settings that were changed. + + manual_window_management (bool): If True, enables manual flow control window management. + Default is False. + + initial_window_size (Optional[int]): Initial window size for flow control. + If None, uses default value. + + conn_manual_window_management (bool): If True, enables manual connection-level window management. + Default is False. + + conn_window_size_threshold (Optional[int]): Connection window size threshold. + If None, uses default value. + + stream_window_size_threshold (Optional[int]): Stream window size threshold. + If None, uses default value. """ future = cls._generic_new( host_name, @@ -216,7 +280,12 @@ async def new(cls, expected_version=HttpVersion.Http2, initial_settings=initial_settings, on_remote_settings_changed=on_remote_settings_changed, - asyncio_connection=True) + asyncio_connection=True, + manual_window_management=manual_window_management, + initial_window_size=initial_window_size, + conn_manual_window_management=conn_manual_window_management, + conn_window_size_threshold=conn_window_size_threshold, + stream_window_size_threshold=stream_window_size_threshold) return await asyncio.wrap_future(future) def request(self, diff --git a/awscrt/http.py b/awscrt/http.py index e8a9c2a73..70b03f786 100644 --- a/awscrt/http.py +++ b/awscrt/http.py @@ -116,6 +116,15 @@ def is_open(self) -> bool: """ return _awscrt.http_connection_is_open(self._binding) + def update_window(self, increment_size: int) -> None: + """ + Update the connection's flow control window. + + Args: + increment_size (int): Number of bytes to increment the window by. + """ + _awscrt.http2_connection_update_window(self._binding, increment_size) + class HttpClientConnectionBase(HttpConnectionBase): __slots__ = ('_host_name', '_port') @@ -131,7 +140,13 @@ def _generic_new( expected_version: Optional[HttpVersion] = None, initial_settings: Optional[List[Http2Setting]] = None, on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], None]] = None, - asyncio_connection=False) -> "concurrent.futures.Future": + asyncio_connection=False, + manual_window_management: bool = False, + initial_window_size: Optional[int] = None, + read_buffer_capacity: Optional[int] = None, + conn_manual_window_management: bool = False, + conn_window_size_threshold: Optional[int] = None, + stream_window_size_threshold: Optional[int] = None) -> "concurrent.futures.Future": """ Initialize the generic part of the HttpClientConnection class. """ @@ -170,7 +185,13 @@ def _generic_new( proxy_options, initial_settings, on_remote_settings_changed, - connection_core) + connection_core, + manual_window_management, + initial_window_size, + read_buffer_capacity, + conn_manual_window_management, + conn_window_size_threshold, + stream_window_size_threshold) except Exception as e: future.set_exception(e) @@ -203,7 +224,10 @@ def new(cls, bootstrap: Optional[ClientBootstrap] = None, socket_options: Optional[SocketOptions] = None, tls_connection_options: Optional[TlsConnectionOptions] = None, - proxy_options: Optional['HttpProxyOptions'] = None) -> "concurrent.futures.Future": + proxy_options: Optional['HttpProxyOptions'] = None, + manual_window_management: bool = False, + initial_window_size: Optional[int] = None, + read_buffer_capacity: Optional[int] = None) -> "concurrent.futures.Future": """ Asynchronously establish a new HttpClientConnection. @@ -225,6 +249,15 @@ def new(cls, proxy_options (Optional[HttpProxyOptions]): Optional proxy options. If None is provided then a proxy is not used. + manual_window_management (bool): If True, enables manual flow control window management. + Default is False. + + initial_window_size (Optional[int]): Initial window size for flow control. + If None, uses default value. + + read_buffer_capacity (Optional[int]): Read buffer capacity for the connection. + If None, uses default value. + Returns: concurrent.futures.Future: A Future which completes when connection succeeds or fails. If successful, the Future will contain a new :class:`HttpClientConnection`. @@ -236,7 +269,10 @@ def new(cls, bootstrap, socket_options, tls_connection_options, - proxy_options) + proxy_options, + manual_window_management=manual_window_management, + initial_window_size=initial_window_size, + read_buffer_capacity=read_buffer_capacity) def request(self, request: 'HttpRequest', @@ -311,7 +347,12 @@ def new(cls, proxy_options: Optional['HttpProxyOptions'] = None, initial_settings: Optional[List[Http2Setting]] = None, on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], - None]] = None) -> "concurrent.futures.Future": + None]] = None, + manual_window_management: bool = False, + initial_window_size: Optional[int] = None, + conn_manual_window_management: bool = False, + conn_window_size_threshold: Optional[int] = None, + stream_window_size_threshold: Optional[int] = None) -> "concurrent.futures.Future": """ Asynchronously establish an HTTP/2 client connection. Notes: to set up the connection, the server must support HTTP/2 and TlsConnectionOptions @@ -326,6 +367,21 @@ def new(cls, The function should take the following arguments and return nothing: * `settings` (List[Http2Setting]): List of settings that were changed. + + manual_window_management (bool): If True, enables manual flow control window management. + Default is False. + + initial_window_size (Optional[int]): Initial window size for flow control. + If None, uses default value. + + conn_manual_window_management (bool): If True, enables manual connection-level window management. + Default is False. + + conn_window_size_threshold (Optional[int]): Connection window size threshold. + If None, uses default value. + + stream_window_size_threshold (Optional[int]): Stream window size threshold. + If None, uses default value. """ return cls._generic_new( host_name, @@ -336,7 +392,12 @@ def new(cls, proxy_options, HttpVersion.Http2, initial_settings, - on_remote_settings_changed) + on_remote_settings_changed, + manual_window_management=manual_window_management, + initial_window_size=initial_window_size, + conn_manual_window_management=conn_manual_window_management, + conn_window_size_threshold=conn_window_size_threshold, + stream_window_size_threshold=stream_window_size_threshold) def request(self, request: 'HttpRequest', @@ -486,6 +547,15 @@ def _on_complete(self, error_code: int) -> None: else: self._completion_future.set_exception(awscrt.exceptions.from_code(error_code)) + def update_window(self, increment_size: int) -> None: + """ + Update the stream's flow control window. + + Args: + increment_size (int): Number of bytes to increment the window by. + """ + _awscrt.http_stream_update_window(self._binding, increment_size) + class HttpClientStream(HttpClientStreamBase): """HTTP stream that sends a request and receives a response. diff --git a/source/http.h b/source/http.h index 2cfbf91e2..dbdab9af1 100644 --- a/source/http.h +++ b/source/http.h @@ -30,6 +30,16 @@ PyObject *aws_py_http_connection_close(PyObject *self, PyObject *args); */ PyObject *aws_py_http_connection_is_open(PyObject *self, PyObject *args); +/** + * Update HTTP/2 connection window size. + */ +PyObject *aws_py_http2_connection_update_window(PyObject *self, PyObject *args); + +/** + * Update HTTP stream window size. + */ +PyObject *aws_py_http_stream_update_window(PyObject *self, PyObject *args); + /** * Create a new connection. returns void. The on_setup callback will be invoked * upon either success or failure of the connection. diff --git a/source/http_connection.c b/source/http_connection.c index 7daa9823b..e57eaea0d 100644 --- a/source/http_connection.c +++ b/source/http_connection.c @@ -232,11 +232,17 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { PyObject *initial_settings_py; PyObject *on_remote_settings_changed_py; PyObject *py_core; + int manual_window_management = 0; + PyObject *initial_window_size_py = Py_None; + PyObject *read_buffer_capacity_py = Py_None; + int conn_manual_window_management = 0; + PyObject *conn_window_size_threshold_py = Py_None; + PyObject *stream_window_size_threshold_py = Py_None; bool success = false; if (!PyArg_ParseTuple( args, - "Os#IOOOOOO", + "Os#IOOOOOOpOOpOO", &bootstrap_py, &host_name, &host_name_len, @@ -246,7 +252,13 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { &proxy_options_py, &initial_settings_py, &on_remote_settings_changed_py, - &py_core)) { + &py_core, + &manual_window_management, + &initial_window_size_py, + &read_buffer_capacity_py, + &conn_manual_window_management, + &conn_window_size_threshold_py, + &stream_window_size_threshold_py)) { return NULL; } @@ -286,6 +298,23 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { http2_options.on_remote_settings_change = s_http2_on_remote_settings_change; } + /* Set up HTTP/2 flow control options */ + if (conn_manual_window_management) { + http2_options.conn_manual_window_management = true; + } + if (conn_window_size_threshold_py != Py_None) { + http2_options.conn_window_size_threshold = PyLong_AsSize_t(conn_window_size_threshold_py); + if (PyErr_Occurred()) { + goto done; + } + } + if (stream_window_size_threshold_py != Py_None) { + http2_options.stream_window_size_threshold = PyLong_AsSize_t(stream_window_size_threshold_py); + if (PyErr_Occurred()) { + goto done; + } + } + /* proxy options are optional */ struct aws_http_proxy_options proxy_options_storage; struct aws_http_proxy_options *proxy_options = NULL; @@ -310,8 +339,25 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { .on_setup = s_on_client_connection_setup, .on_shutdown = s_on_connection_shutdown, .http2_options = &http2_options, + .manual_window_management = manual_window_management, }; + /* Set initial window size if provided */ + if (initial_window_size_py != Py_None) { + http_options.initial_window_size = PyLong_AsSize_t(initial_window_size_py); + if (PyErr_Occurred()) { + goto done; + } + } + + /* Set read buffer capacity if provided */ + if (read_buffer_capacity_py != Py_None) { + http_options.read_buffer_capacity = PyLong_AsSize_t(read_buffer_capacity_py); + if (PyErr_Occurred()) { + goto done; + } + } + connection->py_core = py_core; Py_INCREF(connection->py_core); @@ -365,3 +411,45 @@ PyObject *aws_py_http_connection_is_open(PyObject *self, PyObject *args) { } Py_RETURN_FALSE; } + +PyObject *aws_py_http2_connection_update_window(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + uint32_t increment_size; + if (!PyArg_ParseTuple(args, "OI", &capsule, &increment_size)) { + return NULL; + } + + struct http_connection_binding *connection = PyCapsule_GetPointer(capsule, s_capsule_name_http_connection); + if (!connection) { + return NULL; + } + + if (aws_http2_connection_update_window(connection->native, increment_size)) { + PyErr_SetAwsLastError(); + return NULL; + } + + Py_RETURN_NONE; +} + +PyObject *aws_py_http_stream_update_window(PyObject *self, PyObject *args) { + (void)self; + PyObject *stream_capsule; + uint32_t increment_size; + if (!PyArg_ParseTuple(args, "OI", &stream_capsule, &increment_size)) { + return NULL; + } + + struct aws_http_stream *stream = aws_py_get_http_stream(stream_capsule); + if (!stream) { + return NULL; + } + + if (aws_http_stream_update_window(stream, increment_size)) { + PyErr_SetAwsLastError(); + return NULL; + } + + Py_RETURN_NONE; +} diff --git a/source/module.c b/source/module.c index 84b5673d2..dd346747e 100644 --- a/source/module.c +++ b/source/module.c @@ -866,6 +866,8 @@ static PyMethodDef s_module_methods[] = { /* HTTP */ AWS_PY_METHOD_DEF(http_connection_close, METH_VARARGS), AWS_PY_METHOD_DEF(http_connection_is_open, METH_VARARGS), + AWS_PY_METHOD_DEF(http2_connection_update_window, METH_VARARGS), + AWS_PY_METHOD_DEF(http_stream_update_window, METH_VARARGS), AWS_PY_METHOD_DEF(http_client_connection_new, METH_VARARGS), AWS_PY_METHOD_DEF(http_client_stream_new, METH_VARARGS), AWS_PY_METHOD_DEF(http_client_stream_activate, METH_VARARGS), From a50ce80d2c79847b24872f71962d3fd3b472a8b0 Mon Sep 17 00:00:00 2001 From: Krish Date: Thu, 8 Jan 2026 16:26:19 -0800 Subject: [PATCH 02/12] fix variable names for binding --- source/http_connection.c | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/source/http_connection.c b/source/http_connection.c index e57eaea0d..391bab332 100644 --- a/source/http_connection.c +++ b/source/http_connection.c @@ -303,13 +303,13 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { http2_options.conn_manual_window_management = true; } if (conn_window_size_threshold_py != Py_None) { - http2_options.conn_window_size_threshold = PyLong_AsSize_t(conn_window_size_threshold_py); + http2_options.conn_window_size_threshold_to_send_update = PyLong_AsUnsignedLong(conn_window_size_threshold_py); if (PyErr_Occurred()) { goto done; } } if (stream_window_size_threshold_py != Py_None) { - http2_options.stream_window_size_threshold = PyLong_AsSize_t(stream_window_size_threshold_py); + http2_options.stream_window_size_threshold_to_send_update = PyLong_AsUnsignedLong(stream_window_size_threshold_py); if (PyErr_Occurred()) { goto done; } @@ -325,6 +325,15 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { } } + /* Set up HTTP/1.1 options if needed */ + struct aws_http1_connection_options http1_options = {0}; + if (read_buffer_capacity_py != Py_None) { + http1_options.read_buffer_capacity = PyLong_AsSize_t(read_buffer_capacity_py); + if (PyErr_Occurred()) { + goto done; + } + } + struct aws_http_client_connection_options http_options = { .self_size = sizeof(http_options), .bootstrap = bootstrap, @@ -339,6 +348,7 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { .on_setup = s_on_client_connection_setup, .on_shutdown = s_on_connection_shutdown, .http2_options = &http2_options, + .http1_options = (read_buffer_capacity_py != Py_None) ? &http1_options : NULL, .manual_window_management = manual_window_management, }; @@ -350,14 +360,6 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { } } - /* Set read buffer capacity if provided */ - if (read_buffer_capacity_py != Py_None) { - http_options.read_buffer_capacity = PyLong_AsSize_t(read_buffer_capacity_py); - if (PyErr_Occurred()) { - goto done; - } - } - connection->py_core = py_core; Py_INCREF(connection->py_core); @@ -425,10 +427,7 @@ PyObject *aws_py_http2_connection_update_window(PyObject *self, PyObject *args) return NULL; } - if (aws_http2_connection_update_window(connection->native, increment_size)) { - PyErr_SetAwsLastError(); - return NULL; - } + aws_http2_connection_update_window(connection->native, increment_size); Py_RETURN_NONE; } @@ -446,10 +445,7 @@ PyObject *aws_py_http_stream_update_window(PyObject *self, PyObject *args) { return NULL; } - if (aws_http_stream_update_window(stream, increment_size)) { - PyErr_SetAwsLastError(); - return NULL; - } + aws_http_stream_update_window(stream, increment_size); Py_RETURN_NONE; } From ae922b9b57ee7478360e7e806c61e5202a90754b Mon Sep 17 00:00:00 2001 From: Krish Date: Fri, 9 Jan 2026 18:10:48 -0800 Subject: [PATCH 03/12] code fix, test and lint --- awscrt/http.py | 2 +- source/http_connection.c | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/awscrt/http.py b/awscrt/http.py index 70b03f786..d948f0780 100644 --- a/awscrt/http.py +++ b/awscrt/http.py @@ -554,7 +554,7 @@ def update_window(self, increment_size: int) -> None: Args: increment_size (int): Number of bytes to increment the window by. """ - _awscrt.http_stream_update_window(self._binding, increment_size) + _awscrt.http_stream_update_window(self, increment_size) class HttpClientStream(HttpClientStreamBase): diff --git a/source/http_connection.c b/source/http_connection.c index 391bab332..a7feb5049 100644 --- a/source/http_connection.c +++ b/source/http_connection.c @@ -309,7 +309,8 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { } } if (stream_window_size_threshold_py != Py_None) { - http2_options.stream_window_size_threshold_to_send_update = PyLong_AsUnsignedLong(stream_window_size_threshold_py); + http2_options.stream_window_size_threshold_to_send_update = + PyLong_AsUnsignedLong(stream_window_size_threshold_py); if (PyErr_Occurred()) { goto done; } From 44a5f3ea289f0ad5b620431316a058e900be6790 Mon Sep 17 00:00:00 2001 From: Krish Date: Mon, 12 Jan 2026 11:19:05 -0800 Subject: [PATCH 04/12] fix tests --- test/test_aiohttp_client.py | 142 +++++++++++++++++++++++++++ test/test_http_client.py | 185 +++++++++++++++++++++++++++++++++++- 2 files changed, 326 insertions(+), 1 deletion(-) diff --git a/test/test_aiohttp_client.py b/test/test_aiohttp_client.py index f3fa54471..d44c9f0fb 100644 --- a/test/test_aiohttp_client.py +++ b/test/test_aiohttp_client.py @@ -43,6 +43,8 @@ async def collect_response(self, stream): if not chunk: break self.body.extend(chunk) + if hasattr(stream, 'update_window'): + stream.update_window(len(chunk)) # Return status code for convenience return self.status_code @@ -657,5 +659,145 @@ def test_h2_mock_server_settings(self): asyncio.run(self._test_h2_mock_server_settings()) +class AIOFlowControlTest(NativeResourceTest): + timeout = 10.0 + + async def _test_http1_manual_window_management_parameters(self): + """Test HTTP/1.1 connection accepts flow control parameters""" + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['http/1.1'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_options = tls_ctx.new_connection_options() + tls_options.set_server_name("httpbin.org") + + connection = await AIOHttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=tls_options, + manual_window_management=True, + initial_window_size=32768, + read_buffer_capacity=16384 + ) + await connection.close() + + def test_http1_manual_window_management_parameters(self): + asyncio.run(self._test_http1_manual_window_management_parameters()) + + async def _test_http2_manual_window_management_parameters(self): + """Test HTTP/2 connection accepts flow control parameters""" + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['h2'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_options = tls_ctx.new_connection_options() + tls_options.set_server_name("httpbin.org") + + connection = await AIOHttp2ClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=tls_options, + manual_window_management=True, + initial_window_size=65536, + conn_manual_window_management=True, + conn_window_size_threshold=32767, + stream_window_size_threshold=16384 + ) + await connection.close() + + def test_http2_manual_window_management_parameters(self): + asyncio.run(self._test_http2_manual_window_management_parameters()) + + async def _test_connection_has_update_window_method(self): + """Test connection has update_window method""" + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['http/1.1'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_options = tls_ctx.new_connection_options() + tls_options.set_server_name("httpbin.org") + + connection = await AIOHttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=tls_options + ) + self.assertTrue(hasattr(connection, 'update_window'), + "Connection missing update_window method") + self.assertTrue(callable(getattr(connection, 'update_window')), + "update_window is not callable") + await connection.close() + + def test_connection_has_update_window_method(self): + asyncio.run(self._test_connection_has_update_window_method()) + + def test_stream_has_update_window_method(self): + """Test stream has update_window method""" + pass + + async def _test_h2_manual_window_management_happy_path(self): + """Test HTTP/2 manual window management happy path""" + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['h2'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_options = tls_ctx.new_connection_options() + tls_options.set_server_name("httpbin.org") + + connection = await AIOHttp2ClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=tls_options, + manual_window_management=True, + initial_window_size=65536 + ) + + request = HttpRequest('GET', '/get') + request.headers.add('host', 'httpbin.org') + stream = connection.request(request) + + response = Response() + status_code = await response.collect_response(stream) + + self.assertEqual(200, status_code) + self.assertGreater(len(response.body), 0, "No data received") + await connection.close() + + def test_h2_manual_window_management_happy_path(self): + asyncio.run(self._test_h2_manual_window_management_happy_path()) + + async def _test_h1_manual_window_management_happy_path(self): + """Test HTTP/1.1 manual window management happy path""" + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['http/1.1'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_options = tls_ctx.new_connection_options() + tls_options.set_server_name("httpbin.org") + + connection = await AIOHttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=tls_options, + manual_window_management=True, + initial_window_size=5, + read_buffer_capacity=1000 + ) + + request = HttpRequest('GET', '/bytes/10') + request.headers.add('host', 'httpbin.org') + stream = connection.request(request) + + response = Response() + status_code = await response.collect_response(stream) + + self.assertEqual(200, status_code) + self.assertEqual(10, len(response.body)) + await connection.close() + + def test_h1_manual_window_management_happy_path(self): + asyncio.run(self._test_h1_manual_window_management_happy_path()) + + if __name__ == '__main__': unittest.main() diff --git a/test/test_http_client.py b/test/test_http_client.py index 6aa99174e..8ef63ad25 100644 --- a/test/test_http_client.py +++ b/test/test_http_client.py @@ -16,7 +16,7 @@ from http.server import HTTPServer, SimpleHTTPRequestHandler from concurrent.futures import Future, thread from awscrt.io import ClientBootstrap, ClientTlsContext, DefaultHostResolver, EventLoopGroup, TlsConnectionOptions, TlsContextOptions, TlsCipherPref -from awscrt.http import HttpClientConnection, HttpClientStream, HttpHeaders, HttpProxyOptions, HttpRequest, HttpVersion, Http2ClientConnection, Http2Setting, Http2SettingID +from awscrt.http import HttpClientConnection, HttpClientStreamBase, HttpHeaders, HttpProxyOptions, HttpRequest, HttpVersion, Http2ClientConnection, Http2Setting, Http2SettingID import awscrt.exceptions @@ -638,5 +638,188 @@ def test_h2_mock_server_settings(self): self.assertEqual(None, connection.close().exception(self.timeout)) +class Response: + def __init__(self): + self.status_code = None + self.headers = None + self.body = bytearray() + + def on_response(self, http_stream, status_code, headers, **kwargs): + self.status_code = status_code + self.headers = HttpHeaders(headers) + + def on_body(self, http_stream, chunk, **kwargs): + self.body.extend(chunk) + + +class FlowControlTest(NativeResourceTest): + timeout = 10.0 + + def setUp(self): + super().setUp() + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['h2', 'http/1.1'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + self.tls_options = tls_ctx.new_connection_options() + self.tls_options.set_server_name("httpbin.org") + + def test_http1_manual_window_management_parameters(self): + """Test HTTP/1.1 connection accepts flow control parameters""" + try: + future = HttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=self.tls_options, + manual_window_management=True, + initial_window_size=32768, + read_buffer_capacity=16384 + ) + self.assertTrue(True, "HTTP/1.1 flow control parameters accepted") + try: + connection = future.result(timeout=1) + connection.close() + except: + future.cancel() + except Exception as e: + self.fail(f"HTTP/1.1 flow control parameters rejected: {e}") + + def test_http2_manual_window_management_parameters(self): + """Test HTTP/2 connection accepts flow control parameters""" + try: + future = Http2ClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=self.tls_options, + manual_window_management=True, + initial_window_size=65536, + conn_manual_window_management=True, + conn_window_size_threshold=32767, + stream_window_size_threshold=16384 + ) + self.assertTrue(True, "HTTP/2 flow control parameters accepted") + try: + connection = future.result(timeout=1) + connection.close() + except: + future.cancel() + except Exception as e: + self.fail(f"HTTP/2 flow control parameters rejected: {e}") + + def test_connection_has_update_window_method(self): + """Test connection has update_window method""" + future = HttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=self.tls_options + ) + + try: + connection = future.result(timeout=self.timeout) + self.assertTrue(hasattr(connection, 'update_window'), + "Connection missing update_window method") + self.assertTrue(callable(getattr(connection, 'update_window')), + "update_window is not callable") + connection.close() + except Exception as e: + self.assertTrue(hasattr(HttpClientConnectionBase, 'update_window'), + "HttpClientConnectionBase missing update_window method") + + def test_stream_has_update_window_method(self): + """Test stream has update_window method""" + self.assertTrue(hasattr(HttpClientStreamBase, 'update_window'), + "HttpClientStreamBase missing update_window method") + self.assertTrue(callable(getattr(HttpClientStreamBase, 'update_window')), + "Stream update_window is not callable") + + def test_h2_manual_window_management_happy_path(self): + """Test HTTP/2 manual window management happy path""" + connection_future = Http2ClientConnection.new( + host_name="nghttp2.org", + port=443, + tls_connection_options=self.tls_options, + manual_window_management=True, + initial_window_size=65536 + ) + + try: + connection = connection_future.result(timeout=self.timeout) + request = HttpRequest('GET', '/httpbin/get') + request.headers.add('host', 'nghttp2.org') + + response = Response() + received_chunks = [] + window_updates_sent = [] + + def on_body_with_window_update(http_stream, chunk, **kwargs): + received_chunks.append(len(chunk)) + response.body.extend(chunk) + if hasattr(http_stream, '_binding') and http_stream._binding: + http_stream.update_window(len(chunk)) + window_updates_sent.append(len(chunk)) + + stream = connection.request(request, response.on_response, on_body_with_window_update) + stream.activate() + stream_completion_result = stream.completion_future.result(timeout=self.timeout) + + self.assertEqual(200, response.status_code) + self.assertEqual(200, stream_completion_result) + self.assertGreater(len(received_chunks), 0, "No data chunks received") + self.assertGreater(len(window_updates_sent), 0, "No window updates sent") + + connection.close() + except Exception as e: + self.skipTest(f"HTTP/2 flow control test skipped due to connection issue: {e}") + + def test_h1_manual_window_management_happy_path(self): + """Test HTTP/1.1 manual window management happy path""" + connection_future = HttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=self.tls_options, + manual_window_management=True, + initial_window_size=5, + read_buffer_capacity=1000 + ) + + try: + connection = connection_future.result(timeout=self.timeout) + request = HttpRequest('GET', '/bytes/10') + request.headers.add('host', 'httpbin.org') + + response = Response() + received_chunks = [] + window_updates_sent = [] + + def on_body_with_window_update(http_stream, chunk, **kwargs): + received_chunks.append(len(chunk)) + response.body.extend(chunk) + if hasattr(http_stream, '_binding') and http_stream._binding: + http_stream.update_window(len(chunk)) + window_updates_sent.append(len(chunk)) + + stream = connection.request(request, response.on_response, on_body_with_window_update) + stream.activate() + stream_completion_result = stream.completion_future.result(timeout=self.timeout) + + self.assertEqual(200, response.status_code) + self.assertEqual(200, stream_completion_result) + self.assertEqual(10, len(response.body)) + + if len(response.body) > 0: + self.assertGreater(len(received_chunks), 0, "No data chunks received") + self.assertGreater(len(window_updates_sent), 0, "No window updates sent") + self.assertEqual(sum(received_chunks), sum(window_updates_sent), + "Window updates don't match received data") + + connection.close() + except Exception as e: + self.skipTest(f"HTTP/1.1 flow control test skipped due to connection issue: {e}") + + def tearDown(self): + self.tls_options = None + super().tearDown() + + if __name__ == '__main__': unittest.main() From b3b14880543bee3fd2bc0f437024befec3b66227 Mon Sep 17 00:00:00 2001 From: Krish Date: Mon, 12 Jan 2026 12:01:18 -0800 Subject: [PATCH 05/12] fix lint --- test/test_aiohttp_client.py | 14 +++++++------- test/test_http_client.py | 24 ++++++++++++------------ 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/test/test_aiohttp_client.py b/test/test_aiohttp_client.py index d44c9f0fb..38621ceed 100644 --- a/test/test_aiohttp_client.py +++ b/test/test_aiohttp_client.py @@ -670,7 +670,7 @@ async def _test_http1_manual_window_management_parameters(self): tls_ctx = ClientTlsContext(tls_ctx_opt) tls_options = tls_ctx.new_connection_options() tls_options.set_server_name("httpbin.org") - + connection = await AIOHttpClientConnection.new( host_name="httpbin.org", port=443, @@ -692,7 +692,7 @@ async def _test_http2_manual_window_management_parameters(self): tls_ctx = ClientTlsContext(tls_ctx_opt) tls_options = tls_ctx.new_connection_options() tls_options.set_server_name("httpbin.org") - + connection = await AIOHttp2ClientConnection.new( host_name="httpbin.org", port=443, @@ -716,16 +716,16 @@ async def _test_connection_has_update_window_method(self): tls_ctx = ClientTlsContext(tls_ctx_opt) tls_options = tls_ctx.new_connection_options() tls_options.set_server_name("httpbin.org") - + connection = await AIOHttpClientConnection.new( host_name="httpbin.org", port=443, tls_connection_options=tls_options ) - self.assertTrue(hasattr(connection, 'update_window'), - "Connection missing update_window method") - self.assertTrue(callable(getattr(connection, 'update_window')), - "update_window is not callable") + self.assertTrue(hasattr(connection, 'update_window'), + "Connection missing update_window method") + self.assertTrue(callable(getattr(connection, 'update_window')), + "update_window is not callable") await connection.close() def test_connection_has_update_window_method(self): diff --git a/test/test_http_client.py b/test/test_http_client.py index 8ef63ad25..a797946b4 100644 --- a/test/test_http_client.py +++ b/test/test_http_client.py @@ -679,7 +679,7 @@ def test_http1_manual_window_management_parameters(self): try: connection = future.result(timeout=1) connection.close() - except: + except BaseException: future.cancel() except Exception as e: self.fail(f"HTTP/1.1 flow control parameters rejected: {e}") @@ -688,7 +688,7 @@ def test_http2_manual_window_management_parameters(self): """Test HTTP/2 connection accepts flow control parameters""" try: future = Http2ClientConnection.new( - host_name="httpbin.org", + host_name="httpbin.org", port=443, tls_connection_options=self.tls_options, manual_window_management=True, @@ -701,7 +701,7 @@ def test_http2_manual_window_management_parameters(self): try: connection = future.result(timeout=1) connection.close() - except: + except BaseException: future.cancel() except Exception as e: self.fail(f"HTTP/2 flow control parameters rejected: {e}") @@ -716,21 +716,21 @@ def test_connection_has_update_window_method(self): try: connection = future.result(timeout=self.timeout) - self.assertTrue(hasattr(connection, 'update_window'), - "Connection missing update_window method") - self.assertTrue(callable(getattr(connection, 'update_window')), - "update_window is not callable") + self.assertTrue(hasattr(connection, 'update_window'), + "Connection missing update_window method") + self.assertTrue(callable(getattr(connection, 'update_window')), + "update_window is not callable") connection.close() except Exception as e: self.assertTrue(hasattr(HttpClientConnectionBase, 'update_window'), - "HttpClientConnectionBase missing update_window method") + "HttpClientConnectionBase missing update_window method") def test_stream_has_update_window_method(self): """Test stream has update_window method""" self.assertTrue(hasattr(HttpClientStreamBase, 'update_window'), - "HttpClientStreamBase missing update_window method") + "HttpClientStreamBase missing update_window method") self.assertTrue(callable(getattr(HttpClientStreamBase, 'update_window')), - "Stream update_window is not callable") + "Stream update_window is not callable") def test_h2_manual_window_management_happy_path(self): """Test HTTP/2 manual window management happy path""" @@ -809,8 +809,8 @@ def on_body_with_window_update(http_stream, chunk, **kwargs): if len(response.body) > 0: self.assertGreater(len(received_chunks), 0, "No data chunks received") self.assertGreater(len(window_updates_sent), 0, "No window updates sent") - self.assertEqual(sum(received_chunks), sum(window_updates_sent), - "Window updates don't match received data") + self.assertEqual(sum(received_chunks), sum(window_updates_sent), + "Window updates don't match received data") connection.close() except Exception as e: From ad4bdef609b866512bc5ed639d39e69a6cca0b75 Mon Sep 17 00:00:00 2001 From: Krish Date: Tue, 20 Jan 2026 14:21:55 -0800 Subject: [PATCH 06/12] fix update_window and tests --- awscrt/aio/http.py | 9 +++++++++ awscrt/http.py | 18 ++++++++--------- source/http_connection.c | 4 ++-- test/test_aiohttp_client.py | 39 +++++++++++++++++++++++++++++-------- test/test_http_client.py | 27 +++++++++++++++++++------ 5 files changed, 72 insertions(+), 25 deletions(-) diff --git a/awscrt/aio/http.py b/awscrt/aio/http.py index c6040e0e0..e49aeb3f3 100644 --- a/awscrt/aio/http.py +++ b/awscrt/aio/http.py @@ -306,6 +306,15 @@ def request(self, """ return AIOHttp2ClientStream(self, request, request_body_generator, loop) + def update_window(self, increment_size: int) -> None: + """ + Update the connection's flow control window. + + Args: + increment_size (int): Number of bytes to increment the window by. + """ + _awscrt.http2_connection_update_window(self._binding, increment_size) + class AIOHttpClientStreamUnified(HttpClientStreamBase): __slots__ = ( diff --git a/awscrt/http.py b/awscrt/http.py index d948f0780..f3e467fec 100644 --- a/awscrt/http.py +++ b/awscrt/http.py @@ -116,15 +116,6 @@ def is_open(self) -> bool: """ return _awscrt.http_connection_is_open(self._binding) - def update_window(self, increment_size: int) -> None: - """ - Update the connection's flow control window. - - Args: - increment_size (int): Number of bytes to increment the window by. - """ - _awscrt.http2_connection_update_window(self._binding, increment_size) - class HttpClientConnectionBase(HttpConnectionBase): __slots__ = ('_host_name', '_port') @@ -458,6 +449,15 @@ def close(self) -> "concurrent.futures.Future": _awscrt.http_connection_close(self._binding) return self.shutdown_future + def update_window(self, increment_size: int) -> None: + """ + Update the connection's flow control window. + + Args: + increment_size (int): Number of bytes to increment the window by. + """ + _awscrt.http2_connection_update_window(self._binding, increment_size) + class HttpStreamBase(NativeResource): """Base for HTTP stream classes. diff --git a/source/http_connection.c b/source/http_connection.c index a7feb5049..4301add98 100644 --- a/source/http_connection.c +++ b/source/http_connection.c @@ -418,8 +418,8 @@ PyObject *aws_py_http_connection_is_open(PyObject *self, PyObject *args) { PyObject *aws_py_http2_connection_update_window(PyObject *self, PyObject *args) { (void)self; PyObject *capsule; - uint32_t increment_size; - if (!PyArg_ParseTuple(args, "OI", &capsule, &increment_size)) { + size_t increment_size; + if (!PyArg_ParseTuple(args, "On", &capsule, &increment_size)) { return NULL; } diff --git a/test/test_aiohttp_client.py b/test/test_aiohttp_client.py index 38621ceed..ae5f9404a 100644 --- a/test/test_aiohttp_client.py +++ b/test/test_aiohttp_client.py @@ -708,32 +708,55 @@ async def _test_http2_manual_window_management_parameters(self): def test_http2_manual_window_management_parameters(self): asyncio.run(self._test_http2_manual_window_management_parameters()) - async def _test_connection_has_update_window_method(self): - """Test connection has update_window method""" + async def _test_h2_connection_has_update_window_method(self): + """Test HTTP/2 connection has update_window method""" tls_ctx_opt = TlsContextOptions() tls_ctx_opt.verify_peer = False - tls_ctx_opt.alpn_list = ['http/1.1'] + tls_ctx_opt.alpn_list = ['h2'] tls_ctx = ClientTlsContext(tls_ctx_opt) tls_options = tls_ctx.new_connection_options() tls_options.set_server_name("httpbin.org") - connection = await AIOHttpClientConnection.new( + connection = await AIOHttp2ClientConnection.new( host_name="httpbin.org", port=443, tls_connection_options=tls_options ) self.assertTrue(hasattr(connection, 'update_window'), - "Connection missing update_window method") + "HTTP/2 Connection missing update_window method") self.assertTrue(callable(getattr(connection, 'update_window')), "update_window is not callable") await connection.close() - def test_connection_has_update_window_method(self): - asyncio.run(self._test_connection_has_update_window_method()) + async def _test_h1_connection_no_update_window_method(self): + """Test HTTP/1.1 connection does NOT have update_window method""" + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['http/1.1'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_options = tls_ctx.new_connection_options() + tls_options.set_server_name("httpbin.org") + + connection = await AIOHttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=tls_options + ) + self.assertFalse(hasattr(connection, 'update_window'), + "HTTP/1.1 Connection should NOT have update_window method") + await connection.close() + + def test_h2_connection_has_update_window_method(self): + asyncio.run(self._test_h2_connection_has_update_window_method()) + + def test_h1_connection_no_update_window_method(self): + asyncio.run(self._test_h1_connection_no_update_window_method()) def test_stream_has_update_window_method(self): """Test stream has update_window method""" - pass + from awscrt.http import HttpClientStreamBase + self.assertTrue(hasattr(HttpClientStreamBase, 'update_window'), + "HttpClientStreamBase missing update_window method") async def _test_h2_manual_window_management_happy_path(self): """Test HTTP/2 manual window management happy path""" diff --git a/test/test_http_client.py b/test/test_http_client.py index a797946b4..48090349c 100644 --- a/test/test_http_client.py +++ b/test/test_http_client.py @@ -706,9 +706,9 @@ def test_http2_manual_window_management_parameters(self): except Exception as e: self.fail(f"HTTP/2 flow control parameters rejected: {e}") - def test_connection_has_update_window_method(self): - """Test connection has update_window method""" - future = HttpClientConnection.new( + def test_h2_connection_has_update_window_method(self): + """Test HTTP/2 connection has update_window method""" + future = Http2ClientConnection.new( host_name="httpbin.org", port=443, tls_connection_options=self.tls_options @@ -717,13 +717,28 @@ def test_connection_has_update_window_method(self): try: connection = future.result(timeout=self.timeout) self.assertTrue(hasattr(connection, 'update_window'), - "Connection missing update_window method") + "HTTP/2 Connection missing update_window method") self.assertTrue(callable(getattr(connection, 'update_window')), "update_window is not callable") connection.close() except Exception as e: - self.assertTrue(hasattr(HttpClientConnectionBase, 'update_window'), - "HttpClientConnectionBase missing update_window method") + self.skipTest(f"HTTP/2 connection test skipped: {e}") + + def test_h1_connection_no_update_window_method(self): + """Test HTTP/1.1 connection does NOT have update_window method""" + future = HttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=self.tls_options + ) + + try: + connection = future.result(timeout=self.timeout) + self.assertFalse(hasattr(connection, 'update_window'), + "HTTP/1.1 Connection should NOT have update_window method") + connection.close() + except Exception as e: + self.skipTest(f"HTTP/1.1 connection test skipped: {e}") def test_stream_has_update_window_method(self): """Test stream has update_window method""" From 35d50a1cf9ef75fd0a69a6ca061f9a4cb3cd371c Mon Sep 17 00:00:00 2001 From: Krish Date: Tue, 20 Jan 2026 16:27:11 -0800 Subject: [PATCH 07/12] add more tests --- test/test_aiohttp_client.py | 62 ++++++++++++++++++++++++++ test/test_http_client.py | 89 +++++++++++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+) diff --git a/test/test_aiohttp_client.py b/test/test_aiohttp_client.py index ae5f9404a..dd9976b61 100644 --- a/test/test_aiohttp_client.py +++ b/test/test_aiohttp_client.py @@ -821,6 +821,68 @@ async def _test_h1_manual_window_management_happy_path(self): def test_h1_manual_window_management_happy_path(self): asyncio.run(self._test_h1_manual_window_management_happy_path()) + async def _test_h2_connection_update_window_callable(self): + """Test HTTP/2 connection.update_window() can be called without error""" + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['h2'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_options = tls_ctx.new_connection_options() + tls_options.set_server_name("httpbin.org") + + connection = await AIOHttp2ClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=tls_options, + conn_manual_window_management=True + ) + # Should not raise + connection.update_window(65535) + await connection.close() + + def test_h2_connection_update_window_callable(self): + asyncio.run(self._test_h2_connection_update_window_callable()) + + async def _test_h2_stream_flow_control_blocks_and_resumes(self): + """Test that stream flow control actually blocks and resumes""" + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['h2'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_options = tls_ctx.new_connection_options() + tls_options.set_server_name("httpbin.org") + + connection = await AIOHttp2ClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=tls_options, + manual_window_management=True, + initial_window_size=1 # Tiny window + ) + + request = HttpRequest('GET', '/bytes/100') + request.headers.add('host', 'httpbin.org') + stream = connection.request(request) + + chunks_received = [] + body = bytearray() + + await stream.get_response_status_code() + while True: + chunk = await stream.get_next_response_chunk() + if not chunk: + break + chunks_received.append(len(chunk)) + body.extend(chunk) + stream.update_window(len(chunk)) + + self.assertEqual(100, len(body)) + self.assertGreater(len(chunks_received), 1, "Expected multiple chunks with tiny window") + await connection.close() + + def test_h2_stream_flow_control_blocks_and_resumes(self): + asyncio.run(self._test_h2_stream_flow_control_blocks_and_resumes()) + if __name__ == '__main__': unittest.main() diff --git a/test/test_http_client.py b/test/test_http_client.py index 48090349c..c20fe3ac9 100644 --- a/test/test_http_client.py +++ b/test/test_http_client.py @@ -831,6 +831,95 @@ def on_body_with_window_update(http_stream, chunk, **kwargs): except Exception as e: self.skipTest(f"HTTP/1.1 flow control test skipped due to connection issue: {e}") + def test_h2_connection_update_window_callable(self): + """Test HTTP/2 connection.update_window() can be called without error""" + future = Http2ClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=self.tls_options, + conn_manual_window_management=True + ) + + try: + connection = future.result(timeout=self.timeout) + # Should not raise + connection.update_window(65535) + connection.close() + except Exception as e: + self.skipTest(f"HTTP/2 connection test skipped: {e}") + + def test_h2_stream_flow_control_blocks_and_resumes(self): + """Test that stream flow control actually blocks and resumes""" + connection_future = Http2ClientConnection.new( + host_name="nghttp2.org", + port=443, + tls_connection_options=self.tls_options, + manual_window_management=True, + initial_window_size=1 # Tiny window - will block immediately + ) + + try: + connection = connection_future.result(timeout=self.timeout) + request = HttpRequest('GET', '/httpbin/bytes/100') + request.headers.add('host', 'nghttp2.org') + + response = Response() + chunks_received = [] + + def on_body(http_stream, chunk, **kwargs): + chunks_received.append(len(chunk)) + response.body.extend(chunk) + # Update window to allow more data + http_stream.update_window(len(chunk)) + + stream = connection.request(request, response.on_response, on_body) + stream.activate() + stream.completion_future.result(timeout=self.timeout) + + self.assertEqual(100, len(response.body)) + # With window=1, we should receive many small chunks + self.assertGreater(len(chunks_received), 1, "Expected multiple chunks with tiny window") + + connection.close() + except Exception as e: + self.skipTest(f"HTTP/2 flow control test skipped: {e}") + + def test_h1_stream_flow_control_blocks_and_resumes(self): + """Test that HTTP/1.1 stream flow control actually blocks and resumes""" + connection_future = HttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=self.tls_options, + manual_window_management=True, + initial_window_size=1, # Tiny window + read_buffer_capacity=1000 + ) + + try: + connection = connection_future.result(timeout=self.timeout) + request = HttpRequest('GET', '/bytes/100') + request.headers.add('host', 'httpbin.org') + + response = Response() + chunks_received = [] + + def on_body(http_stream, chunk, **kwargs): + chunks_received.append(len(chunk)) + response.body.extend(chunk) + http_stream.update_window(len(chunk)) + + stream = connection.request(request, response.on_response, on_body) + stream.activate() + stream.completion_future.result(timeout=self.timeout) + + self.assertEqual(100, len(response.body)) + # With window=1, we should receive many small chunks + self.assertGreater(len(chunks_received), 1, "Expected multiple chunks with tiny window") + + connection.close() + except Exception as e: + self.skipTest(f"HTTP/1.1 flow control test skipped: {e}") + def tearDown(self): self.tls_options = None super().tearDown() From c1b5bbdd0d9cb8095e7031b74823d49cc067efa2 Mon Sep 17 00:00:00 2001 From: Krish Date: Tue, 20 Jan 2026 17:07:20 -0800 Subject: [PATCH 08/12] remove basic tests --- test/test_aiohttp_client.py | 144 ++++-------------------------------- test/test_http_client.py | 133 +++++++-------------------------- 2 files changed, 38 insertions(+), 239 deletions(-) diff --git a/test/test_aiohttp_client.py b/test/test_aiohttp_client.py index dd9976b61..4320c6342 100644 --- a/test/test_aiohttp_client.py +++ b/test/test_aiohttp_client.py @@ -662,8 +662,8 @@ def test_h2_mock_server_settings(self): class AIOFlowControlTest(NativeResourceTest): timeout = 10.0 - async def _test_http1_manual_window_management_parameters(self): - """Test HTTP/1.1 connection accepts flow control parameters""" + async def _test_h1_manual_window_management_happy_path(self): + """Test HTTP/1.1 manual window management happy path""" tls_ctx_opt = TlsContextOptions() tls_ctx_opt.verify_peer = False tls_ctx_opt.alpn_list = ['http/1.1'] @@ -676,87 +676,23 @@ async def _test_http1_manual_window_management_parameters(self): port=443, tls_connection_options=tls_options, manual_window_management=True, - initial_window_size=32768, - read_buffer_capacity=16384 - ) - await connection.close() - - def test_http1_manual_window_management_parameters(self): - asyncio.run(self._test_http1_manual_window_management_parameters()) - - async def _test_http2_manual_window_management_parameters(self): - """Test HTTP/2 connection accepts flow control parameters""" - tls_ctx_opt = TlsContextOptions() - tls_ctx_opt.verify_peer = False - tls_ctx_opt.alpn_list = ['h2'] - tls_ctx = ClientTlsContext(tls_ctx_opt) - tls_options = tls_ctx.new_connection_options() - tls_options.set_server_name("httpbin.org") - - connection = await AIOHttp2ClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=tls_options, - manual_window_management=True, - initial_window_size=65536, - conn_manual_window_management=True, - conn_window_size_threshold=32767, - stream_window_size_threshold=16384 + initial_window_size=5, + read_buffer_capacity=1000 ) - await connection.close() - - def test_http2_manual_window_management_parameters(self): - asyncio.run(self._test_http2_manual_window_management_parameters()) - async def _test_h2_connection_has_update_window_method(self): - """Test HTTP/2 connection has update_window method""" - tls_ctx_opt = TlsContextOptions() - tls_ctx_opt.verify_peer = False - tls_ctx_opt.alpn_list = ['h2'] - tls_ctx = ClientTlsContext(tls_ctx_opt) - tls_options = tls_ctx.new_connection_options() - tls_options.set_server_name("httpbin.org") - - connection = await AIOHttp2ClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=tls_options - ) - self.assertTrue(hasattr(connection, 'update_window'), - "HTTP/2 Connection missing update_window method") - self.assertTrue(callable(getattr(connection, 'update_window')), - "update_window is not callable") - await connection.close() + request = HttpRequest('GET', '/bytes/10') + request.headers.add('host', 'httpbin.org') + stream = connection.request(request) - async def _test_h1_connection_no_update_window_method(self): - """Test HTTP/1.1 connection does NOT have update_window method""" - tls_ctx_opt = TlsContextOptions() - tls_ctx_opt.verify_peer = False - tls_ctx_opt.alpn_list = ['http/1.1'] - tls_ctx = ClientTlsContext(tls_ctx_opt) - tls_options = tls_ctx.new_connection_options() - tls_options.set_server_name("httpbin.org") + response = Response() + status_code = await response.collect_response(stream) - connection = await AIOHttpClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=tls_options - ) - self.assertFalse(hasattr(connection, 'update_window'), - "HTTP/1.1 Connection should NOT have update_window method") + self.assertEqual(200, status_code) + self.assertEqual(10, len(response.body)) await connection.close() - def test_h2_connection_has_update_window_method(self): - asyncio.run(self._test_h2_connection_has_update_window_method()) - - def test_h1_connection_no_update_window_method(self): - asyncio.run(self._test_h1_connection_no_update_window_method()) - - def test_stream_has_update_window_method(self): - """Test stream has update_window method""" - from awscrt.http import HttpClientStreamBase - self.assertTrue(hasattr(HttpClientStreamBase, 'update_window'), - "HttpClientStreamBase missing update_window method") + def test_h1_manual_window_management_happy_path(self): + asyncio.run(self._test_h1_manual_window_management_happy_path()) async def _test_h2_manual_window_management_happy_path(self): """Test HTTP/2 manual window management happy path""" @@ -789,60 +725,6 @@ async def _test_h2_manual_window_management_happy_path(self): def test_h2_manual_window_management_happy_path(self): asyncio.run(self._test_h2_manual_window_management_happy_path()) - async def _test_h1_manual_window_management_happy_path(self): - """Test HTTP/1.1 manual window management happy path""" - tls_ctx_opt = TlsContextOptions() - tls_ctx_opt.verify_peer = False - tls_ctx_opt.alpn_list = ['http/1.1'] - tls_ctx = ClientTlsContext(tls_ctx_opt) - tls_options = tls_ctx.new_connection_options() - tls_options.set_server_name("httpbin.org") - - connection = await AIOHttpClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=tls_options, - manual_window_management=True, - initial_window_size=5, - read_buffer_capacity=1000 - ) - - request = HttpRequest('GET', '/bytes/10') - request.headers.add('host', 'httpbin.org') - stream = connection.request(request) - - response = Response() - status_code = await response.collect_response(stream) - - self.assertEqual(200, status_code) - self.assertEqual(10, len(response.body)) - await connection.close() - - def test_h1_manual_window_management_happy_path(self): - asyncio.run(self._test_h1_manual_window_management_happy_path()) - - async def _test_h2_connection_update_window_callable(self): - """Test HTTP/2 connection.update_window() can be called without error""" - tls_ctx_opt = TlsContextOptions() - tls_ctx_opt.verify_peer = False - tls_ctx_opt.alpn_list = ['h2'] - tls_ctx = ClientTlsContext(tls_ctx_opt) - tls_options = tls_ctx.new_connection_options() - tls_options.set_server_name("httpbin.org") - - connection = await AIOHttp2ClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=tls_options, - conn_manual_window_management=True - ) - # Should not raise - connection.update_window(65535) - await connection.close() - - def test_h2_connection_update_window_callable(self): - asyncio.run(self._test_h2_connection_update_window_callable()) - async def _test_h2_stream_flow_control_blocks_and_resumes(self): """Test that stream flow control actually blocks and resumes""" tls_ctx_opt = TlsContextOptions() diff --git a/test/test_http_client.py b/test/test_http_client.py index c20fe3ac9..2803cd7b0 100644 --- a/test/test_http_client.py +++ b/test/test_http_client.py @@ -664,103 +664,21 @@ def setUp(self): self.tls_options = tls_ctx.new_connection_options() self.tls_options.set_server_name("httpbin.org") - def test_http1_manual_window_management_parameters(self): - """Test HTTP/1.1 connection accepts flow control parameters""" - try: - future = HttpClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=self.tls_options, - manual_window_management=True, - initial_window_size=32768, - read_buffer_capacity=16384 - ) - self.assertTrue(True, "HTTP/1.1 flow control parameters accepted") - try: - connection = future.result(timeout=1) - connection.close() - except BaseException: - future.cancel() - except Exception as e: - self.fail(f"HTTP/1.1 flow control parameters rejected: {e}") - - def test_http2_manual_window_management_parameters(self): - """Test HTTP/2 connection accepts flow control parameters""" - try: - future = Http2ClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=self.tls_options, - manual_window_management=True, - initial_window_size=65536, - conn_manual_window_management=True, - conn_window_size_threshold=32767, - stream_window_size_threshold=16384 - ) - self.assertTrue(True, "HTTP/2 flow control parameters accepted") - try: - connection = future.result(timeout=1) - connection.close() - except BaseException: - future.cancel() - except Exception as e: - self.fail(f"HTTP/2 flow control parameters rejected: {e}") - - def test_h2_connection_has_update_window_method(self): - """Test HTTP/2 connection has update_window method""" - future = Http2ClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=self.tls_options - ) - - try: - connection = future.result(timeout=self.timeout) - self.assertTrue(hasattr(connection, 'update_window'), - "HTTP/2 Connection missing update_window method") - self.assertTrue(callable(getattr(connection, 'update_window')), - "update_window is not callable") - connection.close() - except Exception as e: - self.skipTest(f"HTTP/2 connection test skipped: {e}") - - def test_h1_connection_no_update_window_method(self): - """Test HTTP/1.1 connection does NOT have update_window method""" - future = HttpClientConnection.new( + def test_h1_manual_window_management_happy_path(self): + """Test HTTP/1.1 manual window management happy path""" + connection_future = HttpClientConnection.new( host_name="httpbin.org", port=443, - tls_connection_options=self.tls_options - ) - - try: - connection = future.result(timeout=self.timeout) - self.assertFalse(hasattr(connection, 'update_window'), - "HTTP/1.1 Connection should NOT have update_window method") - connection.close() - except Exception as e: - self.skipTest(f"HTTP/1.1 connection test skipped: {e}") - - def test_stream_has_update_window_method(self): - """Test stream has update_window method""" - self.assertTrue(hasattr(HttpClientStreamBase, 'update_window'), - "HttpClientStreamBase missing update_window method") - self.assertTrue(callable(getattr(HttpClientStreamBase, 'update_window')), - "Stream update_window is not callable") - - def test_h2_manual_window_management_happy_path(self): - """Test HTTP/2 manual window management happy path""" - connection_future = Http2ClientConnection.new( - host_name="nghttp2.org", - port=443, tls_connection_options=self.tls_options, manual_window_management=True, - initial_window_size=65536 + initial_window_size=5, + read_buffer_capacity=1000 ) try: connection = connection_future.result(timeout=self.timeout) - request = HttpRequest('GET', '/httpbin/get') - request.headers.add('host', 'nghttp2.org') + request = HttpRequest('GET', '/bytes/10') + request.headers.add('host', 'httpbin.org') response = Response() received_chunks = [] @@ -779,28 +697,32 @@ def on_body_with_window_update(http_stream, chunk, **kwargs): self.assertEqual(200, response.status_code) self.assertEqual(200, stream_completion_result) - self.assertGreater(len(received_chunks), 0, "No data chunks received") - self.assertGreater(len(window_updates_sent), 0, "No window updates sent") + self.assertEqual(10, len(response.body)) + + if len(response.body) > 0: + self.assertGreater(len(received_chunks), 0, "No data chunks received") + self.assertGreater(len(window_updates_sent), 0, "No window updates sent") + self.assertEqual(sum(received_chunks), sum(window_updates_sent), + "Window updates don't match received data") connection.close() except Exception as e: - self.skipTest(f"HTTP/2 flow control test skipped due to connection issue: {e}") + self.skipTest(f"HTTP/1.1 flow control test skipped due to connection issue: {e}") - def test_h1_manual_window_management_happy_path(self): - """Test HTTP/1.1 manual window management happy path""" - connection_future = HttpClientConnection.new( - host_name="httpbin.org", + def test_h2_manual_window_management_happy_path(self): + """Test HTTP/2 manual window management happy path""" + connection_future = Http2ClientConnection.new( + host_name="nghttp2.org", port=443, tls_connection_options=self.tls_options, manual_window_management=True, - initial_window_size=5, - read_buffer_capacity=1000 + initial_window_size=65536 ) try: connection = connection_future.result(timeout=self.timeout) - request = HttpRequest('GET', '/bytes/10') - request.headers.add('host', 'httpbin.org') + request = HttpRequest('GET', '/httpbin/get') + request.headers.add('host', 'nghttp2.org') response = Response() received_chunks = [] @@ -819,17 +741,12 @@ def on_body_with_window_update(http_stream, chunk, **kwargs): self.assertEqual(200, response.status_code) self.assertEqual(200, stream_completion_result) - self.assertEqual(10, len(response.body)) - - if len(response.body) > 0: - self.assertGreater(len(received_chunks), 0, "No data chunks received") - self.assertGreater(len(window_updates_sent), 0, "No window updates sent") - self.assertEqual(sum(received_chunks), sum(window_updates_sent), - "Window updates don't match received data") + self.assertGreater(len(received_chunks), 0, "No data chunks received") + self.assertGreater(len(window_updates_sent), 0, "No window updates sent") connection.close() except Exception as e: - self.skipTest(f"HTTP/1.1 flow control test skipped due to connection issue: {e}") + self.skipTest(f"HTTP/2 flow control test skipped due to connection issue: {e}") def test_h2_connection_update_window_callable(self): """Test HTTP/2 connection.update_window() can be called without error""" From 3cd0899c3c41116591b8956891f77f0f58ff0112 Mon Sep 17 00:00:00 2001 From: Krish Date: Wed, 21 Jan 2026 09:36:32 -0800 Subject: [PATCH 09/12] fix block-resume test --- test/test_aiohttp_client.py | 2 +- test/test_http_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_aiohttp_client.py b/test/test_aiohttp_client.py index 4320c6342..db3e9ce82 100644 --- a/test/test_aiohttp_client.py +++ b/test/test_aiohttp_client.py @@ -759,7 +759,7 @@ async def _test_h2_stream_flow_control_blocks_and_resumes(self): stream.update_window(len(chunk)) self.assertEqual(100, len(body)) - self.assertGreater(len(chunks_received), 1, "Expected multiple chunks with tiny window") + self.assertEqual(len(chunks_received), 100, "Should receive exactly 100 chunks") await connection.close() def test_h2_stream_flow_control_blocks_and_resumes(self): diff --git a/test/test_http_client.py b/test/test_http_client.py index 2803cd7b0..99fba1741 100644 --- a/test/test_http_client.py +++ b/test/test_http_client.py @@ -831,7 +831,7 @@ def on_body(http_stream, chunk, **kwargs): self.assertEqual(100, len(response.body)) # With window=1, we should receive many small chunks - self.assertGreater(len(chunks_received), 1, "Expected multiple chunks with tiny window") + self.assertEqual(len(chunks_received), 100, "Should receive exactly 100 chunks") connection.close() except Exception as e: From 3dd4195e5c73049572416f8cc761516f17a6bd68 Mon Sep 17 00:00:00 2001 From: Krish Date: Wed, 21 Jan 2026 10:23:52 -0800 Subject: [PATCH 10/12] fix tests again --- test/test_aiohttp_client.py | 45 +++++++++++++++++++++++++++++++++++-- test/test_http_client.py | 21 ++--------------- 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/test/test_aiohttp_client.py b/test/test_aiohttp_client.py index db3e9ce82..757d2dde5 100644 --- a/test/test_aiohttp_client.py +++ b/test/test_aiohttp_client.py @@ -739,7 +739,7 @@ async def _test_h2_stream_flow_control_blocks_and_resumes(self): port=443, tls_connection_options=tls_options, manual_window_management=True, - initial_window_size=1 # Tiny window + initial_window_size=10 # Tiny window ) request = HttpRequest('GET', '/bytes/100') @@ -759,12 +759,53 @@ async def _test_h2_stream_flow_control_blocks_and_resumes(self): stream.update_window(len(chunk)) self.assertEqual(100, len(body)) - self.assertEqual(len(chunks_received), 100, "Should receive exactly 100 chunks") + self.assertEqual(len(chunks_received), 10, "Should receive exactly 10 chunks") await connection.close() def test_h2_stream_flow_control_blocks_and_resumes(self): asyncio.run(self._test_h2_stream_flow_control_blocks_and_resumes()) + async def _test_h1_stream_flow_control_blocks_and_resumes(self): + """Test that HTTP/1.1 stream flow control actually blocks and resumes""" + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx_opt.alpn_list = ['http/1.1'] + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_options = tls_ctx.new_connection_options() + tls_options.set_server_name("httpbin.org") + + connection = await AIOHttpClientConnection.new( + host_name="httpbin.org", + port=443, + tls_connection_options=tls_options, + manual_window_management=True, + initial_window_size=1, # Tiny window + read_buffer_capacity=1000 + ) + + request = HttpRequest('GET', '/bytes/100') + request.headers.add('host', 'httpbin.org') + stream = connection.request(request) + + chunks_received = [] + body = bytearray() + + await stream.get_response_status_code() + while True: + chunk = await stream.get_next_response_chunk() + if not chunk: + break + chunks_received.append(len(chunk)) + body.extend(chunk) + stream.update_window(len(chunk)) + + self.assertEqual(100, len(body)) + self.assertEqual(len(chunks_received), 100, "Should receive exactly 100 chunks") + await connection.close() + + def test_h1_stream_flow_control_blocks_and_resumes(self): + asyncio.run(self._test_h1_stream_flow_control_blocks_and_resumes()) + if __name__ == '__main__': unittest.main() diff --git a/test/test_http_client.py b/test/test_http_client.py index 99fba1741..9b4876db1 100644 --- a/test/test_http_client.py +++ b/test/test_http_client.py @@ -748,23 +748,6 @@ def on_body_with_window_update(http_stream, chunk, **kwargs): except Exception as e: self.skipTest(f"HTTP/2 flow control test skipped due to connection issue: {e}") - def test_h2_connection_update_window_callable(self): - """Test HTTP/2 connection.update_window() can be called without error""" - future = Http2ClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=self.tls_options, - conn_manual_window_management=True - ) - - try: - connection = future.result(timeout=self.timeout) - # Should not raise - connection.update_window(65535) - connection.close() - except Exception as e: - self.skipTest(f"HTTP/2 connection test skipped: {e}") - def test_h2_stream_flow_control_blocks_and_resumes(self): """Test that stream flow control actually blocks and resumes""" connection_future = Http2ClientConnection.new( @@ -772,7 +755,7 @@ def test_h2_stream_flow_control_blocks_and_resumes(self): port=443, tls_connection_options=self.tls_options, manual_window_management=True, - initial_window_size=1 # Tiny window - will block immediately + initial_window_size=10 # Tiny window - will block immediately ) try: @@ -795,7 +778,7 @@ def on_body(http_stream, chunk, **kwargs): self.assertEqual(100, len(response.body)) # With window=1, we should receive many small chunks - self.assertGreater(len(chunks_received), 1, "Expected multiple chunks with tiny window") + self.assertEqual(len(chunks_received), 10, "Expected multiple chunks with tiny window") connection.close() except Exception as e: From 03fec8d317e71cc34726226b791bca282e5718bc Mon Sep 17 00:00:00 2001 From: Krish Date: Wed, 21 Jan 2026 10:29:56 -0800 Subject: [PATCH 11/12] comment fix --- test/test_http_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_http_client.py b/test/test_http_client.py index 9b4876db1..2bc5edf07 100644 --- a/test/test_http_client.py +++ b/test/test_http_client.py @@ -777,7 +777,7 @@ def on_body(http_stream, chunk, **kwargs): stream.completion_future.result(timeout=self.timeout) self.assertEqual(100, len(response.body)) - # With window=1, we should receive many small chunks + # With window=10, we should receive many small chunks self.assertEqual(len(chunks_received), 10, "Expected multiple chunks with tiny window") connection.close() From 855e4ffa89d513cbc572756087fdc791185d7597 Mon Sep 17 00:00:00 2001 From: Krish Date: Fri, 23 Jan 2026 15:15:44 -0800 Subject: [PATCH 12/12] address comments --- awscrt/aio/http.py | 77 +++++++++++++++++++++------------------- awscrt/http.py | 46 +++++++++++++++++------- test/test_http_client.py | 10 +++--- 3 files changed, 78 insertions(+), 55 deletions(-) diff --git a/awscrt/aio/http.py b/awscrt/aio/http.py index e49aeb3f3..e31535d34 100644 --- a/awscrt/aio/http.py +++ b/awscrt/aio/http.py @@ -40,11 +40,7 @@ async def new(cls, tls_connection_options: Optional[TlsConnectionOptions] = None, proxy_options: Optional[HttpProxyOptions] = None, manual_window_management: bool = False, - initial_window_size: Optional[int] = None, - read_buffer_capacity: Optional[int] = None, - conn_manual_window_management: bool = False, - conn_window_size_threshold: Optional[int] = None, - stream_window_size_threshold: Optional[int] = None) -> "AIOHttpClientConnectionUnified": + initial_window_size: Optional[int] = None) -> "AIOHttpClientConnectionUnified": """ Asynchronously establish a new AIOHttpClientConnectionUnified. @@ -66,23 +62,22 @@ async def new(cls, proxy_options (Optional[HttpProxyOptions]): Optional proxy options. If None is provided then a proxy is not used. - manual_window_management (bool): If True, enables manual flow control window management. - Default is False. - - initial_window_size (Optional[int]): Initial window size for flow control. - If None, uses default value. - - read_buffer_capacity (Optional[int]): Read buffer capacity for the connection. - If None, uses default value. - - conn_manual_window_management (bool): If True, enables manual connection-level window management. - Default is False. - - conn_window_size_threshold (Optional[int]): Connection window size threshold. - If None, uses default value. - - stream_window_size_threshold (Optional[int]): Stream window size threshold. - If None, uses default value. + manual_window_management (bool): Set to True to manually manage the flow-control window + of each stream. If False, the connection maintains flow-control windows such that + no back-pressure is applied and data arrives as fast as possible. If True, the + flow-control window of each stream shrinks as body data is received (headers, + padding, and other metadata do not affect the window). `initial_window_size` + determines the starting size of each stream's window. When a stream's window + reaches 0, no further data is received until `update_window()` is called. + For HTTP/2, this only controls stream windows; connection window is controlled + by `conn_manual_window_management`. Default is False. + + initial_window_size (Optional[int]): The starting size of each stream's flow-control + window. Required if `manual_window_management` is True, ignored otherwise. + For HTTP/2, this becomes the `INITIAL_WINDOW_SIZE` setting and can be overridden + by `initial_settings`. Must be <= 2^31-1 or connection fails. If set to 0 with + `manual_window_management` True, streams start with zero window. If None, uses + default value. Returns: AIOHttpClientConnectionUnified: A new unified HTTP client connection. @@ -96,11 +91,7 @@ async def new(cls, proxy_options, asyncio_connection=True, manual_window_management=manual_window_management, - initial_window_size=initial_window_size, - read_buffer_capacity=read_buffer_capacity, - conn_manual_window_management=conn_manual_window_management, - conn_window_size_threshold=conn_window_size_threshold, - stream_window_size_threshold=stream_window_size_threshold) + initial_window_size=initial_window_size) return await asyncio.wrap_future(future) async def close(self) -> None: @@ -179,8 +170,11 @@ async def new(cls, initial_window_size (Optional[int]): Initial window size for flow control. If None, uses default value. - read_buffer_capacity (Optional[int]): Read buffer capacity for the connection. - If None, uses default value. + read_buffer_capacity (Optional[int]): Capacity in bytes of the HTTP/1.1 connection's + read buffer. The buffer grows when the flow-control window of the incoming stream + reaches zero. Ignored if `manual_window_management` is False. A capacity that is + too small may hinder throughput. A capacity that is too large may waste memory + without improving throughput. If None or zero, a default value is used. Returns: AIOHttpClientConnection: A new HTTP client connection. @@ -261,14 +255,23 @@ async def new(cls, initial_window_size (Optional[int]): Initial window size for flow control. If None, uses default value. - conn_manual_window_management (bool): If True, enables manual connection-level window management. - Default is False. - - conn_window_size_threshold (Optional[int]): Connection window size threshold. - If None, uses default value. - - stream_window_size_threshold (Optional[int]): Stream window size threshold. - If None, uses default value. + conn_manual_window_management (bool): If True, enables manual connection-level flow control + for the entire HTTP/2 connection. When enabled, the connection's flow-control window + shrinks as body data is received across all streams. The initial connection window is + 65,535 bytes. When the window reaches 0, all streams stop receiving data until + `update_window()` is called to increment the connection's window. + Note: Padding in DATA frames counts against the window, but window updates for padding + are sent automatically even in manual mode. Default is False. + + conn_window_size_threshold (Optional[int]): Threshold for sending connection-level WINDOW_UPDATE + frames. Ignored if `conn_manual_window_management` is False. When the connection's window + is above this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update + is sent. Default is 32,767 (half of the initial 65,535 window). + + stream_window_size_threshold (Optional[int]): Threshold for sending stream-level WINDOW_UPDATE + frames. Ignored if `manual_window_management` is False. When a stream's window is above + this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update is sent. + Default is half of `initial_window_size`. """ future = cls._generic_new( host_name, diff --git a/awscrt/http.py b/awscrt/http.py index f3e467fec..a3bb6119f 100644 --- a/awscrt/http.py +++ b/awscrt/http.py @@ -240,14 +240,25 @@ def new(cls, proxy_options (Optional[HttpProxyOptions]): Optional proxy options. If None is provided then a proxy is not used. - manual_window_management (bool): If True, enables manual flow control window management. + manual_window_management (bool): Set to True to manually manage the flow-control window + of each stream. If False, the connection maintains flow-control windows such that + no back-pressure is applied and data arrives as fast as possible. If True, the + flow-control window of each stream shrinks as body data is received (headers, + padding, and other metadata do not affect the window). `initial_window_size` + determines the starting size of each stream's window. When a stream's window + reaches 0, no further data is received until `update_window()` is called. Default is False. - initial_window_size (Optional[int]): Initial window size for flow control. - If None, uses default value. + initial_window_size (Optional[int]): The starting size of each stream's flow-control + window. Required if `manual_window_management` is True, ignored otherwise. + Must be <= 2^31-1 or connection fails. If set to 0 with `manual_window_management` + True, streams start with zero window. If None, uses default value. - read_buffer_capacity (Optional[int]): Read buffer capacity for the connection. - If None, uses default value. + read_buffer_capacity (Optional[int]): Capacity in bytes of the HTTP/1.1 connection's + read buffer. The buffer grows when the flow-control window of the incoming stream + reaches zero. Ignored if `manual_window_management` is False. A capacity that is + too small may hinder throughput. A capacity that is too large may waste memory + without improving throughput. If None or zero, a default value is used. Returns: concurrent.futures.Future: A Future which completes when connection succeeds or fails. @@ -365,14 +376,23 @@ def new(cls, initial_window_size (Optional[int]): Initial window size for flow control. If None, uses default value. - conn_manual_window_management (bool): If True, enables manual connection-level window management. - Default is False. - - conn_window_size_threshold (Optional[int]): Connection window size threshold. - If None, uses default value. - - stream_window_size_threshold (Optional[int]): Stream window size threshold. - If None, uses default value. + conn_manual_window_management (bool): If True, enables manual connection-level flow control + for the entire HTTP/2 connection. When enabled, the connection's flow-control window + shrinks as body data is received across all streams. The initial connection window is + 65,535 bytes. When the window reaches 0, all streams stop receiving data until + `update_window()` is called to increment the connection's window. + Note: Padding in DATA frames counts against the window, but window updates for padding + are sent automatically even in manual mode. Default is False. + + conn_window_size_threshold (Optional[int]): Threshold for sending connection-level WINDOW_UPDATE + frames. Ignored if `conn_manual_window_management` is False. When the connection's window + is above this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update + is sent. Default is 32,767 (half of the initial 65,535 window). + + stream_window_size_threshold (Optional[int]): Threshold for sending stream-level WINDOW_UPDATE + frames. Ignored if `manual_window_management` is False. When a stream's window is above + this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update is sent. + Default is half of `initial_window_size`. """ return cls._generic_new( host_name, diff --git a/test/test_http_client.py b/test/test_http_client.py index 2bc5edf07..a8c217698 100644 --- a/test/test_http_client.py +++ b/test/test_http_client.py @@ -751,17 +751,17 @@ def on_body_with_window_update(http_stream, chunk, **kwargs): def test_h2_stream_flow_control_blocks_and_resumes(self): """Test that stream flow control actually blocks and resumes""" connection_future = Http2ClientConnection.new( - host_name="nghttp2.org", + host_name="httpbin.org", port=443, tls_connection_options=self.tls_options, manual_window_management=True, - initial_window_size=10 # Tiny window - will block immediately + initial_window_size=1 # Tiny window - will block immediately ) try: connection = connection_future.result(timeout=self.timeout) - request = HttpRequest('GET', '/httpbin/bytes/100') - request.headers.add('host', 'nghttp2.org') + request = HttpRequest('GET', '/bytes/100') + request.headers.add('host', 'httpbin.org') response = Response() chunks_received = [] @@ -778,7 +778,7 @@ def on_body(http_stream, chunk, **kwargs): self.assertEqual(100, len(response.body)) # With window=10, we should receive many small chunks - self.assertEqual(len(chunks_received), 10, "Expected multiple chunks with tiny window") + self.assertEqual(len(chunks_received), 100, "Expected multiple chunks with tiny window") connection.close() except Exception as e: