diff --git a/demo-notebooks/additional-demos/hf_interactive.ipynb b/demo-notebooks/additional-demos/hf_interactive.ipynb index 05c568ae..7da1d155 100644 --- a/demo-notebooks/additional-demos/hf_interactive.ipynb +++ b/demo-notebooks/additional-demos/hf_interactive.ipynb @@ -40,7 +40,7 @@ "outputs": [], "source": [ "# Import pieces from codeflare-sdk\n", - "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication" + "from codeflare_sdk import RayCluster, TokenAuthentication" ] }, { @@ -86,21 +86,20 @@ "# Create our cluster and submit\n", "# The SDK will try to find the name of your default local queue based on the annotation \"kueue.x-k8s.io/default-queue\": \"true\" unless you specify the local queue manually below\n", "cluster_name= \"hfgputest\"\n", - "cluster = Cluster(ClusterConfiguration(\n", + "cluster = RayCluster(\n", " name=cluster_name, \n", " head_memory_requests=6,\n", " head_memory_limits=8,\n", - " head_extended_resource_requests={'nvidia.com/gpu':1}, # For GPU enabled workloads set the head_extended_resource_requests and worker_extended_resource_requests\n", - " worker_extended_resource_requests={'nvidia.com/gpu':1},\n", + " head_accelerators={'nvidia.com/gpu':1}, # For GPU enabled workloads set the head_accelerators and worker_accelerators\n", + " worker_accelerators={'nvidia.com/gpu':1},\n", " num_workers=1,\n", " worker_cpu_requests=8, \n", " worker_cpu_limits=8, \n", " worker_memory_requests=16, \n", " worker_memory_limits=16, \n", " # image=\"\", # Optional Field \n", - " write_to_file=False, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources \n", " # local_queue=\"local-queue-name\" # Specify the local queue manually\n", - "))" + ")" ] }, { @@ -204,8 +203,8 @@ "source": [ "from codeflare_sdk import generate_cert\n", "# Create required TLS cert and export the environment variables to enable TLS\n", - "generate_cert.generate_tls_cert(cluster_name, cluster.config.namespace)\n", - "generate_cert.export_env(cluster_name, cluster.config.namespace)" + "generate_cert.generate_tls_cert(cluster_name, cluster.namespace)\n", + "generate_cert.export_env(cluster_name, cluster.namespace)" ] }, { diff --git a/demo-notebooks/additional-demos/local_interactive.ipynb b/demo-notebooks/additional-demos/local_interactive.ipynb index 61c1b0ac..c0201ec3 100644 --- a/demo-notebooks/additional-demos/local_interactive.ipynb +++ b/demo-notebooks/additional-demos/local_interactive.ipynb @@ -10,7 +10,7 @@ "outputs": [], "source": [ "# Import pieces from codeflare-sdk\n", - "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication" + "from codeflare_sdk import RayCluster, TokenAuthentication" ] }, { @@ -56,21 +56,20 @@ "# The SDK will try to find the name of your default local queue based on the annotation \"kueue.x-k8s.io/default-queue\": \"true\" unless you specify the local queue manually below\n", "cluster_name = \"hfgputest-1\"\n", "\n", - "cluster = Cluster(ClusterConfiguration(\n", + "cluster = RayCluster(\n", " name=cluster_name,\n", " head_memory_requests=6,\n", " head_memory_limits=8,\n", - " head_extended_resource_requests={'nvidia.com/gpu':0}, # For GPU enabled workloads set the head_extended_resource_requests and worker_extended_resource_requests\n", - " worker_extended_resource_requests={'nvidia.com/gpu':0},\n", + " head_accelerators={'nvidia.com/gpu':0}, # For GPU enabled workloads set the head_accelerators and worker_accelerators\n", + " worker_accelerators={'nvidia.com/gpu':0},\n", " num_workers=1,\n", " worker_cpu_requests=1,\n", " worker_cpu_limits=1,\n", " worker_memory_requests=4,\n", " worker_memory_limits=6,\n", " # image=\"\", # Optional Field \n", - " write_to_file=False, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources \n", " # local_queue=\"local-queue-name\" # Specify the local queue manually\n", - "))" + ")" ] }, { @@ -114,8 +113,8 @@ "source": [ "from codeflare_sdk import generate_cert\n", "\n", - "generate_cert.generate_tls_cert(cluster_name, cluster.config.namespace)\n", - "generate_cert.export_env(cluster_name, cluster.config.namespace)" + "generate_cert.generate_tls_cert(cluster_name, cluster.namespace)\n", + "generate_cert.export_env(cluster_name, cluster.namespace)" ] }, { diff --git a/demo-notebooks/additional-demos/ray_job_client.ipynb b/demo-notebooks/additional-demos/ray_job_client.ipynb index 0078c530..d8f41244 100644 --- a/demo-notebooks/additional-demos/ray_job_client.ipynb +++ b/demo-notebooks/additional-demos/ray_job_client.ipynb @@ -14,7 +14,7 @@ "outputs": [], "source": [ "# Import pieces from codeflare-sdk\n", - "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication, RayJobClient" + "from codeflare_sdk import RayCluster, TokenAuthentication, RayJobClient" ] }, { @@ -57,12 +57,12 @@ "source": [ "# Create and configure our cluster object\n", "# The SDK will try to find the name of your default local queue based on the annotation \"kueue.x-k8s.io/default-queue\": \"true\" unless you specify the local queue manually below\n", - "cluster = Cluster(ClusterConfiguration(\n", + "cluster = RayCluster(\n", " name='jobtest',\n", " head_memory_requests=6,\n", " head_memory_limits=8,\n", - " head_extended_resource_requests={'nvidia.com/gpu':0}, # For GPU enabled workloads set the head_extended_resource_requests and worker_extended_resource_requests\n", - " worker_extended_resource_requests={'nvidia.com/gpu':0},\n", + " head_accelerators={'nvidia.com/gpu':0}, # For GPU enabled workloads set the head_accelerators and worker_accelerators\n", + " worker_accelerators={'nvidia.com/gpu':0},\n", " num_workers=2,\n", " worker_cpu_requests=1,\n", " worker_cpu_limits=1,\n", @@ -71,7 +71,7 @@ " # image=\"\", # Optional Field \n", " write_to_file=False # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources \n", " # local_queue=\"local-queue-name\" # Specify the local queue manually\n", - "))" + ")" ] }, { diff --git a/demo-notebooks/guided-demos/1_cluster_job_client.ipynb b/demo-notebooks/guided-demos/1_cluster_job_client.ipynb index 18bbed23..2396aad7 100644 --- a/demo-notebooks/guided-demos/1_cluster_job_client.ipynb +++ b/demo-notebooks/guided-demos/1_cluster_job_client.ipynb @@ -14,7 +14,7 @@ "outputs": [], "source": [ "# Import pieces from codeflare-sdk\n", - "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication" + "from codeflare_sdk import RayCluster, TokenAuthentication" ] }, { @@ -57,23 +57,22 @@ "source": [ "# Create and configure our cluster object\n", "# The SDK will try to find the name of your default local queue based on the annotation \"kueue.x-k8s.io/default-queue\": \"true\" unless you specify the local queue manually below\n", - "cluster = Cluster(ClusterConfiguration(\n", + "cluster = RayCluster(\n", " name='jobtest',\n", " head_cpu_requests=1,\n", " head_cpu_limits=1,\n", " head_memory_requests=6,\n", " head_memory_limits=8,\n", - " head_extended_resource_requests={'nvidia.com/gpu':1}, # For GPU enabled workloads set the head_extended_resource_requests and worker_extended_resource_requests\n", - " worker_extended_resource_requests={'nvidia.com/gpu':1},\n", + " head_accelerators={'nvidia.com/gpu':1}, # For GPU enabled workloads set the head_accelerators and worker_accelerators\n", + " worker_accelerators={'nvidia.com/gpu':1},\n", " num_workers=2,\n", " worker_cpu_requests='250m',\n", " worker_cpu_limits=1,\n", " worker_memory_requests=4,\n", " worker_memory_limits=6,\n", " # image=\"\", # Optional Field \n", - " write_to_file=False, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources \n", " # local_queue=\"local-queue-name\" # Specify the local queue manually\n", - "))" + ")" ] }, { @@ -229,7 +228,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "base", "language": "python", "name": "python3" }, @@ -243,7 +242,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.18" + "version": "3.12.7" } }, "nbformat": 4, diff --git a/demo-notebooks/guided-demos/2_basic_interactive.ipynb b/demo-notebooks/guided-demos/2_basic_interactive.ipynb index c3033735..65099b60 100644 --- a/demo-notebooks/guided-demos/2_basic_interactive.ipynb +++ b/demo-notebooks/guided-demos/2_basic_interactive.ipynb @@ -16,7 +16,7 @@ "outputs": [], "source": [ "# Import pieces from codeflare-sdk\n", - "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication" + "from codeflare_sdk import RayCluster, TokenAuthentication" ] }, { @@ -62,23 +62,22 @@ "# Create and configure our cluster object\n", "# The SDK will try to find the name of your default local queue based on the annotation \"kueue.x-k8s.io/default-queue\": \"true\" unless you specify the local queue manually below\n", "cluster_name = \"interactivetest\"\n", - "cluster = Cluster(ClusterConfiguration(\n", + "cluster = RayCluster(\n", " name=cluster_name,\n", " head_cpu_requests=1,\n", " head_cpu_limits=1,\n", " head_memory_requests=6,\n", " head_memory_limits=8,\n", - " head_extended_resource_requests={'nvidia.com/gpu':1}, # For GPU enabled workloads set the head_extended_resource_requests and worker_extended_resource_requests\n", - " worker_extended_resource_requests={'nvidia.com/gpu':1},\n", + " head_accelerators={'nvidia.com/gpu':1}, # For GPU enabled workloads set the head_accelerators and worker_accelerators\n", + " worker_accelerators={'nvidia.com/gpu':1},\n", " num_workers=2,\n", " worker_cpu_requests='250m',\n", " worker_cpu_limits=1,\n", " worker_memory_requests=4,\n", " worker_memory_limits=6,\n", " # image=\"\", # Optional Field \n", - " write_to_file=False, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources \n", " # local_queue=\"local-queue-name\" # Specify the local queue manually\n", - "))" + ")" ] }, { @@ -151,8 +150,8 @@ "source": [ "from codeflare_sdk import generate_cert\n", "# Create required TLS cert and export the environment variables to enable TLS\n", - "generate_cert.generate_tls_cert(cluster_name, cluster.config.namespace)\n", - "generate_cert.export_env(cluster_name, cluster.config.namespace)" + "generate_cert.generate_tls_cert(cluster_name, cluster.namespace)\n", + "generate_cert.export_env(cluster_name, cluster.namespace)" ] }, { diff --git a/demo-notebooks/guided-demos/3_widget_example.ipynb b/demo-notebooks/guided-demos/3_widget_example.ipynb index 9e4daf1e..ee6c510b 100644 --- a/demo-notebooks/guided-demos/3_widget_example.ipynb +++ b/demo-notebooks/guided-demos/3_widget_example.ipynb @@ -19,7 +19,7 @@ "outputs": [], "source": [ "# Import pieces from codeflare-sdk\n", - "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication, view_clusters" + "from codeflare_sdk import RayCluster, TokenAuthentication, view_clusters" ] }, { @@ -64,23 +64,22 @@ "source": [ "# Create and configure our cluster object\n", "# The SDK will try to find the name of your default local queue based on the annotation \"kueue.x-k8s.io/default-queue\": \"true\" unless you specify the local queue manually below\n", - "cluster = Cluster(ClusterConfiguration(\n", + "cluster = RayCluster(\n", " name='widgettest',\n", " head_cpu_requests='500m',\n", " head_cpu_limits='500m',\n", " head_memory_requests=6,\n", " head_memory_limits=8,\n", - " head_extended_resource_requests={'nvidia.com/gpu':0}, # For GPU enabled workloads set the head_extended_resource_requests and worker_extended_resource_requests\n", - " worker_extended_resource_requests={'nvidia.com/gpu':0},\n", + " head_accelerators={'nvidia.com/gpu':0}, # For GPU enabled workloads set the head_accelerators and worker_accelerators\n", + " worker_accelerators={'nvidia.com/gpu':0},\n", " num_workers=2,\n", " worker_cpu_requests='250m',\n", " worker_cpu_limits=1,\n", " worker_memory_requests=4,\n", " worker_memory_limits=6,\n", " # image=\"\", # Optional Field\n", - " write_to_file=False, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources\n", " # local_queue=\"local-queue-name\" # Specify the local queue manually\n", - "))" + ")" ] }, { diff --git a/demo-notebooks/guided-demos/4_rayjob_existing_cluster.ipynb b/demo-notebooks/guided-demos/4_rayjob_existing_cluster.ipynb index eabd6e93..a1529270 100644 --- a/demo-notebooks/guided-demos/4_rayjob_existing_cluster.ipynb +++ b/demo-notebooks/guided-demos/4_rayjob_existing_cluster.ipynb @@ -37,7 +37,7 @@ "metadata": {}, "outputs": [], "source": [ - "from codeflare_sdk import Cluster, ClusterConfiguration, RayJob" + "from codeflare_sdk import RayCluster, RayJob" ] }, { @@ -73,21 +73,20 @@ "metadata": {}, "outputs": [], "source": [ - "cluster = Cluster(ClusterConfiguration(\n", + "cluster = RayCluster(\n", " name='rayjob-cluster',\n", " head_cpu_requests=1,\n", " head_cpu_limits=1,\n", " head_memory_requests=6,\n", " head_memory_limits=8,\n", - " head_extended_resource_requests={'nvidia.com/gpu':1},\n", - " worker_extended_resource_requests={'nvidia.com/gpu':1},\n", + " head_accelerators={'nvidia.com/gpu':1},\n", + " worker_accelerators={'nvidia.com/gpu':1},\n", " num_workers=2,\n", " worker_cpu_requests='250m',\n", " worker_cpu_limits=1,\n", " worker_memory_requests=4,\n", " worker_memory_limits=6,\n", - "\n", - "))\n", + ")\n", "\n", "cluster.apply()" ] diff --git a/demo-notebooks/guided-demos/5_submit_rayjob_cr.ipynb b/demo-notebooks/guided-demos/5_submit_rayjob_cr.ipynb index 055638a1..9ff77fa9 100644 --- a/demo-notebooks/guided-demos/5_submit_rayjob_cr.ipynb +++ b/demo-notebooks/guided-demos/5_submit_rayjob_cr.ipynb @@ -29,7 +29,7 @@ "metadata": {}, "outputs": [], "source": [ - "from codeflare_sdk import RayJob, ManagedClusterConfig" + "from codeflare_sdk import RayJob, RayCluster" ] }, { @@ -55,7 +55,7 @@ "id": "5581eca9", "metadata": {}, "source": [ - "Next we'll need to define the ManagedClusterConfig. Kuberay will use this to spin up a short-lived RayCluster that will only exist as long as the job" + "Next we'll need to define the RayCluster configuration. Kuberay will use this to spin up a short-lived RayCluster that will only exist as long as the job" ] }, { @@ -65,7 +65,7 @@ "metadata": {}, "outputs": [], "source": [ - "cluster_config = ManagedClusterConfig(\n", + "cluster_config = RayCluster(\n", " head_memory_requests=6,\n", " head_memory_limits=8,\n", " num_workers=2,\n", @@ -83,7 +83,7 @@ "id": "02a2b32b", "metadata": {}, "source": [ - "Lastly we can pass the ManagedClusterConfig into the RayJob and submit it. You do not need to worry about tearing down the cluster when the job has completed, that is handled for you!" + "Lastly we can pass the RayCluster configuration into the RayJob and submit it. You do not need to worry about tearing down the cluster when the job has completed, that is handled for you!" ] }, { diff --git a/poetry.lock b/poetry.lock index db4bb628..bd22e47c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -5242,4 +5242,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "fcdf6f4663956fde1eef7e6bc70b8a0abccec68342832b6a2a1c2272a7070f8b" +content-hash = "688b9b73ee70de8cac601d86a46e8bc48924b179868be25ba1d0086becc079e8" diff --git a/pyproject.toml b/pyproject.toml index 21433540..c2f303ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ kubernetes = ">= 27.2.0" cryptography = "43.0.3" executing = "1.2.0" pydantic = ">= 2.10.6" +typing_extensions = ">= 4.12.0" ipywidgets = "8.1.2" [[tool.poetry.source]] diff --git a/src/codeflare_sdk/__init__.py b/src/codeflare_sdk/__init__.py index a27702e7..f7042601 100644 --- a/src/codeflare_sdk/__init__.py +++ b/src/codeflare_sdk/__init__.py @@ -1,9 +1,10 @@ from .ray import ( Cluster, ClusterConfiguration, + RayCluster, + RayClusterInfo, RayClusterStatus, CodeFlareClusterStatus, - RayCluster, get_cluster, list_all_queued, list_all_clusters, diff --git a/src/codeflare_sdk/common/kueue/test_kueue.py b/src/codeflare_sdk/common/kueue/test_kueue.py index 548a09bf..f8c0af3a 100644 --- a/src/codeflare_sdk/common/kueue/test_kueue.py +++ b/src/codeflare_sdk/common/kueue/test_kueue.py @@ -18,7 +18,7 @@ apply_template, ) from unittest.mock import patch -from codeflare_sdk.ray.cluster.cluster import Cluster, ClusterConfiguration +from codeflare_sdk.ray.cluster.raycluster import RayCluster import yaml import os import filecmp @@ -36,85 +36,10 @@ def test_none_local_queue(mocker): mocker.patch("kubernetes.client.CustomObjectsApi.list_namespaced_custom_object") - config = ClusterConfiguration(name="unit-test-aw-kueue", namespace="ns") - config.name = "unit-test-aw-kueue" - config.local_queue = None - - cluster = Cluster(config) + cluster = RayCluster(name="unit-test-aw-kueue", namespace="ns", local_queue=None) assert cluster.config.local_queue == None -def test_cluster_creation_no_aw_local_queue(mocker): - # With written resources - # Create Ray Cluster with local queue specified - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch( - "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", - return_value={"spec": {"domain": "apps.cluster.awsroute.org"}}, - ) - mocker.patch( - "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", - return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), - ) - config = create_cluster_config() - config.name = "unit-test-cluster-kueue" - config.write_to_file = True - config.local_queue = "local-queue-default" - cluster = Cluster(config) - assert cluster.resource_yaml == f"{aw_dir}unit-test-cluster-kueue.yaml" - expected_rc = apply_template( - f"{parent}/tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml", - get_template_variables(), - ) - - with open(f"{aw_dir}unit-test-cluster-kueue.yaml", "r") as f: - cluster_kueue = yaml.load(f, Loader=yaml.FullLoader) - assert cluster_kueue == expected_rc - - # With resources loaded in memory, no Local Queue specified. - config = create_cluster_config() - config.name = "unit-test-cluster-kueue" - config.write_to_file = False - cluster = Cluster(config) - assert cluster.resource_yaml == expected_rc - - -def test_aw_creation_local_queue(mocker): - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch( - "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", - return_value={"spec": {"domain": "apps.cluster.awsroute.org"}}, - ) - mocker.patch( - "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", - return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), - ) - config = create_cluster_config() - config.name = "unit-test-aw-kueue" - config.appwrapper = True - config.write_to_file = True - config.local_queue = "local-queue-default" - cluster = Cluster(config) - assert cluster.resource_yaml == f"{aw_dir}unit-test-aw-kueue.yaml" - expected_rc = apply_template( - f"{parent}/tests/test_cluster_yamls/kueue/aw_kueue.yaml", - get_template_variables(), - ) - - with open(f"{aw_dir}unit-test-aw-kueue.yaml", "r") as f: - aw_kueue = yaml.load(f, Loader=yaml.FullLoader) - assert aw_kueue == expected_rc - - # With resources loaded in memory, no Local Queue specified. - config = create_cluster_config() - config.name = "unit-test-aw-kueue" - config.appwrapper = True - config.write_to_file = False - cluster = Cluster(config) - - assert cluster.resource_yaml == expected_rc - - def test_get_local_queue_exists_fail(mocker): mocker.patch("kubernetes.client.ApisApi.get_api_versions") mocker.patch( @@ -125,13 +50,12 @@ def test_get_local_queue_exists_fail(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - config = create_cluster_config() - config.name = "unit-test-aw-kueue" - config.appwrapper = True - config.write_to_file = True - config.local_queue = "local_queue_doesn't_exist" try: - Cluster(config) + RayCluster( + name="unit-test-aw-kueue", + namespace="ns", + local_queue="local_queue_doesn't_exist", + ) except ValueError as e: assert ( str(e) @@ -345,5 +269,7 @@ def test_priority_class_exists_other_error(mocker): # Make sure to always keep this function last def test_cleanup(): - os.remove(f"{aw_dir}unit-test-cluster-kueue.yaml") - os.remove(f"{aw_dir}unit-test-aw-kueue.yaml") + if os.path.exists(f"{aw_dir}unit-test-cluster-kueue.yaml"): + os.remove(f"{aw_dir}unit-test-cluster-kueue.yaml") + if os.path.exists(f"{aw_dir}unit-test-aw-kueue.yaml"): + os.remove(f"{aw_dir}unit-test-aw-kueue.yaml") diff --git a/src/codeflare_sdk/common/utils/unit_test_support.py b/src/codeflare_sdk/common/utils/unit_test_support.py index 653e818c..d566085a 100644 --- a/src/codeflare_sdk/common/utils/unit_test_support.py +++ b/src/codeflare_sdk/common/utils/unit_test_support.py @@ -20,6 +20,7 @@ Cluster, ClusterConfiguration, ) +from codeflare_sdk.ray.cluster.raycluster import RayCluster import os import yaml from pathlib import Path @@ -31,23 +32,55 @@ aw_dir = os.path.expanduser("~/.codeflare/resources/") -def create_cluster_config(num_workers=2, write_to_file=False): - config = ClusterConfiguration( +def create_cluster_config(num_workers=2, write_to_file=False, appwrapper=False): + if appwrapper: + config = ClusterConfiguration( + name="unit-test-cluster", + namespace="ns", + num_workers=num_workers, + worker_cpu_requests=3, + worker_cpu_limits=4, + worker_memory_requests=5, + worker_memory_limits=6, + appwrapper=True, + write_to_file=write_to_file, + ) + return config + + cluster = RayCluster( name="unit-test-cluster", namespace="ns", + head_cpu_requests=1, + head_cpu_limits=2, + head_memory_requests=5, + head_memory_limits=8, num_workers=num_workers, worker_cpu_requests=3, worker_cpu_limits=4, worker_memory_requests=5, worker_memory_limits=6, - appwrapper=True, - write_to_file=write_to_file, ) - return config + cluster.write_to_file = write_to_file + cluster.resource_yaml = cluster._create_resource() + return cluster -def create_cluster(mocker, num_workers=2, write_to_file=False): - cluster = Cluster(create_cluster_config(num_workers, write_to_file)) +def create_cluster(mocker, num_workers=2, write_to_file=False, appwrapper=False): + if appwrapper: + cluster = Cluster( + create_cluster_config( + num_workers=num_workers, + write_to_file=write_to_file, + appwrapper=True, + ) + ) + return cluster + + cluster = create_cluster_config( + num_workers=num_workers, + write_to_file=write_to_file, + appwrapper=False, + ) return cluster @@ -59,7 +92,7 @@ def patch_cluster_with_dynamic_client(mocker, cluster, dynamic_client=None): def create_cluster_wrong_type(): - config = ClusterConfiguration( + RayCluster( name="unit-test-cluster", namespace="ns", num_workers=True, @@ -67,14 +100,11 @@ def create_cluster_wrong_type(): worker_cpu_limits=4, worker_memory_requests=5, worker_memory_limits=6, - worker_extended_resource_requests={"nvidia.com/gpu": 7}, - appwrapper=True, + worker_accelerators={"nvidia.com/gpu": 7}, image_pull_secrets=["unit-test-pull-secret"], image=constants.CUDA_PY312_RUNTIME_IMAGE, - write_to_file=True, labels={1: 1}, ) - return config def get_package_and_version(package_name, requirements_file_path): @@ -275,11 +305,16 @@ def arg_check_del_effect(group, version, namespace, plural, name, *args): elif plural == "rayclusters": assert group == "ray.io" assert version == "v1" - assert name == "unit-test-cluster-ray" + # Accept either name format depending on test + assert name in ["unit-test-cluster", "unit-test-cluster-ray"] elif plural == "ingresses": assert group == "networking.k8s.io" assert version == "v1" - assert name == "ray-dashboard-unit-test-cluster-ray" + # Accept either name format depending on test + assert name in [ + "ray-dashboard-unit-test-cluster", + "ray-dashboard-unit-test-cluster-ray", + ] def apply_template(yaml_file_path, variables): @@ -476,21 +511,21 @@ def mock_server_side_apply(resource, body=None, name=None, namespace=None, **kwa @patch.dict("os.environ", {"NB_PREFIX": "test-prefix"}) -def create_cluster_all_config_params(mocker, cluster_name, is_appwrapper) -> Cluster: +def create_cluster_all_config_params(mocker, cluster_name, is_appwrapper=False): mocker.patch( "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) volumes, volume_mounts = get_example_extended_storage_opts() - config = ClusterConfiguration( + cluster = RayCluster( name=cluster_name, namespace="ns", head_cpu_requests=4, head_cpu_limits=8, - head_memory_requests=12, - head_memory_limits=16, - head_extended_resource_requests={"nvidia.com/gpu": 1, "intel.com/gpu": 2}, + head_memory_requests="12G", + head_memory_limits="16G", + head_accelerators={"nvidia.com/gpu": 1, "intel.com/gpu": 2}, head_tolerations=[ V1Toleration( key="key1", operator="Equal", value="value1", effect="NoSchedule" @@ -504,18 +539,16 @@ def create_cluster_all_config_params(mocker, cluster_name, is_appwrapper) -> Clu ) ], num_workers=10, - worker_memory_requests=12, - worker_memory_limits=16, - appwrapper=is_appwrapper, + worker_memory_requests="12G", + worker_memory_limits="16G", envs={"key1": "value1", "key2": "value2"}, image="example/ray:tag", image_pull_secrets=["secret1", "secret2"], - write_to_file=True, verify_tls=True, labels={"key1": "value1", "key2": "value2"}, - worker_extended_resource_requests={"nvidia.com/gpu": 1}, - extended_resource_mapping={"example.com/gpu": "GPU", "intel.com/gpu": "TPU"}, - overwrite_default_resource_mapping=True, + worker_accelerators={"nvidia.com/gpu": 1}, + accelerator_configs={"example.com/gpu": "GPU", "intel.com/gpu": "TPU"}, + overwrite_default_accelerator_configs=True, local_queue="local-queue-default", annotations={ "key1": "value1", @@ -524,7 +557,9 @@ def create_cluster_all_config_params(mocker, cluster_name, is_appwrapper) -> Clu volumes=volumes, volume_mounts=volume_mounts, ) - return Cluster(config) + cluster.write_to_file = True + cluster.resource_yaml = cluster._create_resource() + return cluster def get_example_extended_storage_opts(): diff --git a/src/codeflare_sdk/common/widgets/test_widgets.py b/src/codeflare_sdk/common/widgets/test_widgets.py index 55be2b75..ac20a57c 100644 --- a/src/codeflare_sdk/common/widgets/test_widgets.py +++ b/src/codeflare_sdk/common/widgets/test_widgets.py @@ -16,9 +16,8 @@ import pandas as pd from unittest.mock import MagicMock, patch from ..utils.unit_test_support import get_local_queue, create_cluster_config -from codeflare_sdk.ray.cluster.cluster import Cluster from codeflare_sdk.ray.cluster.status import ( - RayCluster, + RayClusterInfo, RayClusterStatus, ) import pytest @@ -38,7 +37,7 @@ def test_cluster_apply_down_buttons(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cluster = Cluster(create_cluster_config()) + cluster = create_cluster_config() with patch("ipywidgets.Button") as MockButton, patch( "ipywidgets.Checkbox" @@ -388,7 +387,7 @@ def test_fetch_cluster_data(mocker): assert df.empty # Create mock RayCluster objects - mock_raycluster1 = MagicMock(spec=RayCluster) + mock_raycluster1 = MagicMock(spec=RayClusterInfo) mock_raycluster1.name = "test-cluster-1" mock_raycluster1.namespace = "default" mock_raycluster1.num_workers = 1 @@ -406,7 +405,7 @@ def test_fetch_cluster_data(mocker): mock_raycluster1.status.name = "READY" mock_raycluster1.status = RayClusterStatus.READY - mock_raycluster2 = MagicMock(spec=RayCluster) + mock_raycluster2 = MagicMock(spec=RayClusterInfo) mock_raycluster2.name = "test-cluster-2" mock_raycluster2.namespace = "default" mock_raycluster2.num_workers = 2 diff --git a/src/codeflare_sdk/ray/__init__.py b/src/codeflare_sdk/ray/__init__.py index 7bd0b2c8..44e5ebd4 100644 --- a/src/codeflare_sdk/ray/__init__.py +++ b/src/codeflare_sdk/ray/__init__.py @@ -15,10 +15,11 @@ from .cluster import ( Cluster, ClusterConfiguration, + RayCluster, + RayClusterInfo, get_cluster, list_all_queued, list_all_clusters, RayClusterStatus, CodeFlareClusterStatus, - RayCluster, ) diff --git a/src/codeflare_sdk/ray/appwrapper/__init__.py b/src/codeflare_sdk/ray/appwrapper/__init__.py index 537fdf8a..9e836843 100644 --- a/src/codeflare_sdk/ray/appwrapper/__init__.py +++ b/src/codeflare_sdk/ray/appwrapper/__init__.py @@ -1,6 +1,16 @@ -from .awload import AWManager +# Copyright 2024 IBM, Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. -from .status import ( - AppWrapperStatus, - AppWrapper, -) +from .status import AppWrapper, AppWrapperStatus +from .awload import AWManager diff --git a/src/codeflare_sdk/ray/appwrapper/awload.py b/src/codeflare_sdk/ray/appwrapper/awload.py index 02794f3d..61f33f06 100644 --- a/src/codeflare_sdk/ray/appwrapper/awload.py +++ b/src/codeflare_sdk/ray/appwrapper/awload.py @@ -1,4 +1,4 @@ -# Copyright 2022 IBM, Red Hat +# Copyright 2024 IBM, Red Hat # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,90 +13,14 @@ # limitations under the License. """ -The awload sub-module contains the definition of the AWManager object, which handles -submission and deletion of existing AppWrappers from a user's file system. -""" +Legacy AppWrapper manager. -from os.path import isfile -import errno -import os -import yaml +This is kept for backward compatibility with the deprecated AppWrapper workflow. +""" -from kubernetes import client -from ...common import _kube_api_error_handling -from ...common.kubernetes_cluster.auth import ( - config_check, - get_api_client, -) +import warnings class AWManager: - """ - An object for submitting and removing existing AppWrapper yamls - to be added to the Kueue localqueue. - """ - - def __init__(self, filename: str) -> None: - """ - Create the AppWrapper Manager object by passing in an - AppWrapper yaml file - """ - if not isfile(filename): - raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), filename) - self.filename = filename - try: - with open(self.filename) as f: - self.awyaml = yaml.load(f, Loader=yaml.FullLoader) - assert self.awyaml["kind"] == "AppWrapper" - self.name = self.awyaml["metadata"]["name"] - self.namespace = self.awyaml["metadata"]["namespace"] - except: - raise ValueError( - f"{filename } is not a correctly formatted AppWrapper yaml" - ) - self.submitted = False - - def submit(self) -> None: - """ - Attempts to create the AppWrapper custom resource using the yaml file - """ - try: - config_check() - api_instance = client.CustomObjectsApi(get_api_client()) - api_instance.create_namespaced_custom_object( - group="workload.codeflare.dev", - version="v1beta2", - namespace=self.namespace, - plural="appwrappers", - body=self.awyaml, - ) - except Exception as e: - return _kube_api_error_handling(e) - - self.submitted = True - print(f"AppWrapper {self.filename} submitted!") - - def remove(self) -> None: - """ - Attempts to delete the AppWrapper custom resource matching the name in the yaml, - if submitted by this manager. - """ - if not self.submitted: - print("AppWrapper not submitted by this manager yet, nothing to remove") - return - - try: - config_check() - api_instance = client.CustomObjectsApi(get_api_client()) - api_instance.delete_namespaced_custom_object( - group="workload.codeflare.dev", - version="v1beta2", - namespace=self.namespace, - plural="appwrappers", - name=self.name, - ) - except Exception as e: - return _kube_api_error_handling(e) - - self.submitted = False - print(f"AppWrapper {self.name} removed!") + def __init__(self, *args, **kwargs): + warnings.warn("AppWrapper is deprecated", DeprecationWarning, stacklevel=2) diff --git a/src/codeflare_sdk/ray/appwrapper/status.py b/src/codeflare_sdk/ray/appwrapper/status.py index 79fe0fd2..50e82b82 100644 --- a/src/codeflare_sdk/ray/appwrapper/status.py +++ b/src/codeflare_sdk/ray/appwrapper/status.py @@ -13,8 +13,11 @@ # limitations under the License. """ -The status sub-module defines Enums containing information for -AppWrapper states, as well as dataclasses to store information for AppWrappers. +AppWrapper status types. + +This module exists to keep the legacy AppWrapper workflow functional for users who +still rely on it, even though AppWrapper is no longer part of the new `RayCluster` +API surface. """ from dataclasses import dataclass @@ -22,25 +25,16 @@ class AppWrapperStatus(Enum): - """ - Defines the possible reportable phases of an AppWrapper. - """ - + RUNNING = "running" SUSPENDED = "suspended" + SUSPENDING = "suspending" RESUMING = "resuming" - RUNNING = "running" RESETTING = "resetting" - SUSPENDING = "suspending" - SUCCEEDED = "succeeded" FAILED = "failed" - TERMINATING = "terminating" + UNKNOWN = "unknown" @dataclass class AppWrapper: - """ - For storing information about an AppWrapper. - """ - name: str status: AppWrapperStatus diff --git a/src/codeflare_sdk/ray/appwrapper/test_awload.py b/src/codeflare_sdk/ray/appwrapper/test_awload.py deleted file mode 100644 index 3f45e1a5..00000000 --- a/src/codeflare_sdk/ray/appwrapper/test_awload.py +++ /dev/null @@ -1,93 +0,0 @@ -# Copyright 2024 IBM, Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from codeflare_sdk.common.utils.unit_test_support import ( - apply_template, - arg_check_aw_apply_effect, - arg_check_aw_del_effect, - get_template_variables, -) -from codeflare_sdk.ray.appwrapper import AWManager -from codeflare_sdk.ray.cluster import Cluster, ClusterConfiguration -import os -from pathlib import Path - -parent = Path(__file__).resolve().parents[4] # project directory -aw_dir = os.path.expanduser("~/.codeflare/resources/") - - -def test_AWManager_creation(mocker): - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch("kubernetes.client.CustomObjectsApi.list_namespaced_custom_object") - # Create test.yaml - Cluster( - ClusterConfiguration( - name="test", - namespace="ns", - write_to_file=True, - appwrapper=True, - ) - ) - - testaw = AWManager(f"{aw_dir}test.yaml") - assert testaw.name == "test" - assert testaw.namespace == "ns" - assert testaw.submitted == False - try: - testaw = AWManager("fake") - except Exception as e: - assert type(e) == FileNotFoundError - assert str(e) == "[Errno 2] No such file or directory: 'fake'" - try: - testaw = apply_template( - AWManager( - f"{parent}/tests/test_cluster_yamls/appwrapper/test-case-bad.yaml" - ), - get_template_variables(), - ) - except Exception as e: - assert type(e) == ValueError - assert ( - str(e) - == f"{parent}/tests/test_cluster_yamls/appwrapper/test-case-bad.yaml is not a correctly formatted AppWrapper yaml" - ) - - -def test_AWManager_submit_remove(mocker, capsys): - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - testaw = AWManager(f"{aw_dir}test.yaml") - testaw.remove() - captured = capsys.readouterr() - assert ( - captured.out - == "AppWrapper not submitted by this manager yet, nothing to remove\n" - ) - assert testaw.submitted == False - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "kubernetes.client.CustomObjectsApi.create_namespaced_custom_object", - side_effect=arg_check_aw_apply_effect, - ) - mocker.patch( - "kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object", - side_effect=arg_check_aw_del_effect, - ) - testaw.submit() - assert testaw.submitted == True - testaw.remove() - assert testaw.submitted == False - - -# Make sure to always keep this function last -def test_cleanup(): - os.remove(f"{aw_dir}test.yaml") diff --git a/src/codeflare_sdk/ray/appwrapper/test_status.py b/src/codeflare_sdk/ray/appwrapper/test_status.py deleted file mode 100644 index a3fcf870..00000000 --- a/src/codeflare_sdk/ray/appwrapper/test_status.py +++ /dev/null @@ -1,105 +0,0 @@ -# Copyright 2024 IBM, Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from codeflare_sdk.ray.cluster.cluster import ( - _app_wrapper_status, - Cluster, - ClusterConfiguration, -) -from codeflare_sdk.ray.appwrapper import AppWrapper, AppWrapperStatus -from codeflare_sdk.ray.cluster.status import CodeFlareClusterStatus -from codeflare_sdk.common.utils.unit_test_support import get_local_queue -import os - -aw_dir = os.path.expanduser("~/.codeflare/resources/") - - -def test_cluster_status(mocker): - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", - return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), - ) - fake_aw = AppWrapper("test", AppWrapperStatus.FAILED) - - cf = Cluster( - ClusterConfiguration( - name="test", - namespace="ns", - write_to_file=True, - appwrapper=True, - local_queue="local-queue-default", - ) - ) - mocker.patch( - "codeflare_sdk.ray.cluster.cluster._app_wrapper_status", return_value=None - ) - mocker.patch( - "codeflare_sdk.ray.cluster.cluster._ray_cluster_status", return_value=None - ) - status, ready = cf.status() - assert status == CodeFlareClusterStatus.UNKNOWN - assert ready == False - - mocker.patch( - "codeflare_sdk.ray.cluster.cluster._app_wrapper_status", return_value=fake_aw - ) - status, ready = cf.status() - assert status == CodeFlareClusterStatus.FAILED - assert ready == False - - fake_aw.status = AppWrapperStatus.SUSPENDED - status, ready = cf.status() - assert status == CodeFlareClusterStatus.QUEUED - assert ready == False - - fake_aw.status = AppWrapperStatus.RESUMING - status, ready = cf.status() - assert status == CodeFlareClusterStatus.STARTING - assert ready == False - - fake_aw.status = AppWrapperStatus.RESETTING - status, ready = cf.status() - assert status == CodeFlareClusterStatus.STARTING - assert ready == False - - fake_aw.status = AppWrapperStatus.RUNNING - status, ready = cf.status() - assert status == CodeFlareClusterStatus.UNKNOWN - assert ready == False - - -def aw_status_fields(group, version, namespace, plural, *args): - assert group == "workload.codeflare.dev" - assert version == "v1beta2" - assert namespace == "test-ns" - assert plural == "appwrappers" - assert args == tuple() - return {"items": []} - - -def test_aw_status(mocker): - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", - side_effect=aw_status_fields, - ) - aw = _app_wrapper_status("test-aw", "test-ns") - assert aw == None - - -# Make sure to always keep this function last -def test_cleanup(): - os.remove(f"{aw_dir}test.yaml") diff --git a/src/codeflare_sdk/ray/cluster/__init__.py b/src/codeflare_sdk/ray/cluster/__init__.py index bf32459b..dc9ea584 100644 --- a/src/codeflare_sdk/ray/cluster/__init__.py +++ b/src/codeflare_sdk/ray/cluster/__init__.py @@ -1,12 +1,19 @@ from .status import ( RayClusterStatus, CodeFlareClusterStatus, + RayClusterInfo, +) + +from .raycluster import ( RayCluster, ) +from .config import ( + ClusterConfiguration, +) + from .cluster import ( Cluster, - ClusterConfiguration, get_cluster, list_all_queued, list_all_clusters, diff --git a/src/codeflare_sdk/ray/cluster/build_ray_cluster.py b/src/codeflare_sdk/ray/cluster/build_ray_cluster.py index 6a3984b1..3d077266 100644 --- a/src/codeflare_sdk/ray/cluster/build_ray_cluster.py +++ b/src/codeflare_sdk/ray/cluster/build_ray_cluster.py @@ -484,6 +484,9 @@ def local_queue_exists(cluster: "codeflare_sdk.ray.cluster.Cluster"): """ The local_queue_exists() checks if the user inputted local_queue exists in the given namespace and returns a bool """ + # Return False if namespace is None to avoid API errors + if cluster.config.namespace is None: + return False # get all local queues in the namespace try: config_check() @@ -507,6 +510,9 @@ def get_default_local_queue(cluster: "codeflare_sdk.ray.cluster.Cluster", labels """ The get_default_local_queue() function attempts to find a local queue with the default label == true, if that is the case the labels variable is updated with that local queue """ + # Return early if namespace is None to avoid API errors + if cluster.config.namespace is None: + return try: # Try to get the default local queue if it exists and append the label list config_check() diff --git a/src/codeflare_sdk/ray/cluster/cluster.py b/src/codeflare_sdk/ray/cluster/cluster.py index ea0a83f5..476a2df3 100644 --- a/src/codeflare_sdk/ray/cluster/cluster.py +++ b/src/codeflare_sdk/ray/cluster/cluster.py @@ -20,11 +20,8 @@ from time import sleep from typing import List, Optional, Tuple, Dict -import copy -from ray.job_submission import JobSubmissionClient, JobStatus -import time -import uuid +from ray.job_submission import JobSubmissionClient import warnings from ...common.utils import get_current_namespace @@ -41,7 +38,7 @@ from .config import ClusterConfiguration from .status import ( CodeFlareClusterStatus, - RayCluster, + RayClusterInfo, RayClusterStatus, ) from ..appwrapper import ( @@ -54,12 +51,9 @@ ) from kubernetes import client import yaml -import os import requests -from kubernetes import config from kubernetes.dynamic import DynamicClient -from kubernetes import client as k8s_client from kubernetes.client.rest import ApiException from kubernetes.client.rest import ApiException @@ -488,7 +482,7 @@ def wait_ready(self, timeout: Optional[int] = None, dashboard_check: bool = True sleep(5) time += 5 - def details(self, print_to_console: bool = True) -> RayCluster: + def details(self, print_to_console: bool = True) -> RayClusterInfo: """ Retrieves details about the Ray Cluster. @@ -954,7 +948,7 @@ def _app_wrapper_status(name, namespace="default") -> Optional[AppWrapper]: return None -def _ray_cluster_status(name, namespace="default") -> Optional[RayCluster]: +def _ray_cluster_status(name, namespace="default") -> Optional[RayClusterInfo]: try: config_check() api_instance = client.CustomObjectsApi(get_api_client()) @@ -975,7 +969,7 @@ def _ray_cluster_status(name, namespace="default") -> Optional[RayCluster]: def _get_ray_clusters( namespace="default", filter: Optional[List[RayClusterStatus]] = None -) -> List[RayCluster]: +) -> List[RayClusterInfo]: list_of_clusters = [] try: config_check() @@ -1028,7 +1022,7 @@ def _get_app_wrappers( return list_of_app_wrappers -def _map_to_ray_cluster(rc) -> Optional[RayCluster]: +def _map_to_ray_cluster(rc) -> Optional[RayClusterInfo]: if "status" in rc and "state" in rc["status"]: status = RayClusterStatus(rc["status"]["state"].lower()) else: @@ -1086,7 +1080,7 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]: worker_extended_resources, ) = Cluster._head_worker_extended_resources_from_rc_dict(rc) - return RayCluster( + return RayClusterInfo( name=rc["metadata"]["name"], status=status, # for now we are not using autoscaling so same replicas is fine @@ -1134,8 +1128,8 @@ def _map_to_app_wrapper(aw) -> AppWrapper: ) -def _copy_to_ray(cluster: Cluster) -> RayCluster: - ray = RayCluster( +def _copy_to_ray(cluster: Cluster) -> RayClusterInfo: + ray = RayClusterInfo( name=cluster.config.name, status=cluster.status(print_to_console=False)[0], num_workers=cluster.config.num_workers, diff --git a/src/codeflare_sdk/ray/cluster/config.py b/src/codeflare_sdk/ray/cluster/config.py index 7759202b..d6384479 100644 --- a/src/codeflare_sdk/ray/cluster/config.py +++ b/src/codeflare_sdk/ray/cluster/config.py @@ -19,6 +19,7 @@ """ import pathlib +from typing_extensions import deprecated import warnings from dataclasses import dataclass, field, fields from typing import Dict, List, Optional, Union, get_args, get_origin @@ -39,9 +40,12 @@ } +@deprecated("Use RayCluster instead") @dataclass class ClusterConfiguration: """ + [DEPRECATED] Use RayCluster instead. + This dataclass is used to specify resource requirements and other details, and is passed in as an argument when creating a Cluster object. diff --git a/src/codeflare_sdk/ray/cluster/pretty_print.py b/src/codeflare_sdk/ray/cluster/pretty_print.py index faa03258..6b44c8a5 100644 --- a/src/codeflare_sdk/ray/cluster/pretty_print.py +++ b/src/codeflare_sdk/ray/cluster/pretty_print.py @@ -17,14 +17,12 @@ (in the cluster sub-module) for pretty-printing cluster status and details. """ -from rich import print from rich.table import Table from rich.console import Console -from rich.layout import Layout from rich.panel import Panel from rich import box from typing import List -from .status import RayCluster, RayClusterStatus +from .status import RayClusterInfo, RayClusterStatus from ..appwrapper.status import AppWrapper @@ -85,7 +83,7 @@ def print_ray_clusters_status(app_wrappers: List[AppWrapper], starting: bool = F console.print(Panel.fit(table)) -def print_cluster_status(cluster: RayCluster): +def print_cluster_status(cluster: RayClusterInfo): "Pretty prints the status of a passed-in cluster" if not cluster: print_no_resources_found() @@ -124,7 +122,7 @@ def print_cluster_status(cluster: RayCluster): console.print(table5) -def print_clusters(clusters: List[RayCluster]): +def print_clusters(clusters: List[RayClusterInfo]): if not clusters: print_no_resources_found() return # shortcircuit diff --git a/src/codeflare_sdk/ray/cluster/raycluster.py b/src/codeflare_sdk/ray/cluster/raycluster.py new file mode 100644 index 00000000..c8a61d66 --- /dev/null +++ b/src/codeflare_sdk/ray/cluster/raycluster.py @@ -0,0 +1,1252 @@ +# Copyright 2024 IBM, Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +The RayCluster class provides a unified interface for creating and managing Ray clusters. +It combines configuration and operational methods in a single object. +""" + +from time import sleep +from typing import List, Optional, Tuple, Dict, Union, Any, get_args, get_origin +import warnings + +from ray.job_submission import JobSubmissionClient +import requests + +from kubernetes import client +from kubernetes.dynamic import DynamicClient +from kubernetes.client.rest import ApiException +from kubernetes.client import ( + V1Toleration, + V1Volume, + V1VolumeMount, + V1LocalObjectReference, + V1ObjectMeta, + V1Container, + V1ContainerPort, + V1Lifecycle, + V1ExecAction, + V1LifecycleHandler, + V1EnvVar, + V1PodTemplateSpec, + V1PodSpec, + V1ResourceRequirements, +) +import yaml + +from ...common.utils import get_current_namespace +from ...common.utils.constants import RAY_VERSION +from ...common.utils.utils import update_image +from ...common.kubernetes_cluster.auth import config_check, get_api_client +from ...common import _kube_api_error_handling +from ...common.widgets.widgets import cluster_apply_down_buttons, is_notebook + +from . import pretty_print +from .build_ray_cluster import build_ray_cluster, head_worker_gpu_count_from_cluster +from .build_ray_cluster import write_to_file as write_cluster_to_file +from .status import CodeFlareClusterStatus, RayClusterInfo, RayClusterStatus +from ..appwrapper import AppWrapper, AppWrapperStatus + + +# https://docs.ray.io/en/latest/ray-core/scheduling/accelerators.html +DEFAULT_ACCELERATORS = { + "nvidia.com/gpu": "GPU", + "intel.com/gpu": "GPU", + "amd.com/gpu": "GPU", + "aws.amazon.com/neuroncore": "neuron_cores", + "google.com/tpu": "TPU", + "habana.ai/gaudi": "HPU", + "huawei.com/Ascend910": "NPU", + "huawei.com/Ascend310": "NPU", +} + +CF_SDK_FIELD_MANAGER = "codeflare-sdk" + + +class RayCluster: + """ + A unified object for configuring, requesting, bringing up, and taking down Ray clusters. + + This is the recommended way to create and manage Ray clusters. It combines configuration + and operational methods in a single object, replacing the previous pattern of + Cluster(ClusterConfiguration(...)). + + Example: + cluster = RayCluster( + name='my-cluster', + namespace='default', + num_workers=2, + worker_accelerators={'nvidia.com/gpu': 1} + ) + cluster.apply() + cluster.wait_ready() + # ... use cluster ... + cluster.down() + + Args: + name: + The name of the cluster. Required. + namespace: + The namespace in which the cluster should be created. + head_cpu_requests: + CPU requests for the head node. + head_cpu_limits: + CPU limits for the head node. + head_memory_requests: + Memory requests for the head node (int for GB, or string with unit). + head_memory_limits: + Memory limits for the head node (int for GB, or string with unit). + head_accelerators: + A dictionary of accelerator requests for the head node. ex: {"nvidia.com/gpu": 1} + head_tolerations: + List of tolerations for head nodes. + worker_cpu_requests: + CPU requests for each worker. + worker_cpu_limits: + CPU limits for each worker. + num_workers: + The number of workers to create. + worker_memory_requests: + Memory requests for each worker. + worker_memory_limits: + Memory limits for each worker. + worker_tolerations: + List of tolerations for worker nodes. + worker_accelerators: + A dictionary of accelerator requests for each worker. ex: {"nvidia.com/gpu": 1} + envs: + A dictionary of environment variables to set for the cluster. + image: + The image to use for the cluster. + image_pull_secrets: + A list of image pull secrets to use for the cluster. + verify_tls: + A boolean indicating whether to verify TLS when connecting to the cluster. + labels: + A dictionary of labels to apply to the cluster. + accelerator_configs: + A dictionary mapping accelerator resource names to Ray resource names. + overwrite_default_accelerator_configs: + A boolean indicating whether to overwrite the default accelerator configs. + local_queue: + The Kueue local queue to use for scheduling. + annotations: + A dictionary of annotations to apply to the cluster. + volumes: + A list of V1Volume objects to add to the Cluster. + volume_mounts: + A list of V1VolumeMount objects to add to the Cluster. + enable_gcs_ft: + A boolean indicating whether to enable GCS fault tolerance. + enable_usage_stats: + A boolean indicating whether to capture and send Ray usage stats externally. + redis_address: + The address of the Redis server for GCS fault tolerance. + redis_password_secret: + Kubernetes secret reference for Redis password. + external_storage_namespace: + The storage namespace for GCS fault tolerance. + """ + + def __init__( + self, + name: str, + namespace: Optional[str] = None, + head_cpu_requests: Union[int, str] = 1, + head_cpu_limits: Union[int, str] = 2, + head_memory_requests: Union[int, str] = 5, + head_memory_limits: Union[int, str] = 8, + head_accelerators: Optional[Dict[str, Union[str, int]]] = None, + head_tolerations: Optional[List[V1Toleration]] = None, + worker_cpu_requests: Union[int, str] = 1, + worker_cpu_limits: Union[int, str] = 1, + num_workers: int = 1, + worker_memory_requests: Union[int, str] = 3, + worker_memory_limits: Union[int, str] = 6, + worker_tolerations: Optional[List[V1Toleration]] = None, + worker_accelerators: Optional[Dict[str, Union[str, int]]] = None, + envs: Optional[Dict[str, str]] = None, + image: str = "", + image_pull_secrets: Optional[List[str]] = None, + verify_tls: bool = True, + labels: Optional[Dict[str, str]] = None, + accelerator_configs: Optional[Dict[str, str]] = None, + overwrite_default_accelerator_configs: bool = False, + local_queue: Optional[str] = None, + annotations: Optional[Dict[str, str]] = None, + volumes: Optional[List[V1Volume]] = None, + volume_mounts: Optional[List[V1VolumeMount]] = None, + enable_gcs_ft: bool = False, + enable_usage_stats: bool = False, + redis_address: Optional[str] = None, + redis_password_secret: Optional[Dict[str, str]] = None, + external_storage_namespace: Optional[str] = None, + write_to_file: bool = False, + ): + # Store all configuration as instance attributes + self.name = name + self.namespace = namespace + self.head_cpu_requests = head_cpu_requests + self.head_cpu_limits = head_cpu_limits + self.head_memory_requests = head_memory_requests + self.head_memory_limits = head_memory_limits + self.head_accelerators = ( + head_accelerators if head_accelerators is not None else {} + ) + self.head_tolerations = head_tolerations + self.worker_cpu_requests = worker_cpu_requests + self.worker_cpu_limits = worker_cpu_limits + self.num_workers = num_workers + self.worker_memory_requests = worker_memory_requests + self.worker_memory_limits = worker_memory_limits + self.worker_tolerations = worker_tolerations + self.worker_accelerators = ( + worker_accelerators if worker_accelerators is not None else {} + ) + self.envs = envs if envs is not None else {} + self.image = image + self.image_pull_secrets = ( + image_pull_secrets if image_pull_secrets is not None else [] + ) + self.verify_tls = verify_tls + self.labels = labels if labels is not None else {} + self.accelerator_configs = ( + accelerator_configs + if accelerator_configs is not None + else DEFAULT_ACCELERATORS.copy() + ) + self.overwrite_default_accelerator_configs = ( + overwrite_default_accelerator_configs + ) + self.local_queue = local_queue + self.annotations = annotations if annotations is not None else {} + self.volumes = volumes if volumes is not None else [] + self.volume_mounts = volume_mounts if volume_mounts is not None else [] + self.enable_gcs_ft = enable_gcs_ft + self.enable_usage_stats = enable_usage_stats + self.redis_address = redis_address + self.redis_password_secret = redis_password_secret + self.external_storage_namespace = external_storage_namespace + + # Internal state + self._job_submission_client = None + self.resource_yaml = None + + # For backward compatibility - RayCluster doesn't support AppWrapper + # Users needing AppWrapper should use deprecated ClusterConfiguration + self.appwrapper = False + self.write_to_file = write_to_file + + # Run validation and initialization + self._post_init() + + # Create resource yaml + self.resource_yaml = self._create_resource() + + # Display widgets if in notebook + if is_notebook(): + cluster_apply_down_buttons(self) + + def _post_init(self): + """Post-initialization validation and setup.""" + # Type validation + errors = [] + if not isinstance(self.num_workers, int) or isinstance(self.num_workers, bool): + errors.append(f"'num_workers' should be of type int.") + if not isinstance(self.worker_cpu_requests, (int, str)) or isinstance( + self.worker_cpu_requests, bool + ): + errors.append(f"'worker_cpu_requests' should be of type Union[int, str].") + if self.labels and not all( + isinstance(k, str) and isinstance(v, str) for k, v in self.labels.items() + ): + errors.append(f"'labels' should be of type Dict[str, str].") + + if errors: + raise TypeError("Type validation failed:\n" + "\n".join(errors)) + + if not self.verify_tls: + print( + "Warning: TLS verification has been disabled - Endpoint checks will be bypassed" + ) + + if self.enable_usage_stats: + self.envs["RAY_USAGE_STATS_ENABLED"] = "1" + else: + self.envs["RAY_USAGE_STATS_ENABLED"] = "0" + + if self.enable_gcs_ft: + if not self.redis_address: + raise ValueError( + "redis_address must be provided when enable_gcs_ft is True" + ) + + if self.redis_password_secret and not isinstance( + self.redis_password_secret, dict + ): + raise ValueError( + "redis_password_secret must be a dictionary with 'name' and 'key' fields" + ) + + if self.redis_password_secret and ( + "name" not in self.redis_password_secret + or "key" not in self.redis_password_secret + ): + raise ValueError( + "redis_password_secret must contain both 'name' and 'key' fields" + ) + + self._memory_to_string() + self._str_mem_no_unit_add_GB() + self._combine_accelerator_configs() + self._validate_accelerators(self.head_accelerators) + self._validate_accelerators(self.worker_accelerators) + + def _combine_accelerator_configs(self): + """Combine user accelerator configs with defaults.""" + if overwritten := set(self.accelerator_configs.keys()).intersection( + DEFAULT_ACCELERATORS.keys() + ): + if self.overwrite_default_accelerator_configs: + warnings.warn( + f"Overwriting default accelerator configs for {overwritten}", + UserWarning, + ) + self.accelerator_configs = { + **DEFAULT_ACCELERATORS, + **self.accelerator_configs, + } + + def _validate_accelerators(self, accelerators: Dict[str, int]): + """Validate that accelerators are in the config.""" + for k in accelerators.keys(): + if k not in self.accelerator_configs.keys(): + raise ValueError( + f"Accelerator '{k}' not found in accelerator_configs, available resources are {list(self.accelerator_configs.keys())}, to add more supported resources use accelerator_configs. i.e. accelerator_configs = {{'{k}': 'FOO_BAR'}}" + ) + + def _str_mem_no_unit_add_GB(self): + """Add GB unit to string memory values without units.""" + if ( + isinstance(self.worker_memory_requests, str) + and self.worker_memory_requests.isdecimal() + ): + self.worker_memory_requests = f"{self.worker_memory_requests}G" + if ( + isinstance(self.worker_memory_limits, str) + and self.worker_memory_limits.isdecimal() + ): + self.worker_memory_limits = f"{self.worker_memory_limits}G" + + def _memory_to_string(self): + """Convert integer memory values to strings with GB unit.""" + if isinstance(self.head_memory_requests, int): + self.head_memory_requests = f"{self.head_memory_requests}G" + if isinstance(self.head_memory_limits, int): + self.head_memory_limits = f"{self.head_memory_limits}G" + if isinstance(self.worker_memory_requests, int): + self.worker_memory_requests = f"{self.worker_memory_requests}G" + if isinstance(self.worker_memory_limits, int): + self.worker_memory_limits = f"{self.worker_memory_limits}G" + + # Properties for backward compatibility with ClusterConfiguration field names + @property + def head_extended_resource_requests(self) -> Dict[str, Union[str, int]]: + """Backward compatibility alias for head_accelerators.""" + return self.head_accelerators + + @property + def worker_extended_resource_requests(self) -> Dict[str, Union[str, int]]: + """Backward compatibility alias for worker_accelerators.""" + return self.worker_accelerators + + @property + def extended_resource_mapping(self) -> Dict[str, str]: + """Backward compatibility alias for accelerator_configs.""" + return self.accelerator_configs + + @property + def overwrite_default_resource_mapping(self) -> bool: + """Backward compatibility alias for overwrite_default_accelerator_configs.""" + return self.overwrite_default_accelerator_configs + + # Provide config property that returns self for compatibility with Cluster interface + @property + def config(self): + """Return self for compatibility with code expecting cluster.config.""" + return self + + # ==================== Operational Methods (from Cluster) ==================== + + def get_dynamic_client(self): # pragma: no cover + """Get a DynamicClient for Kubernetes API access.""" + return DynamicClient(get_api_client()) + + def config_check(self): + """Check Kubernetes configuration.""" + return config_check() + + @property + def _client_headers(self): + """Get authorization headers for API requests.""" + k8_client = get_api_client() + return { + "Authorization": k8_client.configuration.get_api_key_with_prefix( + "authorization" + ) + } + + @property + def _client_verify_tls(self): + """Get TLS verification setting.""" + return _is_openshift_cluster() and self.verify_tls + + @property + def job_client(self): + """Get the Ray Job Submission Client.""" + if self._job_submission_client: + return self._job_submission_client + if _is_openshift_cluster(): + self._job_submission_client = JobSubmissionClient( + self.cluster_dashboard_uri(), + headers=self._client_headers, + verify=self._client_verify_tls, + ) + else: + self._job_submission_client = JobSubmissionClient( + self.cluster_dashboard_uri() + ) + return self._job_submission_client + + def _create_resource(self): + """ + Create the RayCluster yaml based on the configuration. + """ + if self.namespace is None: + self.namespace = get_current_namespace() + if self.namespace is None: + print("Please specify with namespace=") + elif type(self.namespace) is not str: + raise TypeError( + f"Namespace {self.namespace} is of type {type(self.namespace)}. Check your Kubernetes Authentication." + ) + return build_ray_cluster(self) + + def apply(self, force=False): + """ + Applies the RayCluster yaml using server-side apply. + If 'force' is set to True, conflicts will be forced. + """ + self._throw_for_no_raycluster() + namespace = self.namespace + name = self.name + + # Regenerate resource_yaml to reflect any configuration changes + self.resource_yaml = self._create_resource() + + try: + self.config_check() + crds = self.get_dynamic_client().resources + api_version = "ray.io/v1" + api_instance = crds.get(api_version=api_version, kind="RayCluster") + self._component_resources_apply( + namespace=namespace, api_instance=api_instance + ) + print( + f"Ray Cluster: '{name}' has successfully been applied. For optimal resource management, you should delete this Ray Cluster when no longer in use." + ) + except AttributeError as e: + raise RuntimeError(f"Failed to initialize DynamicClient: {e}") + except Exception as e: # pragma: no cover + if hasattr(e, "status") and e.status == 422: + print( + "WARNING: RayCluster creation rejected due to invalid Kueue configuration. Please contact your administrator." + ) + else: + print( + "WARNING: Failed to create RayCluster due to unexpected error. Please contact your administrator." + ) + return _kube_api_error_handling(e) + + def _throw_for_no_raycluster(self): + """Check if RayCluster CRD exists.""" + api_instance = client.CustomObjectsApi(get_api_client()) + try: + api_instance.list_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=self.namespace, + plural="rayclusters", + ) + except ApiException as e: + if e.status == 404: + raise RuntimeError( + "RayCluster CustomResourceDefinition unavailable contact your administrator." + ) + else: + raise RuntimeError( + "Failed to get RayCluster CustomResourceDefinition: " + str(e) + ) + + def down(self): + """ + Deletes the RayCluster, scaling-down and deleting all resources + associated with the cluster. + """ + namespace = self.namespace + resource_name = self.name + self._throw_for_no_raycluster() + try: + self.config_check() + api_instance = client.CustomObjectsApi(get_api_client()) + _delete_resources(resource_name, namespace, api_instance) + print(f"Ray Cluster: '{self.name}' has successfully been deleted") + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + + def status( + self, print_to_console: bool = True + ) -> Tuple[CodeFlareClusterStatus, bool]: + """ + Returns the requested cluster's status, as well as whether or not + it is ready for use. + """ + ready = False + status = CodeFlareClusterStatus.UNKNOWN + + # check the ray cluster status + cluster = _ray_cluster_status(self.name, self.namespace) + if cluster: + if cluster.status == RayClusterStatus.SUSPENDED: + ready = False + status = CodeFlareClusterStatus.SUSPENDED + if cluster.status == RayClusterStatus.UNKNOWN: + ready = False + status = CodeFlareClusterStatus.STARTING + if cluster.status == RayClusterStatus.READY: + ready = True + status = CodeFlareClusterStatus.READY + elif cluster.status in [ + RayClusterStatus.UNHEALTHY, + RayClusterStatus.FAILED, + ]: + ready = False + status = CodeFlareClusterStatus.FAILED + + if print_to_console: + # overriding the number of gpus with requested + _, cluster.worker_gpu = head_worker_gpu_count_from_cluster(self) + pretty_print.print_cluster_status(cluster) + elif print_to_console: + if status == CodeFlareClusterStatus.UNKNOWN: + pretty_print.print_no_resources_found() + + return status, ready + + def is_dashboard_ready(self) -> bool: + """ + Checks if the cluster's dashboard is ready and accessible. + + Returns: + bool: True if the dashboard is ready, False otherwise. + """ + dashboard_uri = self.cluster_dashboard_uri() + if dashboard_uri is None: + return False + + try: + response = requests.get( + dashboard_uri, + headers=self._client_headers, + timeout=5, + verify=self._client_verify_tls, + allow_redirects=False, + ) + except requests.exceptions.SSLError: # pragma no cover + return False + except Exception: # pragma no cover + return False + + if response.status_code in (200, 302, 401, 403): + return True + else: + return False + + def wait_ready(self, timeout: Optional[int] = None, dashboard_check: bool = True): + """ + Waits for the requested cluster to be ready, up to an optional timeout. + + Args: + timeout (Optional[int]): + The maximum time to wait for the cluster to be ready in seconds. + dashboard_check (bool): + Flag to determine if the dashboard readiness should be checked. + """ + print("Waiting for requested resources to be set up...") + time = 0 + while True: + if timeout and time >= timeout: + raise TimeoutError( + f"wait() timed out after waiting {timeout}s for cluster to be ready" + ) + status, ready = self.status(print_to_console=False) + if status == CodeFlareClusterStatus.UNKNOWN: + print( + "WARNING: Current cluster status is unknown, have you run cluster.apply() yet? Run cluster.details() to check if it's ready." + ) + if ready: + break + sleep(5) + time += 5 + print("Requested cluster is up and running!") + + while dashboard_check: + if timeout and time >= timeout: + raise TimeoutError( + f"wait() timed out after waiting {timeout}s for dashboard to be ready" + ) + if self.is_dashboard_ready(): + print("Dashboard is ready!") + break + # Check if dashboard URI is available (not an error message) + dashboard_uri = self.cluster_dashboard_uri() + if ( + "not available" in dashboard_uri.lower() + or "not ready" in dashboard_uri.lower() + ): + print("Waiting for dashboard route/HTTPRoute to be created...") + elif dashboard_uri.startswith("http://") or dashboard_uri.startswith( + "https://" + ): + print(f"Waiting for dashboard to become accessible: {dashboard_uri}") + sleep(5) + time += 5 + + def details(self, print_to_console: bool = True) -> RayClusterInfo: + """ + Retrieves details about the Ray Cluster. + + Args: + print_to_console (bool): Flag to determine if details should be printed. + + Returns: + RayClusterInfo: A copy of the Ray Cluster details. + """ + cluster = _copy_to_ray(self) + if print_to_console: + pretty_print.print_clusters([cluster]) + return cluster + + def cluster_uri(self) -> str: + """Returns a string containing the cluster's URI.""" + return f"ray://{self.name}-head-svc.{self.namespace}.svc:10001" + + def cluster_dashboard_uri(self) -> str: + """ + Returns a string containing the cluster's dashboard URI. + """ + config_check() + + # Try HTTPRoute first (RHOAI v3.0+) + httproute_url = _get_dashboard_url_from_httproute(self.name, self.namespace) + if httproute_url: + return httproute_url + + # Fall back to OpenShift Routes or Ingresses + if _is_openshift_cluster(): + try: + api_instance = client.CustomObjectsApi(get_api_client()) + routes = api_instance.list_namespaced_custom_object( + group="route.openshift.io", + version="v1", + namespace=self.namespace, + plural="routes", + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + + for route in routes["items"]: + if route["metadata"]["name"] == f"ray-dashboard-{self.name}" or route[ + "metadata" + ]["name"].startswith(f"{self.name}-ingress"): + protocol = "https" if route["spec"].get("tls") else "http" + return f"{protocol}://{route['spec']['host']}" + return "Dashboard not available yet, have you run cluster.apply()?" + else: + try: + api_instance = client.NetworkingV1Api(get_api_client()) + ingresses = api_instance.list_namespaced_ingress(self.namespace) + except Exception as e: # pragma no cover + return _kube_api_error_handling(e) + + for ingress in ingresses.items: + annotations = ingress.metadata.annotations + protocol = "http" + if ( + ingress.metadata.name == f"ray-dashboard-{self.name}" + or ingress.metadata.name.startswith(f"{self.name}-ingress") + ): + if annotations is None: + protocol = "http" + elif "route.openshift.io/termination" in annotations: + protocol = "https" + return f"{protocol}://{ingress.spec.rules[0].host}" + return "Dashboard not available yet, have you run cluster.apply()? Run cluster.details() to check if it's ready." + + def list_jobs(self) -> List: + """Lists the running jobs on the cluster.""" + return self.job_client.list_jobs() + + def job_status(self, job_id: str) -> str: + """Returns the job status for the provided job id.""" + return self.job_client.get_job_status(job_id) + + def job_logs(self, job_id: str) -> str: + """Returns the logs for the provided job id.""" + return self.job_client.get_job_logs(job_id) + + @staticmethod + def _head_worker_extended_resources_from_rc_dict(rc: Dict) -> Tuple[dict, dict]: + """Extract extended resources from a RayCluster dict.""" + head_extended_resources, worker_extended_resources = {}, {} + for resource in rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["limits"].keys(): + if resource in ["memory", "cpu"]: + continue + worker_extended_resources[resource] = rc["spec"]["workerGroupSpecs"][0][ + "template" + ]["spec"]["containers"][0]["resources"]["limits"][resource] + + for resource in rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][ + 0 + ]["resources"]["limits"].keys(): + if resource in ["memory", "cpu"]: + continue + head_extended_resources[resource] = rc["spec"]["headGroupSpec"]["template"][ + "spec" + ]["containers"][0]["resources"]["limits"][resource] + + return head_extended_resources, worker_extended_resources + + def local_client_url(self): + """Returns the Ray client URL based on the ingress domain.""" + ingress_domain = _get_ingress_domain(self) + return f"ray://{ingress_domain}" + + def _component_resources_apply( + self, namespace: str, api_instance: client.CustomObjectsApi + ): + """Apply RayCluster resources.""" + _apply_ray_cluster(self.resource_yaml, namespace, api_instance) + + # ==================== RayJob Integration Methods ==================== + + def build_ray_cluster_spec(self, cluster_name: str) -> Dict[str, Any]: + """ + Build the RayCluster spec for embedding in RayJob. + + Args: + cluster_name: The name for the cluster (derived from RayJob name) + + Returns: + Dict containing the RayCluster spec for embedding in RayJob + """ + ray_cluster_spec = { + "rayVersion": RAY_VERSION, + "enableInTreeAutoscaling": False, + "headGroupSpec": self._build_head_group_spec(), + "workerGroupSpecs": [self._build_worker_group_spec(cluster_name)], + } + return ray_cluster_spec + + def _build_head_group_spec(self) -> Dict[str, Any]: + """Build the head group specification.""" + return { + "serviceType": "ClusterIP", + "enableIngress": False, + "rayStartParams": self._build_head_ray_params(), + "template": V1PodTemplateSpec( + metadata=V1ObjectMeta(annotations=self.annotations), + spec=self._build_pod_spec(self._build_head_container(), is_head=True), + ), + } + + def _build_worker_group_spec(self, cluster_name: str) -> Dict[str, Any]: + """Build the worker group specification.""" + return { + "replicas": self.num_workers, + "minReplicas": self.num_workers, + "maxReplicas": self.num_workers, + "groupName": f"worker-group-{cluster_name}", + "rayStartParams": self._build_worker_ray_params(), + "template": V1PodTemplateSpec( + metadata=V1ObjectMeta(annotations=self.annotations), + spec=self._build_pod_spec( + self._build_worker_container(), + is_head=False, + ), + ), + } + + def _build_head_ray_params(self) -> Dict[str, str]: + """Build Ray start parameters for head node.""" + params = { + "dashboard-host": "0.0.0.0", + "block": "true", + } + + if self.head_accelerators: + gpu_count = sum( + count + for resource_type, count in self.head_accelerators.items() + if "gpu" in resource_type.lower() + ) + if gpu_count > 0: + params["num-gpus"] = str(gpu_count) + + return params + + def _build_worker_ray_params(self) -> Dict[str, str]: + """Build Ray start parameters for worker nodes.""" + params = { + "block": "true", + } + + if self.worker_accelerators: + gpu_count = sum( + count + for resource_type, count in self.worker_accelerators.items() + if "gpu" in resource_type.lower() + ) + if gpu_count > 0: + params["num-gpus"] = str(gpu_count) + + return params + + def _build_head_container(self) -> V1Container: + """Build the head container specification.""" + container = V1Container( + name="ray-head", + image=update_image(self.image), + image_pull_policy="IfNotPresent", + ports=[ + V1ContainerPort(name="gcs", container_port=6379), + V1ContainerPort(name="dashboard", container_port=8265), + V1ContainerPort(name="client", container_port=10001), + ], + lifecycle=V1Lifecycle( + pre_stop=V1LifecycleHandler( + _exec=V1ExecAction(command=["/bin/sh", "-c", "ray stop"]) + ) + ), + resources=self._build_resource_requirements( + self.head_cpu_requests, + self.head_cpu_limits, + self.head_memory_requests, + self.head_memory_limits, + self.head_accelerators, + ), + volume_mounts=self._generate_volume_mounts(), + env=self._build_env_vars() if self.envs else None, + ) + return container + + def _build_worker_container(self) -> V1Container: + """Build the worker container specification.""" + container = V1Container( + name="ray-worker", + image=update_image(self.image), + image_pull_policy="IfNotPresent", + lifecycle=V1Lifecycle( + pre_stop=V1LifecycleHandler( + _exec=V1ExecAction(command=["/bin/sh", "-c", "ray stop"]) + ) + ), + resources=self._build_resource_requirements( + self.worker_cpu_requests, + self.worker_cpu_limits, + self.worker_memory_requests, + self.worker_memory_limits, + self.worker_accelerators, + ), + volume_mounts=self._generate_volume_mounts(), + env=self._build_env_vars() if self.envs else None, + ) + return container + + def _build_resource_requirements( + self, + cpu_requests: Union[int, str], + cpu_limits: Union[int, str], + memory_requests: Union[int, str], + memory_limits: Union[int, str], + accelerators: Dict[str, Union[int, str]] = None, + ) -> V1ResourceRequirements: + """Build Kubernetes resource requirements.""" + resource_requirements = V1ResourceRequirements( + requests={"cpu": cpu_requests, "memory": memory_requests}, + limits={"cpu": cpu_limits, "memory": memory_limits}, + ) + + if accelerators: + for resource_type, amount in accelerators.items(): + resource_requirements.limits[resource_type] = amount + resource_requirements.requests[resource_type] = amount + + return resource_requirements + + def _build_pod_spec(self, container: V1Container, is_head: bool) -> V1PodSpec: + """Build the pod specification.""" + pod_spec = V1PodSpec( + containers=[container], + volumes=self._generate_volumes(), + restart_policy="Never", + ) + + if is_head and self.head_tolerations: + pod_spec.tolerations = self.head_tolerations + elif not is_head and self.worker_tolerations: + pod_spec.tolerations = self.worker_tolerations + + if self.image_pull_secrets: + pod_spec.image_pull_secrets = [ + V1LocalObjectReference(name=secret) + for secret in self.image_pull_secrets + ] + + return pod_spec + + def _generate_volume_mounts(self) -> list: + """Generate volume mounts for the container.""" + volume_mounts = [] + if self.volume_mounts: + volume_mounts.extend(self.volume_mounts) + return volume_mounts + + def _generate_volumes(self) -> list: + """Generate volumes for the pod.""" + volumes = [] + if self.volumes: + volumes.extend(self.volumes) + return volumes + + def _build_env_vars(self) -> list: + """Build environment variables list.""" + return [V1EnvVar(name=key, value=value) for key, value in self.envs.items()] + + +# ==================== Module-level functions ==================== + + +def _delete_resources(name: str, namespace: str, api_instance: client.CustomObjectsApi): + """Delete a RayCluster resource.""" + api_instance.delete_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + name=name, + ) + + +def _apply_ray_cluster( + yamls, namespace: str, api_instance: client.CustomObjectsApi, force=False +): + """Apply a RayCluster resource.""" + api_instance.server_side_apply( + field_manager=CF_SDK_FIELD_MANAGER, + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + body=yamls, + force_conflicts=force, + ) + + +def _ray_cluster_status(name, namespace="default") -> Optional[RayClusterInfo]: + """Get the status of a RayCluster.""" + try: + config_check() + api_instance = client.CustomObjectsApi(get_api_client()) + rcs = api_instance.list_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + + for rc in rcs["items"]: + if rc["metadata"]["name"] == name: + return _map_to_ray_cluster(rc) + return None + + +def _map_to_ray_cluster(rc) -> Optional[RayClusterInfo]: + """Map a RayCluster dict to RayClusterInfo.""" + if "status" in rc and "state" in rc["status"]: + status = RayClusterStatus(rc["status"]["state"].lower()) + else: + status = RayClusterStatus.UNKNOWN + config_check() + dashboard_url = None + + rc_name = rc["metadata"]["name"] + rc_namespace = rc["metadata"]["namespace"] + dashboard_url = _get_dashboard_url_from_httproute(rc_name, rc_namespace) + + if not dashboard_url: + if _is_openshift_cluster(): + try: + api_instance = client.CustomObjectsApi(get_api_client()) + routes = api_instance.list_namespaced_custom_object( + group="route.openshift.io", + version="v1", + namespace=rc_namespace, + plural="routes", + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + + for route in routes["items"]: + if route["metadata"]["name"] == f"ray-dashboard-{rc_name}" or route[ + "metadata" + ]["name"].startswith(f"{rc_name}-ingress"): + protocol = "https" if route["spec"].get("tls") else "http" + dashboard_url = f"{protocol}://{route['spec']['host']}" + break + else: + try: + api_instance = client.NetworkingV1Api(get_api_client()) + ingresses = api_instance.list_namespaced_ingress(rc_namespace) + except Exception as e: # pragma no cover + return _kube_api_error_handling(e) + for ingress in ingresses.items: + annotations = ingress.metadata.annotations + protocol = "http" + if ( + ingress.metadata.name == f"ray-dashboard-{rc_name}" + or ingress.metadata.name.startswith(f"{rc_name}-ingress") + ): + if annotations is None: + protocol = "http" + elif "route.openshift.io/termination" in annotations: + protocol = "https" + dashboard_url = f"{protocol}://{ingress.spec.rules[0].host}" + + ( + head_extended_resources, + worker_extended_resources, + ) = RayCluster._head_worker_extended_resources_from_rc_dict(rc) + + return RayClusterInfo( + name=rc["metadata"]["name"], + status=status, + num_workers=rc["spec"]["workerGroupSpecs"][0]["replicas"], + worker_mem_limits=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["limits"]["memory"], + worker_mem_requests=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["requests"]["memory"], + worker_cpu_requests=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["requests"]["cpu"], + worker_cpu_limits=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["limits"]["cpu"], + worker_extended_resources=worker_extended_resources, + namespace=rc["metadata"]["namespace"], + head_cpu_requests=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][ + 0 + ]["resources"]["requests"]["cpu"], + head_cpu_limits=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][ + 0 + ]["resources"]["limits"]["cpu"], + head_mem_requests=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][ + 0 + ]["resources"]["requests"]["memory"], + head_mem_limits=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][ + 0 + ]["resources"]["limits"]["memory"], + head_extended_resources=head_extended_resources, + dashboard=dashboard_url, + ) + + +def _copy_to_ray(cluster: "RayCluster") -> RayClusterInfo: + """Copy cluster config to RayClusterInfo.""" + ray = RayClusterInfo( + name=cluster.name, + status=cluster.status(print_to_console=False)[0], + num_workers=cluster.num_workers, + worker_mem_requests=cluster.worker_memory_requests, + worker_mem_limits=cluster.worker_memory_limits, + worker_cpu_requests=cluster.worker_cpu_requests, + worker_cpu_limits=cluster.worker_cpu_limits, + worker_extended_resources=cluster.worker_accelerators, + namespace=cluster.namespace, + dashboard=cluster.cluster_dashboard_uri(), + head_mem_requests=cluster.head_memory_requests, + head_mem_limits=cluster.head_memory_limits, + head_cpu_requests=cluster.head_cpu_requests, + head_cpu_limits=cluster.head_cpu_limits, + head_extended_resources=cluster.head_accelerators, + ) + if ray.status == CodeFlareClusterStatus.READY: + ray.status = RayClusterStatus.READY + return ray + + +def _is_openshift_cluster(): + """Check if running on OpenShift cluster.""" + try: + config_check() + for api in client.ApisApi(get_api_client()).get_api_versions().groups: + for v in api.versions: + if "route.openshift.io/v1" in v.group_version: + return True + else: + return False + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + + +def _get_dashboard_url_from_httproute( + cluster_name: str, namespace: str +) -> Optional[str]: + """Get the Ray dashboard URL from an HTTPRoute.""" + try: + config_check() + api_instance = client.CustomObjectsApi(get_api_client()) + + label_selector = ( + f"ray.io/cluster-name={cluster_name},ray.io/cluster-namespace={namespace}" + ) + + try: + httproutes = api_instance.list_cluster_custom_object( + group="gateway.networking.k8s.io", + version="v1", + plural="httproutes", + label_selector=label_selector, + ) + items = httproutes.get("items", []) + if items: + httproute = items[0] + else: + return None + except Exception: + search_namespaces = [ + "redhat-ods-applications", + "opendatahub", + "default", + "ray-system", + ] + + httproute = None + for ns in search_namespaces: + try: + httproutes = api_instance.list_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1", + namespace=ns, + plural="httproutes", + label_selector=label_selector, + ) + items = httproutes.get("items", []) + if items: + httproute = items[0] + break + except client.ApiException: + continue + + if not httproute: + return None + + parent_refs = httproute.get("spec", {}).get("parentRefs", []) + if not parent_refs: + return None + + gateway_ref = parent_refs[0] + gateway_name = gateway_ref.get("name") + gateway_namespace = gateway_ref.get("namespace") + + if not gateway_name or not gateway_namespace: + return None + + gateway = api_instance.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1", + namespace=gateway_namespace, + plural="gateways", + name=gateway_name, + ) + + listeners = gateway.get("spec", {}).get("listeners", []) + if not listeners: + return None + + hostname = listeners[0].get("hostname") + if not hostname: + return None + + return f"https://{hostname}/ray/{namespace}/{cluster_name}" + + except Exception: # pragma: no cover + return None + + +def _get_ingress_domain(cluster): # pragma: no cover + """Get ingress domain for local client URL.""" + config_check() + + if cluster.namespace is not None: + namespace = cluster.namespace + else: + namespace = get_current_namespace() + domain = None + + if _is_openshift_cluster(): + try: + api_instance = client.CustomObjectsApi(get_api_client()) + routes = api_instance.list_namespaced_custom_object( + group="route.openshift.io", + version="v1", + namespace=namespace, + plural="routes", + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + + for route in routes["items"]: + if ( + route["spec"]["port"]["targetPort"] == "client" + or route["spec"]["port"]["targetPort"] == 10001 + ): + domain = route["spec"]["host"] + else: + try: + api_client = client.NetworkingV1Api(get_api_client()) + ingresses = api_client.list_namespaced_ingress(namespace) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + + for ingress in ingresses.items: + if ingress.spec.rules[0].http.paths[0].backend.service.port.number == 10001: + domain = ingress.spec.rules[0].host + return domain diff --git a/src/codeflare_sdk/ray/cluster/status.py b/src/codeflare_sdk/ray/cluster/status.py index 136ae302..725ed38e 100644 --- a/src/codeflare_sdk/ray/cluster/status.py +++ b/src/codeflare_sdk/ray/cluster/status.py @@ -52,7 +52,7 @@ class CodeFlareClusterStatus(Enum): @dataclass -class RayCluster: +class RayClusterInfo: """ For storing information about a Ray cluster. """ diff --git a/src/codeflare_sdk/ray/cluster/test_build_ray_cluster.py b/src/codeflare_sdk/ray/cluster/test_build_ray_cluster.py index 3a7947d3..649bca48 100644 --- a/src/codeflare_sdk/ray/cluster/test_build_ray_cluster.py +++ b/src/codeflare_sdk/ray/cluster/test_build_ray_cluster.py @@ -15,7 +15,7 @@ import sys from .build_ray_cluster import gen_names, update_image, build_ray_cluster import uuid -from codeflare_sdk.ray.cluster.cluster import ClusterConfiguration, Cluster +from codeflare_sdk.ray.cluster.raycluster import RayCluster def test_gen_names_with_name(mocker): @@ -72,15 +72,13 @@ def test_build_ray_cluster_with_gcs_ft(mocker): mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") mocker.patch("kubernetes.client.CustomObjectsApi.list_namespaced_custom_object") - cluster = Cluster( - ClusterConfiguration( - name="test", - namespace="ns", - enable_gcs_ft=True, - redis_address="redis:6379", - redis_password_secret={"name": "redis-password-secret", "key": "password"}, - external_storage_namespace="new-ns", - ) + cluster = RayCluster( + name="test", + namespace="ns", + enable_gcs_ft=True, + redis_address="redis:6379", + redis_password_secret={"name": "redis-password-secret", "key": "password"}, + external_storage_namespace="new-ns", ) mocker.patch("codeflare_sdk.ray.cluster.build_ray_cluster.config_check") diff --git a/src/codeflare_sdk/ray/cluster/test_cluster.py b/src/codeflare_sdk/ray/cluster/test_cluster.py index d2d86fb7..b5c9be36 100644 --- a/src/codeflare_sdk/ray/cluster/test_cluster.py +++ b/src/codeflare_sdk/ray/cluster/test_cluster.py @@ -12,12 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from codeflare_sdk.ray.cluster.cluster import ( - Cluster, - ClusterConfiguration, - get_cluster, - list_all_queued, -) +from codeflare_sdk.ray.cluster.cluster import list_all_queued +from codeflare_sdk.ray.cluster.raycluster import RayCluster, _is_openshift_cluster from codeflare_sdk.common.utils.unit_test_support import ( create_cluster, arg_check_del_effect, @@ -28,11 +24,9 @@ get_ray_obj, get_obj_none, get_ray_obj_with_status, - get_aw_obj_with_status, patch_cluster_with_dynamic_client, route_list_retrieval, ) -from codeflare_sdk.ray.cluster.cluster import _is_openshift_cluster from pathlib import Path from unittest.mock import MagicMock from kubernetes import client @@ -51,8 +45,10 @@ def test_cluster_apply_down(mocker): mocker.patch("kubernetes.client.ApisApi.get_api_versions") mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") - mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster.get_dynamic_client") + mocker.patch( + "codeflare_sdk.ray.cluster.raycluster.RayCluster._throw_for_no_raycluster" + ) + mocker.patch("codeflare_sdk.ray.cluster.raycluster.RayCluster.get_dynamic_client") mocker.patch( "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", return_value={"spec": {"domain": ""}}, @@ -85,10 +81,6 @@ def test_cluster_apply_scale_up_scale_down(mocker): mocker.patch( "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock ) - mocker.patch( - "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", - return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", - ) mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") mocker.patch( "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", @@ -126,13 +118,11 @@ def test_cluster_apply_with_file(mocker): mocker.patch("kubernetes.client.ApisApi.get_api_versions") mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") mock_dynamic_client = mocker.Mock() - mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") mocker.patch( - "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock + "codeflare_sdk.ray.cluster.raycluster.RayCluster._throw_for_no_raycluster" ) mocker.patch( - "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", - return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", + "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock ) mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") mocker.patch( @@ -152,115 +142,13 @@ def test_cluster_apply_with_file(mocker): cluster.down() -def test_cluster_apply_with_appwrapper(mocker): - # Mock Kubernetes client and dynamic client methods - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "codeflare_sdk.ray.cluster.cluster._check_aw_exists", - return_value=True, - ) - mock_dynamic_client = mocker.Mock() - mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") - mocker.patch( - "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock - ) - mocker.patch( - "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", - return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", - ) - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - - # Create a cluster configuration with appwrapper set to False - cluster = create_cluster(mocker, 1, write_to_file=False) - patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) - - # Mock listing RayCluster to simulate it doesn't exist - mocker.patch( - "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", - return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"), - ) - # Call the apply method - cluster.apply() - - # Assertions - print("Cluster applied without AppWrapper.") - - -def test_cluster_apply_without_appwrapper_write_to_file(mocker): - # Mock Kubernetes client and dynamic client methods - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "codeflare_sdk.ray.cluster.cluster._check_aw_exists", - return_value=True, - ) - mock_dynamic_client = mocker.Mock() - mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") - mocker.patch( - "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock - ) - mocker.patch( - "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", - return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", - ) - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - - # Create a cluster configuration with appwrapper set to False - cluster = create_cluster(mocker, 1, write_to_file=True) - patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) - cluster.config.appwrapper = False - - # Mock listing RayCluster to simulate it doesn't exist - mocker.patch( - "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", - return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"), - ) - # Call the apply method - cluster.apply() - - # Assertions - print("Cluster applied without AppWrapper.") - - -def test_cluster_apply_without_appwrapper(mocker): - # Mock Kubernetes client and dynamic client methods - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mock_dynamic_client = mocker.Mock() - mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") - mocker.patch( - "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock - ) - mocker.patch( - "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", - return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", - ) - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - - # Create a cluster configuration with appwrapper set to False - cluster = create_cluster(mocker, 1, write_to_file=False) - cluster.config.appwrapper = None - patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) - - # Mock listing RayCluster to simulate it doesn't exist +def test_cluster_apply_down_no_mcad(mocker): mocker.patch( - "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", - return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"), + "codeflare_sdk.ray.cluster.raycluster.RayCluster._throw_for_no_raycluster" ) - - # Call the apply method - cluster.apply() - - # Assertions - print("Cluster applied without AppWrapper.") - - -def test_cluster_apply_down_no_mcad(mocker): - mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster.get_dynamic_client") + mocker.patch("codeflare_sdk.ray.cluster.raycluster.RayCluster.get_dynamic_client") mocker.patch( "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), @@ -283,10 +171,9 @@ def test_cluster_apply_down_no_mcad(mocker): "kubernetes.client.CustomObjectsApi.list_cluster_custom_object", return_value={"items": []}, ) - config = create_cluster_config() - config.name = "unit-test-cluster-ray" - config.appwrapper = False - cluster = Cluster(config) + cluster = create_cluster_config() + cluster.name = "unit-test-cluster-ray" + cluster.resource_yaml = cluster._create_resource() cluster.apply() cluster.down() @@ -295,14 +182,17 @@ def test_cluster_uris(mocker): mocker.patch("kubernetes.client.ApisApi.get_api_versions") mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") mocker.patch( - "codeflare_sdk.ray.cluster.cluster._get_ingress_domain", - return_value="apps.cluster.awsroute.org", + "codeflare_sdk.ray.cluster.raycluster._get_dashboard_url_from_httproute", + return_value=None, ) mocker.patch( "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) cluster = create_cluster(mocker) + mocker.patch( + "codeflare_sdk.ray.cluster.raycluster._is_openshift_cluster", return_value=False + ) mocker.patch( "kubernetes.client.NetworkingV1Api.list_namespaced_ingress", return_value=ingress_retrieval( @@ -323,9 +213,12 @@ def test_cluster_uris(mocker): cluster.cluster_dashboard_uri() == "http://ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org" ) - cluster.config.name = "fake" + cluster.name = "fake" + from kubernetes.client import V1IngressList + mocker.patch( "kubernetes.client.NetworkingV1Api.list_namespaced_ingress", + return_value=V1IngressList(items=[]), # Empty list so no ingress matches ) assert ( cluster.cluster_dashboard_uri() @@ -333,7 +226,7 @@ def test_cluster_uris(mocker): ) mocker.patch( - "codeflare_sdk.ray.cluster.cluster._is_openshift_cluster", return_value=True + "codeflare_sdk.ray.cluster.raycluster._is_openshift_cluster", return_value=True ) mocker.patch( "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", @@ -426,75 +319,20 @@ def test_local_client_url(mocker): return_value={"spec": {"domain": ""}}, ) mocker.patch( - "codeflare_sdk.ray.cluster.cluster._get_ingress_domain", + "codeflare_sdk.ray.cluster.raycluster._get_ingress_domain", return_value="rayclient-unit-test-cluster-localinter-ns.apps.cluster.awsroute.org", ) mocker.patch( - "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", - return_value="unit-test-cluster-localinter.yaml", - ) - - cluster_config = ClusterConfiguration( - name="unit-test-cluster-localinter", - namespace="ns", + "codeflare_sdk.ray.cluster.raycluster.RayCluster._create_resource", + return_value={}, ) - cluster = Cluster(cluster_config) + cluster = RayCluster(name="unit-test-cluster-localinter", namespace="ns") assert ( cluster.local_client_url() == "ray://rayclient-unit-test-cluster-localinter-ns.apps.cluster.awsroute.org" ) -""" -get_cluster tests -""" - - -def test_get_cluster_no_appwrapper(mocker): - """ - This test uses the "test all params" unit test file as a comparison - """ - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "codeflare_sdk.ray.cluster.cluster._check_aw_exists", - return_value=False, - ) - - with open(f"{expected_clusters_dir}/ray/unit-test-all-params.yaml") as f: - expected_rc = yaml.load(f, Loader=yaml.FullLoader) - mocker.patch( - "kubernetes.client.CustomObjectsApi.get_namespaced_custom_object", - return_value=expected_rc, - ) - get_cluster("test-all-params", "ns", write_to_file=True) - - with open(f"{aw_dir}test-all-params.yaml") as f: - generated_rc = yaml.load(f, Loader=yaml.FullLoader) - assert generated_rc == expected_rc - - -def test_get_cluster_with_appwrapper(mocker): - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "codeflare_sdk.ray.cluster.cluster._check_aw_exists", - return_value=True, - ) - - with open(f"{expected_clusters_dir}/appwrapper/unit-test-all-params.yaml") as f: - expected_aw = yaml.load(f, Loader=yaml.FullLoader) - mocker.patch( - "kubernetes.client.CustomObjectsApi.get_namespaced_custom_object", - return_value=expected_aw, - ) - get_cluster("aw-all-params", "ns", write_to_file=True) - - with open(f"{aw_dir}aw-all-params.yaml") as f: - generated_aw = yaml.load(f, Loader=yaml.FullLoader) - assert generated_aw == expected_aw - - def test_wait_ready(mocker, capsys): from codeflare_sdk.ray.cluster.status import CodeFlareClusterStatus @@ -505,10 +343,7 @@ def test_wait_ready(mocker, capsys): ) mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") mocker.patch( - "codeflare_sdk.ray.cluster.cluster._app_wrapper_status", return_value=None - ) - mocker.patch( - "codeflare_sdk.ray.cluster.cluster._ray_cluster_status", return_value=None + "codeflare_sdk.ray.cluster.raycluster._ray_cluster_status", return_value=None ) mocker.patch.object( client.CustomObjectsApi, @@ -525,14 +360,11 @@ def test_wait_ready(mocker, capsys): mock_response = mocker.Mock() mock_response.status_code = 200 mocker.patch("requests.get", return_value=mock_response) - cf = Cluster( - ClusterConfiguration( - name="test", - namespace="ns", - write_to_file=False, - appwrapper=True, - ) + mocker.patch( + "codeflare_sdk.ray.cluster.raycluster.RayCluster._create_resource", + return_value={}, ) + cf = RayCluster(name="test", namespace="ns") try: cf.wait_ready(timeout=5) assert 1 == 0 @@ -545,8 +377,8 @@ def test_wait_ready(mocker, capsys): in captured.out ) mocker.patch( - "codeflare_sdk.ray.cluster.cluster.Cluster.status", - return_value=(True, CodeFlareClusterStatus.READY), + "codeflare_sdk.ray.cluster.raycluster.RayCluster.status", + return_value=(CodeFlareClusterStatus.READY, True), ) cf.wait_ready() captured = capsys.readouterr() @@ -666,49 +498,6 @@ def test_is_dashboard_ready_url_validation(mocker): ), "Should return False when dashboard returns 500" -def test_list_queue_appwrappers(mocker, capsys): - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", - return_value=get_obj_none( - "workload.codeflare.dev", "v1beta2", "ns", "appwrappers" - ), - ) - list_all_queued("ns", appwrapper=True) - captured = capsys.readouterr() - # The Rich library's console width detection varies between test contexts - # Accept either the two-line format (individual tests) or single-line format (full test suite) - # Check for key parts of the message instead of the full text - assert "No resources found" in captured.out - assert "cluster.apply()" in captured.out - assert "cluster.details()" in captured.out - assert "check if it's ready" in captured.out - assert "╭" in captured.out and "╮" in captured.out # Check for box characters - assert "│" in captured.out # Check for vertical lines - mocker.patch( - "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", - return_value=get_aw_obj_with_status( - "workload.codeflare.dev", "v1beta2", "ns", "appwrappers" - ), - ) - list_all_queued("ns", appwrapper=True) - captured = capsys.readouterr() - print(captured.out) - assert captured.out == ( - "╭────────────────────────────────╮\n" - "│ 🚀 Cluster Queue Status 🚀 │\n" - "│ +----------------+-----------+ │\n" - "│ | Name | Status | │\n" - "│ +================+===========+ │\n" - "│ | test-cluster-a | running | │\n" - "│ | | | │\n" - "│ | test-cluster-b | suspended | │\n" - "│ | | | │\n" - "│ +----------------+-----------+ │\n" - "╰────────────────────────────────╯\n" - ) - - def test_list_queue_rayclusters(mocker, capsys): mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") mock_api = MagicMock() @@ -918,7 +707,7 @@ def raise_attribute_error(): raise AttributeError("DynamicClient initialization failed") mocker.patch( - "codeflare_sdk.ray.cluster.cluster.Cluster.get_dynamic_client", + "codeflare_sdk.ray.cluster.raycluster.RayCluster.get_dynamic_client", side_effect=raise_attribute_error, ) @@ -938,20 +727,18 @@ def test_cluster_namespace_handling(mocker, capsys): # Test with None namespace that gets set mocker.patch( - "codeflare_sdk.ray.cluster.cluster.get_current_namespace", return_value=None + "codeflare_sdk.ray.cluster.raycluster.get_current_namespace", return_value=None ) - config = ClusterConfiguration( + cluster = RayCluster( name="test-cluster-ns", - namespace=None, # Will trigger namespace check + namespace=None, num_workers=1, worker_cpu_requests=1, worker_cpu_limits=1, worker_memory_requests=2, worker_memory_limits=2, ) - - cluster = Cluster(config) captured = capsys.readouterr() # Verify the warning message was printed assert "Please specify with namespace=" in captured.out @@ -959,18 +746,18 @@ def test_cluster_namespace_handling(mocker, capsys): def test_component_resources_with_write_to_file(mocker): - """Test _component_resources_up with write_to_file enabled""" + """Test _component_resources_apply with write_to_file enabled""" + mocker.patch("kubernetes.client.ApisApi.get_api_versions") mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") mocker.patch( "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - # Mock the _create_resources function - mocker.patch("codeflare_sdk.ray.cluster.cluster._create_resources") + # Create cluster with write_to_file=True + from codeflare_sdk.ray.cluster.raycluster import RayCluster - # Create cluster with write_to_file=True (without appwrapper) - config = ClusterConfiguration( + cluster = RayCluster( name="test-cluster-component", namespace="ns", num_workers=1, @@ -979,13 +766,9 @@ def test_component_resources_with_write_to_file(mocker): worker_memory_requests=2, worker_memory_limits=2, write_to_file=True, - appwrapper=False, ) - cluster = Cluster(config) - - # Mock file reading and test _component_resources_up - + # Mock file reading and test _component_resources_apply with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: f.write("apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: test") temp_file = f.name @@ -993,7 +776,7 @@ def test_component_resources_with_write_to_file(mocker): try: mock_api = MagicMock() cluster.resource_yaml = temp_file - cluster._component_resources_up("ns", mock_api) + cluster._component_resources_apply("ns", mock_api) # If we got here without error, the write_to_file path was executed assert True finally: @@ -1029,6 +812,7 @@ def test_get_cluster_status_functions(mocker): def test_cluster_namespace_type_error(mocker): """Test TypeError when namespace is not a string""" + mocker.patch("kubernetes.client.ApisApi.get_api_versions") mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") mocker.patch( "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", @@ -1037,25 +821,26 @@ def test_cluster_namespace_type_error(mocker): # Mock get_current_namespace to return a non-string value (e.g., int) mocker.patch( - "codeflare_sdk.ray.cluster.cluster.get_current_namespace", return_value=12345 + "codeflare_sdk.ray.cluster.raycluster.get_current_namespace", return_value=12345 ) - config = ClusterConfiguration( - name="test-cluster-type-error", - namespace=None, # Will trigger namespace check - num_workers=1, - worker_cpu_requests=1, - worker_cpu_limits=1, - worker_memory_requests=2, - worker_memory_limits=2, - ) + from codeflare_sdk.ray.cluster.raycluster import RayCluster # This should raise TypeError because get_current_namespace returns int + # The error is raised during __init__ when _create_resource is called with pytest.raises( TypeError, match="Namespace 12345 is of type.*Check your Kubernetes Authentication", ): - Cluster(config) + cluster = RayCluster( + name="test-cluster-type-error", + namespace=None, # Will trigger namespace check + num_workers=1, + worker_cpu_requests=1, + worker_cpu_limits=1, + worker_memory_requests=2, + worker_memory_limits=2, + ) def test_get_dashboard_url_from_httproute(mocker): @@ -1607,14 +1392,14 @@ def test_cluster_dashboard_uri_httproute_first(mocker): # Test 1: HTTPRoute exists - should return HTTPRoute URL mocker.patch( - "codeflare_sdk.ray.cluster.cluster._is_openshift_cluster", return_value=True + "codeflare_sdk.ray.cluster.raycluster._is_openshift_cluster", return_value=True ) httproute_url = ( "https://data-science-gateway.apps.example.com/ray/ns/unit-test-cluster" ) mocker.patch( - "codeflare_sdk.ray.cluster.cluster._get_dashboard_url_from_httproute", + "codeflare_sdk.ray.cluster.raycluster._get_dashboard_url_from_httproute", return_value=httproute_url, ) @@ -1624,7 +1409,7 @@ def test_cluster_dashboard_uri_httproute_first(mocker): # Test 2: HTTPRoute not found - should fall back to OpenShift Route mocker.patch( - "codeflare_sdk.ray.cluster.cluster._get_dashboard_url_from_httproute", + "codeflare_sdk.ray.cluster.raycluster._get_dashboard_url_from_httproute", return_value=None, ) mocker.patch( diff --git a/src/codeflare_sdk/ray/cluster/test_config.py b/src/codeflare_sdk/ray/cluster/test_config.py index e405bc5b..0c564902 100644 --- a/src/codeflare_sdk/ray/cluster/test_config.py +++ b/src/codeflare_sdk/ray/cluster/test_config.py @@ -19,7 +19,7 @@ create_cluster_all_config_params, get_template_variables, ) -from codeflare_sdk.ray.cluster.cluster import ClusterConfiguration, Cluster +from codeflare_sdk.ray.cluster.raycluster import RayCluster from pathlib import Path import filecmp import pytest @@ -36,7 +36,7 @@ def test_default_cluster_creation(mocker): mocker.patch("kubernetes.client.ApisApi.get_api_versions") mocker.patch("kubernetes.client.CustomObjectsApi.list_namespaced_custom_object") - cluster = Cluster(ClusterConfiguration(name="default-cluster", namespace="ns")) + cluster = RayCluster(name="default-cluster", namespace="ns") expected_rc = apply_template( f"{expected_clusters_dir}/ray/default-ray-cluster.yaml", @@ -46,21 +46,6 @@ def test_default_cluster_creation(mocker): assert cluster.resource_yaml == expected_rc -def test_default_appwrapper_creation(mocker): - # Create an AppWrapper using the default config variables - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch("kubernetes.client.CustomObjectsApi.list_namespaced_custom_object") - - cluster = Cluster( - ClusterConfiguration(name="default-appwrapper", namespace="ns", appwrapper=True) - ) - - expected_aw = apply_template( - f"{expected_clusters_dir}/ray/default-appwrapper.yaml", get_template_variables() - ) - assert cluster.resource_yaml == expected_aw - - @pytest.mark.filterwarnings("ignore::UserWarning") def test_config_creation_all_parameters(mocker): from codeflare_sdk.ray.cluster.config import DEFAULT_RESOURCE_MAPPING @@ -85,7 +70,6 @@ def test_config_creation_all_parameters(mocker): assert cluster.config.num_workers == 10 assert cluster.config.worker_memory_requests == "12G" assert cluster.config.worker_memory_limits == "16G" - assert cluster.config.appwrapper == False assert cluster.config.envs == { "key1": "value1", "key2": "value2", @@ -117,17 +101,6 @@ def test_config_creation_all_parameters(mocker): ) -@pytest.mark.filterwarnings("ignore::UserWarning") -def test_all_config_params_aw(mocker): - create_cluster_all_config_params(mocker, "aw-all-params", True) - - assert filecmp.cmp( - f"{aw_dir}aw-all-params.yaml", - f"{expected_clusters_dir}/appwrapper/unit-test-all-params.yaml", - shallow=True, - ) - - def test_config_creation_wrong_type(): with pytest.raises(TypeError) as error_info: create_cluster_wrong_type() @@ -136,7 +109,7 @@ def test_config_creation_wrong_type(): def test_gcs_fault_tolerance_config_validation(): - config = ClusterConfiguration( + config = RayCluster( name="test", namespace="ns", enable_gcs_ft=True, @@ -154,12 +127,12 @@ def test_gcs_fault_tolerance_config_validation(): assert config.external_storage_namespace == "new-ns" try: - ClusterConfiguration(name="test", namespace="ns", enable_gcs_ft=True) + RayCluster(name="test", namespace="ns", enable_gcs_ft=True) except ValueError as e: assert str(e) in "redis_address must be provided when enable_gcs_ft is True" try: - ClusterConfiguration( + RayCluster( name="test", namespace="ns", enable_gcs_ft=True, @@ -173,7 +146,7 @@ def test_gcs_fault_tolerance_config_validation(): ) try: - ClusterConfiguration( + RayCluster( name="test", namespace="ns", enable_gcs_ft=True, @@ -190,9 +163,7 @@ def test_ray_usage_stats_default(mocker): mocker.patch("kubernetes.client.ApisApi.get_api_versions") mocker.patch("kubernetes.client.CustomObjectsApi.list_namespaced_custom_object") - cluster = Cluster( - ClusterConfiguration(name="default-usage-stats-cluster", namespace="ns") - ) + cluster = RayCluster(name="default-usage-stats-cluster", namespace="ns") # Verify that usage stats are disabled by default assert cluster.config.envs["RAY_USAGE_STATS_ENABLED"] == "0" @@ -209,12 +180,10 @@ def test_ray_usage_stats_enabled(mocker): mocker.patch("kubernetes.client.ApisApi.get_api_versions") mocker.patch("kubernetes.client.CustomObjectsApi.list_namespaced_custom_object") - cluster = Cluster( - ClusterConfiguration( - name="usage-stats-enabled-cluster", - namespace="ns", - enable_usage_stats=True, - ) + cluster = RayCluster( + name="usage-stats-enabled-cluster", + namespace="ns", + enable_usage_stats=True, ) assert cluster.config.envs["RAY_USAGE_STATS_ENABLED"] == "1" @@ -229,4 +198,3 @@ def test_ray_usage_stats_enabled(mocker): # Make sure to always keep this function last def test_cleanup(): os.remove(f"{aw_dir}test-all-params.yaml") - os.remove(f"{aw_dir}aw-all-params.yaml") diff --git a/src/codeflare_sdk/ray/cluster/test_pretty_print.py b/src/codeflare_sdk/ray/cluster/test_pretty_print.py index d0e10585..fc237355 100644 --- a/src/codeflare_sdk/ray/cluster/test_pretty_print.py +++ b/src/codeflare_sdk/ray/cluster/test_pretty_print.py @@ -13,22 +13,16 @@ # limitations under the License. from codeflare_sdk.ray.cluster.pretty_print import ( - print_app_wrappers_status, print_cluster_status, print_clusters, print_no_resources_found, ) -from codeflare_sdk.ray.appwrapper.status import AppWrapperStatus, AppWrapper from codeflare_sdk.ray.cluster.status import ( - RayCluster, + RayClusterInfo, RayClusterStatus, CodeFlareClusterStatus, ) -from codeflare_sdk.ray.cluster.cluster import ( - Cluster, - ClusterConfiguration, - _copy_to_ray, -) +from codeflare_sdk.ray.cluster.raycluster import RayCluster from codeflare_sdk.common.utils.unit_test_support import get_local_queue @@ -49,39 +43,9 @@ def test_print_no_resources(capsys): assert "│" in captured.out # Check for vertical lines -def test_print_appwrappers(capsys): - aw1 = AppWrapper( - name="awtest1", - status=AppWrapperStatus.SUSPENDED, - ) - aw2 = AppWrapper( - name="awtest2", - status=AppWrapperStatus.RUNNING, - ) - try: - print_app_wrappers_status([aw1, aw2]) - except Exception: - assert 1 == 0 - captured = capsys.readouterr() - assert captured.out == ( - "╭─────────────────────────╮\n" - "│ 🚀 Cluster Queue │\n" - "│ Status 🚀 │\n" - "│ +---------+-----------+ │\n" - "│ | Name | Status | │\n" - "│ +=========+===========+ │\n" - "│ | awtest1 | suspended | │\n" - "│ | | | │\n" - "│ | awtest2 | running | │\n" - "│ | | | │\n" - "│ +---------+-----------+ │\n" - "╰─────────────────────────╯\n" - ) - - def test_ray_details(mocker, capsys): mocker.patch("kubernetes.client.ApisApi.get_api_versions") - ray1 = RayCluster( + ray1 = RayClusterInfo( name="raytest1", status=RayClusterStatus.READY, num_workers=1, @@ -97,29 +61,33 @@ def test_ray_details(mocker, capsys): head_mem_limits=8, ) mocker.patch( - "codeflare_sdk.ray.cluster.cluster.Cluster.status", - return_value=(False, CodeFlareClusterStatus.UNKNOWN), + "codeflare_sdk.ray.cluster.raycluster.RayCluster.status", + return_value=(CodeFlareClusterStatus.UNKNOWN, False), ) mocker.patch( - "codeflare_sdk.ray.cluster.cluster.Cluster.cluster_dashboard_uri", + "codeflare_sdk.ray.cluster.raycluster.RayCluster.cluster_dashboard_uri", return_value="", ) mocker.patch( "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cf = Cluster( - ClusterConfiguration( - name="raytest2", - namespace="ns", - appwrapper=True, - local_queue="local-queue-default", - ) + cf = RayCluster( + name="raytest2", + namespace="ns", + head_cpu_requests=1, + head_cpu_limits=2, + head_memory_requests="5G", + head_memory_limits="8G", + worker_cpu_requests=1, + worker_cpu_limits=1, + worker_memory_requests="3G", + worker_memory_limits="6G", + num_workers=1, + local_queue="local-queue-default", ) captured = capsys.readouterr() - ray2 = _copy_to_ray(cf) - details = cf.details() - assert details == ray2 + ray2 = cf.details() assert ray2.name == "raytest2" assert ray1.namespace == ray2.namespace assert ray1.num_workers == ray2.num_workers diff --git a/src/codeflare_sdk/ray/cluster/test_status.py b/src/codeflare_sdk/ray/cluster/test_status.py index 27eda49e..a9e6db67 100644 --- a/src/codeflare_sdk/ray/cluster/test_status.py +++ b/src/codeflare_sdk/ray/cluster/test_status.py @@ -12,15 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from codeflare_sdk.ray.cluster.cluster import ( - Cluster, - ClusterConfiguration, - _ray_cluster_status, -) +from codeflare_sdk.ray.cluster.raycluster import RayCluster, _ray_cluster_status from codeflare_sdk.ray.cluster.status import ( CodeFlareClusterStatus, RayClusterStatus, - RayCluster, + RayClusterInfo, ) import os from ...common.utils.unit_test_support import get_local_queue @@ -32,7 +28,7 @@ def test_cluster_status(mocker): mocker.patch("kubernetes.client.ApisApi.get_api_versions") mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - fake_ray = RayCluster( + fake_ray = RayClusterInfo( name="test", status=RayClusterStatus.UNKNOWN, num_workers=1, @@ -53,24 +49,19 @@ def test_cluster_status(mocker): return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cf = Cluster( - ClusterConfiguration( - name="test", - namespace="ns", - write_to_file=True, - appwrapper=False, - local_queue="local-queue-default", - ) - ) + cf = RayCluster(name="test", namespace="ns", local_queue="local-queue-default") + cf.write_to_file = True + cf.resource_yaml = cf._create_resource() mocker.patch( - "codeflare_sdk.ray.cluster.cluster._ray_cluster_status", return_value=None + "codeflare_sdk.ray.cluster.raycluster._ray_cluster_status", return_value=None ) status, ready = cf.status() assert status == CodeFlareClusterStatus.UNKNOWN assert ready == False mocker.patch( - "codeflare_sdk.ray.cluster.cluster._ray_cluster_status", return_value=fake_ray + "codeflare_sdk.ray.cluster.raycluster._ray_cluster_status", + return_value=fake_ray, ) status, ready = cf.status() diff --git a/src/codeflare_sdk/ray/rayjobs/__init__.py b/src/codeflare_sdk/ray/rayjobs/__init__.py index cd6b4123..d03b80f9 100644 --- a/src/codeflare_sdk/ray/rayjobs/__init__.py +++ b/src/codeflare_sdk/ray/rayjobs/__init__.py @@ -1,3 +1,3 @@ -from .rayjob import RayJob, ManagedClusterConfig +from .rayjob import RayJob from .status import RayJobDeploymentStatus, CodeflareRayJobStatus, RayJobInfo from .config import ManagedClusterConfig diff --git a/src/codeflare_sdk/ray/rayjobs/config.py b/src/codeflare_sdk/ray/rayjobs/config.py index 5b724272..e0b41be5 100644 --- a/src/codeflare_sdk/ray/rayjobs/config.py +++ b/src/codeflare_sdk/ray/rayjobs/config.py @@ -21,6 +21,7 @@ import pathlib from dataclasses import dataclass, field, fields from typing import Dict, List, Optional, Union, get_args, get_origin, Any, Tuple +from typing_extensions import deprecated from kubernetes.client import ( V1LocalObjectReference, V1SecretVolumeSource, @@ -61,9 +62,12 @@ } +@deprecated("Use RayCluster instead") @dataclass class ManagedClusterConfig: """ + [DEPRECATED] Use RayCluster instead. + This dataclass is used to specify resource requirements and other details for RayJobs. The cluster name and namespace are automatically derived from the RayJob configuration. diff --git a/src/codeflare_sdk/ray/rayjobs/rayjob.py b/src/codeflare_sdk/ray/rayjobs/rayjob.py index e19abee2..057fa09a 100644 --- a/src/codeflare_sdk/ray/rayjobs/rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/rayjob.py @@ -33,6 +33,7 @@ from codeflare_sdk.vendored.python_client.kuberay_job_api import RayjobApi from codeflare_sdk.vendored.python_client.kuberay_cluster_api import RayClusterApi from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig +from codeflare_sdk.ray.cluster.raycluster import RayCluster from codeflare_sdk.ray.rayjobs.runtime_env import ( create_file_secret, extract_all_local_files, @@ -66,7 +67,7 @@ def __init__( job_name: str, entrypoint: str, cluster_name: Optional[str] = None, - cluster_config: Optional[ManagedClusterConfig] = None, + cluster_config: Optional[Union[ManagedClusterConfig, RayCluster]] = None, namespace: Optional[str] = None, runtime_env: Optional[Union[RuntimeEnv, Dict[str, Any]]] = None, ttl_seconds_after_finished: int = 0, @@ -81,7 +82,7 @@ def __init__( job_name: The name for the Ray job entrypoint: The Python script or command to run (required) cluster_name: The name of an existing Ray cluster (optional if cluster_config provided) - cluster_config: Configuration for creating a new cluster (optional if cluster_name provided) + cluster_config: RayCluster configuration for creating a new cluster (optional if cluster_name provided) namespace: The Kubernetes namespace (auto-detected if not specified) runtime_env: Ray runtime environment configuration. Can be: - RuntimeEnv object from ray.runtime_env diff --git a/src/codeflare_sdk/ray/rayjobs/test/test_rayjob.py b/src/codeflare_sdk/ray/rayjobs/test/test_rayjob.py index a6f19803..6fc0adef 100644 --- a/src/codeflare_sdk/ray/rayjobs/test/test_rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/test/test_rayjob.py @@ -18,7 +18,7 @@ from ray.runtime_env import RuntimeEnv from codeflare_sdk.ray.rayjobs.rayjob import RayJob -from codeflare_sdk.ray.cluster.config import ClusterConfiguration +from codeflare_sdk.ray.cluster.raycluster import RayCluster from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig from kubernetes.client import V1Volume, V1VolumeMount, V1Toleration @@ -80,7 +80,7 @@ def test_rayjob_init_validation_both_provided(auto_mock_setup): """ Test that providing both cluster_name and cluster_config raises error. """ - cluster_config = ClusterConfiguration(name="test-cluster", namespace="test") + cluster_config = RayCluster(name="test-cluster", namespace="test") with pytest.raises( ValueError, @@ -109,7 +109,7 @@ def test_rayjob_init_with_cluster_config(auto_mock_setup): """ Test RayJob initialization with cluster configuration for auto-creation. """ - cluster_config = ClusterConfiguration( + cluster_config = RayCluster( name="auto-cluster", namespace="test-namespace", num_workers=2 ) @@ -130,7 +130,7 @@ def test_rayjob_cluster_name_generation(auto_mock_setup): """ Test that cluster names are generated when config has empty name. """ - cluster_config = ClusterConfiguration( + cluster_config = RayCluster( name="", # Empty name should trigger generation namespace="test-namespace", num_workers=1, @@ -150,7 +150,7 @@ def test_rayjob_cluster_config_namespace_none(auto_mock_setup): """ Test that cluster config namespace is set when None. """ - cluster_config = ClusterConfiguration( + cluster_config = RayCluster( name="test-cluster", namespace=None, # This should be set to job namespace num_workers=1,