From f3d9fb2dc614730a4574407d47aec2896ac1dde4 Mon Sep 17 00:00:00 2001 From: Julius Schwartzenberg Date: Wed, 23 Jul 2025 22:06:26 +0200 Subject: [PATCH 1/4] SOCKS5: support looking up names remotely --- kafka/conn.py | 8 ++++++-- kafka/socks5_wrapper.py | 38 ++++++++++++++++++++++++++++---------- 2 files changed, 34 insertions(+), 12 deletions(-) mode change 100644 => 100755 kafka/conn.py mode change 100644 => 100755 kafka/socks5_wrapper.py diff --git a/kafka/conn.py b/kafka/conn.py old mode 100644 new mode 100755 index 64445fab0..469e19c1e --- a/kafka/conn.py +++ b/kafka/conn.py @@ -326,6 +326,8 @@ def _dns_lookup(self): return True def _next_afi_sockaddr(self): + if self._socks5_proxy.use_remote_lookup(): + return (socket.AF_UNSPEC, (self.host, self.port)) if not self._gai: if not self._dns_lookup(): return @@ -366,6 +368,9 @@ def connect_blocking(self, timeout=float('inf')): def connect(self): """Attempt to connect and return ConnectionState""" + if self.config["socks5_proxy"] is not None and self._socks5_proxy is None: + self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi) + if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out(): self.state = ConnectionStates.CONNECTING self.last_attempt = time.time() @@ -379,7 +384,6 @@ def connect(self): self._sock_afi, self._sock_addr = next_lookup try: if self.config["socks5_proxy"] is not None: - self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi) self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM) else: self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) @@ -862,7 +866,7 @@ def connection_delay(self): large number to handle slow/stalled connections. """ if self.disconnected() or self.connecting(): - if len(self._gai) > 0: + if len(self._gai) > 0 or (self._socks5_proxy is not None and self._socks5_proxy.use_remote_lookup()): return 0 else: time_waited = time.time() - self.last_attempt diff --git a/kafka/socks5_wrapper.py b/kafka/socks5_wrapper.py old mode 100644 new mode 100755 index 18bea7c8d..cdd600b7a --- a/kafka/socks5_wrapper.py +++ b/kafka/socks5_wrapper.py @@ -64,6 +64,9 @@ def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC): log.warning("DNS lookup failed for proxy %s:%d, %r", host, port, ex) return [] + def use_remote_lookup(self): + return self._proxy_url.scheme == 'socks5h' + def socket(self, family, sock_type): """Open and record a socket. @@ -187,7 +190,10 @@ def connect_ex(self, addr): return errno.ECONNREFUSED if self._state == ProxyConnectionStates.REQUEST_SUBMIT: - if self._target_afi == socket.AF_INET: + if self.use_remote_lookup(): + addr_type = 3 + addr_len = len(addr[0]) + elif self._target_afi == socket.AF_INET: addr_type = 1 addr_len = 4 elif self._target_afi == socket.AF_INET6: @@ -199,15 +205,27 @@ def connect_ex(self, addr): self._sock.close() return errno.ECONNREFUSED - self._buffer_out = struct.pack( - "!bbbb{}sh".format(addr_len), - 5, # version - 1, # command: connect - 0, # reserved - addr_type, # 1 for ipv4, 4 for ipv6 address - socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address - addr[1], # port - ) + if self.use_remote_lookup(): + self._buffer_out = struct.pack( + "!bbbbb{}sh".format(addr_len), + 5, # version + 1, # command: connect + 0, # reserved + addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name + addr_len, + addr[0].encode('ascii'), # host + addr[1], # port + ) + else: + self._buffer_out = struct.pack( + "!bbbb{}sh".format(addr_len), + 5, # version + 1, # command: connect + 0, # reserved + addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name + socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address + addr[1], # port + ) self._state = ProxyConnectionStates.REQUESTING if self._state == ProxyConnectionStates.REQUESTING: From b3b9a8645e724622de37b5664aba718009b89e10 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 18 Nov 2025 10:57:05 -0800 Subject: [PATCH 2/4] Fix _next_afi_sockaddr crash; revert Socks5Wrapper change; reset _socks5_proxy in close() --- kafka/conn.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 469e19c1e..dd1417120 100755 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -326,8 +326,9 @@ def _dns_lookup(self): return True def _next_afi_sockaddr(self): - if self._socks5_proxy.use_remote_lookup(): + if self._socks5_proxy and self._socks5_proxy.use_remote_lookup(): return (socket.AF_UNSPEC, (self.host, self.port)) + if not self._gai: if not self._dns_lookup(): return @@ -368,9 +369,6 @@ def connect_blocking(self, timeout=float('inf')): def connect(self): """Attempt to connect and return ConnectionState""" - if self.config["socks5_proxy"] is not None and self._socks5_proxy is None: - self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi) - if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out(): self.state = ConnectionStates.CONNECTING self.last_attempt = time.time() @@ -384,6 +382,7 @@ def connect(self): self._sock_afi, self._sock_addr = next_lookup try: if self.config["socks5_proxy"] is not None: + self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi) self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM) else: self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) @@ -866,7 +865,9 @@ def connection_delay(self): large number to handle slow/stalled connections. """ if self.disconnected() or self.connecting(): - if len(self._gai) > 0 or (self._socks5_proxy is not None and self._socks5_proxy.use_remote_lookup()): + if len(self._gai) > 0: + return 0 + elif self._socks5_proxy and self._socks5_proxy.use_remote_lookup(): return 0 else: time_waited = time.time() - self.last_attempt @@ -968,6 +969,7 @@ def close(self, error=None): # the socket fd from selectors cleanly. sock = self._sock self._sock = None + self._socks5_proxy = None # drop lock before state change callback and processing futures self.config['state_change_callback'](self.node_id, sock, self) From fb9bebcc97e16a4b8536d87328232ee6407cff25 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 18 Nov 2025 12:20:10 -0800 Subject: [PATCH 3/4] use_remote_lookup(proxy_url) classmethod; add debug statement for proxy init; consolidate buffer_out; fix port H encoding --- kafka/conn.py | 5 +++-- kafka/socks5_wrapper.py | 44 +++++++++++++++++++++++------------------ 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index dd1417120..1ead4af97 100755 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -326,7 +326,7 @@ def _dns_lookup(self): return True def _next_afi_sockaddr(self): - if self._socks5_proxy and self._socks5_proxy.use_remote_lookup(): + if self.config["socks5_proxy"] and Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]): return (socket.AF_UNSPEC, (self.host, self.port)) if not self._gai: @@ -382,6 +382,7 @@ def connect(self): self._sock_afi, self._sock_addr = next_lookup try: if self.config["socks5_proxy"] is not None: + log.debug('%s: initializing Socks5 proxy at %s', self, self.config["socks5_proxy"]) self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi) self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM) else: @@ -867,7 +868,7 @@ def connection_delay(self): if self.disconnected() or self.connecting(): if len(self._gai) > 0: return 0 - elif self._socks5_proxy and self._socks5_proxy.use_remote_lookup(): + elif Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]): return 0 else: time_waited = time.time() - self.last_attempt diff --git a/kafka/socks5_wrapper.py b/kafka/socks5_wrapper.py index cdd600b7a..9ed6fb98a 100755 --- a/kafka/socks5_wrapper.py +++ b/kafka/socks5_wrapper.py @@ -64,7 +64,11 @@ def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC): log.warning("DNS lookup failed for proxy %s:%d, %r", host, port, ex) return [] - def use_remote_lookup(self): + @classmethod + def use_remote_lookup(cls, proxy_url): + return urlparse(proxy_url).scheme == 'socks5h' + + def _use_remote_lookup(self): return self._proxy_url.scheme == 'socks5h' def socket(self, family, sock_type): @@ -190,7 +194,7 @@ def connect_ex(self, addr): return errno.ECONNREFUSED if self._state == ProxyConnectionStates.REQUEST_SUBMIT: - if self.use_remote_lookup(): + if self._use_remote_lookup(): addr_type = 3 addr_len = len(addr[0]) elif self._target_afi == socket.AF_INET: @@ -205,27 +209,29 @@ def connect_ex(self, addr): self._sock.close() return errno.ECONNREFUSED - if self.use_remote_lookup(): - self._buffer_out = struct.pack( - "!bbbbb{}sh".format(addr_len), - 5, # version - 1, # command: connect - 0, # reserved - addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name + self._buffer_out = struct.pack( + "!bbbb", + 5, # version + 1, # command: connect + 0, # reserved + addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name + ) + # Addr format depends on type + if addr_type == 3: + # len + domain name (no null terminator) + self._buffer_out += struct.pack( + "!b{}s".format(addr_len), addr_len, - addr[0].encode('ascii'), # host - addr[1], # port + addr[0].encode('ascii'), ) else: - self._buffer_out = struct.pack( - "!bbbb{}sh".format(addr_len), - 5, # version - 1, # command: connect - 0, # reserved - addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name - socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address - addr[1], # port + # either 4 (type 1) or 16 (type 4) bytes of actual address + self._buffer_out += struct.pack( + "!{}s".format(addr_len), + socket.inet_pton(self._target_afi, addr[0]), ) + self._buffer_out += struct.pack("!H", addr[1]) # port + self._state = ProxyConnectionStates.REQUESTING if self._state == ProxyConnectionStates.REQUESTING: From 2fcedef773854f0908250af12696ae6118bb722c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 20 Nov 2025 12:26:56 -0800 Subject: [PATCH 4/4] Check for non-null proxy_url --- kafka/conn.py | 2 +- kafka/socks5_wrapper.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 1ead4af97..4a0d64a28 100755 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -868,7 +868,7 @@ def connection_delay(self): if self.disconnected() or self.connecting(): if len(self._gai) > 0: return 0 - elif Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]): + elif self.config["socks5_proxy"] and Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]): return 0 else: time_waited = time.time() - self.last_attempt diff --git a/kafka/socks5_wrapper.py b/kafka/socks5_wrapper.py index 9ed6fb98a..6715f2093 100755 --- a/kafka/socks5_wrapper.py +++ b/kafka/socks5_wrapper.py @@ -66,6 +66,8 @@ def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC): @classmethod def use_remote_lookup(cls, proxy_url): + if proxy_url is None: + return False return urlparse(proxy_url).scheme == 'socks5h' def _use_remote_lookup(self):