Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 219 additions & 36 deletions benchmarks/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,33 @@

import argparse
import asyncio
import platform
import sys
from pathlib import Path
from typing import Optional

# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))

# Import after path setup
from tests.test_benchmarks import TestRPCBenchmarks # noqa: E402
from example.shared import ExampleExtensionBase # Corrected import path

# Try to import tabulate globally
try:
from tabulate import tabulate
TABULATE_AVAILABLE = True
except ImportError:
TABULATE_AVAILABLE = False
def tabulate(*args, **kwargs):
return "[tabulate not available]"

# pyright: reportMissingImports=false


async def run_benchmarks(
quick: bool = False, no_torch: bool = False, no_gpu: bool = False, torch_mode: str = "both"
quick: bool = False, no_torch: bool = False, no_gpu: bool = False, torch_mode: str = "both",
):
"""Run all benchmarks with the specified options."""

Expand Down Expand Up @@ -55,10 +69,26 @@ async def run_benchmarks(
import numpy as np
from shared import ExampleExtension, DatabaseSingleton
from pyisolate import local_execution

from pyisolate._internal.gpu_utils import (
maybe_serialize_tensor,
maybe_deserialize_tensor,
maybe_to_dlpack,
maybe_from_dlpack,
)
import os
try:
import torch
TORCH_AVAILABLE = True
# Set CUDA or XPU device if specified in environment
device_idx = os.environ.get("PYISOLATE_CUDA_DEVICE")
if device_idx is not None:
try:
if hasattr(torch, "xpu") and torch.xpu.is_available():
torch.xpu.set_device(int(device_idx))
elif torch.cuda.is_available():
torch.cuda.set_device(int(device_idx))
except Exception as e:
print(f"[Extension] Failed to set device: {e}")
except ImportError:
TORCH_AVAILABLE = False

Expand Down Expand Up @@ -191,8 +221,8 @@ def example_entrypoint():
test_instance.extensions = await test_instance.load_extensions(extensions_to_create)

# Assign extension references based on what was created
test_instance.benchmark_ext = None
test_instance.benchmark_ext_shared = None
test_instance.benchmark_ext: Optional[ExampleExtensionBase] = None
test_instance.benchmark_ext_shared: Optional[ExampleExtensionBase] = None

for i, ext_config in enumerate(extensions_to_create):
if ext_config["name"] == "benchmark_ext":
Expand Down Expand Up @@ -245,7 +275,6 @@ def example_entrypoint():
("image_8k", (3, 8192, 8192)), # 201M elements, ~800MB (8K RGB image)
]

# Create CPU tensors and add to test data
for name, size in tensor_specs:
try:
print(f" Creating {name} tensor {size}...")
Expand Down Expand Up @@ -275,6 +304,22 @@ def example_entrypoint():
except RuntimeError as gpu_e:
print(f" GPU tensor failed: {gpu_e}")

# --- XPU support: create XPU tensors if available ---
if not no_gpu and hasattr(torch, "xpu") and torch.xpu.is_available():
try:
if name == "image_8k":
print(f" Creating XPU version of {name} (may use significant VRAM)...")
with torch.inference_mode():
xpu_tensor = tensor.to("xpu")
test_data.append((f"{name}_xpu", xpu_tensor))
print(" XPU tensor created successfully")
else:
with torch.inference_mode():
xpu_tensor = tensor.to("xpu")
test_data.append((f"{name}_xpu", xpu_tensor))
print(" XPU tensor created successfully")
except RuntimeError as xpu_e:
print(f" XPU tensor failed: {xpu_e}")
except RuntimeError as e:
print(f" Skipping {name}: {e}")

Expand Down Expand Up @@ -320,6 +365,16 @@ def example_entrypoint():
print(" GPU tensor created successfully")
except RuntimeError as gpu_e:
print(f" GPU tensor failed: {gpu_e}")
# --- XPU support: create XPU tensor for 6GB model if available ---
if not no_gpu and hasattr(torch, "xpu") and torch.xpu.is_available():
try:
print(" Creating XPU version of model_6gb (may use significant VRAM)...")
with torch.inference_mode():
xpu_tensor = model_6gb_tensor.to("xpu")
test_data.append(("model_6gb_xpu", xpu_tensor))
print(" XPU tensor created successfully")
except RuntimeError as xpu_e:
print(f" XPU tensor failed: {xpu_e}")
except RuntimeError as e:
print(f" Skipping model_6gb: {e}")

