Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 49 additions & 16 deletions apps/api/plane/bgtasks/work_item_link_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

# Python imports
import logging
import socket

# Third party imports
from celery import shared_task
Expand All @@ -26,7 +27,7 @@
def validate_url_ip(url: str) -> None:
"""
Validate that a URL doesn't point to a private/internal IP address.
Only checks if the hostname is a direct IP address.
Resolves hostnames to IPs before checking.

Args:
url: The URL to validate
Expand All @@ -38,17 +39,31 @@ def validate_url_ip(url: str) -> None:
hostname = parsed.hostname

if not hostname:
return
raise ValueError("Invalid URL: No hostname found")

# Only allow HTTP and HTTPS to prevent file://, gopher://, etc.
if parsed.scheme not in ("http", "https"):
raise ValueError("Invalid URL scheme. Only HTTP and HTTPS are allowed")

# Resolve hostname to IP addresses — this catches domain names that
# point to internal IPs (e.g. attacker.com -> 169.254.169.254)

try:
ip = ipaddress.ip_address(hostname)
except ValueError:
# Not an IP address (it's a domain name), nothing to check here
return
addr_info = socket.getaddrinfo(hostname, None)
except socket.gaierror:
raise ValueError("Hostname could not be resolved")

if not addr_info:
raise ValueError("No IP addresses found for the hostname")

# It IS an IP address - check if it's private/internal
if ip.is_private or ip.is_loopback or ip.is_reserved:
raise ValueError("Access to private/internal networks is not allowed")
# Check every resolved IP against blocked ranges to prevent SSRF
for addr in addr_info:
ip = ipaddress.ip_address(addr[4][0])
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
raise ValueError("Access to private/internal networks is not allowed")


MAX_REDIRECTS = 5


def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]:
Expand All @@ -74,11 +89,23 @@ def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]:
validate_url_ip(final_url)

try:
response = requests.get(final_url, headers=headers, timeout=1)
final_url = response.url # Get the final URL after any redirects

# check for redirected url also
validate_url_ip(final_url)
# Manually follow redirects to validate each URL before requesting
redirect_count = 0
response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False)

while response.is_redirect and redirect_count < MAX_REDIRECTS:
redirect_url = response.headers.get("Location")
if not redirect_url:
break
# Resolve relative redirects against current URL
final_url = urljoin(final_url, redirect_url)
# Validate the redirect target BEFORE making the request
validate_url_ip(final_url)
redirect_count += 1
response = requests.get(final_url, headers=headers, timeout=1, allow_redirects=False)

if redirect_count >= MAX_REDIRECTS:
logger.warning(f"Too many redirects for URL: {url}")

soup = BeautifulSoup(response.content, "html.parser")
title_tag = soup.find("title")
Expand Down Expand Up @@ -134,15 +161,19 @@ def find_favicon_url(soup: Optional[BeautifulSoup], base_url: str) -> Optional[s
for selector in favicon_selectors:
favicon_tag = soup.select_one(selector)
if favicon_tag and favicon_tag.get("href"):
return urljoin(base_url, favicon_tag["href"])
favicon_href = urljoin(base_url, favicon_tag["href"])
validate_url_ip(favicon_href)
return favicon_href

# Fallback to /favicon.ico
parsed_url = urlparse(base_url)
fallback_url = f"{parsed_url.scheme}://{parsed_url.netloc}/favicon.ico"

# Check if fallback exists
try:
response = requests.head(fallback_url, timeout=2)
validate_url_ip(fallback_url)
response = requests.head(fallback_url, timeout=2, allow_redirects=False)

if response.status_code == 200:
return fallback_url
except requests.RequestException as e:
Expand Down Expand Up @@ -173,6 +204,8 @@ def fetch_and_encode_favicon(
"favicon_base64": f"data:image/svg+xml;base64,{DEFAULT_FAVICON}",
}

validate_url_ip(favicon_url)

response = requests.get(favicon_url, headers=headers, timeout=1)

# Get content type
Expand Down
Loading