@@ -15287,6 +15287,267 @@ def _try_flush_buffer():
1528715287 except Exception:
1528815288 pass
1528915289
15290+ def _udp_raw_send(fileobj, host, port, **kwargs):
15291+ logger = _logger_from_kwargs(kwargs)
15292+ addr = (host or "127.0.0.1", int(port))
15293+
15294+ # ---- normalize bool-ish flags (URL query values may be strings) ----
15295+ handshake = _kw_bool(kwargs.get("handshake", True), True)
15296+ raw_ack = _kw_bool(kwargs.get("raw_ack", False), False)
15297+ raw_meta = _kw_bool(kwargs.get("raw_meta", True), True)
15298+ raw_sha = _kw_bool(kwargs.get("raw_sha", False), False)
15299+ wait = _kw_bool(kwargs.get("wait", True), True) or _kw_bool(kwargs.get("connect_wait", False), False)
15300+
15301+ verbose = _kw_bool(kwargs.get("verbose", False), False)
15302+
15303+ def _log(msg):
15304+ _net_log(verbose, msg, logger=logger)
15305+
15306+ # ---- numeric params ----
15307+ try:
15308+ chunk = int(kwargs.get("chunk", 1200))
15309+ except Exception:
15310+ chunk = 1200
15311+ if chunk < 256:
15312+ chunk = 256
15313+
15314+ try:
15315+ wt = kwargs.get("wait_timeout", None)
15316+ wt = float(wt) if wt is not None else None
15317+ except Exception:
15318+ wt = None
15319+
15320+ try:
15321+ hello_iv = float(kwargs.get("hello_interval", 0.1) or 0.1)
15322+ except Exception:
15323+ hello_iv = 0.1
15324+ if hello_iv <= 0:
15325+ hello_iv = 0.1
15326+
15327+ # ---- compute total remaining length (for META and/or HASH) ----
15328+ total_len = None
15329+ pos = None
15330+ if raw_meta or raw_sha:
15331+ try:
15332+ pos = fileobj.tell()
15333+ fileobj.seek(0, os.SEEK_END)
15334+ end = fileobj.tell()
15335+ fileobj.seek(pos, os.SEEK_SET)
15336+ total_len = int(end - pos)
15337+ if total_len < 0:
15338+ total_len = None
15339+ except Exception:
15340+ total_len = None
15341+ try:
15342+ if pos is not None:
15343+ fileobj.seek(pos, os.SEEK_SET)
15344+ except Exception:
15345+ pass
15346+
15347+ # ---- precompute expected hash (optional) ----
15348+ expected_hex = None
15349+ raw_hash = (kwargs.get("raw_hash", "sha256") or "sha256").lower()
15350+ if raw_sha and total_len is not None:
15351+ try:
15352+ h = hashlib.sha256() if raw_hash != "md5" else hashlib.md5()
15353+ cur = fileobj.tell()
15354+ while True:
15355+ b = fileobj.read(65536)
15356+ if not b:
15357+ break
15358+ h.update(_to_bytes(b))
15359+ expected_hex = h.hexdigest()
15360+ fileobj.seek(cur, os.SEEK_SET)
15361+ except Exception:
15362+ expected_hex = None
15363+ try:
15364+ if pos is not None:
15365+ fileobj.seek(pos, os.SEEK_SET)
15366+ except Exception:
15367+ pass
15368+
15369+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
15370+
15371+ try:
15372+ # ---- handshake / wait-for-receiver ----
15373+ tok = kwargs.get("token")
15374+ tok = _hs_token() if tok is None else _to_bytes(tok)
15375+
15376+ if wait:
15377+ start_t = time.time()
15378+ while True:
15379+ if wt is not None and wt >= 0 and (time.time() - start_t) >= wt:
15380+ _log("UDP raw: wait_timeout reached; no receiver READY")
15381+ try:
15382+ sock.close()
15383+ except Exception:
15384+ pass
15385+ return False
15386+
15387+ # announce
15388+ if handshake:
15389+ try:
15390+ sock.sendto(b"HELLO " + tok + b"\n", addr)
15391+ except Exception:
15392+ pass
15393+
15394+ if raw_meta and total_len is not None:
15395+ try:
15396+ sock.sendto(b"META " + str(total_len).encode("ascii") + b"\n", addr)
15397+ except Exception:
15398+ pass
15399+
15400+ if raw_sha and expected_hex:
15401+ try:
15402+ sock.sendto(b"HASH " + raw_hash.encode("ascii") + b" " + expected_hex.encode("ascii") + b"\n", addr)
15403+ except Exception:
15404+ pass
15405+
15406+ # wait briefly for READY
15407+ try:
15408+ sock.settimeout(hello_iv)
15409+ except Exception:
15410+ pass
15411+
15412+ try:
15413+ pkt, _a = sock.recvfrom(1024)
15414+ if pkt.startswith(b"READY"):
15415+ # READY or READY <token>
15416+ if b" " in pkt:
15417+ rt = pkt.split(None, 1)[1].strip()
15418+ if rt and rt != tok:
15419+ continue
15420+ _log("UDP raw: received READY from receiver")
15421+ break
15422+ except socket.timeout:
15423+ continue
15424+ except Exception:
15425+ continue
15426+ else:
15427+ # if not waiting, still send META/HASH once up front
15428+ if handshake:
15429+ try:
15430+ sock.sendto(b"HELLO " + tok + b"\n", addr)
15431+ except Exception:
15432+ pass
15433+ if raw_meta and total_len is not None:
15434+ try:
15435+ sock.sendto(b"META " + str(total_len).encode("ascii") + b"\n", addr)
15436+ except Exception:
15437+ pass
15438+ if raw_sha and expected_hex:
15439+ try:
15440+ sock.sendto(b"HASH " + raw_hash.encode("ascii") + b" " + expected_hex.encode("ascii") + b"\n", addr)
15441+ except Exception:
15442+ pass
15443+
15444+ # ---- send data ----
15445+ if raw_ack:
15446+ # sliding window retransmit
15447+ try:
15448+ ack_to = float(kwargs.get("raw_ack_timeout", 0.5) or 0.5)
15449+ except Exception:
15450+ ack_to = 0.5
15451+ try:
15452+ retries_max = int(kwargs.get("raw_ack_retries", 40) or 40)
15453+ except Exception:
15454+ retries_max = 40
15455+ try:
15456+ win = int(kwargs.get("raw_ack_window", 1) or 1)
15457+ except Exception:
15458+ win = 1
15459+ if win < 1:
15460+ win = 1
15461+
15462+ try:
15463+ sock.settimeout(ack_to)
15464+ except Exception:
15465+ pass
15466+
15467+ def _make_pkt(seq, data):
15468+ return b"PKT " + str(seq).encode("ascii") + b" " + _to_bytes(data)
15469+
15470+ base_seq = 0
15471+ next_seq = 0
15472+ pkts = {}
15473+ eof = False
15474+ timeout_tries = 0
15475+
15476+ while True:
15477+ # fill window
15478+ while (not eof) and next_seq < base_seq + win:
15479+ data = fileobj.read(chunk)
15480+ if not data:
15481+ eof = True
15482+ break
15483+ pkt = _make_pkt(next_seq, data)
15484+ pkts[next_seq] = pkt
15485+ try:
15486+ sock.sendto(pkt, addr)
15487+ except Exception:
15488+ pass
15489+ next_seq += 1
15490+
15491+ if eof and base_seq == next_seq:
15492+ break
15493+
15494+ try:
15495+ apkt, _a = sock.recvfrom(1024)
15496+ if apkt.startswith(b"ACK "):
15497+ try:
15498+ aseq = int(apkt.split()[1])
15499+ except Exception:
15500+ aseq = -1
15501+ new_base = aseq + 1
15502+ if new_base > base_seq:
15503+ for s in list(pkts.keys()):
15504+ if s < new_base:
15505+ pkts.pop(s, None)
15506+ base_seq = new_base
15507+ timeout_tries = 0
15508+ except socket.timeout:
15509+ timeout_tries += 1
15510+ if retries_max >= 0 and timeout_tries >= retries_max:
15511+ _log("UDP raw: too many ACK timeouts, giving up")
15512+ return False
15513+ # retransmit all in-flight
15514+ for s in range(base_seq, next_seq):
15515+ pkt = pkts.get(s)
15516+ if pkt is None:
15517+ continue
15518+ try:
15519+ sock.sendto(pkt, addr)
15520+ except Exception:
15521+ pass
15522+ except Exception:
15523+ # treat as timeout-ish
15524+ timeout_tries += 1
15525+
15526+ else:
15527+ # legacy raw: just send datagrams
15528+ while True:
15529+ data = fileobj.read(chunk)
15530+ if not data:
15531+ break
15532+ try:
15533+ sock.sendto(_to_bytes(data), addr)
15534+ except Exception:
15535+ pass
15536+
15537+ # ---- finish ----
15538+ try:
15539+ sock.sendto(b"DONE", addr)
15540+ except Exception:
15541+ pass
15542+
15543+ return True
15544+
15545+ finally:
15546+ try:
15547+ sock.close()
15548+ except Exception:
15549+ pass
15550+
1529015551def _udp_seq_send(fileobj, host, port, resume=False, path_text=None, **kwargs):
1529115552 addr = (host, int(port))
1529215553 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
0 commit comments