Expand Down Expand Up @@ -363,8 +418,6 @@ async def benchmark_func(data=data):
# Stop the extension to clean up the stuck process
try:
test_instance.manager.stop_extension("benchmark_ext")
print(" Extension stopped successfully")
# Mark as None so we don't try to use it again
test_instance.benchmark_ext = None
except Exception as stop_e:
print(f" Failed to stop extension: {stop_e}")
Expand Down Expand Up @@ -407,8 +460,6 @@ async def benchmark_func_shared(data=data):
# Stop the extension to clean up the stuck process
try:
test_instance.manager.stop_extension("benchmark_ext_shared")
print(" Extension stopped successfully")
# Mark as None so we don't try to use it again
test_instance.benchmark_ext_shared = None
except Exception as stop_e:
print(f" Failed to stop extension: {stop_e}")
Expand All @@ -429,37 +480,47 @@ async def benchmark_func_shared(data=data):

# Print successful results
if results:
from tabulate import tabulate

print("\nSuccessful Benchmarks:")
headers = ["Test", "Mean (ms)", "Std Dev (ms)", "Min (ms)", "Max (ms)"]
table_data = []

for name, result in results.items():
table_data.append(
[
name,
f"{result.mean * 1000:.2f}",
f"{result.stdev * 1000:.2f}",
f"{result.min_time * 1000:.2f}",
f"{result.max_time * 1000:.2f}",
]
)

print(tabulate(table_data, headers=headers, tablefmt="grid"))

# Show fastest result for reference
baseline = min(r.mean for r in results.values())
print(f"\nFastest result: {baseline * 1000:.2f}ms")
else:
print("\nNo successful benchmark results!")
if TABULATE_AVAILABLE:
print("\nSuccessful Benchmarks:")
headers = ["Test", "Mean (ms)", "Std Dev (ms)", "Min (ms)", "Max (ms)"]
table_data = []

for name, result in results.items():
table_data.append(
[
name,
f"{result.mean * 1000:.2f}",
f"{result.stdev * 1000:.2f}",
f"{result.min_time * 1000:.2f}",
f"{result.max_time * 1000:.2f}",
]
)

print(tabulate(table_data, headers=headers, tablefmt="grid"))

# Show fastest result for reference
baseline = min(r.mean for r in results.values())
print(f"\nFastest result: {baseline * 1000:.2f}ms")
else:
print("\nSuccessful Benchmarks:")
for name, result in results.items():
print(
f" {name}: Mean={result.mean_time * 1000:.2f}ms, "
f"Std={result.stdev * 1000:.2f}ms, Min={result.min_time * 1000:.2f}ms, "
f"Max={result.max_time * 1000:.2f}ms"
)

# Print failed tests
if failed_tests:
print("\nFailed Tests:")
failed_headers = ["Test", "Error"]
failed_data = [[name, error] for name, error in failed_tests.items()]
print(tabulate(failed_data, headers=failed_headers, tablefmt="grid"))
if TABULATE_AVAILABLE:
print(tabulate(failed_data, headers=failed_headers, tablefmt="grid"))
elif failed_data:
print("[tabulate not available] Failed tests:")
for row in failed_data:
print(row)

# Print skipped tests
if skipped_tests:
Expand Down Expand Up @@ -520,24 +581,146 @@ def main():
help="Which torch mode to test: both, standard (no share_torch), or shared (share_torch only)",
)

parser.add_argument(
"--backend",
choices=["auto", "cuda", "xpu"],
default="auto",
help="Device backend to use: auto (default), cuda (NVIDIA/AMD ROCm), or xpu (Intel oneAPI)",
)

parser.add_argument(
"--device",
type=str,
default=None,
help="Device index (int) or 'cpu' to force CPU mode",
)

args = parser.parse_args()

