Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 88 additions & 7 deletions awscrt/aio/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ async def new(cls,
bootstrap: Optional[ClientBootstrap] = None,
socket_options: Optional[SocketOptions] = None,
tls_connection_options: Optional[TlsConnectionOptions] = None,
proxy_options: Optional[HttpProxyOptions] = None) -> "AIOHttpClientConnectionUnified":
proxy_options: Optional[HttpProxyOptions] = None,
manual_window_management: bool = False,
initial_window_size: Optional[int] = None) -> "AIOHttpClientConnectionUnified":
"""
Asynchronously establish a new AIOHttpClientConnectionUnified.

Expand All @@ -60,6 +62,23 @@ async def new(cls,
proxy_options (Optional[HttpProxyOptions]): Optional proxy options.
If None is provided then a proxy is not used.

manual_window_management (bool): Set to True to manually manage the flow-control window
of each stream. If False, the connection maintains flow-control windows such that
no back-pressure is applied and data arrives as fast as possible. If True, the
flow-control window of each stream shrinks as body data is received (headers,
padding, and other metadata do not affect the window). `initial_window_size`
determines the starting size of each stream's window. When a stream's window
reaches 0, no further data is received until `update_window()` is called.
For HTTP/2, this only controls stream windows; connection window is controlled
by `conn_manual_window_management`. Default is False.

initial_window_size (Optional[int]): The starting size of each stream's flow-control
window. Required if `manual_window_management` is True, ignored otherwise.
For HTTP/2, this becomes the `INITIAL_WINDOW_SIZE` setting and can be overridden
by `initial_settings`. Must be <= 2^31-1 or connection fails. If set to 0 with
`manual_window_management` True, streams start with zero window. If None, uses
default value.

Returns:
AIOHttpClientConnectionUnified: A new unified HTTP client connection.
"""
Expand All @@ -70,7 +89,9 @@ async def new(cls,
socket_options,
tls_connection_options,
proxy_options,
asyncio_connection=True)
asyncio_connection=True,
manual_window_management=manual_window_management,
initial_window_size=initial_window_size)
return await asyncio.wrap_future(future)

async def close(self) -> None:
Expand Down Expand Up @@ -118,7 +139,10 @@ async def new(cls,
bootstrap: Optional[ClientBootstrap] = None,
socket_options: Optional[SocketOptions] = None,
tls_connection_options: Optional[TlsConnectionOptions] = None,
proxy_options: Optional[HttpProxyOptions] = None) -> "AIOHttpClientConnection":
proxy_options: Optional[HttpProxyOptions] = None,
manual_window_management: bool = False,
initial_window_size: Optional[int] = None,
read_buffer_capacity: Optional[int] = None) -> "AIOHttpClientConnection":
"""
Asynchronously establish a new AIOHttpClientConnection.

Expand All @@ -140,6 +164,18 @@ async def new(cls,
proxy_options (Optional[HttpProxyOptions]): Optional proxy options.
If None is provided then a proxy is not used.

manual_window_management (bool): If True, enables manual flow control window management.
Default is False.

initial_window_size (Optional[int]): Initial window size for flow control.
If None, uses default value.

read_buffer_capacity (Optional[int]): Capacity in bytes of the HTTP/1.1 connection's
read buffer. The buffer grows when the flow-control window of the incoming stream
reaches zero. Ignored if `manual_window_management` is False. A capacity that is
too small may hinder throughput. A capacity that is too large may waste memory
without improving throughput. If None or zero, a default value is used.

Returns:
AIOHttpClientConnection: A new HTTP client connection.
"""
Expand All @@ -151,7 +187,10 @@ async def new(cls,
tls_connection_options,
proxy_options,
expected_version=HttpVersion.Http1_1,
asyncio_connection=True)
asyncio_connection=True,
manual_window_management=manual_window_management,
initial_window_size=initial_window_size,
read_buffer_capacity=read_buffer_capacity)
return await asyncio.wrap_future(future)

