diff --git a/apps/api/plane/bgtasks/work_item_link_task.py b/apps/api/plane/bgtasks/work_item_link_task.py index b14dc75bcb1..442396c7f02 100644 --- a/apps/api/plane/bgtasks/work_item_link_task.py +++ b/apps/api/plane/bgtasks/work_item_link_task.py @@ -4,6 +4,7 @@ # Python imports import logging +import socket # Third party imports from celery import shared_task @@ -26,7 +27,7 @@ def validate_url_ip(url: str) -> None: """ Validate that a URL doesn't point to a private/internal IP address. - Only checks if the hostname is a direct IP address. + Resolves hostnames to IPs before checking. Args: url: The URL to validate @@ -38,17 +39,31 @@ def validate_url_ip(url: str) -> None: hostname = parsed.hostname if not hostname: - return + raise ValueError("Invalid URL: No hostname found") + + # Only allow HTTP and HTTPS to prevent file://, gopher://, etc. + if parsed.scheme not in ("http", "https"): + raise ValueError("Invalid URL scheme. Only HTTP and HTTPS are allowed") + + # Resolve hostname to IP addresses — this catches domain names that + # point to internal IPs (e.g. attacker.com -> 169.254.169.254) try: - ip = ipaddress.ip_address(hostname) - except ValueError: - # Not an IP address (it's a domain name), nothing to check here - return + addr_info = socket.getaddrinfo(hostname, None) + except socket.gaierror: + raise ValueError("Hostname could not be resolved") + + if not addr_info: + raise ValueError("No IP addresses found for the hostname") - # It IS an IP address - check if it's private/internal - if ip.is_private or ip.is_loopback or ip.is_reserved: - raise ValueError("Access to private/internal networks is not allowed") + # Check every resolved IP against blocked ranges to prevent SSRF + for addr in addr_info: + ip = ipaddress.ip_address(addr[4][0]) + if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local: + raise ValueError("Access to private/internal networks is not allowed") + + +MAX_REDIRECTS = 5 def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]: @@ -74,11 +89,23 @@ def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]: validate_url_ip(final_url) try: - response = requests.get(final_url, headers=headers, timeout=1) - final_url = response.url # Get the final URL after any redirects - - # check for redirected url also - validate_url_ip(final_url) + # Manually follow redirects to validate each URL before requesting + redirect_count = 0 + response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False) + + while response.is_redirect and redirect_count < MAX_REDIRECTS: + redirect_url = response.headers.get("Location") + if not redirect_url: + break + # Resolve relative redirects against current URL + final_url = urljoin(final_url, redirect_url) + # Validate the redirect target BEFORE making the request + validate_url_ip(final_url) + redirect_count += 1 + response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False) + + if redirect_count >= MAX_REDIRECTS: + logger.warning(f"Too many redirects for URL: {url}") soup = BeautifulSoup(response.content, "html.parser") title_tag = soup.find("title") @@ -134,7 +161,9 @@ def find_favicon_url(soup: Optional[BeautifulSoup], base_url: str) -> Optional[s for selector in favicon_selectors: favicon_tag = soup.select_one(selector) if favicon_tag and favicon_tag.get("href"): - return urljoin(base_url, favicon_tag["href"]) + favicon_href = urljoin(base_url, favicon_tag["href"]) + validate_url_ip(favicon_href) + return favicon_href # Fallback to /favicon.ico parsed_url = urlparse(base_url) @@ -142,7 +171,9 @@ def find_favicon_url(soup: Optional[BeautifulSoup], base_url: str) -> Optional[s # Check if fallback exists try: - response = requests.head(fallback_url, timeout=2) + validate_url_ip(fallback_url) + response = requests.head(fallback_url, timeout=2, allow_redirects=False) + if response.status_code == 200: return fallback_url except requests.RequestException as e: @@ -173,6 +204,8 @@ def fetch_and_encode_favicon( "favicon_base64": f"data:image/svg+xml;base64,{DEFAULT_FAVICON}", } + validate_url_ip(favicon_url) + response = requests.get(favicon_url, headers=headers, timeout=1) # Get content type