Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 14 additions & 33 deletions httpcore/_async/socks_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async def _init_socks5_connection(
host: bytes,
port: int,
auth: tuple[bytes, bytes] | None = None,
timeout: float | None = None, # <--- FIX 1: Add timeout argument
) -> None:
conn = socksio.socks5.SOCKS5Connection()

Expand All @@ -56,10 +57,12 @@ async def _init_socks5_connection(
)
conn.send(socksio.socks5.SOCKS5AuthMethodsRequest([auth_method]))
outgoing_bytes = conn.data_to_send()
await stream.write(outgoing_bytes)
await stream.write(outgoing_bytes, timeout=timeout) # <--- FIX 2: Pass timeout

# Auth method response
incoming_bytes = await stream.read(max_bytes=4096)
incoming_bytes = await stream.read(
max_bytes=4096, timeout=timeout
) # <--- FIX 3: Pass timeout
response = conn.receive_data(incoming_bytes)
assert isinstance(response, socksio.socks5.SOCKS5AuthReply)
if response.method != auth_method:
Expand All @@ -75,10 +78,12 @@ async def _init_socks5_connection(
username, password = auth
conn.send(socksio.socks5.SOCKS5UsernamePasswordRequest(username, password))
outgoing_bytes = conn.data_to_send()
await stream.write(outgoing_bytes)
await stream.write(outgoing_bytes, timeout=timeout) # <--- FIX 4: Pass timeout

# Username/password response
incoming_bytes = await stream.read(max_bytes=4096)
incoming_bytes = await stream.read(
max_bytes=4096, timeout=timeout
) # <--- FIX 5: Pass timeout
response = conn.receive_data(incoming_bytes)
assert isinstance(response, socksio.socks5.SOCKS5UsernamePasswordReply)
if not response.success:
Expand All @@ -91,10 +96,12 @@ async def _init_socks5_connection(
)
)
outgoing_bytes = conn.data_to_send()
await stream.write(outgoing_bytes)
await stream.write(outgoing_bytes, timeout=timeout) # <--- FIX 6: Pass timeout

