Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions httpcore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ._backends.sync import SyncBackend
from ._exceptions import (
ConnectError,
ConnectionGoingAway,
ConnectionNotAvailable,
ConnectTimeout,
LocalProtocolError,
Expand Down Expand Up @@ -114,6 +115,7 @@ def __init__(self, *args, **kwargs): # type: ignore
"default_ssl_context",
"SOCKET_OPTION",
# exceptions
"ConnectionGoingAway",
"ConnectionNotAvailable",
"ProxyError",
"ProtocolError",
Expand Down
28 changes: 27 additions & 1 deletion httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@

from .._backends.auto import AutoBackend
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._exceptions import (
ConnectionGoingAway,
ConnectionNotAvailable,
RemoteProtocolError,
UnsupportedProtocol,
)
from .._models import Origin, Proxy, Request, Response
from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock
from .connection import AsyncHTTPConnection
Expand Down Expand Up @@ -236,6 +241,27 @@ async def handle_async_request(self, request: Request) -> Response:
response = await connection.handle_async_request(
pool_request.request
)
except ConnectionGoingAway as exc:
# GOAWAY frame recieved during request processing.
# Determine if we can safely retry based on RFC 7540 semantics.
# NOTE: This must be caught before ConnectionNotAvailable since
# ConnectionGoingAway is a subclass of ConnectionNotAvailable.
pool_request.clear_connection()

if exc.is_safe_to_retry:
# stream_id > last_stream_id: guaranteed unprocessed per RFC 7540
# Safe to retry on a new connection
continue
elif exc.is_graceful_shutdown and not exc.may_have_side_effects:
# Graceful shutdown and headers weren't sent yet.
# Likely safe to retry.
continue
else:
# Request may have been processed. Propagate error with context so application can decide
# whether to retry.
msg = "GOAWAY recieved: request may have been processed"
# QUESTION: What is the best way to propagate the context for the applications?
raise RemoteProtocolError(msg) from exc
except ConnectionNotAvailable:
# In some cases a connection may initially be available to
# handle a request, but then become unavailable.
Expand Down
151 changes: 140 additions & 11 deletions httpcore/_async/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from .._backends.base import AsyncNetworkStream
from .._exceptions import (
ConnectionGoingAway,
ConnectionNotAvailable,
LocalProtocolError,
RemoteProtocolError,
Expand All @@ -36,7 +37,8 @@ def has_body_headers(request: Request) -> bool:
class HTTPConnectionState(enum.IntEnum):
ACTIVE = 1
IDLE = 2
CLOSED = 3
DRAINING = 3
CLOSED = 4


class AsyncHTTP2Connection(AsyncConnectionInterface):
Expand Down Expand Up @@ -82,6 +84,11 @@ def __init__(
self._read_exception: Exception | None = None
self._write_exception: Exception | None = None

# Track request phases for GOAWAY retry safety determination.
# Maps stream_id -> {"headers_sent": bool, "body_sent": bool}
# TODO: Consider shifting this to a dataclass or typeddict
self._stream_requests: dict[int, dict[str, bool]] = {}

async def handle_async_request(self, request: Request) -> Response:
if not self.can_handle_request(request.url.origin):
# This cannot occur in normal operation, since the connection pool
Expand Down Expand Up @@ -133,6 +140,11 @@ async def handle_async_request(self, request: Request) -> Response:
try:
stream_id = self._h2_state.get_next_available_stream_id()
self._events[stream_id] = []
# Initialize phase tracking for this stream
self._stream_requests[stream_id] = {
"headers_sent": False,
"body_sent": False,
}
except h2.exceptions.NoAvailableStreamIDError: # pragma: nocover
self._used_all_stream_ids = True
self._request_count -= 1
Expand All @@ -142,8 +154,10 @@ async def handle_async_request(self, request: Request) -> Response:
kwargs = {"request": request, "stream_id": stream_id}
async with Trace("send_request_headers", logger, request, kwargs):
await self._send_request_headers(request=request, stream_id=stream_id)
self._stream_requests[stream_id]["headers_sent"] = True
async with Trace("send_request_body", logger, request, kwargs):
await self._send_request_body(request=request, stream_id=stream_id)
self._stream_requests[stream_id]["body_sent"] = True
async with Trace(
"receive_response_headers", logger, request, kwargs
) as trace:
Expand Down Expand Up @@ -177,9 +191,46 @@ async def handle_async_request(self, request: Request) -> Response:
# a protocol error at any point they interact with the 'h2_state'.
#
# In this case we'll have stored the event, and should raise
# it as a RemoteProtocolError.
# it as a ConnectionGoingAway if applicable, or RemoteProtocolError.
if self._connection_terminated: # pragma: nocover
raise RemoteProtocolError(self._connection_terminated)
phase = self._stream_requests.get(
stream_id,
{
"headers_sent": False,
"body_sent": False,
},
)
raise ConnectionGoingAway(
self._connection_terminated, # type: ignore[arg-type,unused-ignore]
last_stream_id=self._connection_terminated.last_stream_id, # type: ignore[arg-type,unused-ignore]
error_code=self._connection_terminated.error_code, # type: ignore[arg-type,unused-ignore]
request_stream_id=stream_id,
headers_sent=phase["headers_sent"],
body_sent=phase["body_sent"],
)
# Check if h2 is in CLOSED state due to GOAWAY. This can happen when
# GOAWAY was recieved but we haven't processed the event yet (race condition).
if (
self._h2_state.state_machine.state
== h2.connection.ConnectionState.CLOSED
):
phase = self._stream_requests.get(
stream_id,
{
"headers_sent": False,
"body_sent": False,
},
)
msg = f"Connection closed: {exc}"
raise ConnectionGoingAway(
msg,
last_stream_id=stream_id, # Conservative: assume this stream may have been processed
error_code=0, # Assume graceful shutdown
request_stream_id=stream_id,
headers_sent=phase["headers_sent"],
body_sent=phase["body_sent"],
)

# If h2 raises a protocol error in some other state then we
# must somehow have made a protocol violation.
raise LocalProtocolError(exc) # pragma: nocover
Expand Down Expand Up @@ -349,10 +400,39 @@ async def _receive_events(
async with self._read_lock:
if self._connection_terminated is not None:
last_stream_id = self._connection_terminated.last_stream_id
if stream_id and last_stream_id and stream_id > last_stream_id:
self._request_count -= 1
raise ConnectionNotAvailable()
raise RemoteProtocolError(self._connection_terminated)
if stream_id is not None:
phase = self._stream_requests.get(
stream_id,
{
"headers_sent": False,
"body_sent": False,
},
)
if last_stream_id is not None and stream_id > last_stream_id:
# stream_id > last_stream_id: guaranteed unprocessed, safe to retry
self._request_count -= 1
raise ConnectionGoingAway(
f"GOAWAY received: stream {stream_id} > last_stream_id {last_stream_id}",
last_stream_id=last_stream_id,
error_code=self._connection_terminated.error_code, # type: ignore[arg-type,unused-ignore]
request_stream_id=stream_id,
headers_sent=phase["headers_sent"],
body_sent=phase["body_sent"],
)
if self._state != HTTPConnectionState.DRAINING:
# stream_id <= last_stream_id: may have been processed
raise ConnectionGoingAway(
f"GOAWAY received: stream {stream_id} <= last_stream_id {last_stream_id}",
last_stream_id=last_stream_id
if last_stream_id is not None
else 0,
error_code=self._connection_terminated.error_code, # type: ignore[arg-type,unused-ignore]
request_stream_id=stream_id,
headers_sent=phase["headers_sent"],
body_sent=phase["body_sent"],
)
else:
raise RemoteProtocolError(self._connection_terminated)

# This conditional is a bit icky. We don't want to block reading if we've
# actually got an event to return for a given stream. We need to do that
Expand All @@ -361,7 +441,7 @@ async def _receive_events(
# block until we've available flow control, event when we have events
# pending for the stream ID we're attempting to send on.
if stream_id is None or not self._events.get(stream_id):
events = await self._read_incoming_data(request)
events = await self._read_incoming_data(request, stream_id)
for event in events:
if isinstance(event, h2.events.RemoteSettingsChanged):
async with Trace(
Expand All @@ -384,6 +464,13 @@ async def _receive_events(

elif isinstance(event, h2.events.ConnectionTerminated):
self._connection_terminated = event
# Transition to DRAINING on graceful shutdown (NO_ERROR),
# allowing in-flight streams to complete.
# Non-graceful shutdown closes immediately.
if event.error_code == 0:
self._state = HTTPConnectionState.DRAINING
else:
self._state = HTTPConnectionState.CLOSED

await self._write_outgoing_data(request)

Expand All @@ -409,6 +496,7 @@ async def _receive_remote_settings_change(
async def _response_closed(self, stream_id: int) -> None:
await self._max_streams_semaphore.release()
del self._events[stream_id]
self._stream_requests.pop(stream_id, None) # Clean up phase tracking
async with self._state_lock:
if self._connection_terminated and not self._events:
await self.aclose()
Expand All @@ -430,7 +518,9 @@ async def aclose(self) -> None:

# Wrappers around network read/write operations...

async def _read_incoming_data(self, request: Request) -> list[h2.events.Event]:
async def _read_incoming_data(
self, request: Request, stream_id: int | None = None
) -> list[h2.events.Event]:
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("read", None)

Expand All @@ -440,6 +530,39 @@ async def _read_incoming_data(self, request: Request) -> list[h2.events.Event]:
try:
data = await self._network_stream.read(self.READ_NUM_BYTES, timeout)
if data == b"":
# Server disconnected. Check if this is related to GOAWAY.
if stream_id is not None:
phase = self._stream_requests.get(
stream_id,
{
"headers_sent": False,
"body_sent": False,
},
)
# If we have a GOAWAY recorded, this disconnect is GOAWAY-related
if self._connection_terminated is not None:
last_stream_id = self._connection_terminated.last_stream_id
raise ConnectionGoingAway(
"Server disconnected after GOAWAY",
last_stream_id=last_stream_id if last_stream_id else 0,
error_code=self._connection_terminated.error_code, # type: ignore[arg-type,unused-ignore]
request_stream_id=stream_id,
headers_sent=phase["headers_sent"],
body_sent=phase["body_sent"],
)
# Check if h2 is in CLOSED state (GOAWAY received but not processed)
if (
self._h2_state.state_machine.state
== h2.connection.ConnectionState.CLOSED
):
raise ConnectionGoingAway(
"Server disconnected (connection closed)",
last_stream_id=stream_id, # Conservative
error_code=0, # Assume graceful
request_stream_id=stream_id,
headers_sent=phase["headers_sent"],
body_sent=phase["body_sent"],
)
raise RemoteProtocolError("Server disconnected")
except Exception as exc:
# If we get a network error we should:
Expand Down Expand Up @@ -510,7 +633,8 @@ def can_handle_request(self, origin: Origin) -> bool:

def is_available(self) -> bool:
return (
self._state != HTTPConnectionState.CLOSED
self._state
not in (HTTPConnectionState.DRAINING, HTTPConnectionState.CLOSED)
and not self._connection_error
and not self._used_all_stream_ids
and not (
Expand All @@ -521,7 +645,12 @@ def is_available(self) -> bool:

def has_expired(self) -> bool:
now = time.monotonic()
return self._expire_at is not None and now > self._expire_at
keepalive_expired = self._expire_at is not None and now > self._expire_at
# Draining connections with no active streams are considered expired
draining_complete = (
self._state == HTTPConnectionState.DRAINING and not self._events
)
return keepalive_expired or draining_complete

def is_idle(self) -> bool:
return self._state == HTTPConnectionState.IDLE
Expand Down
78 changes: 78 additions & 0 deletions httpcore/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,84 @@ class ConnectionNotAvailable(Exception):
pass


class ConnectionGoingAway(ConnectionNotAvailable):
"""
Raised when a GOAWAY frame is received during HTTP/2 request processing.

This exception provides context for determining whether a request is safe
to retry, based on RFC 7540 Section 6.8 semantics.

Per RFC 7540: streams with IDs > last_stream_id are guaranteed unprocessed
and safe to retry. Streams with IDs <= last_stream_id may have been processed.

Attributes:
last_stream_id: The highest stream ID the server may have processed.
error_code: The GOAWAY error code (0 = NO_ERROR for graceful shutdown).
request_stream_id: The stream ID assigned to this specific request.
headers_sent: Whether request headers were transmitted before GOAWAY.
body_sent: Whether request body was transmitted before GOAWAY.
"""

def __init__(
self,
message: str,
*,
last_stream_id: int,
error_code: int,
request_stream_id: int,
headers_sent: bool = False,
body_sent: bool = False,
) -> None:
super().__init__(message)
self.last_stream_id = last_stream_id
self.error_code = error_code
self.request_stream_id = request_stream_id
self.headers_sent = headers_sent
self.body_sent = body_sent

@property
def is_safe_to_retry(self) -> bool:
"""
Returns True if the request is guaranteed unprocessed and safe to retry.

Per RFC 7540 Section 6.8: any stream with ID > last_stream_id was never
seen by the server and can be safely retried.
"""
return self.request_stream_id > self.last_stream_id

@property
def is_graceful_shutdown(self) -> bool:
"""
Returns True if this is a graceful shutdown (NO_ERROR).

NO_ERROR (0x0) indicates administrative shutdown such as server restart,
connection limit reached, or idle timeout.
"""
return self.error_code == 0

@property
def may_have_side_effects(self) -> bool:
"""
Returns True if the request may have been processed by the server.

If stream_id > last_stream_id: guaranteed no side effects (unprocessed).
If stream_id <= last_stream_id AND (headers or body sent): possibly processed.
"""
if self.request_stream_id > self.last_stream_id:
return False # Guaranteed unprocessed per RFC 7540
return self.headers_sent or self.body_sent

def __repr__(self) -> str:
return (
f"ConnectionGoingAway("
f"last_stream_id={self.last_stream_id}, "
f"error_code={self.error_code}, "
f"request_stream_id={self.request_stream_id}, "
f"is_safe_to_retry={self.is_safe_to_retry}, "
f"is_graceful_shutdown={self.is_graceful_shutdown})"
)


class ProxyError(Exception):
pass

Expand Down
Loading