diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py index 8b3231b..4284404 100644 --- a/benchmarks/benchmark.py +++ b/benchmarks/benchmark.py @@ -13,8 +13,10 @@ import argparse import asyncio +import platform import sys from pathlib import Path +from typing import Optional # Add project root to path project_root = Path(__file__).parent.parent @@ -22,10 +24,22 @@ # Import after path setup from tests.test_benchmarks import TestRPCBenchmarks # noqa: E402 +from example.shared import ExampleExtensionBase # Corrected import path + +# Try to import tabulate globally +try: + from tabulate import tabulate + TABULATE_AVAILABLE = True +except ImportError: + TABULATE_AVAILABLE = False + def tabulate(*args, **kwargs): + return "[tabulate not available]" + +# pyright: reportMissingImports=false async def run_benchmarks( - quick: bool = False, no_torch: bool = False, no_gpu: bool = False, torch_mode: str = "both" + quick: bool = False, no_torch: bool = False, no_gpu: bool = False, torch_mode: str = "both", ): """Run all benchmarks with the specified options.""" @@ -55,10 +69,26 @@ async def run_benchmarks( import numpy as np from shared import ExampleExtension, DatabaseSingleton from pyisolate import local_execution - +from pyisolate._internal.gpu_utils import ( + maybe_serialize_tensor, + maybe_deserialize_tensor, + maybe_to_dlpack, + maybe_from_dlpack, +) +import os try: import torch TORCH_AVAILABLE = True + # Set CUDA or XPU device if specified in environment + device_idx = os.environ.get("PYISOLATE_CUDA_DEVICE") + if device_idx is not None: + try: + if hasattr(torch, "xpu") and torch.xpu.is_available(): + torch.xpu.set_device(int(device_idx)) + elif torch.cuda.is_available(): + torch.cuda.set_device(int(device_idx)) + except Exception as e: + print(f"[Extension] Failed to set device: {e}") except ImportError: TORCH_AVAILABLE = False @@ -191,8 +221,8 @@ def example_entrypoint(): test_instance.extensions = await test_instance.load_extensions(extensions_to_create) # Assign extension references based on what was created - test_instance.benchmark_ext = None - test_instance.benchmark_ext_shared = None + test_instance.benchmark_ext: Optional[ExampleExtensionBase] = None + test_instance.benchmark_ext_shared: Optional[ExampleExtensionBase] = None for i, ext_config in enumerate(extensions_to_create): if ext_config["name"] == "benchmark_ext": @@ -245,7 +275,6 @@ def example_entrypoint(): ("image_8k", (3, 8192, 8192)), # 201M elements, ~800MB (8K RGB image) ] - # Create CPU tensors and add to test data for name, size in tensor_specs: try: print(f" Creating {name} tensor {size}...") @@ -275,6 +304,22 @@ def example_entrypoint(): except RuntimeError as gpu_e: print(f" GPU tensor failed: {gpu_e}") + # --- XPU support: create XPU tensors if available --- + if not no_gpu and hasattr(torch, "xpu") and torch.xpu.is_available(): + try: + if name == "image_8k": + print(f" Creating XPU version of {name} (may use significant VRAM)...") + with torch.inference_mode(): + xpu_tensor = tensor.to("xpu") + test_data.append((f"{name}_xpu", xpu_tensor)) + print(" XPU tensor created successfully") + else: + with torch.inference_mode(): + xpu_tensor = tensor.to("xpu") + test_data.append((f"{name}_xpu", xpu_tensor)) + print(" XPU tensor created successfully") + except RuntimeError as xpu_e: + print(f" XPU tensor failed: {xpu_e}") except RuntimeError as e: print(f" Skipping {name}: {e}") @@ -320,6 +365,16 @@ def example_entrypoint(): print(" GPU tensor created successfully") except RuntimeError as gpu_e: print(f" GPU tensor failed: {gpu_e}") + # --- XPU support: create XPU tensor for 6GB model if available --- + if not no_gpu and hasattr(torch, "xpu") and torch.xpu.is_available(): + try: + print(" Creating XPU version of model_6gb (may use significant VRAM)...") + with torch.inference_mode(): + xpu_tensor = model_6gb_tensor.to("xpu") + test_data.append(("model_6gb_xpu", xpu_tensor)) + print(" XPU tensor created successfully") + except RuntimeError as xpu_e: + print(f" XPU tensor failed: {xpu_e}") except RuntimeError as e: print(f" Skipping model_6gb: {e}") @@ -363,8 +418,6 @@ async def benchmark_func(data=data): # Stop the extension to clean up the stuck process try: test_instance.manager.stop_extension("benchmark_ext") - print(" Extension stopped successfully") - # Mark as None so we don't try to use it again test_instance.benchmark_ext = None except Exception as stop_e: print(f" Failed to stop extension: {stop_e}") @@ -407,8 +460,6 @@ async def benchmark_func_shared(data=data): # Stop the extension to clean up the stuck process try: test_instance.manager.stop_extension("benchmark_ext_shared") - print(" Extension stopped successfully") - # Mark as None so we don't try to use it again test_instance.benchmark_ext_shared = None except Exception as stop_e: print(f" Failed to stop extension: {stop_e}") @@ -429,37 +480,47 @@ async def benchmark_func_shared(data=data): # Print successful results if results: - from tabulate import tabulate - - print("\nSuccessful Benchmarks:") - headers = ["Test", "Mean (ms)", "Std Dev (ms)", "Min (ms)", "Max (ms)"] - table_data = [] - - for name, result in results.items(): - table_data.append( - [ - name, - f"{result.mean * 1000:.2f}", - f"{result.stdev * 1000:.2f}", - f"{result.min_time * 1000:.2f}", - f"{result.max_time * 1000:.2f}", - ] - ) - - print(tabulate(table_data, headers=headers, tablefmt="grid")) - - # Show fastest result for reference - baseline = min(r.mean for r in results.values()) - print(f"\nFastest result: {baseline * 1000:.2f}ms") - else: - print("\nNo successful benchmark results!") + if TABULATE_AVAILABLE: + print("\nSuccessful Benchmarks:") + headers = ["Test", "Mean (ms)", "Std Dev (ms)", "Min (ms)", "Max (ms)"] + table_data = [] + + for name, result in results.items(): + table_data.append( + [ + name, + f"{result.mean * 1000:.2f}", + f"{result.stdev * 1000:.2f}", + f"{result.min_time * 1000:.2f}", + f"{result.max_time * 1000:.2f}", + ] + ) + + print(tabulate(table_data, headers=headers, tablefmt="grid")) + + # Show fastest result for reference + baseline = min(r.mean for r in results.values()) + print(f"\nFastest result: {baseline * 1000:.2f}ms") + else: + print("\nSuccessful Benchmarks:") + for name, result in results.items(): + print( + f" {name}: Mean={result.mean_time * 1000:.2f}ms, " + f"Std={result.stdev * 1000:.2f}ms, Min={result.min_time * 1000:.2f}ms, " + f"Max={result.max_time * 1000:.2f}ms" + ) # Print failed tests if failed_tests: print("\nFailed Tests:") failed_headers = ["Test", "Error"] failed_data = [[name, error] for name, error in failed_tests.items()] - print(tabulate(failed_data, headers=failed_headers, tablefmt="grid")) + if TABULATE_AVAILABLE: + print(tabulate(failed_data, headers=failed_headers, tablefmt="grid")) + elif failed_data: + print("[tabulate not available] Failed tests:") + for row in failed_data: + print(row) # Print skipped tests if skipped_tests: @@ -520,24 +581,146 @@ def main(): help="Which torch mode to test: both, standard (no share_torch), or shared (share_torch only)", ) + parser.add_argument( + "--backend", + choices=["auto", "cuda", "xpu"], + default="auto", + help="Device backend to use: auto (default), cuda (NVIDIA/AMD ROCm), or xpu (Intel oneAPI)", + ) + + parser.add_argument( + "--device", + type=str, + default=None, + help="Device index (int) or 'cpu' to force CPU mode", + ) + args = parser.parse_args() # Check dependencies try: import numpy # noqa: F401 import psutil # noqa: F401 - import tabulate # noqa: F401 + # import tabulate # noqa: F401 # This line is now handled globally except ImportError as e: print(f"Missing required dependency: {e}") print("Please install benchmark dependencies with:") print(" pip install -e .[bench]") return 1 + # Set device and backend + backend = args.backend + device_arg = args.device + if device_arg is not None and str(device_arg).lower() == "cpu": + backend = "cpu" + device_idx = None + print("[PyIsolate] Forcing CPU mode due to --device=cpu") + args.no_gpu = True + elif device_arg is not None: + try: + device_idx = int(device_arg) + except ValueError: + print(f"Invalid --device value: {device_arg}. Must be integer or 'cpu'.") + sys.exit(1) + else: + device_idx = None + + device_str = "cpu" + device_name = "cpu" + backend_used = "cpu" + try: + import torch + cuda_available = torch.cuda.is_available() + xpu_available = hasattr(torch, "xpu") and torch.xpu.is_available() + # Auto backend selection + if backend == "auto": + if cuda_available: + backend = "cuda" + elif xpu_available: + backend = "xpu" + else: + backend = "cpu" + + if backend == "cuda": + if not cuda_available: + print("[PyIsolate] CUDA backend requested but not available. Exiting.") + sys.exit(1) + # Only check for ROCm on Linux + if platform.system() == "Linux": + torch_version = getattr(torch, 'version', None) + hip_version = getattr(torch_version, 'hip', None) if torch_version else None + if hip_version is not None: + print("[PyIsolate] ROCm (AMD) backend detected on Linux.") + # On Windows, just use CUDA if available + if device_idx is not None: + torch.cuda.set_device(device_idx) + device_str = f"cuda{device_idx}" + device_name = torch.cuda.get_device_name(device_idx) + else: + device_idx = torch.cuda.current_device() + device_str = f"cuda{device_idx}" + device_name = torch.cuda.get_device_name(device_idx) + backend_used = "cuda" + print(f"[PyIsolate] Using CUDA device {device_idx}: {device_name}") + + elif backend == "xpu": + if not xpu_available: + print("[PyIsolate] XPU backend requested but not available. Exiting.") + sys.exit(1) + if device_idx is not None: + torch.xpu.set_device(device_idx) + device_str = f"xpu{device_idx}" + device_name = ( + torch.xpu.get_device_name(device_idx) + if hasattr(torch.xpu, "get_device_name") + else "Intel XPU" + ) + else: + device_idx = torch.xpu.current_device() + device_str = f"xpu{device_idx}" + device_name = ( + torch.xpu.get_device_name(device_idx) + if hasattr(torch.xpu, "get_device_name") + else "Intel XPU" + ) + backend_used = "xpu" + print(f"[PyIsolate] Using Intel XPU device {device_idx}: {device_name}") + + elif backend == "cpu": + # CPU fallback + device_str = "cpu" + device_name = "cpu" + backend_used = "cpu" + print(f"[PyIsolate] Using CPU backend.") + else: + print("[PyIsolate] No supported GPU backend available, exiting.") + sys.exit(1) + + except Exception as e: + print(f"[PyIsolate] Error setting device/backend: {e}") + + # Generate results filename with backend and device info + import datetime + import socket + computer = socket.gethostname() + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + device_tag = f"{backend_used}{device_idx if device_idx is not None else 0}" + if device_str != "cpu": + safe_device_name = device_name.replace(" ", "").replace("/", "-") + device_tag = f"{backend_used}{device_idx if device_idx is not None else 0}-{safe_device_name}" + results_filename = f"benchmark_results_{computer}_{device_tag}_{timestamp}.txt" + print(f"\n[PyIsolate] Results will be saved to: {results_filename}") + + # In main(), after parsing args and determining device_idx: + if device_idx is not None: + import os + os.environ["PYISOLATE_CUDA_DEVICE"] = str(device_idx) + # Run benchmarks try: return asyncio.run( run_benchmarks( - quick=args.quick, no_torch=args.no_torch, no_gpu=args.no_gpu, torch_mode=args.torch_mode + quick=args.quick, no_torch=args.no_torch, no_gpu=args.no_gpu, torch_mode=args.torch_mode, ) ) except KeyboardInterrupt: diff --git a/benchmarks/memory_benchmark.py b/benchmarks/memory_benchmark.py index f831f4c..578b84e 100644 --- a/benchmarks/memory_benchmark.py +++ b/benchmarks/memory_benchmark.py @@ -52,6 +52,27 @@ from tests.test_integration import IntegrationTestBase +# 1. Device detection helpers (add after imports) +def detect_available_backends(): + import torch + backends = ["cpu"] + cuda_available = torch.cuda.is_available() + xpu_available = hasattr(torch, "xpu") and torch.xpu.is_available() + rocm_available = False + if cuda_available: + torch_version = getattr(torch, 'version', None) + hip_version = getattr(torch_version, 'hip', None) if torch_version else None + if hip_version is not None: + rocm_available = True + if cuda_available and not rocm_available: + backends.append("cuda") + if rocm_available: + backends.append("rocm") + if xpu_available: + backends.append("xpu") + return backends + + class MemoryTracker: """Tracks memory usage across host and child processes.""" @@ -69,8 +90,8 @@ def __init__(self): # Get the first GPU self.gpu_handle = nvml.nvmlDeviceGetHandleByIndex(0) # Store baseline GPU memory usage - mem_info = nvml.nvmlDeviceGetMemoryInfo(self.gpu_handle) - self.baseline_gpu_memory_mb = mem_info.used / 1024 / 1024 + mem_info = nvml.nvmlDeviceGetMemoryInfo(self.gpu_handle) if nvml is not None else None + self.baseline_gpu_memory_mb = (mem_info.used / 1024 / 1024) if mem_info is not None else 0 print( f"NVML initialized on {self.platform}. " f"Initial GPU memory: {self.baseline_gpu_memory_mb:.1f} MB" @@ -119,7 +140,7 @@ def _get_gpu_memory_windows_fallback(self, memory_info: dict[str, float]) -> dic # Calculate delta from baseline vram_delta = current_used - self.baseline_gpu_memory_mb - memory_info["host_vram_mb"] = max(0, vram_delta) + memory_info["host_vram_mb"] = max(0.0, vram_delta) # Try to get total GPU memory try: @@ -155,52 +176,54 @@ def get_process_tree_pids(self) -> list[int]: def get_memory_usage(self) -> dict[str, float]: """Get current memory usage for host and all child processes.""" memory_info = { - "host_ram_mb": 0, - "children_ram_mb": 0, - "total_ram_mb": 0, - "host_vram_mb": 0, - "total_vram_mb": 0, - "gpu_used_mb": 0, - "gpu_total_mb": 0, - "num_processes": 1, + "host_ram_mb": 0.0, + "children_ram_mb": 0.0, + "total_ram_mb": 0.0, + "host_vram_mb": 0.0, + "total_vram_mb": 0.0, + "gpu_used_mb": 0.0, + "gpu_total_mb": 0.0, + "num_processes": 1.0, # float for type consistency } # Get RAM usage try: # Host process host_info = self.process.memory_info() - memory_info["host_ram_mb"] = host_info.rss / 1024 / 1024 + memory_info["host_ram_mb"] = (host_info.rss or 0) / 1024 / 1024 # Child processes children = self.process.children(recursive=True) - memory_info["num_processes"] = 1 + len(children) + memory_info["num_processes"] = 1.0 + len(children) for child in children: try: child_info = child.memory_info() - memory_info["children_ram_mb"] += child_info.rss / 1024 / 1024 + memory_info["children_ram_mb"] += (child_info.rss or 0) / 1024 / 1024 except psutil.NoSuchProcess: pass - memory_info["total_ram_mb"] = memory_info["host_ram_mb"] + memory_info["children_ram_mb"] + memory_info["total_ram_mb"] = ( + (memory_info["host_ram_mb"] or 0) + + (memory_info["children_ram_mb"] or 0) + ) except Exception as e: print(f"Error getting RAM usage: {e}") # Get GPU memory usage - use total system VRAM since extensions run in separate processes - if self.nvml_initialized and self.gpu_handle: + if self.nvml_initialized and self.gpu_handle and nvml is not None: try: # Get total GPU memory info mem_info = nvml.nvmlDeviceGetMemoryInfo(self.gpu_handle) - current_used_mb = mem_info.used / 1024 / 1024 + current_used_mb = float(mem_info.used or 0) / 1024 / 1024 if mem_info else 0 memory_info["gpu_used_mb"] = current_used_mb - memory_info["gpu_total_mb"] = mem_info.total / 1024 / 1024 + memory_info["gpu_total_mb"] = float(mem_info.total or 0) / 1024 / 1024 if mem_info else 0 memory_info["total_vram_mb"] = current_used_mb # Calculate VRAM usage relative to baseline (captures all processes) - # This is more reliable than per-process tracking, especially on Windows - vram_delta = current_used_mb - self.baseline_gpu_memory_mb - memory_info["host_vram_mb"] = max(0, vram_delta) + vram_delta = current_used_mb - (self.baseline_gpu_memory_mb or 0) + memory_info["host_vram_mb"] = max(0.0, vram_delta) except Exception as e: print(f"Error getting GPU memory usage via NVML: {e}") if self.platform == "Windows": @@ -235,11 +258,11 @@ def get_memory_usage(self) -> dict[str, float]: def reset_baseline(self): """Reset the baseline GPU memory measurement.""" - if self.nvml_initialized and self.gpu_handle: + if self.nvml_initialized and self.gpu_handle and nvml is not None: try: mem_info = nvml.nvmlDeviceGetMemoryInfo(self.gpu_handle) old_baseline = self.baseline_gpu_memory_mb - self.baseline_gpu_memory_mb = mem_info.used / 1024 / 1024 + self.baseline_gpu_memory_mb = float(mem_info.used or 0) / 1024 / 1024 if mem_info else 0 print( f"[DEBUG {self.platform}] Reset baseline from {old_baseline:.1f} MB " f"to {self.baseline_gpu_memory_mb:.1f} MB", @@ -250,7 +273,7 @@ def reset_baseline(self): def __del__(self): """Cleanup NVML on deletion.""" - if self.nvml_initialized: + if self.nvml_initialized and nvml is not None: with contextlib.suppress(Exception): nvml.nvmlShutdown() @@ -333,6 +356,11 @@ class MemoryBenchmarkRunner: def __init__(self, test_base: IntegrationTestBase): self.test_base = test_base + if self.test_base.test_root is None: + raise RuntimeError( + "test_root is not set on test_base. " + "Did you await setup_test_environment() successfully?" + ) self.memory_tracker = MemoryTracker() self.results = [] @@ -348,12 +376,25 @@ async def run_baseline_memory_test(self) -> dict[str, float]: baseline = self.memory_tracker.get_memory_usage() print(f"Baseline: {baseline['total_ram_mb']:.1f} MB RAM, {baseline['total_vram_mb']:.1f} MB VRAM") - if baseline["gpu_total_mb"] > 0: - gpu_pct = (baseline["gpu_used_mb"] / baseline["gpu_total_mb"]) * 100 + # Print GPU memory usage if available + gpu_total = baseline.get("gpu_total_mb", 0.0) + gpu_used = baseline.get("gpu_used_mb", 0.0) + try: + gpu_total = float(gpu_total) if gpu_total not in (None, 0) else 0.0 + except Exception: + gpu_total = 0.0 + try: + gpu_used = float(gpu_used) if gpu_used is not None else 0.0 + except Exception: + gpu_used = 0.0 + if gpu_total > 0: + gpu_pct = (gpu_used / gpu_total) * 100 if gpu_total else 0 print( - f"GPU Memory: {baseline['gpu_used_mb']:.1f} / " - f"{baseline['gpu_total_mb']:.1f} MB ({gpu_pct:.1f}% used)" + f"GPU Memory: {gpu_used:.1f} / " + f"{gpu_total:.1f} MB ({gpu_pct:.1f}% used)" ) + else: + print("GPU Memory: N/A") return baseline async def run_scaling_test( @@ -361,22 +402,28 @@ async def run_scaling_test( num_extensions_list: list[int], share_torch: bool = True, test_tensor_size: tuple[int, ...] = (512, 512), - use_cuda: bool = False, + device: str = "cpu", ) -> list[dict]: """Test memory scaling with different numbers of extensions.""" + import torch results = [] extension_code = await create_memory_benchmark_extension() for num_extensions in num_extensions_list: print(f"\n{'=' * 60}") - print(f"Testing with {num_extensions} extensions (share_torch={share_torch})") + print(f"Testing with {num_extensions} extensions (share_torch={share_torch}, device={device})") print("=" * 60) # Create extensions extensions = [] + extension_venv_root = self.test_base.test_root + if extension_venv_root is not None: + extension_venv_root = extension_venv_root / "extension-venvs" + else: + extension_venv_root = "extension-venvs" manager = ExtensionManager( MemoryBenchmarkExtensionBase, - ExtensionManagerConfig(venv_root_path=str(self.test_base.test_root / "extension-venvs")), + ExtensionManagerConfig(venv_root_path=str(extension_venv_root)), ) # Clean up and reset baseline before measuring @@ -410,7 +457,7 @@ async def run_scaling_test( config = ExtensionConfig( name=ext_name, - module_path=str(self.test_base.test_root / "extensions" / ext_name), + module_path=str((self.test_base.test_root or Path(".")) / "extensions" / ext_name), isolated=True, dependencies=["torch>=2.0.0"] if TORCH_AVAILABLE else [], apis=[], @@ -427,11 +474,13 @@ async def run_scaling_test( after_load_memory = self.memory_tracker.get_memory_usage() # Create test tensor - print(f"Creating test tensor {test_tensor_size}...") + print(f"Creating test tensor {test_tensor_size} on {device}...") with torch.inference_mode(): - if use_cuda and CUDA_AVAILABLE: + if device == "cuda" or device == "rocm": test_tensor = torch.randn(*test_tensor_size, device="cuda") - torch.cuda.synchronize() # Ensure tensor creation completes + torch.cuda.synchronize() + elif device == "xpu": + test_tensor = torch.randn(*test_tensor_size, device="xpu") else: test_tensor = torch.randn(*test_tensor_size) @@ -439,7 +488,7 @@ async def run_scaling_test( print(f"Tensor size: {tensor_size_mb:.1f} MB on {test_tensor.device}") # Check memory after tensor creation - if use_cuda and CUDA_AVAILABLE: + if device in ("cuda", "rocm"): post_tensor_memory = self.memory_tracker.get_memory_usage() print( f"GPU memory after tensor creation: {post_tensor_memory.get('gpu_used_mb', 0):.1f} MB " @@ -456,7 +505,7 @@ async def run_scaling_test( if i == 0: print(f" First extension stored: {info}") # Force GPU sync after each send for accurate memory tracking - if use_cuda and CUDA_AVAILABLE: + if device in ("cuda", "rocm"): torch.cuda.synchronize() except Exception as e: print(f" Failed to send to {ext_name}: {e}") @@ -465,7 +514,7 @@ async def run_scaling_test( print(f"Send completed in {send_time:.2f}s") # Force final sync before measuring - if use_cuda and CUDA_AVAILABLE: + if device in ("cuda", "rocm"): torch.cuda.synchronize() # Wait for memory to settle @@ -498,8 +547,10 @@ async def run_scaling_test( "after_send_ram_mb": after_send_memory["total_ram_mb"], "load_ram_delta_mb": after_load_memory["total_ram_mb"] - before_memory["total_ram_mb"], "send_ram_delta_mb": after_send_memory["total_ram_mb"] - after_load_memory["total_ram_mb"], - "ram_per_extension_mb": (after_load_memory["total_ram_mb"] - before_memory["total_ram_mb"]) - / num_extensions, + "ram_per_extension_mb": ( + float(after_load_memory["total_ram_mb"] or 0) + - float(before_memory["total_ram_mb"] or 0) + ) / num_extensions if num_extensions else 0, "before_vram_mb": before_memory["total_vram_mb"], "after_load_vram_mb": after_load_memory["total_vram_mb"], "after_send_vram_mb": after_send_memory["total_vram_mb"], @@ -531,7 +582,7 @@ async def run_scaling_test( print(f" Baseline: {self.memory_tracker.baseline_gpu_memory_mb:.1f} MB") # Show GPU memory if this is a GPU test - if use_cuda and result["load_gpu_delta_mb"] > 0: + if device in ("cuda", "rocm") and result["load_gpu_delta_mb"] > 0: print(f" GPU memory for tensor creation: {result['load_gpu_delta_mb']:.1f} MB") print(f" GPU memory for tensor transfer: {result['send_gpu_delta_mb']:.1f} MB") else: @@ -545,7 +596,7 @@ async def run_scaling_test( manager.stop_all_extensions() del test_tensor gc.collect() - if CUDA_AVAILABLE: + if device in ("cuda", "rocm"): torch.cuda.empty_cache() torch.cuda.synchronize() @@ -555,141 +606,164 @@ async def run_scaling_test( return results async def run_large_tensor_sharing_test( - self, num_extensions: int = 50, tensor_gb: float = 2.0, test_both_modes: bool = False + self, + num_extensions: int = 50, + tensor_gb: float = 2.0, + test_both_modes: bool = False, + device: str = "cpu", ) -> dict: """Test memory sharing with a large tensor across multiple extensions.""" + import torch print(f"\n{'=' * 60}") - print(f"Large Tensor Sharing Test ({tensor_gb}GB tensor, {num_extensions} extensions)") + print( + f"Large Tensor Sharing Test ({tensor_gb}GB tensor, " + f"{num_extensions} extensions, device={device})" + ) print("=" * 60) extension_code = await create_memory_benchmark_extension() results = {} # Test both CPU and GPU tensors - for use_cuda in [False, True]: - if use_cuda and not CUDA_AVAILABLE: - continue - - device_name = "GPU" if use_cuda else "CPU" - print(f"\n{'=' * 50}") - print(f"Testing {device_name} Tensors") - print("=" * 50) - - results[device_name.lower()] = {} - - # Test only with share_torch=True by default - share_torch_modes = [False, True] if test_both_modes else [True] - for share_torch in share_torch_modes: - print(f"\n--- Testing {device_name} with share_torch={share_torch} ---") - - # Create extensions - extensions = [] - manager = ExtensionManager( - MemoryBenchmarkExtensionBase, - ExtensionManagerConfig(venv_root_path=str(self.test_base.test_root / "extension-venvs")), + device_name = device.upper() + results[device_name.lower()] = {} + share_torch_modes = [False, True] if test_both_modes else [True] + for share_torch in share_torch_modes: + print(f"\n--- Testing {device_name} with share_torch={share_torch} ---") + + # Create extensions + extensions = [] + extension_venv_root = self.test_base.test_root + if extension_venv_root is not None: + extension_venv_root = extension_venv_root / "extension-venvs" + else: + extension_venv_root = "extension-venvs" + manager = ExtensionManager( + MemoryBenchmarkExtensionBase, + ExtensionManagerConfig(venv_root_path=str(extension_venv_root)), + ) + + # Measure baseline + gc.collect() + if CUDA_AVAILABLE: + torch.cuda.empty_cache() + baseline = self.memory_tracker.get_memory_usage() + + # Create extensions + for i in range(num_extensions): + ext_name = f"large_test_ext_{device_name.lower()}_{i}" + self.test_base.create_extension( + ext_name, + dependencies=["torch>=2.0.0"], + share_torch=share_torch, + extension_code=extension_code, ) - # Measure baseline - gc.collect() - if CUDA_AVAILABLE: - torch.cuda.empty_cache() - baseline = self.memory_tracker.get_memory_usage() - - # Create extensions - for i in range(num_extensions): - ext_name = f"large_test_ext_{device_name.lower()}_{i}" - self.test_base.create_extension( - ext_name, - dependencies=["torch>=2.0.0"], - share_torch=share_torch, - extension_code=extension_code, - ) + config = ExtensionConfig( + name=ext_name, + module_path=str((self.test_base.test_root or Path(".")) / "extensions" / ext_name), + isolated=True, + dependencies=["torch>=2.0.0"], + apis=[], + share_torch=share_torch, + ) - config = ExtensionConfig( - name=ext_name, - module_path=str(self.test_base.test_root / "extensions" / ext_name), - isolated=True, - dependencies=["torch>=2.0.0"], - apis=[], - share_torch=share_torch, - ) + ext = manager.load_extension(config) + extensions.append((ext_name, ext)) - ext = manager.load_extension(config) - extensions.append((ext_name, ext)) + # Wait for extensions to initialize + await asyncio.sleep(2) - # Wait for extensions to initialize - await asyncio.sleep(2) + # Create large tensor + # Calculate size for desired GB (float32 = 4 bytes per element) + num_elements = int(tensor_gb * 1024 * 1024 * 1024 / 4) + # Make it a square-ish tensor + side = int(num_elements**0.5) - # Create large tensor - # Calculate size for desired GB (float32 = 4 bytes per element) - num_elements = int(tensor_gb * 1024 * 1024 * 1024 / 4) - # Make it a square-ish tensor - side = int(num_elements**0.5) + print(f"Creating {tensor_gb}GB tensor ({side}x{side}) on {device_name}...") + with torch.inference_mode(): + if device == "cuda" or device == "rocm": + large_tensor = torch.randn(side, side, device="cuda") + torch.cuda.synchronize() + elif device == "xpu": + large_tensor = torch.randn(side, side, device="xpu") + else: + large_tensor = torch.randn(side, side) + actual_size_mb = large_tensor.element_size() * large_tensor.numel() / (1024 * 1024) + print(f"Actual tensor size: {actual_size_mb:.1f} MB on {large_tensor.device}") - print(f"Creating {tensor_gb}GB tensor ({side}x{side}) on {device_name}...") - with torch.inference_mode(): - large_tensor = ( - torch.randn(side, side, device="cuda") if use_cuda else torch.randn(side, side) - ) - actual_size_mb = large_tensor.element_size() * large_tensor.numel() / (1024 * 1024) - print(f"Actual tensor size: {actual_size_mb:.1f} MB on {large_tensor.device}") + # Measure after tensor creation + after_create = self.memory_tracker.get_memory_usage() - # Measure after tensor creation - after_create = self.memory_tracker.get_memory_usage() + # Send to all extensions + print(f"Sending large {device_name} tensor to {num_extensions} extensions...") + send_start = time.time() - # Send to all extensions - print(f"Sending large {device_name} tensor to {num_extensions} extensions...") - send_start = time.time() + for _i, (ext_name, ext) in enumerate(extensions): + try: + await ext.store_tensor("large_tensor", large_tensor) + print(f" Sent to {ext_name}") + except Exception as e: + print(f" Failed to send to {ext_name}: {e}") - for _i, (ext_name, ext) in enumerate(extensions): - try: - await ext.store_tensor("large_tensor", large_tensor) - print(f" Sent to {ext_name}") - except Exception as e: - print(f" Failed to send to {ext_name}: {e}") - - send_time = time.time() - send_start - - # Measure after sending - await asyncio.sleep(2) - after_send = self.memory_tracker.get_memory_usage() - - # Store results - results[device_name.lower()][f"share_torch_{share_torch}"] = { - "baseline_ram_mb": baseline["total_ram_mb"], - "after_create_ram_mb": after_create["total_ram_mb"], - "after_send_ram_mb": after_send["total_ram_mb"], - "baseline_vram_mb": baseline["total_vram_mb"], - "after_create_vram_mb": after_create["total_vram_mb"], - "after_send_vram_mb": after_send["total_vram_mb"], - "tensor_size_mb": actual_size_mb, - "tensor_device": str(large_tensor.device), - "ram_for_tensor_creation_mb": after_create["total_ram_mb"] - baseline["total_ram_mb"], - "ram_for_distribution_mb": after_send["total_ram_mb"] - after_create["total_ram_mb"], - "ram_per_extension_copy_mb": (after_send["total_ram_mb"] - after_create["total_ram_mb"]) - / num_extensions - if num_extensions > 0 - else 0, - "vram_for_tensor_creation_mb": after_create["total_vram_mb"] - baseline["total_vram_mb"], - "vram_for_distribution_mb": after_send["total_vram_mb"] - after_create["total_vram_mb"], - # Add GPU total memory tracking - "baseline_gpu_mb": baseline.get("gpu_used_mb", 0), - "after_create_gpu_mb": after_create.get("gpu_used_mb", 0), - "after_send_gpu_mb": after_send.get("gpu_used_mb", 0), - "gpu_for_tensor_creation_mb": after_create.get("gpu_used_mb", 0) - - baseline.get("gpu_used_mb", 0), - "gpu_for_distribution_mb": after_send.get("gpu_used_mb", 0) - - after_create.get("gpu_used_mb", 0), - "send_time_s": send_time, - } - - # Cleanup - manager.stop_all_extensions() - del large_tensor - gc.collect() - if CUDA_AVAILABLE: - torch.cuda.empty_cache() - await asyncio.sleep(2) + send_time = time.time() - send_start + + # Measure after sending + await asyncio.sleep(2) + after_send = self.memory_tracker.get_memory_usage() + + # Store results + results[device_name.lower()][f"share_torch_{share_torch}"] = { + "baseline_ram_mb": baseline["total_ram_mb"], + "after_create_ram_mb": after_create["total_ram_mb"], + "after_send_ram_mb": after_send["total_ram_mb"], + "baseline_vram_mb": baseline["total_vram_mb"], + "after_create_vram_mb": after_create["total_vram_mb"], + "after_send_vram_mb": after_send["total_vram_mb"], + "tensor_size_mb": actual_size_mb, + "tensor_device": str(large_tensor.device), + "ram_for_tensor_creation_mb": ( + float(after_create["total_ram_mb"] or 0) + - float(baseline["total_ram_mb"] or 0) + ) / num_extensions if num_extensions else 0, + "ram_for_distribution_mb": ( + float(after_send["total_ram_mb"] or 0) + - float(after_create["total_ram_mb"] or 0) + ) / num_extensions if num_extensions else 0, + "ram_per_extension_copy_mb": ( + float(after_send["total_ram_mb"] or 0) + - float(after_create["total_ram_mb"] or 0) + ) / num_extensions if num_extensions else 0, + "vram_for_tensor_creation_mb": ( + float(after_create["total_vram_mb"] or 0) + - float(baseline["total_vram_mb"] or 0) + ) / num_extensions if num_extensions else 0, + "vram_for_distribution_mb": ( + float(after_send["total_vram_mb"] or 0) + - float(after_create["total_vram_mb"] or 0) + ) / num_extensions if num_extensions else 0, + # Add GPU total memory tracking + "baseline_gpu_mb": baseline.get("gpu_used_mb", 0), + "after_create_gpu_mb": after_create.get("gpu_used_mb", 0), + "after_send_gpu_mb": after_send.get("gpu_used_mb", 0), + "gpu_for_tensor_creation_mb": ( + float(after_create.get("gpu_used_mb", 0) or 0) + - float(baseline.get("gpu_used_mb", 0) or 0) + ) / num_extensions if num_extensions else 0, + "gpu_for_distribution_mb": ( + float(after_send.get("gpu_used_mb", 0) or 0) + - float(after_create.get("gpu_used_mb", 0) or 0) + ) / num_extensions if num_extensions else 0, + "send_time_s": send_time, + } + + # Cleanup + manager.stop_all_extensions() + del large_tensor + gc.collect() + if device in ("cuda", "rocm"): + torch.cuda.empty_cache() + await asyncio.sleep(2) return results @@ -700,6 +774,7 @@ async def run_memory_benchmarks( test_large_tensor: bool = True, max_extensions_for_large: int = 50, test_both_modes: bool = False, + backend: str = "auto", ): """Run the full memory benchmark suite.""" test_base = IntegrationTestBase() @@ -713,54 +788,54 @@ async def run_memory_benchmarks( baseline = await runner.run_baseline_memory_test() all_results["baseline"] = baseline + if backend == "auto": + available_backends = detect_available_backends() + if "cpu" not in available_backends: + available_backends = ["cpu"] + available_backends + else: + # Ensure cpu is first + available_backends = [b for b in ["cpu"] + available_backends if b != "cpu"] + available_backends = ["cpu"] + available_backends + else: + available_backends = [backend] + if test_small_tensor: - # Small tensor tests with multiple extension counts print("\n" + "=" * 80) print("SMALL TENSOR SCALING TESTS") print("=" * 80) - # CPU tensor tests small_tensor_size = (512, 512) # ~1MB tensor - if test_both_modes: - # Test both modes - print("\n--- CPU Tensor Tests (share_torch=False) ---") - cpu_results_no_share = await runner.run_scaling_test( - extension_counts, share_torch=False, test_tensor_size=small_tensor_size, use_cuda=False - ) - all_results["cpu_no_share"] = cpu_results_no_share - - print("\n--- CPU Tensor Tests (share_torch=True) ---") - cpu_results_share = await runner.run_scaling_test( - extension_counts, share_torch=True, test_tensor_size=small_tensor_size, use_cuda=False - ) - all_results["cpu_share"] = cpu_results_share - - # GPU tensor tests if available - if CUDA_AVAILABLE: + for backend_used in available_backends: if test_both_modes: - print("\n--- GPU Tensor Tests (share_torch=False) ---") - gpu_results_no_share = await runner.run_scaling_test( - extension_counts, share_torch=False, test_tensor_size=small_tensor_size, use_cuda=True + print(f"\n--- {backend_used.upper()} Tensor Tests (share_torch=False) ---") + results_no_share = await runner.run_scaling_test( + extension_counts, + share_torch=False, + test_tensor_size=small_tensor_size, + device=backend_used, ) - all_results["gpu_no_share"] = gpu_results_no_share - - print("\n--- GPU Tensor Tests (share_torch=True) ---") - gpu_results_share = await runner.run_scaling_test( - extension_counts, share_torch=True, test_tensor_size=small_tensor_size, use_cuda=True + all_results[f"{backend_used}_no_share"] = results_no_share + + print(f"\n--- {backend_used.upper()} Tensor Tests (share_torch=True) ---") + results_share = await runner.run_scaling_test( + extension_counts, + share_torch=True, + test_tensor_size=small_tensor_size, + device=backend_used, ) - all_results["gpu_share"] = gpu_results_share + all_results[f"{backend_used}_share"] = results_share if test_large_tensor: - # Large tensor sharing test - large_results = await runner.run_large_tensor_sharing_test( - num_extensions=min(max_extensions_for_large, max(extension_counts)), - tensor_gb=2.0, - test_both_modes=test_both_modes, - ) - all_results["large_tensor_sharing"] = large_results + for backend_used in available_backends: + large_results = await runner.run_large_tensor_sharing_test( + num_extensions=min(max_extensions_for_large, max(extension_counts)), + tensor_gb=2.0, + test_both_modes=test_both_modes, + device=backend_used, + ) + all_results[f"{backend_used}_large"] = large_results - # Print final summary print_memory_benchmark_summary(all_results) finally: @@ -779,48 +854,55 @@ def print_memory_benchmark_summary(results: dict): print("\nBaseline Memory Usage:") print(f" RAM: {baseline['total_ram_mb']:.1f} MB") print(f" VRAM: {baseline['total_vram_mb']:.1f} MB") - if baseline.get("gpu_total_mb", 0) > 0: - gpu_pct = (baseline["gpu_used_mb"] / baseline["gpu_total_mb"]) * 100 - print( - f" GPU Total: {baseline['gpu_used_mb']:.1f} / " - f"{baseline['gpu_total_mb']:.1f} MB ({gpu_pct:.1f}% used)" - ) - - # Scaling results - for test_type in ["cpu_no_share", "cpu_share", "gpu_no_share", "gpu_share"]: + gpu_total = baseline.get("gpu_total_mb", 0.0) + gpu_used = baseline.get("gpu_used_mb", 0.0) + try: + gpu_total = float(gpu_total) if gpu_total not in (None, 0) else 0.0 + except Exception: + gpu_total = 0.0 + try: + gpu_used = float(gpu_used) if gpu_used is not None else 0.0 + except Exception: + gpu_used = 0.0 + if gpu_total > 0: + gpu_pct = (gpu_used / gpu_total) * 100 if gpu_total else 0 + print(f" GPU Total: {gpu_used:.1f} / {gpu_total:.1f} MB ({gpu_pct:.1f}% used)") + else: + print(" GPU Total: N/A") + + # Dynamically print all *_share and *_no_share results + share_types = [k for k in results if k.endswith(("_share", "_no_share"))] + for test_type in share_types: if test_type in results: - print(f"\n{test_type.upper().replace('_', ' ')} Results:") - + backend = test_type.replace("_share", "").replace("_no_share", "").upper() + share_mode = "SHARE_TORCH=TRUE" if test_type.endswith("_share") else "SHARE_TORCH=FALSE" + print(f"\n{backend} {share_mode} Results:") headers = ["Extensions", "RAM/Ext (MB)", "Tensor RAM (MB)", "GPU (MB)", "Shared"] table_data = [] - for result in results[test_type]: - # Use GPU memory delta if available, otherwise fall back to VRAM gpu_memory = result.get("send_gpu_delta_mb", result.get("send_vram_delta_mb", 0)) - table_data.append( - [ - result["num_extensions"], - f"{result['ram_per_extension_mb']:.1f}", - f"{result['send_ram_delta_mb']:.1f}", - f"{gpu_memory:.1f}", + table_data.append([ + result["num_extensions"], + f"{result['ram_per_extension_mb']:.1f}", + f"{result['send_ram_delta_mb']:.1f}", + f"{gpu_memory:.1f}", + ( "Yes" if result.get("shared_memory") - else "No" - if result.get("shared_memory") is False - else "N/A", - ] - ) - - print(tabulate(table_data, headers=headers, tablefmt="grid")) - - # Large tensor sharing results - if "large_tensor_sharing" in results: - print("\n2GB TENSOR SHARING TEST:") - large_results = results["large_tensor_sharing"] + else "No" if result.get("shared_memory") is False else "N/A" + ), + ]) + if table_data: + print(tabulate(table_data, headers=headers, tablefmt="grid")) - # Process CPU results - if "cpu" in large_results: - print("\nCPU Tensor Results:") + # Large tensor sharing results for all backends + large_keys = [k for k in results if k.endswith("_large")] + for large_key in large_keys: + backend = large_key.replace("_large", "").upper() + print(f"\n2GB TENSOR SHARING TEST: {backend}") + large_results = results[large_key] + for dev in large_results: + print(f"\n{dev.upper()} Tensor Results:") headers = [ "Config", "Tensor Size (MB)", @@ -829,95 +911,31 @@ def print_memory_benchmark_summary(results: dict): "Send Time (s)", ] table_data = [] - for share_torch in [False, True]: key = f"share_torch_{share_torch}" - if key in large_results["cpu"]: - r = large_results["cpu"][key] - table_data.append( - [ - f"share_torch={share_torch}", - f"{r['tensor_size_mb']:.1f}", - f"{r['ram_for_distribution_mb']:.1f}", - f"{r['ram_per_extension_copy_mb']:.1f}", - f"{r['send_time_s']:.2f}", - ] - ) - + if key in large_results[dev]: + r = large_results[dev][key] + table_data.append([ + f"share_torch={share_torch}", + f"{r['tensor_size_mb']:.1f}", + f"{r['ram_for_distribution_mb']:.1f}", + f"{r['ram_per_extension_copy_mb']:.1f}", + f"{r['send_time_s']:.2f}", + ]) if table_data: print(tabulate(table_data, headers=headers, tablefmt="grid")) - - # Analysis for CPU - if "share_torch_False" in large_results["cpu"] and "share_torch_True" in large_results["cpu"]: - no_share = large_results["cpu"]["share_torch_False"] - share = large_results["cpu"]["share_torch_True"] - + # Analysis for this backend + if "share_torch_False" in large_results[dev] and "share_torch_True" in large_results[dev]: + no_share = large_results[dev]["share_torch_False"] + share = large_results[dev]["share_torch_True"] savings = no_share["ram_for_distribution_mb"] - share["ram_for_distribution_mb"] savings_pct = ( (savings / no_share["ram_for_distribution_mb"] * 100) - if no_share["ram_for_distribution_mb"] > 0 - else 0 + if no_share["ram_for_distribution_mb"] else 0 ) - - print("\nCPU Memory Sharing Analysis:") + print(f"\n{dev.upper()} Memory Sharing Analysis:") print(f" Memory saved with share_torch: {savings:.1f} MB ({savings_pct:.1f}%)") - # Process GPU results - if "gpu" in large_results: - print("\nGPU Tensor Results:") - headers = [ - "Config", - "Tensor Size (MB)", - "RAM Dist (MB)", - "GPU Created (MB)", - "GPU Dist (MB)", - "Send Time (s)", - ] - table_data = [] - - for share_torch in [False, True]: - key = f"share_torch_{share_torch}" - if key in large_results["gpu"]: - r = large_results["gpu"][key] - table_data.append( - [ - f"share_torch={share_torch}", - f"{r['tensor_size_mb']:.1f}", - f"{r['ram_for_distribution_mb']:.1f}", - f"{r['gpu_for_tensor_creation_mb']:.1f}", - f"{r['gpu_for_distribution_mb']:.1f}", - f"{r['send_time_s']:.2f}", - ] - ) - - if table_data: - print(tabulate(table_data, headers=headers, tablefmt="grid")) - - # Analysis for GPU - if "share_torch_False" in large_results["gpu"] and "share_torch_True" in large_results["gpu"]: - no_share = large_results["gpu"]["share_torch_False"] - share = large_results["gpu"]["share_torch_True"] - - ram_savings = no_share["ram_for_distribution_mb"] - share["ram_for_distribution_mb"] - ram_savings_pct = ( - (ram_savings / no_share["ram_for_distribution_mb"] * 100) - if no_share["ram_for_distribution_mb"] > 0 - else 0 - ) - - print("\nGPU Memory Sharing Analysis:") - print(f" RAM saved with share_torch: {ram_savings:.1f} MB ({ram_savings_pct:.1f}%)") - - gpu_savings = no_share["gpu_for_distribution_mb"] - share["gpu_for_distribution_mb"] - if no_share["gpu_for_distribution_mb"] > 0: - gpu_savings_pct = gpu_savings / no_share["gpu_for_distribution_mb"] * 100 - print( - f" GPU memory saved with share_torch: {gpu_savings:.1f} MB " - f"({gpu_savings_pct:.1f}%)" - ) - elif gpu_savings != 0: - print(f" GPU memory difference: {gpu_savings:.1f} MB") - def main(): """Main entry point.""" @@ -951,37 +969,156 @@ def main(): help="Test both share_torch=True and share_torch=False (default: only share_torch=True)", ) + parser.add_argument( + "--backend", + choices=["auto", "cuda", "xpu", "rocm", "cpu"], + default="auto", + help=( + "Device backend to use: auto (default), cuda (NVIDIA/AMD ROCm), " + "xpu (Intel oneAPI), rocm (AMD ROCm), or cpu" + ), + ) + + parser.add_argument( + "--device", + type=str, + default=None, + help="(Legacy) Device index (int) or 'cpu' to force CPU mode. Use --backend for multi-backend tests.", + ) + args = parser.parse_args() - # Determine extension counts + device_idx = None # Ensure device_idx is always defined + + # Determine extension counts (move this up before device/backend logic) if args.counts: extension_counts = [int(x.strip()) for x in args.counts.split(",")] else: - # Default progression: 1, 2, 5, 10, 20, 50, 100 + # Default progression: 1, 2, 5, 10, 20 extension_counts = [1, 2, 5, 10, 20] if args.max_extensions >= 50: extension_counts.append(50) if args.max_extensions >= 100: extension_counts.append(100) - # Filter based on max extension_counts = [c for c in extension_counts if c <= args.max_extensions] - # Check dependencies - if not TORCH_AVAILABLE: - print("PyTorch not available. Install with: pip install torch") - return 1 - - print(f"Running on: {platform.system()} {platform.release()}") + # Set device and backend + backend = args.backend + device_str = "cpu" + device_name = "cpu" + backend_used = "cpu" + try: + import torch # type: ignore + cuda_available = torch.cuda.is_available() + xpu_available = hasattr(torch, "xpu") and torch.xpu.is_available() + # Auto backend selection + if backend == "auto": + if cuda_available: + backend = "cuda" + elif xpu_available: + backend = "xpu" + else: + backend = "cpu" + + if backend == "cuda": + if not cuda_available: + print("[PyIsolate] CUDA backend requested but not available. Exiting.") + sys.exit(1) + # Only check for ROCm on Linux + if platform.system() == "Linux": + torch_version = getattr(torch, 'version', None) + hip_version = getattr(torch_version, 'hip', None) if torch_version else None + if hip_version is not None: + print("[PyIsolate] ROCm (AMD) backend detected on Linux.") + # On Windows, just use CUDA if available + if args.device is not None: + if str(args.device).lower() == "cpu": + backend = "cpu" + device_str = "cpu" + device_name = "cpu" + print("[PyIsolate] Forcing CPU mode due to --device=cpu") + else: + try: + device_idx = int(args.device) + torch.cuda.set_device(device_idx) + device_str = f"cuda{device_idx}" + device_name = torch.cuda.get_device_name(device_idx) + except ValueError: + print(f"Invalid --device value: {args.device}. Must be integer or 'cpu'.") + sys.exit(1) + else: + device_idx = torch.cuda.current_device() + device_str = f"cuda{device_idx}" + device_name = torch.cuda.get_device_name(device_idx) + backend_used = "cuda" + print(f"[PyIsolate] Using CUDA device {device_idx}: {device_name}") + + elif backend == "xpu": + if not xpu_available: + print("[PyIsolate] XPU backend requested but not available. Exiting.") + sys.exit(1) + if args.device is not None: + if str(args.device).lower() == "cpu": + backend = "cpu" + device_str = "cpu" + device_name = "cpu" + print("[PyIsolate] Forcing CPU mode due to --device=cpu") + else: + try: + device_idx = int(args.device) + torch.xpu.set_device(device_idx) + device_str = f"xpu{device_idx}" + device_name = ( + torch.xpu.get_device_name(device_idx) + if hasattr(torch.xpu, "get_device_name") + else "Intel XPU" + ) + except ValueError: + print(f"Invalid --device value: {args.device}. Must be integer or 'cpu'.") + sys.exit(1) + else: + device_idx = torch.xpu.current_device() + device_str = f"xpu{device_idx}" + device_name = ( + torch.xpu.get_device_name(device_idx) + if hasattr(torch.xpu, "get_device_name") + else "Intel XPU" + ) + backend_used = "xpu" + print(f"[PyIsolate] Using Intel XPU device {device_idx}: {device_name}") - if not CUDA_AVAILABLE: - print("CUDA not available. GPU memory tests will be skipped.") + else: + print("[PyIsolate] No supported GPU backend available, exiting.") + sys.exit(1) - if not NVML_AVAILABLE: - print("nvidia-ml-py3 not installed. Install with: pip install nvidia-ml-py3") - print("VRAM tracking will not be available.") - else: - print("NVML available for GPU memory tracking") + except Exception as e: + print(f"[PyIsolate] Error setting device/backend: {e}") + + # Generate results filename with backend and device info + import datetime + import socket + computer = socket.gethostname() + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + device_tag = f"{backend_used}{device_idx if device_idx is not None else 0}" + if device_str != "cpu": + safe_device_name = device_name.replace(" ", "").replace("/", "-") + device_tag = f"{backend_used}{device_idx if device_idx is not None else 0}-{safe_device_name}" + results_filename = f"memory_benchmark_results_{computer}_{device_tag}_{timestamp}.txt" + print(f"\n[PyIsolate] Results will be saved to: {results_filename}") + + # For memory tracking, print backend-specific info + if backend_used == "cuda": + print("[PyIsolate] Using NVML for CUDA/ROCm memory tracking.") + elif backend_used == "xpu": + try: + import torch # type: ignore + mem_alloc = torch.xpu.memory_allocated() if hasattr(torch, "xpu") else 0 + print(f"[PyIsolate] Intel XPU memory allocated: {mem_alloc / 1024 / 1024:.1f} MB") + except Exception as e: + print(f"[PyIsolate] Could not get Intel XPU memory info: {e}") + # For AMD ROCm, optionally try rocm-smi if available (not implemented here, + # but can be added with subprocess) # Determine what to test test_small = not args.large_only @@ -995,6 +1132,7 @@ def main(): test_small_tensor=test_small, test_large_tensor=test_large, test_both_modes=args.test_both_modes, + backend=args.backend, ) ) return 0 diff --git a/cleanup_pyisolate.ps1 b/cleanup_pyisolate.ps1 new file mode 100644 index 0000000..4fdb692 --- /dev/null +++ b/cleanup_pyisolate.ps1 @@ -0,0 +1,32 @@ +Write-Host "Cleaning up all extension venvs, .test_temps, and Python bytecode caches..." -ForegroundColor Cyan + +# Remove extension venvs and test temp directories +$dirsToRemove = @( + ".test_temps", + ".benchmark_venv", + "pyisolate\__pycache__", + "pyisolate\_internal\__pycache__", + "benchmarks\__pycache__", + "example\__pycache__" +) + +foreach ($dir in $dirsToRemove) { + if (Test-Path $dir) { + Write-Host "Removing $dir ..." + Remove-Item -Recurse -Force $dir + } +} + +# Remove all __pycache__ directories recursively +Get-ChildItem -Recurse -Directory -Filter "__pycache__" | ForEach-Object { + Write-Host "Removing $($_.FullName) ..." + Remove-Item -Recurse -Force $_.FullName +} + +# Remove all .pyc files recursively +Get-ChildItem -Recurse -Include *.pyc | ForEach-Object { + Write-Host "Removing $($_.FullName) ..." + Remove-Item -Force $_.FullName +} + +Write-Host "Cleanup complete!" -ForegroundColor Green \ No newline at end of file diff --git a/pyisolate/_internal/client.py b/pyisolate/_internal/client.py index 35c1c9c..3ce89ae 100644 --- a/pyisolate/_internal/client.py +++ b/pyisolate/_internal/client.py @@ -1,9 +1,7 @@ import asyncio import importlib.util import logging -import os.path import sys -import sysconfig from contextlib import nullcontext from ..config import ExtensionConfig @@ -26,7 +24,18 @@ async def async_entrypoint( logger.debug("Loading extension with Python executable: %s", sys.executable) logger.debug("Loading extension from: %s", module_path) - sys.path.insert(0, sysconfig.get_path("platlib")) + # Robustly ensure only the venv's site-packages are present in sys.path + import os + import site + + venv_prefix = sys.prefix + venv_site_packages = [p for p in site.getsitepackages() if p.startswith(venv_prefix)] + # Remove all site-packages not in the current venv + sys.path = [p for p in sys.path if not (("site-packages" in p) and (not p.startswith(venv_prefix)))] + # Prepend all venv site-packages to sys.path (in order) + for p in reversed(venv_site_packages): + if p not in sys.path: + sys.path.insert(0, p) rpc = AsyncRPC(recv_queue=to_extension, send_queue=from_extension) extension = extension_type() diff --git a/pyisolate/_internal/gpu_utils.py b/pyisolate/_internal/gpu_utils.py new file mode 100644 index 0000000..86014d7 --- /dev/null +++ b/pyisolate/_internal/gpu_utils.py @@ -0,0 +1,67 @@ +""" +pyisolate._internal.gpu_utils + +GPU/XPU/torch-specific utilities for tensor serialization, DLPack conversion, and device handling. +These functions require torch (and sometimes numpy) to be installed. +""" + +def maybe_to_dlpack(obj): + """Convert XPU tensor to DLPack capsule if needed (requires torch).""" + try: + import torch + from torch.utils import dlpack as _dlpack # type: ignore[attr-defined] + except ImportError as e: + raise ImportError("pyisolate: 'torch' is required for maybe_to_dlpack but is not installed.") from e + if isinstance(obj, torch.Tensor) and hasattr(obj, "device") and obj.device.type == "xpu": + # If the input is a NumPy array and not writable, make it writable before converting + if hasattr(obj, "numpy"): + arr = obj.numpy() + if not arr.flags.writeable: + arr = arr.copy() + return torch.from_numpy(arr).to("xpu") + return _dlpack.to_dlpack(obj) # type: ignore[attr-defined] + return obj + +def maybe_from_dlpack(obj): + """Convert DLPack capsule to XPU tensor if needed (requires torch).""" + try: + import torch + from torch.utils import dlpack as _dlpack # type: ignore[attr-defined] + except ImportError as e: + raise ImportError("pyisolate: 'torch' is required for maybe_from_dlpack but is not installed.") from e + # DLPack capsules are PyCapsule, not torch.Tensor + if not isinstance(obj, torch.Tensor) and hasattr(obj, "__dlpack__"): + return _dlpack.from_dlpack(obj) # type: ignore[attr-defined] + # For raw PyCapsule (older PyTorch), try fallback + if type(obj).__name__ == "PyCapsule": + return _dlpack.from_dlpack(obj) # type: ignore[attr-defined] + return obj + +def maybe_serialize_tensor(obj): + """Serialize XPU tensor for transport (requires torch).""" + try: + import torch + except ImportError as e: + raise ImportError( + "pyisolate: 'torch' is required for maybe_serialize_tensor but is not installed." + ) from e + if isinstance(obj, torch.Tensor) and hasattr(obj, "device") and obj.device.type == "xpu": + # Fallback: send as CPU buffer + metadata + arr = obj.cpu().numpy() + return ("xpu_tensor", arr.tobytes(), arr.shape, str(arr.dtype)) + return obj + +def maybe_deserialize_tensor(obj): + """Deserialize XPU tensor from transport (requires torch and numpy).""" + try: + import numpy as np + import torch + except ImportError as e: + raise ImportError( + "pyisolate: 'torch' and 'numpy' are required for maybe_deserialize_tensor but are not installed." + ) from e + if isinstance(obj, tuple) and len(obj) == 4 and obj[0] == "xpu_tensor": + _, buf, shape, dtype = obj + arr = np.frombuffer(buf, dtype=dtype).reshape(shape) + return torch.from_numpy(arr).to("xpu") + return obj diff --git a/pyisolate/_internal/host.py b/pyisolate/_internal/host.py index 11ae767..8db5d13 100644 --- a/pyisolate/_internal/host.py +++ b/pyisolate/_internal/host.py @@ -270,7 +270,7 @@ def __launch(self): self._install_dependencies() # Set the Python executable from the virtual environment - executable = sys._base_executable if os.name == "nt" else str(self.venv_path / "bin" / "python") + executable = sys._base_executable if os.name == "nt" else str(self.venv_path / "bin" / "python") # type: ignore logger.debug(f"Launching extension {self.name} with Python executable: {executable}") self.mp.set_executable(executable) context = nullcontext() @@ -309,7 +309,9 @@ def _create_extension_venv(self): raise RuntimeError("uv command not found in PATH") # Use the resolved, validated path - subprocess.check_call([uv_path, "venv", str(self.venv_path)]) # noqa: S603 + import sys + py_version = f"python{sys.version_info.major}.{sys.version_info.minor}" + subprocess.check_call([uv_path, "venv", str(self.venv_path), "--python", py_version]) # noqa: S603 # TODO(Optimization): Only do this when we update a extension to reduce startup time? def _install_dependencies(self): @@ -340,21 +342,37 @@ def _install_dependencies(self): cache_dir.mkdir(exist_ok=True) uv_common_args.extend(["--cache-dir", str(cache_dir)]) - # Install the same version of torch as the current process + # Detect Intel/XPU backend for special index URL + use_xpu_backend = False + backend_env = os.environ.get("PYISOLATE_BACKEND", "auto").lower() + if backend_env == "xpu" or self.config.get("backend") == "xpu": + use_xpu_backend = True + # Also check for Intel GPU in device name if available + if not use_xpu_backend and "intel" in str(self.config.get("device_name", "")).lower(): + use_xpu_backend = True + + # Install the same version of torch as the current process, if needed + torch_requirement = None + torch_index_args = [] if self.config["share_torch"]: import torch torch_version = torch.__version__ - if torch_version.endswith("+cpu"): - # On Windows, the '+cpu' is not included in the version string - torch_version = torch_version[:-4] # Remove the '+cpu' suffix - cuda_version = torch.version.cuda # type: ignore - if cuda_version: - uv_common_args += [ + # Remove any '+cpu', '+xpu', or other local version tags + if "+" in torch_version: + torch_version = torch_version.split("+")[0] + cuda_version = getattr(torch.version, "cuda", None) # type: ignore + if use_xpu_backend: + torch_requirement = "torch>=2.7.0" + torch_index_args = ["--index-url", "https://download.pytorch.org/whl/xpu"] + elif cuda_version: + torch_requirement = f"torch=={torch_version}" + torch_index_args = [ "--extra-index-url", f"https://download.pytorch.org/whl/cu{cuda_version.replace('.', '')}", ] - uv_args.append(f"torch=={torch_version}") + else: + torch_requirement = f"torch=={torch_version}" # Install extension dependencies from config if self.config["dependencies"] or self.config["share_torch"]: @@ -362,10 +380,21 @@ def _install_dependencies(self): # Re-validate dependencies before passing to subprocess (defense in depth) safe_dependencies = [] + torch_in_deps = False for dep in self.config["dependencies"]: validate_dependency(dep) + # Remove any '+xpu' or '+cpu' from torch dependencies for Intel/XPU + if use_xpu_backend and dep.startswith("torch"): + dep = "torch>=2.7.0" + if dep.startswith("torch"): + torch_in_deps = True safe_dependencies.append(dep) + # Only add torch requirement if not already present + if torch_requirement and not torch_in_deps: + safe_dependencies.insert(0, torch_requirement) + uv_args += torch_index_args + # In normal mode, suppress output unless there are actual changes always_output = logger.isEnabledFor(logging.DEBUG) try: diff --git a/pyisolate/_internal/shared.py b/pyisolate/_internal/shared.py index 5750000..844fcd6 100644 --- a/pyisolate/_internal/shared.py +++ b/pyisolate/_internal/shared.py @@ -22,11 +22,23 @@ # We only import this to get type hinting working. It can also be a torch.multiprocessing if TYPE_CHECKING: import multiprocessing as typehint_mp + else: import multiprocessing - typehint_mp = multiprocessing +# GPU-specific utilities have moved to pyisolate._internal.gpu_utils +if TYPE_CHECKING: + from .gpu_utils import ( + maybe_deserialize_tensor, + maybe_serialize_tensor, + ) +else: + from .gpu_utils import ( + maybe_deserialize_tensor, + maybe_serialize_tensor, + ) + logger = logging.getLogger(__name__) # TODO - Remove me @@ -343,6 +355,7 @@ def _recv_thread(self): self.default_loop.call_soon_threadsafe(self.blocking_future.set_result, None) break + # Device-aware deserialization for args/kwargs/result if item["kind"] == "response": debugprint("Got response: ", item) call_id = item["call_id"] @@ -360,10 +373,9 @@ def _recv_thread(self): else: debugprint("Got result: ", item["result"]) set_result = pending_request["future"].set_result - result = item["result"] + result = maybe_deserialize_tensor(item["result"]) pending_request["calling_loop"].call_soon_threadsafe(set_result, result) else: - # If we don"t have a pending request, I guess we just continue on continue elif item["kind"] == "call": request = cast(RPCRequest, item) @@ -371,26 +383,29 @@ def _recv_thread(self): request_parent = request.get("parent_call_id", None) call_id = request["call_id"] + # Device-aware deserialization for args/kwargs + args = tuple(maybe_deserialize_tensor(arg) for arg in request["args"]) + kwargs = {k: maybe_deserialize_tensor(v) for k, v in request["kwargs"].items()} + request_mod = dict(request) + request_mod["args"] = args + request_mod["kwargs"] = kwargs + call_on_loop = self.default_loop if request_parent is not None: - # Get pending request without holding the lock for long pending_request = None with self.lock: pending_request = self.pending.get(request_parent, None) if pending_request: call_on_loop = pending_request["calling_loop"] - async def call_with_context(captured_request: RPCRequest): - # Set the context variable directly when the coroutine actually runs + async def call_with_context(captured_request): token = self.handling_call_id.set(captured_request["call_id"]) try: - # Run the dispatch directly return await self.dispatch_request(captured_request) finally: - # Reset the context variable when done self.handling_call_id.reset(token) - asyncio.run_coroutine_threadsafe(coro=call_with_context(request), loop=call_on_loop) + asyncio.run_coroutine_threadsafe(coro=call_with_context(request_mod), loop=call_on_loop) else: raise ValueError(f"Unknown item type: {type(item)}") @@ -407,14 +422,17 @@ def _send_thread(self): id_gen += 1 with self.lock: self.pending[call_id] = item + # Device-aware serialization for args/kwargs + args = tuple(maybe_serialize_tensor(arg) for arg in item["args"]) + kwargs = {k: maybe_serialize_tensor(v) for k, v in item["kwargs"].items()} request = RPCRequest( kind="call", object_id=item["object_id"], call_id=call_id, parent_call_id=item["parent_call_id"], method=item["method"], - args=item["args"], - kwargs=item["kwargs"], + args=args, + kwargs=kwargs, ) try: self.send_queue.put(request) @@ -422,32 +440,31 @@ def _send_thread(self): error_msg = str(e) if "CUDA error: out of memory" in error_msg or "out of memory" in error_msg.lower(): print(f"CUDA OOM error while sending RPC request for {item['method']}: {error_msg}") - # Set exception on the future to notify the caller - with self.lock: - pending = self.pending.pop(call_id, None) - if pending: - pending["calling_loop"].call_soon_threadsafe( - pending["future"].set_exception, - RuntimeError(f"CUDA out of memory during request transmission: {error_msg}"), + try: + simple_response = RPCRequest( + kind="call", + object_id=item["object_id"], + call_id=call_id, + parent_call_id=item["parent_call_id"], + method=item["method"], + args=(), + kwargs={}, ) + self.send_queue.put(simple_response) + except Exception: + print("Failed to send even a simple error request - process may be stuck") else: print(f"Error sending RPC request: {error_msg}") - # Set exception on the future - with self.lock: - pending = self.pending.pop(call_id, None) - if pending: - pending["calling_loop"].call_soon_threadsafe(pending["future"].set_exception, e) - elif item["kind"] == "response": - try: - self.send_queue.put(item) - except Exception as e: - error_msg = str(e) - if "CUDA error: out of memory" in error_msg or "out of memory" in error_msg.lower(): - print(f"CUDA OOM error while sending RPC response: {error_msg}") - else: - print(f"Error sending RPC response: {error_msg}") + raise else: - raise ValueError(f"Unknown item type: {type(item)}") + # For responses, patch result for device-aware serialization + response = item + if "result" in response: + response_mod = dict(response) + response_mod["result"] = maybe_serialize_tensor(response["result"]) + self.send_queue.put(cast(RPCResponse, response_mod)) + else: + self.send_queue.put(response) class SingletonMetaclass(type): diff --git a/pyisolate/shared.py b/pyisolate/shared.py index d39c6f5..44422ef 100644 --- a/pyisolate/shared.py +++ b/pyisolate/shared.py @@ -144,6 +144,12 @@ class ExtensionBase(ExtensionLocal): ... async def process_data(self, data: list) -> float: ... # Extension method callable from host ... import numpy as np + ... from pyisolate._internal.gpu_utils import ( + ... maybe_to_dlpack, + ... maybe_from_dlpack, + ... maybe_serialize_tensor, + ... maybe_deserialize_tensor, + ... ) ... return np.array(data).mean() Attributes: diff --git a/pyproject.toml b/pyproject.toml index 2c955ba..6f3741f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,6 @@ test = [ "pytest-asyncio>=0.21.0", # Required for async test fixtures "pytest>=7.0", # Required by benchmark scripts that import from tests "pyyaml>=5.4.0", # For test manifest creation - "tabulate>=0.9.0", # For nice output formatting "torch>=2.0.0", # For testing share_torch functionality ] bench = [ diff --git a/run_benchmarks_windows.ps1 b/run_benchmarks_windows.ps1 index 41e1edf..8c1cded 100644 --- a/run_benchmarks_windows.ps1 +++ b/run_benchmarks_windows.ps1 @@ -23,8 +23,16 @@ Write-Host "PyIsolate Benchmark Runner for Windows (PowerShell)" -ForegroundColo Write-Host "================================================================" -ForegroundColor Cyan Write-Host "" +# Prompt for CUDA device index +$device = Read-Host "Enter CUDA device index to use (leave blank for default GPU/CPU)" +if ($device -ne "") { + # Always pass --device to both benchmark.py and memory_benchmark.py + $device_args = @("--device", "$device") +} else { + $device_args = @() +} + # Set up paths and filenames -$ScriptDir = Split-Path -Parent $MyInvocation.MyCommand.Path $Timestamp = Get-Date -Format "yyyyMMdd_HHmmss" $OutputFile = "benchmark_results_${env:COMPUTERNAME}_${Timestamp}.txt" $VenvDir = ".benchmark_venv" @@ -96,20 +104,34 @@ Write-Host "" Write-Host "Step 4: Detecting GPU and installing PyTorch..." Write-Host "" -$cudaAvailable = $false +# Detect OS +$IsWindows = $env:OS -eq "Windows_NT" + +# Detect GPU vendor +$gpuInfo = (Get-WmiObject Win32_VideoController | Select-Object -ExpandProperty Name) -join ", " +$gpuVendor = "cpu" +if ($gpuInfo -match "NVIDIA") { + $gpuVendor = "nvidia" +} elseif ($gpuInfo -match "AMD" -or $gpuInfo -match "Radeon") { + $gpuVendor = "amd" +} elseif ($gpuInfo -match "Intel") { + $gpuVendor = "intel" +} +Write-Host "Detected GPU(s): $gpuInfo" +Write-Host "GPU Vendor: $gpuVendor" + +# Set PyTorch index URL and backend argument $torchIndex = "https://download.pytorch.org/whl/cpu" +$backend_arg = @("--backend", "auto") # Default -# Check for CUDA +if ($gpuVendor -eq "nvidia") { + # CUDA version logic as before $nvidiaSmi = Get-Command nvidia-smi -ErrorAction SilentlyContinue if ($nvidiaSmi) { Write-Host "NVIDIA GPU detected. Checking CUDA version..." - $cudaInfo = & nvidia-smi --query-gpu=driver_version --format=csv,noheader 2>$null - if ($LASTEXITCODE -eq 0) { $cudaVersion = (& nvidia-smi | Select-String "CUDA Version" | ForEach-Object { $_ -match "CUDA Version:\s*(\d+\.\d+)" | Out-Null; $matches[1] }) if ($cudaVersion) { Write-Host "Detected CUDA version: $cudaVersion" -ForegroundColor Green - "[$(Get-Date)] CUDA detected: $cudaVersion" | Add-Content $OutputFile - $cudaMajor = [int]($cudaVersion.Split('.')[0]) if ($cudaMajor -ge 12) { $torchIndex = "https://download.pytorch.org/whl/cu121" @@ -120,9 +142,21 @@ if ($nvidiaSmi) { } } } + $backend_arg = @("--backend", "cuda") +} elseif ($gpuVendor -eq "amd") { + if ($IsWindows) { + Write-Host "AMD GPU detected, but ROCm is not supported on Windows. Falling back to CPU." + $torchIndex = "https://download.pytorch.org/whl/cpu" + $backend_arg = @("--backend", "auto") } else { - Write-Host "No NVIDIA GPU detected. Installing CPU-only PyTorch..." - "[$(Get-Date)] No CUDA detected, using CPU PyTorch" | Add-Content $OutputFile + $torchIndex = "https://download.pytorch.org/whl/rocm5.4.2" + $backend_arg = @("--backend", "cuda") # PyTorch uses 'cuda' for ROCm + Write-Host "AMD GPU detected. ROCm is only supported on Linux. Will attempt ROCm PyTorch." + } +} elseif ($gpuVendor -eq "intel") { + Write-Host "Intel GPU detected. Attempting to use PyTorch XPU backend (requires PyTorch 2.7+ and latest Intel drivers)." + $torchIndex = "https://download.pytorch.org/whl/xpu" + $backend_arg = @("--backend", "xpu") } Write-Host "" @@ -146,8 +180,8 @@ if ($LASTEXITCODE -ne 0) { Write-Host "" Write-Host "Step 5: Installing remaining dependencies..." -$ErrorActionPreference = "SilentlyContinue" -$output = & uv pip install numpy psutil tabulate nvidia-ml-py3 pytest pytest-asyncio pyyaml 2>&1 +# Always install typing_extensions as part of dependencies +$output = & uv pip install numpy psutil tabulate nvidia-ml-py3 pytest pytest-asyncio pyyaml typing_extensions 2>&1 $ErrorActionPreference = "Continue" $output | Out-String | Tee-Object -Append $OutputFile @@ -198,7 +232,7 @@ Write-Host "Output is being saved to the results file..." # Run benchmark - PowerShell handles subprocess differently $env:PYTHONUNBUFFERED = "1" -$output = & python benchmark.py --quick 2>&1 | Out-String +$output = & python benchmark.py --quick @device_args @backend_arg 2>&1 | Out-String $benchmarkResult = $LASTEXITCODE $output | Tee-Object -Append "..\$OutputFile" @@ -220,6 +254,9 @@ Write-Host "Step 8: Running memory benchmarks..." "================================================================" | Add-Content "..\$OutputFile" "" | Add-Content "..\$OutputFile" +# Before running the memory benchmark, always set backend_arg to --backend auto +$memory_backend_arg = @("--backend", "auto") + Write-Host "Running memory_benchmark.py (this may take several minutes)..." Write-Host "NOTE: This test intentionally pushes VRAM limits to find maximum capacity" @@ -228,7 +265,7 @@ Write-Host "NOTE: If nothing has changed after 90 minutes, press Ctrl+C" -Foregr Write-Host "The test intentionally pushes VRAM limits and may appear frozen when it hits limits." # Run memory benchmark -$output = & python memory_benchmark.py --counts 1,2,5,10,25,50,100 2>&1 | Out-String +$output = & python memory_benchmark.py --counts 1,2,5,10,25,50,100 @device_args @memory_backend_arg 2>&1 | Out-String $memoryResult = $LASTEXITCODE $output | Tee-Object -Append "..\$OutputFile" diff --git a/tests/test_benchmarks.py b/tests/test_benchmarks.py index 8bb4b25..0a4fd56 100644 --- a/tests/test_benchmarks.py +++ b/tests/test_benchmarks.py @@ -13,9 +13,10 @@ import asyncio import gc +import os import statistics import time -from typing import Optional +from typing import Any, Optional import numpy as np import psutil @@ -27,11 +28,35 @@ TORCH_AVAILABLE = True CUDA_AVAILABLE = torch.cuda.is_available() + XPU_AVAILABLE = hasattr(torch, "xpu") and torch.xpu.is_available() + ROCM_AVAILABLE = False + if CUDA_AVAILABLE and hasattr(torch.version, "hip") and torch.version.hip is not None: + ROCM_AVAILABLE = True + + # Set CUDA/ROCm device from environment variable if specified + cuda_env = os.environ.get("PYISOLATE_CUDA_DEVICE") + if CUDA_AVAILABLE and cuda_env is not None: + torch.cuda.set_device(int(cuda_env)) + print(f"[PyIsolate] Using CUDA device {cuda_env}: {torch.cuda.get_device_name(int(cuda_env))}") + elif CUDA_AVAILABLE: + print( + f"[PyIsolate] Using default CUDA device {torch.cuda.current_device()}: " + f"{torch.cuda.get_device_name(torch.cuda.current_device())}" + ) + elif ROCM_AVAILABLE: + print("[PyIsolate] Using AMD ROCm backend.") + elif XPU_AVAILABLE: + print("[PyIsolate] Using Intel XPU backend.") + else: + print("[PyIsolate] No supported GPU backend available, exiting.") + import sys + sys.exit(1) except ImportError: TORCH_AVAILABLE = False CUDA_AVAILABLE = False from .test_integration import IntegrationTestBase +from shared import ExampleExtensionBase # Add this import for type annotations class BenchmarkResults: @@ -191,128 +216,24 @@ def print_summary(self): class TestRPCBenchmarks(IntegrationTestBase): """Benchmark tests for RPC call overhead.""" + benchmark_ext_shared: Optional[ExampleExtensionBase] = None + runner: Optional[BenchmarkRunner] = None + @pytest.fixture(autouse=True) async def setup_benchmark_environment(self): """Set up the benchmark environment once for all tests.""" await self.setup_test_environment("benchmark") # Create benchmark extension with all required dependencies - benchmark_extension_code = ''' -import asyncio -import numpy as np -from shared import ExampleExtension, DatabaseSingleton -from pyisolate import local_execution -try: - import torch - TORCH_AVAILABLE = True -except ImportError: - TORCH_AVAILABLE = False - -class BenchmarkExtension(ExampleExtension): - """Extension with methods for benchmarking RPC overhead.""" - - async def initialize(self): - """Initialize the benchmark extension.""" - pass - - async def prepare_shutdown(self): - """Clean shutdown of benchmark extension.""" - pass - - async def do_stuff(self, value): - """Required abstract method from ExampleExtension.""" - return f"Processed: {value}" - - # ======================================== - # Small Data Benchmarks - # ======================================== - - async def echo_int(self, value: int) -> int: - """Echo an integer value.""" - return value - - async def echo_string(self, value: str) -> str: - """Echo a string value.""" - return value - - @local_execution - def echo_int_local(self, value: int) -> int: - """Local execution baseline for integer echo.""" - return value - - @local_execution - def echo_string_local(self, value: str) -> str: - """Local execution baseline for string echo.""" - return value - - # ======================================== - # Large Data Benchmarks - # ======================================== - - async def process_large_array(self, array: np.ndarray) -> int: - """Process a large numpy array and return its size.""" - return array.size - - async def echo_large_bytes(self, data: bytes) -> int: - """Echo large byte data and return its length.""" - return len(data) - - @local_execution - def process_large_array_local(self, array: np.ndarray) -> int: - """Local execution baseline for large array processing.""" - return array.size - - # ======================================== - # Torch Tensor Benchmarks - # ======================================== - - async def process_small_tensor(self, tensor) -> tuple: - """Process a small torch tensor.""" - if not TORCH_AVAILABLE: - return (0, "cpu") - return (tensor.numel(), str(tensor.device)) - - async def process_large_tensor(self, tensor) -> tuple: - """Process a large torch tensor.""" - if not TORCH_AVAILABLE: - return (0, "cpu") - return (tensor.numel(), str(tensor.device)) - - @local_execution - def process_small_tensor_local(self, tensor) -> tuple: - """Local execution baseline for small tensor processing.""" - if not TORCH_AVAILABLE: - return (0, "cpu") - return (tensor.numel(), str(tensor.device)) - - # ======================================== - # Recursive/Complex Call Patterns - # ======================================== - - async def recursive_host_call(self, depth: int) -> int: - """Make recursive calls through host singleton.""" - if depth <= 0: - return 0 - - db = DatabaseSingleton() - await db.set_value(f"depth_{depth}", depth) - value = await db.get_value(f"depth_{depth}") - return value + await self.recursive_host_call(depth - 1) - -def example_entrypoint(): - """Entry point for the benchmark extension.""" - return BenchmarkExtension() -''' - - self.create_extension( - "benchmark_ext", - benchmark_extension_code, - dependencies=["numpy>=1.26.0", "torch>=2.0.0"] if TORCH_AVAILABLE else ["numpy>=1.26.0"], - ) + # self.create_extension( + # "benchmark_ext", + # benchmark_extension_code, + # dependencies=["numpy>=1.26.0", "torch>=2.0.0"] if TORCH_AVAILABLE else ["numpy>=1.26.0"], + # ) # Load extensions - extensions_config = [{"name": "benchmark_ext"}] + extensions_config: list[dict[str, Any]] = [{"name": "benchmark_ext"}] # Add share_torch config if available if TORCH_AVAILABLE: @@ -320,6 +241,11 @@ def example_entrypoint(): self.extensions = await self.load_extensions(extensions_config[:1]) # Load one for now self.benchmark_ext = self.extensions[0] + self.benchmark_ext_shared = None + if TORCH_AVAILABLE and len(extensions_config) > 1: + shared_exts = await self.load_extensions([extensions_config[1]]) + if shared_exts: + self.benchmark_ext_shared = shared_exts[0] # Initialize benchmark runner self.runner = BenchmarkRunner(warmup_runs=3, benchmark_runs=15) @@ -336,6 +262,7 @@ async def test_small_data_benchmarks(self): print("SMALL DATA BENCHMARKS") print("=" * 60) + assert self.runner is not None # Integer benchmarks test_int = 42 await self.runner.run_benchmark( @@ -361,6 +288,7 @@ async def test_large_data_benchmarks(self): print("LARGE DATA BENCHMARKS") print("=" * 60) + assert self.runner is not None # Large numpy array (10MB) large_array = np.random.random((1024, 1024)) # ~8MB float64 @@ -393,6 +321,7 @@ async def test_torch_tensor_benchmarks(self): print("TORCH TENSOR BENCHMARKS") print("=" * 60) + assert self.runner is not None # Small tensor (CPU) with torch.inference_mode(): small_tensor_cpu = torch.randn(100, 100) # ~40KB @@ -440,6 +369,7 @@ async def test_complex_call_patterns(self): print("COMPLEX CALL PATTERN BENCHMARKS") print("=" * 60) + assert self.runner is not None # Recursive calls through host singleton await self.runner.run_benchmark( "Recursive Host Calls (depth=3)", lambda: self.benchmark_ext.recursive_host_call(3) @@ -455,10 +385,12 @@ async def test_print_final_summary(self): # Small delay to ensure this runs last await asyncio.sleep(0.1) + assert self.runner is not None self.runner.print_summary() # Basic assertions to ensure benchmarks ran assert len(self.runner.results) > 0, "No benchmark results found" + assert self.runner is not None # Verify we have both local and RPC results for comparison local_results = [r for r in self.runner.results if "local" in r.name.lower()] diff --git a/tests/test_torch_tensor_integration.py b/tests/test_torch_tensor_integration.py index 1b78d47..0140439 100644 --- a/tests/test_torch_tensor_integration.py +++ b/tests/test_torch_tensor_integration.py @@ -1,3 +1,4 @@ +# pyright: reportMissingImports=false """ Integration tests for passing torch.Tensor objects between host and extensions. @@ -20,7 +21,7 @@ # Import shared components from example sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "example")) -from shared import DatabaseSingleton, ExampleExtensionBase +from shared import DatabaseSingleton, ExampleExtensionBase # type: ignore # Check torch availability try: @@ -28,6 +29,19 @@ HAS_TORCH = True HAS_CUDA = torch.cuda.is_available() + # Set CUDA device from environment variable if specified + cuda_env = os.environ.get("PYISOLATE_CUDA_DEVICE") + if HAS_CUDA and cuda_env is not None: + torch.cuda.set_device(int(cuda_env)) + print(f"[PyIsolate] Using CUDA device {cuda_env}: {torch.cuda.get_device_name(int(cuda_env))}") + elif HAS_CUDA: + print( + f"[PyIsolate] Using default CUDA device " + f"{torch.cuda.current_device()}: " + f"{torch.cuda.get_device_name(torch.cuda.current_device())}" + ) + else: + print("[PyIsolate] CUDA not available, using CPU only.") except ImportError: torch = None HAS_TORCH = False