# Connect response
incoming_bytes = await stream.read(max_bytes=4096)
incoming_bytes = await stream.read(
max_bytes=4096, timeout=timeout
) # <--- FIX 7: Pass timeout
response = conn.receive_data(incoming_bytes)
assert isinstance(response, socksio.socks5.SOCKS5Reply)
if response.reply_code != socksio.socks5.SOCKS5ReplyCode.SUCCEEDED:
Expand Down Expand Up @@ -122,33 +129,6 @@ def __init__(
) -> None:
"""
A connection pool for making HTTP requests.

Parameters:
proxy_url: The URL to use when connecting to the proxy server.
For example `"http://127.0.0.1:8080/"`.
ssl_context: An SSL context to use for verifying connections.
If not specified, the default `httpcore.default_ssl_context()`
will be used.
max_connections: The maximum number of concurrent HTTP connections that
the pool should allow. Any attempt to send a request on a pool that
would exceed this amount will block until a connection is available.
max_keepalive_connections: The maximum number of idle HTTP connections
that will be maintained in the pool.
keepalive_expiry: The duration in seconds that an idle HTTP connection
may be maintained for before being expired from the pool.
http1: A boolean indicating if HTTP/1.1 requests should be supported
by the connection pool. Defaults to True.
http2: A boolean indicating if HTTP/2 requests should be supported by
the connection pool. Defaults to False.
retries: The maximum number of retries when trying to establish
a connection.
local_address: Local address to connect from. Can also be used to
connect using a particular address family. Using
`local_address="0.0.0.0"` will connect using an `AF_INET` address
(IPv4), while using `local_address="::"` will connect using an
`AF_INET6` address (IPv6).
uds: Path to a Unix Domain Socket to use instead of TCP sockets.
network_backend: A backend instance to use for handling network I/O.
"""
super().__init__(
ssl_context=ssl_context,
Expand Down Expand Up @@ -237,6 +217,7 @@ async def handle_async_request(self, request: Request) -> Response:
"host": self._remote_origin.host.decode("ascii"),
"port": self._remote_origin.port,
"auth": self._proxy_auth,
"timeout": timeout, # <--- FIX 8: Pass timeout argument
}
async with Trace(
"setup_socks5_connection", logger, request, kwargs
Expand Down
47 changes: 14 additions & 33 deletions httpcore/_sync/socks_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def _init_socks5_connection(
host: bytes,
port: int,
auth: tuple[bytes, bytes] | None = None,
timeout: float | None = None, # <--- FIX 1: Add timeout argument
) -> None:
conn = socksio.socks5.SOCKS5Connection()

Expand All @@ -56,10 +57,12 @@ def _init_socks5_connection(
)
conn.send(socksio.socks5.SOCKS5AuthMethodsRequest([auth_method]))
outgoing_bytes = conn.data_to_send()
stream.write(outgoing_bytes)
stream.write(outgoing_bytes, timeout=timeout) # <--- FIX 2: Pass timeout

# Auth method response
incoming_bytes = stream.read(max_bytes=4096)
incoming_bytes = stream.read(
max_bytes=4096, timeout=timeout
) # <--- FIX 3: Pass timeout
response = conn.receive_data(incoming_bytes)
assert isinstance(response, socksio.socks5.SOCKS5AuthReply)
if response.method != auth_method:
Expand All @@ -75,10 +78,12 @@ def _init_socks5_connection(
username, password = auth
conn.send(socksio.socks5.SOCKS5UsernamePasswordRequest(username, password))
outgoing_bytes = conn.data_to_send()
stream.write(outgoing_bytes)
stream.write(outgoing_bytes, timeout=timeout) # <--- FIX 4: Pass timeout

# Username/password response
incoming_bytes = stream.read(max_bytes=4096)
incoming_bytes = stream.read(
max_bytes=4096, timeout=timeout
) # <--- FIX 5: Pass timeout
response = conn.receive_data(incoming_bytes)
assert isinstance(response, socksio.socks5.SOCKS5UsernamePasswordReply)
if not response.success:
Expand All @@ -91,10 +96,12 @@ def _init_socks5_connection(
)
)
outgoing_bytes = conn.data_to_send()
stream.write(outgoing_bytes)
stream.write(outgoing_bytes, timeout=timeout) # <--- FIX 6: Pass timeout

# Connect response
incoming_bytes = stream.read(max_bytes=4096)
incoming_bytes = stream.read(
max_bytes=4096, timeout=timeout
) # <--- FIX 7: Pass timeout
response = conn.receive_data(incoming_bytes)
assert isinstance(response, socksio.socks5.SOCKS5Reply)
if response.reply_code != socksio.socks5.SOCKS5ReplyCode.SUCCEEDED:
Expand Down Expand Up @@ -122,33 +129,6 @@ def __init__(
) -> None:
"""
A connection pool for making HTTP requests.

Parameters:
proxy_url: The URL to use when connecting to the proxy server.
For example `"http://127.0.0.1:8080/"`.
ssl_context: An SSL context to use for verifying connections.
If not specified, the default `httpcore.default_ssl_context()`
will be used.
max_connections: The maximum number of concurrent HTTP connections that
the pool should allow. Any attempt to send a request on a pool that
would exceed this amount will block until a connection is available.
max_keepalive_connections: The maximum number of idle HTTP connections
that will be maintained in the pool.
keepalive_expiry: The duration in seconds that an idle HTTP connection
may be maintained for before being expired from the pool.
http1: A boolean indicating if HTTP/1.1 requests should be supported
by the connection pool. Defaults to True.
http2: A boolean indicating if HTTP/2 requests should be supported by
the connection pool. Defaults to False.
retries: The maximum number of retries when trying to establish
a connection.
local_address: Local address to connect from. Can also be used to
connect using a particular address family. Using
`local_address="0.0.0.0"` will connect using an `AF_INET` address
(IPv4), while using `local_address="::"` will connect using an
`AF_INET6` address (IPv6).
uds: Path to a Unix Domain Socket to use instead of TCP sockets.
network_backend: A backend instance to use for handling network I/O.
"""
super().__init__(
ssl_context=ssl_context,
Expand Down Expand Up @@ -237,6 +217,7 @@ def handle_request(self, request: Request) -> Response:
"host": self._remote_origin.host.decode("ascii"),
"port": self._remote_origin.port,
"auth": self._proxy_auth,
"timeout": timeout, # <--- FIX 8: Pass timeout argument
}
with Trace(
"setup_socks5_connection", logger, request, kwargs
Expand Down
63 changes: 63 additions & 0 deletions reproduce_httpcore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import httpcore
import socket
import threading
import time

# --- Same Server Setup as before ---
TIMEOUT = 2.0
HANG_TIME = 20

def get_free_port():
with socket.socket() as s:
s.bind(('', 0))
return s.getsockname()[1]

def blackhole_proxy_server(port, stop_event):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1', port))
server.listen(1)
server.settimeout(1.0)
while not stop_event.is_set():
try:
client, _ = server.accept()
time.sleep(HANG_TIME) # Hang the handshake
client.close()
except socket.timeout: continue
except: break
server.close()

# --- The Test ---
def run_test():
proxy_port = get_free_port()
stop_event = threading.Event()
t = threading.Thread(target=blackhole_proxy_server, args=(proxy_port, stop_event))
t.start()
time.sleep(0.5)

print(f"[*] Testing httpcore SOCKSProxy with {TIMEOUT}s timeout...")
start_time = time.time()

# We use the low-level SOCKSProxy directly
with httpcore.SOCKSProxy(
proxy_url=f"socks5://127.0.0.1:{proxy_port}"
) as pool:
try:
# We assume httpcore 1.0+ style request
pool.request(
"GET",
"http://example.com",
extensions={'timeout': {'connect': TIMEOUT, 'read': TIMEOUT}}
)
except httpcore.TimeoutException:
print("[SUCCESS] Caught timeout correctly!")
except Exception as e:
print(f"[ERROR] {e}")
finally:
duration = time.time() - start_time
print(f"[*] Duration: {duration:.2f}s")
stop_event.set()
t.join()

if __name__ == "__main__":
run_test()
69 changes: 69 additions & 0 deletions reproduce_httpcore_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import httpcore
import socket
import threading
import time
import asyncio

# --- Server Setup (Same as before) ---
HANG_TIME = 20
TIMEOUT = 2.0

def get_free_port():
with socket.socket() as s:
s.bind(('', 0))
return s.getsockname()[1]

def blackhole_proxy_server(port, stop_event):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1', port))
server.listen(1)
server.settimeout(1.0)

while not stop_event.is_set():
try:
client, _ = server.accept()
# print("[Server] Accepted connection, sleeping...")
time.sleep(HANG_TIME)
client.close()
except socket.timeout: continue
except: break
server.close()

# --- Async Test ---
async def run_async_test(port):
print(f"[*] Testing ASYNC httpcore SOCKSProxy with {TIMEOUT}s timeout...")
start_time = time.time()

async with httpcore.AsyncSOCKSProxy(
proxy_url=f"socks5://127.0.0.1:{port}"
) as pool:
try:
await pool.request(
"GET",
"http://example.com",
extensions={'timeout': {'connect': TIMEOUT, 'read': TIMEOUT}}
)
except httpcore.TimeoutException:
print("[SUCCESS] Caught timeout correctly!")
except Exception as e:
print(f"[ERROR] {type(e).__name__}: {e}")
finally:
duration = time.time() - start_time
print(f"[*] Duration: {duration:.2f}s")

def main():
port = get_free_port()
stop_event = threading.Event()
t = threading.Thread(target=blackhole_proxy_server, args=(port, stop_event))
t.start()
time.sleep(0.5)

try:
asyncio.run(run_async_test(port))
finally:
stop_event.set()
t.join()

if __name__ == "__main__":
main()