# Check dependencies
try:
import numpy # noqa: F401
import psutil # noqa: F401
import tabulate # noqa: F401
# import tabulate # noqa: F401 # This line is now handled globally
except ImportError as e:
print(f"Missing required dependency: {e}")
print("Please install benchmark dependencies with:")
print(" pip install -e .[bench]")
return 1

# Set device and backend
backend = args.backend
device_arg = args.device
if device_arg is not None and str(device_arg).lower() == "cpu":
backend = "cpu"
device_idx = None
print("[PyIsolate] Forcing CPU mode due to --device=cpu")
args.no_gpu = True
elif device_arg is not None:
try:
device_idx = int(device_arg)
except ValueError:
print(f"Invalid --device value: {device_arg}. Must be integer or 'cpu'.")
sys.exit(1)
else:
device_idx = None

device_str = "cpu"
device_name = "cpu"
backend_used = "cpu"
try:
import torch
cuda_available = torch.cuda.is_available()
xpu_available = hasattr(torch, "xpu") and torch.xpu.is_available()
# Auto backend selection
if backend == "auto":
if cuda_available:
backend = "cuda"
elif xpu_available:
backend = "xpu"
else:
backend = "cpu"

if backend == "cuda":
if not cuda_available:
print("[PyIsolate] CUDA backend requested but not available. Exiting.")
sys.exit(1)
# Only check for ROCm on Linux
if platform.system() == "Linux":
torch_version = getattr(torch, 'version', None)
hip_version = getattr(torch_version, 'hip', None) if torch_version else None
if hip_version is not None:
print("[PyIsolate] ROCm (AMD) backend detected on Linux.")
# On Windows, just use CUDA if available
if device_idx is not None:
torch.cuda.set_device(device_idx)
device_str = f"cuda{device_idx}"
device_name = torch.cuda.get_device_name(device_idx)
else:
device_idx = torch.cuda.current_device()
device_str = f"cuda{device_idx}"
device_name = torch.cuda.get_device_name(device_idx)
backend_used = "cuda"
print(f"[PyIsolate] Using CUDA device {device_idx}: {device_name}")

elif backend == "xpu":
if not xpu_available:
print("[PyIsolate] XPU backend requested but not available. Exiting.")
sys.exit(1)
if device_idx is not None:
torch.xpu.set_device(device_idx)
device_str = f"xpu{device_idx}"
device_name = (
torch.xpu.get_device_name(device_idx)
if hasattr(torch.xpu, "get_device_name")
else "Intel XPU"
)
else:
device_idx = torch.xpu.current_device()
device_str = f"xpu{device_idx}"
device_name = (
torch.xpu.get_device_name(device_idx)
if hasattr(torch.xpu, "get_device_name")
else "Intel XPU"
)
backend_used = "xpu"
print(f"[PyIsolate] Using Intel XPU device {device_idx}: {device_name}")

elif backend == "cpu":
# CPU fallback
device_str = "cpu"
device_name = "cpu"
backend_used = "cpu"
print(f"[PyIsolate] Using CPU backend.")
else:
print("[PyIsolate] No supported GPU backend available, exiting.")
sys.exit(1)

except Exception as e:
print(f"[PyIsolate] Error setting device/backend: {e}")

# Generate results filename with backend and device info
import datetime
import socket
computer = socket.gethostname()
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
device_tag = f"{backend_used}{device_idx if device_idx is not None else 0}"
if device_str != "cpu":
safe_device_name = device_name.replace(" ", "").replace("/", "-")
device_tag = f"{backend_used}{device_idx if device_idx is not None else 0}-{safe_device_name}"
results_filename = f"benchmark_results_{computer}_{device_tag}_{timestamp}.txt"
print(f"\n[PyIsolate] Results will be saved to: {results_filename}")

# In main(), after parsing args and determining device_idx:
if device_idx is not None:
import os
os.environ["PYISOLATE_CUDA_DEVICE"] = str(device_idx)

# Run benchmarks
try:
return asyncio.run(
run_benchmarks(
quick=args.quick, no_torch=args.no_torch, no_gpu=args.no_gpu, torch_mode=args.torch_mode
quick=args.quick, no_torch=args.no_torch, no_gpu=args.no_gpu, torch_mode=args.torch_mode,
)
)
except KeyboardInterrupt:
Expand Down
Loading