-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathurllib3_proxy_manager.py
More file actions
153 lines (130 loc) · 5.07 KB
/
urllib3_proxy_manager.py
File metadata and controls
153 lines (130 loc) · 5.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import http, sys
from http.client import _read_headers
from urllib3.connection import HTTPSConnection
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
from urllib3.poolmanager import ProxyManager
from urllib3.util.request import make_headers
from urllib3.util.url import parse_url
if sys.version_info < (3, 12, 0):
#####################################
### copied from python3.12 source ###
#####################################
import email.parser
import email.message
class HTTPMessage(email.message.Message):
# XXX The only usage of this method is in
# http.server.CGIHTTPRequestHandler. Maybe move the code there so
# that it doesn't need to be part of the public API. The API has
# never been defined so this could cause backwards compatibility
# issues.
def getallmatchingheaders(self, name):
"""Find all header lines matching a given header name.
Look through the list of headers and find all lines matching a given
header name (and their continuation lines). A list of the lines is
returned, without interpretation. If the header does not occur, an
empty list is returned. If the header occurs multiple times, all
occurrences are returned. Case is not important in the header name.
"""
name = name.lower() + ':'
n = len(name)
lst = []
hit = 0
for line in self.keys():
if line[:n].lower() == name:
hit = 1
elif not line[:1].isspace():
hit = 0
if hit:
lst.append(line)
return lst
def _parse_header_lines(header_lines, _class=HTTPMessage):
"""
Parses only RFC2822 headers from header lines.
email Parser wants to see strings rather than bytes.
But a TextIOWrapper around self.rfile would buffer too many bytes
from the stream, bytes which we later need to read as bytes.
So we read the correct bytes here, as bytes, for email Parser
to parse.
"""
hstring = b''.join(header_lines).decode('iso-8859-1')
return email.parser.Parser(_class=_class).parsestr(hstring)
else:
from http.client import _parse_header_lines
class HTTPSProxyConnection(HTTPSConnection):
if sys.version_info < (3, 12, 0):
#####################################
### copied from python3.12 source ###
#####################################
def _wrap_ipv6(self, ip):
if b':' in ip and ip[0] != b'['[0]:
return b"[" + ip + b"]"
return ip
def _tunnel(self):
connect = b"CONNECT %s:%d %s\r\n" % (
self._wrap_ipv6(self._tunnel_host.encode("idna")),
self._tunnel_port,
self._http_vsn_str.encode("ascii"))
headers = [connect]
for header, value in self._tunnel_headers.items():
headers.append(f"{header}: {value}\r\n".encode("latin-1"))
headers.append(b"\r\n")
# Making a single send() call instead of one per line encourages
# the host OS to use a more optimal packet size instead of
# potentially emitting a series of small packets.
self.send(b"".join(headers))
del headers
response = self.response_class(self.sock, method=self._method)
try:
(version, code, message) = response._read_status()
self._raw_proxy_headers = _read_headers(response.fp)
if self.debuglevel > 0:
for header in self._raw_proxy_headers:
print('header:', header.decode())
if code != http.HTTPStatus.OK:
self.close()
raise OSError(f"Tunnel connection failed: {code} {message.strip()}")
finally:
response.close()
def get_proxy_response_headers(self):
"""
Returns a dictionary with the headers of the response
received from the proxy server to the CONNECT request
sent to set the tunnel.
If the CONNECT request was not sent, the method returns None.
"""
return (
_parse_header_lines(self._raw_proxy_headers)
if self._raw_proxy_headers is not None
else None
)
class HTTPSProxyConnectionPool(HTTPSConnectionPool):
ConnectionCls = HTTPSProxyConnection
def _prepare_proxy(self, conn):
super()._prepare_proxy(conn)
self._proxy_response_headers = conn.get_proxy_response_headers()
def urlopen(self, *args, **kwargs):
response = super().urlopen(*args, **kwargs)
response.headers.update(self._proxy_response_headers)
return response
class ProxyHeaderManager(ProxyManager):
def __init__(self, *args, **kwargs):
# urllib3.ProxyManager does not add Proxy-Authorization from user:pass in the
# proxy URL; requests does via HTTPAdapter.proxy_headers(). Merge URL auth here
# so direct proxy_from_url() matches requests and authenticates CONNECT.
proxy_url = kwargs.get("proxy_url")
if proxy_url is None and args:
proxy_url = args[0]
proxy_headers = kwargs.get("proxy_headers")
merged = dict(proxy_headers or {})
if isinstance(proxy_url, str):
parsed = parse_url(proxy_url)
if parsed.auth and not any(
k.lower() == "proxy-authorization" for k in merged
):
merged.update(make_headers(proxy_basic_auth=parsed.auth))
if merged != dict(proxy_headers or {}):
kwargs["proxy_headers"] = merged
super().__init__(*args, **kwargs)
self.pool_classes_by_scheme = {"http": HTTPConnectionPool, "https": HTTPSProxyConnectionPool}
def proxy_from_url(url, **kwargs):
return ProxyHeaderManager(proxy_url=url, **kwargs)