|
15 | 15 |
|
16 | 16 | from __future__ import annotations |
17 | 17 |
|
18 | | -import ipaddress |
19 | 18 | import socket |
20 | 19 | from collections.abc import Iterator |
21 | 20 | from typing import TypeVar, Generic |
@@ -53,18 +52,40 @@ def _normalize_ip_key(key: str) -> tuple[str, bool]: |
53 | 52 | if not key: |
54 | 53 | raise InvalidIPError("Key cannot be empty") |
55 | 54 |
|
56 | | - addr = key.rsplit("/", 1)[0] if "/" in key else key |
57 | | - |
| 55 | + if "/" in key: |
| 56 | + addr_str, sep, prefix_str = key.rpartition("/") |
| 57 | + try: |
| 58 | + prefix_len = int(prefix_str) |
| 59 | + except ValueError: |
| 60 | + raise InvalidIPError(f"Invalid IP address or network: {key!r}") |
| 61 | + else: |
| 62 | + addr_str = key |
| 63 | + prefix_len = -1 |
| 64 | + |
| 65 | + # Try IPv4 |
58 | 66 | try: |
59 | | - socket.inet_pton(socket.AF_INET, addr) |
60 | | - return key, False |
61 | | - except socket.error: |
| 67 | + packed = socket.inet_pton(socket.AF_INET, addr_str) |
| 68 | + if prefix_len >= 0: |
| 69 | + if prefix_len > 32: |
| 70 | + raise InvalidIPError(f"Invalid IP address or network: {key!r}") |
| 71 | + mask = (0xFFFFFFFF << (32 - prefix_len)) & 0xFFFFFFFF |
| 72 | + masked = (int.from_bytes(packed, "big") & mask).to_bytes(4, "big") |
| 73 | + return f"{socket.inet_ntop(socket.AF_INET, masked)}/{prefix_len}", False |
| 74 | + return addr_str, False |
| 75 | + except OSError: |
62 | 76 | pass |
63 | 77 |
|
| 78 | + # Try IPv6 |
64 | 79 | try: |
65 | | - socket.inet_pton(socket.AF_INET6, addr) |
66 | | - return key, True |
67 | | - except socket.error: |
| 80 | + packed = socket.inet_pton(socket.AF_INET6, addr_str) |
| 81 | + if prefix_len >= 0: |
| 82 | + if prefix_len > 128: |
| 83 | + raise InvalidIPError(f"Invalid IP address or network: {key!r}") |
| 84 | + mask = ((1 << 128) - 1) << (128 - prefix_len) |
| 85 | + masked = (int.from_bytes(packed, "big") & mask).to_bytes(16, "big") |
| 86 | + return f"{socket.inet_ntop(socket.AF_INET6, masked)}/{prefix_len}", True |
| 87 | + return socket.inet_ntop(socket.AF_INET6, packed), True |
| 88 | + except OSError: |
68 | 89 | raise InvalidIPError(f"Invalid IP address or network: {key!r}") |
69 | 90 |
|
70 | 91 |
|
|
0 commit comments