|
2 | 2 | import time |
3 | 3 | import requests |
4 | 4 | import sys |
5 | | -import os |
| 5 | +import pytest |
6 | 6 |
|
7 | 7 | SERVER_PORT = 5000 |
8 | 8 | BASE_URL = f"http://localhost:{SERVER_PORT}" |
9 | 9 |
|
| 10 | + |
10 | 11 | def wait_for_server(): |
11 | 12 | retries = 30 |
12 | 13 | while retries > 0: |
13 | 14 | try: |
14 | 15 | # The root path 404s, but connection refused means it's not up |
15 | | - requests.get(BASE_URL) |
| 16 | + requests.get(BASE_URL, timeout=1) |
16 | 17 | return True |
17 | | - except requests.exceptions.ConnectionError: |
| 18 | + except (requests.exceptions.ConnectionError, requests.exceptions.Timeout): |
18 | 19 | time.sleep(0.5) |
19 | 20 | retries -= 1 |
20 | 21 | return False |
21 | 22 |
|
| 23 | + |
22 | 24 | def test_ssrf(): |
23 | 25 | print("Starting server...") |
24 | | - # Start server in background |
25 | | - # We use sys.executable to ensure we use the same python interpreter |
26 | | - process = subprocess.Popen([sys.executable, "server.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| 26 | + # Start server in background with different port |
| 27 | + env = {**subprocess.os.environ, 'PORT': str(SERVER_PORT)} |
| 28 | + process = subprocess.Popen( |
| 29 | + [sys.executable, "-c", f"import server; server.app.run(host='localhost', port={SERVER_PORT})"], |
| 30 | + stdout=subprocess.PIPE, |
| 31 | + stderr=subprocess.PIPE, |
| 32 | + ) |
27 | 33 |
|
28 | 34 | try: |
29 | 35 | if not wait_for_server(): |
30 | | - print("Server failed to start") |
31 | | - sys.exit(1) |
| 36 | + stdout, stderr = process.communicate() |
| 37 | + print(f"Server failed to start. stdout: {stdout.decode()}, stderr: {stderr.decode()}") |
| 38 | + pytest.fail("Server failed to start within timeout") |
32 | 39 |
|
33 | 40 | print("Server started. Running tests...") |
34 | 41 |
|
35 | | - # Test 1: Proxy to google.com (Valid) |
36 | | - # We might not have internet access in some sandboxes, but let's assume we do or handle it. |
37 | | - # If we don't have internet, this might fail with connection error, but the status code won't be 400 from our validation. |
38 | | - print("Testing valid external URL...") |
39 | | - try: |
40 | | - resp = requests.get(f"{BASE_URL}/api/proxy/http://example.com") |
41 | | - print(f"External URL Status: {resp.status_code}") |
42 | | - # It might be 200 or 500 depending on network, but we are looking for behavior. |
43 | | - except Exception as e: |
44 | | - print(f"External URL Request failed: {e}") |
45 | | - |
46 | | - # Test 2: SSRF to localhost (Vulnerability) |
47 | | - # We try to proxy to the server itself. |
48 | | - # Since the server has no root route, accessing http://127.0.0.1:5000/ should return 404. |
49 | | - # If the proxy works, it will return that 404 (or 500 if recursive blowup). |
50 | | - # If blocked, it should return 400 (or whatever we decide). |
| 42 | + # Test 1: SSRF to localhost (should be blocked) |
51 | 43 | print("Testing SSRF to localhost...") |
52 | 44 | target_url = f"http://127.0.0.1:{SERVER_PORT}/" |
53 | 45 | proxy_url = f"{BASE_URL}/api/proxy/{target_url}" |
54 | 46 |
|
55 | 47 | resp = requests.get(proxy_url) |
56 | 48 | print(f"SSRF Status: {resp.status_code}") |
| 49 | + print(f"SSRF Response: {resp.text}") |
57 | 50 |
|
58 | | - # In the vulnerable state, we expect the code to execute the request. |
59 | | - # Since 127.0.0.1:5000/ returns 404, the proxy will return 404. |
60 | | - if resp.status_code == 404: |
61 | | - print("VULNERABILITY CONFIRMED: Proxied request to localhost (received 404 from internal).") |
62 | | - elif resp.status_code == 200: |
63 | | - print("VULNERABILITY CONFIRMED: Proxied request to localhost (received 200).") |
64 | | - elif resp.status_code == 500: |
65 | | - # 500 could mean it tried and failed (e.g. max retries), which still means it tried. |
66 | | - print("VULNERABILITY CONFIRMED: Proxied request to localhost (received 500).") |
67 | | - elif resp.status_code == 400 or resp.status_code == 403: |
68 | | - print("SECURE: Request to localhost blocked.") |
69 | | - else: |
70 | | - print(f"Unexpected status: {resp.status_code}") |
| 51 | + # Should return 403 (Forbidden) for blocked requests |
| 52 | + assert resp.status_code == 403, f"Expected 403 for localhost, got {resp.status_code}" |
| 53 | + assert "forbidden" in resp.text.lower() or "private" in resp.text.lower(), "Expected blocking error message" |
| 54 | + |
| 55 | + # Test 2: SSRF to private IP (should be blocked) |
| 56 | + print("Testing SSRF to private IP...") |
| 57 | + resp = requests.get(f"{BASE_URL}/api/proxy/http://192.168.1.1/") |
| 58 | + print(f"Private IP Status: {resp.status_code}") |
| 59 | + assert resp.status_code == 403, f"Expected 403 for private IP, got {resp.status_code}" |
| 60 | + |
| 61 | + # Test 3: Valid external URL format (will fail to connect, but shouldn't be blocked by SSRF) |
| 62 | + print("Testing valid external URL format...") |
| 63 | + resp = requests.get(f"{BASE_URL}/api/proxy/http://example.com/") |
| 64 | + print(f"External URL Status: {resp.status_code}") |
| 65 | + # Should NOT return 403 - the SSRF check should pass |
| 66 | + assert resp.status_code != 403, "External URL should not be blocked by SSRF check" |
71 | 67 |
|
72 | 68 | finally: |
73 | 69 | print("Stopping server...") |
74 | 70 | process.terminate() |
75 | | - process.wait() |
| 71 | + try: |
| 72 | + process.wait(timeout=5) |
| 73 | + except subprocess.TimeoutExpired: |
| 74 | + process.kill() |
| 75 | + process.wait() |
| 76 | + |
76 | 77 |
|
77 | 78 | if __name__ == "__main__": |
78 | 79 | test_ssrf() |
0 commit comments