def request(self,
Expand Down Expand Up @@ -189,8 +228,12 @@ async def new(cls,
tls_connection_options: Optional[TlsConnectionOptions] = None,
proxy_options: Optional[HttpProxyOptions] = None,
initial_settings: Optional[List[Http2Setting]] = None,
on_remote_settings_changed: Optional[Callable[[List[Http2Setting]],
None]] = None) -> "AIOHttp2ClientConnection":
on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], None]] = None,
manual_window_management: bool = False,
initial_window_size: Optional[int] = None,
conn_manual_window_management: bool = False,
conn_window_size_threshold: Optional[int] = None,
stream_window_size_threshold: Optional[int] = None) -> "AIOHttp2ClientConnection":
"""
Asynchronously establish an HTTP/2 client connection.
Notes: to set up the connection, the server must support HTTP/2 and TlsConnectionOptions
Expand All @@ -205,6 +248,30 @@ async def new(cls,
The function should take the following arguments and return nothing:

* `settings` (List[Http2Setting]): List of settings that were changed.

manual_window_management (bool): If True, enables manual flow control window management.
Default is False.

initial_window_size (Optional[int]): Initial window size for flow control.
If None, uses default value.

conn_manual_window_management (bool): If True, enables manual connection-level flow control
for the entire HTTP/2 connection. When enabled, the connection's flow-control window
shrinks as body data is received across all streams. The initial connection window is
65,535 bytes. When the window reaches 0, all streams stop receiving data until
`update_window()` is called to increment the connection's window.
Note: Padding in DATA frames counts against the window, but window updates for padding
are sent automatically even in manual mode. Default is False.

conn_window_size_threshold (Optional[int]): Threshold for sending connection-level WINDOW_UPDATE
frames. Ignored if `conn_manual_window_management` is False. When the connection's window
is above this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update
is sent. Default is 32,767 (half of the initial 65,535 window).

stream_window_size_threshold (Optional[int]): Threshold for sending stream-level WINDOW_UPDATE
frames. Ignored if `manual_window_management` is False. When a stream's window is above
this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update is sent.
Default is half of `initial_window_size`.
"""
future = cls._generic_new(
host_name,
Expand All @@ -216,7 +283,12 @@ async def new(cls,
expected_version=HttpVersion.Http2,
initial_settings=initial_settings,
on_remote_settings_changed=on_remote_settings_changed,
asyncio_connection=True)
asyncio_connection=True,
manual_window_management=manual_window_management,
initial_window_size=initial_window_size,
conn_manual_window_management=conn_manual_window_management,
conn_window_size_threshold=conn_window_size_threshold,
stream_window_size_threshold=stream_window_size_threshold)
return await asyncio.wrap_future(future)

def request(self,
Expand All @@ -237,6 +309,15 @@ def request(self,
"""
return AIOHttp2ClientStream(self, request, request_body_generator, loop)

def update_window(self, increment_size: int) -> None:
"""
Update the connection's flow control window.

Args:
increment_size (int): Number of bytes to increment the window by.
"""
_awscrt.http2_connection_update_window(self._binding, increment_size)


class AIOHttpClientStreamUnified(HttpClientStreamBase):
__slots__ = (
Expand Down
102 changes: 96 additions & 6 deletions awscrt/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ def _generic_new(
expected_version: Optional[HttpVersion] = None,
initial_settings: Optional[List[Http2Setting]] = None,
on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], None]] = None,
asyncio_connection=False) -> "concurrent.futures.Future":
asyncio_connection=False,
manual_window_management: bool = False,
initial_window_size: Optional[int] = None,
read_buffer_capacity: Optional[int] = None,
conn_manual_window_management: bool = False,
conn_window_size_threshold: Optional[int] = None,
stream_window_size_threshold: Optional[int] = None) -> "concurrent.futures.Future":
"""
Initialize the generic part of the HttpClientConnection class.
"""
Expand Down Expand Up @@ -170,7 +176,13 @@ def _generic_new(
proxy_options,
initial_settings,
on_remote_settings_changed,
connection_core)
connection_core,
manual_window_management,
initial_window_size,
read_buffer_capacity,
conn_manual_window_management,
conn_window_size_threshold,
stream_window_size_threshold)

except Exception as e:
future.set_exception(e)
Expand Down Expand Up @@ -203,7 +215,10 @@ def new(cls,
bootstrap: Optional[ClientBootstrap] = None,
socket_options: Optional[SocketOptions] = None,
tls_connection_options: Optional[TlsConnectionOptions] = None,
proxy_options: Optional['HttpProxyOptions'] = None) -> "concurrent.futures.Future":
proxy_options: Optional['HttpProxyOptions'] = None,
manual_window_management: bool = False,
initial_window_size: Optional[int] = None,
read_buffer_capacity: Optional[int] = None) -> "concurrent.futures.Future":
"""
Asynchronously establish a new HttpClientConnection.

Expand All @@ -225,6 +240,26 @@ def new(cls,
proxy_options (Optional[HttpProxyOptions]): Optional proxy options.
If None is provided then a proxy is not used.

manual_window_management (bool): Set to True to manually manage the flow-control window
of each stream. If False, the connection maintains flow-control windows such that
no back-pressure is applied and data arrives as fast as possible. If True, the
flow-control window of each stream shrinks as body data is received (headers,
padding, and other metadata do not affect the window). `initial_window_size`
determines the starting size of each stream's window. When a stream's window
reaches 0, no further data is received until `update_window()` is called.
Default is False.

initial_window_size (Optional[int]): The starting size of each stream's flow-control
window. Required if `manual_window_management` is True, ignored otherwise.
Must be <= 2^31-1 or connection fails. If set to 0 with `manual_window_management`
True, streams start with zero window. If None, uses default value.

read_buffer_capacity (Optional[int]): Capacity in bytes of the HTTP/1.1 connection's
read buffer. The buffer grows when the flow-control window of the incoming stream
reaches zero. Ignored if `manual_window_management` is False. A capacity that is
too small may hinder throughput. A capacity that is too large may waste memory
without improving throughput. If None or zero, a default value is used.

Returns:
concurrent.futures.Future: A Future which completes when connection succeeds or fails.
If successful, the Future will contain a new :class:`HttpClientConnection`.
Expand All @@ -236,7 +271,10 @@ def new(cls,
bootstrap,
socket_options,
tls_connection_options,
proxy_options)
proxy_options,
manual_window_management=manual_window_management,
initial_window_size=initial_window_size,
read_buffer_capacity=read_buffer_capacity)

def request(self,
request: 'HttpRequest',
Expand Down Expand Up @@ -311,7 +349,12 @@ def new(cls,
proxy_options: Optional['HttpProxyOptions'] = None,
initial_settings: Optional[List[Http2Setting]] = None,
on_remote_settings_changed: Optional[Callable[[List[Http2Setting]],
None]] = None) -> "concurrent.futures.Future":
None]] = None,
manual_window_management: bool = False,
initial_window_size: Optional[int] = None,
conn_manual_window_management: bool = False,
conn_window_size_threshold: Optional[int] = None,
stream_window_size_threshold: Optional[int] = None) -> "concurrent.futures.Future":
"""
Asynchronously establish an HTTP/2 client connection.
Notes: to set up the connection, the server must support HTTP/2 and TlsConnectionOptions
Expand All @@ -326,6 +369,30 @@ def new(cls,
The function should take the following arguments and return nothing:

* `settings` (List[Http2Setting]): List of settings that were changed.

manual_window_management (bool): If True, enables manual flow control window management.
Default is False.

initial_window_size (Optional[int]): Initial window size for flow control.
If None, uses default value.

conn_manual_window_management (bool): If True, enables manual connection-level flow control
for the entire HTTP/2 connection. When enabled, the connection's flow-control window
shrinks as body data is received across all streams. The initial connection window is
65,535 bytes. When the window reaches 0, all streams stop receiving data until
`update_window()` is called to increment the connection's window.
Note: Padding in DATA frames counts against the window, but window updates for padding
are sent automatically even in manual mode. Default is False.

conn_window_size_threshold (Optional[int]): Threshold for sending connection-level WINDOW_UPDATE
frames. Ignored if `conn_manual_window_management` is False. When the connection's window
is above this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update
is sent. Default is 32,767 (half of the initial 65,535 window).

stream_window_size_threshold (Optional[int]): Threshold for sending stream-level WINDOW_UPDATE
frames. Ignored if `manual_window_management` is False. When a stream's window is above
this threshold, WINDOW_UPDATE frames are batched. When it drops below, the update is sent.
Default is half of `initial_window_size`.
"""
return cls._generic_new(
host_name,
Expand All @@ -336,7 +403,12 @@ def new(cls,
proxy_options,
HttpVersion.Http2,
initial_settings,
on_remote_settings_changed)
on_remote_settings_changed,
manual_window_management=manual_window_management,
initial_window_size=initial_window_size,
conn_manual_window_management=conn_manual_window_management,
conn_window_size_threshold=conn_window_size_threshold,
stream_window_size_threshold=stream_window_size_threshold)

def request(self,
request: 'HttpRequest',
Expand Down Expand Up @@ -397,6 +469,15 @@ def close(self) -> "concurrent.futures.Future":
_awscrt.http_connection_close(self._binding)
return self.shutdown_future

def update_window(self, increment_size: int) -> None:
"""
Update the connection's flow control window.

Args:
increment_size (int): Number of bytes to increment the window by.
"""
_awscrt.http2_connection_update_window(self._binding, increment_size)


class HttpStreamBase(NativeResource):
"""Base for HTTP stream classes.
Expand Down Expand Up @@ -486,6 +567,15 @@ def _on_complete(self, error_code: int) -> None:
else:
self._completion_future.set_exception(awscrt.exceptions.from_code(error_code))

def update_window(self, increment_size: int) -> None:
"""
Update the stream's flow control window.

Args:
increment_size (int): Number of bytes to increment the window by.
"""
_awscrt.http_stream_update_window(self, increment_size)


class HttpClientStream(HttpClientStreamBase):
"""HTTP stream that sends a request and receives a response.
Expand Down
10 changes: 10 additions & 0 deletions source/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ PyObject *aws_py_http_connection_close(PyObject *self, PyObject *args);
*/
PyObject *aws_py_http_connection_is_open(PyObject *self, PyObject *args);

/**
* Update HTTP/2 connection window size.
*/
PyObject *aws_py_http2_connection_update_window(PyObject *self, PyObject *args);

/**
* Update HTTP stream window size.
*/
PyObject *aws_py_http_stream_update_window(PyObject *self, PyObject *args);

/**
* Create a new connection. returns void. The on_setup callback will be invoked
* upon either success or failure of the connection.
Expand Down
Loading