From ff04da5aca8e74c44ae74a9d56a4c04154a8a239 Mon Sep 17 00:00:00 2001 From: NinaCai Date: Thu, 11 Dec 2025 17:38:05 +0000 Subject: [PATCH] add gke node labeling with predictor signal --- gpudirect-tcpxo/predictor-scheduler/README.md | 77 ++++++++ .../label-nodes-cronjob.yaml | 32 ++++ .../predictor-scheduler/label-nodes.py | 165 ++++++++++++++++++ .../predictor-scheduler/service-account.yaml | 34 ++++ 4 files changed, 308 insertions(+) create mode 100644 gpudirect-tcpxo/predictor-scheduler/README.md create mode 100644 gpudirect-tcpxo/predictor-scheduler/label-nodes-cronjob.yaml create mode 100644 gpudirect-tcpxo/predictor-scheduler/label-nodes.py create mode 100644 gpudirect-tcpxo/predictor-scheduler/service-account.yaml diff --git a/gpudirect-tcpxo/predictor-scheduler/README.md b/gpudirect-tcpxo/predictor-scheduler/README.md new file mode 100644 index 00000000..c39d5295 --- /dev/null +++ b/gpudirect-tcpxo/predictor-scheduler/README.md @@ -0,0 +1,77 @@ +## Overview + +This document gives instructions on how to enable predictor in GKE clusters. + +The general outline for this to be successful is: +- We add labels for predictor to nodes in the cluster with a daemonset + +## Prerequisites + +For predictor awareness to be enabled, `compute.googleapis.com/instance/gpu/failure_prediction_status` +[ref](https://monitoring.corp.google.com/explorer?duration=3600&utc_end=0&refresh=90&legend=bottom&mash=Fetch(Precomputed(%27cloud-cluster-vm%27,%20%27compute_module%27,%20%27compute.googleapis.com%2Finstance%2Fgpu%2Ffailure_prediction_status%27))%0A%7C%20Window(Align(%2710m%27))&q_namespaces=cloud_prod) +should be present in CloudMonarch for each GPU node in the cluster. + +## Configuration + +To initialize Kubernetes authentication for scripts: + +```gcloud container clusters get-credentials [cluster name] --zone [cluster zone] --project [project id]``` + +Grant KSU with Cloud Monarch viewer and Computer viewer permissions: + +``` +gcloud projects add-iam-policy-binding projects/[project_name] \ + --role="roles/monitoring.viewer" \ + --member=principal://iam.googleapis.com/projects/[project_id]/locations/global/workloadIdentityPools/[project_name].svc.id.goog/subject/ns/kube-system/sa/predictor-scheduler \ + --condition=None + + +gcloud projects add-iam-policy-binding projects/[project_name] \ + --role="roles/compute.viewer" \ + --member=principal://iam.googleapis.com/projects/[project_id]/locations/global/workloadIdentityPools/[project_name].svc.id.goog/subject/ns/kube-system/sa/predictor-scheduler \ + --condition=None +``` + +## Usage + +First copy this folder locally + +Next create config maps for scripts required by pods + +- Run `kubectl create configmap predictor-scheduler-scripts --namespace kube-system + --from-file=label-nodes.py=label-nodes.py` + +Next apply the service account config to the cluster: + +- Apply `service-account.yaml` config to the cluster by running `kubectl apply + -f service-account.yaml`. + +Now apply `label-nodes-cronjob.yaml` to the cluster. This will create the CronJob, +which is scheduled to run every 10 minutes. + +- Apply `label-nodes-cronjob.yaml` to the cluster by running + `kubectl apply -f label-nodes-cronjob.yaml`. + +You can check the status of the CronJob and its runs using: + +``` +# Check the CronJob definition +kubectl get cronjob label-nodes-cronjob -n kube-system + +# Check the jobs created by the CronJob +kubectl get jobs -n kube-system | grep label-nodes-cronjob + +# Check the pods created by the most recent job +kubectl get pods -n kube-system | grep label-nodes-cronjob + +# View logs from a specific pod (replace pod name) +kubectl logs -n kube-system +``` + +## Verification + +You can also check the labels on your GPU nodes: + +``` +kubectl get nodes -l nvidia.com/gpu -o custom-columns=NAME:.metadata.name,LABEL:.metadata.labels."gke\.io/recommended-to-run-large-training-workload" +``` diff --git a/gpudirect-tcpxo/predictor-scheduler/label-nodes-cronjob.yaml b/gpudirect-tcpxo/predictor-scheduler/label-nodes-cronjob.yaml new file mode 100644 index 00000000..314df4ad --- /dev/null +++ b/gpudirect-tcpxo/predictor-scheduler/label-nodes-cronjob.yaml @@ -0,0 +1,32 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: label-nodes-cronjob + namespace: kube-system +spec: + schedule: "*/10 * * * *" # Run every 10 minutes + jobTemplate: + spec: + template: + spec: + containers: + - name: label-nodes + image: python:3.9 + command: + - bash + - -c + - | + pip install --no-cache-dir kubernetes google-cloud-monitoring google-auth requests google-api-python-client + python -u /scripts/label-nodes.py + volumeMounts: + - name: scripts-volume + mountPath: /scripts + restartPolicy: OnFailure + volumes: + - name: scripts-volume + configMap: + name: predictor-scheduler-scripts # Assuming ConfigMap name stays the same + serviceAccountName: predictor-scheduler + concurrencyPolicy: Forbid # Prevents starting a new job if the previous one hasn't finished + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 1 diff --git a/gpudirect-tcpxo/predictor-scheduler/label-nodes.py b/gpudirect-tcpxo/predictor-scheduler/label-nodes.py new file mode 100644 index 00000000..48c79599 --- /dev/null +++ b/gpudirect-tcpxo/predictor-scheduler/label-nodes.py @@ -0,0 +1,165 @@ +import time +import os +from kubernetes import client, config +from google.cloud import monitoring_v3 +import google.auth +from googleapiclient.discovery import build + +def query_cloud_monitoring_for_status(project_id, instance_id, monitoring_client): + """Queries Cloud Monitoring API for the GPU failure prediction status from metric labels.""" + try: + project_name = f"projects/{project_id}" + now = time.time() + # Look back 15 minutes to ensure we catch a recent point + start_time = int(now - 900) + end_time = int(now) + interval = monitoring_v3.TimeInterval( + start_time={"seconds": start_time}, + end_time={"seconds": end_time} + ) + filter_str = ( + f'metric.type="compute.googleapis.com/instance/gpu/failure_prediction_status" ' + f'AND resource.type="gce_instance" ' + f'AND resource.labels.instance_id="{instance_id}"' + ) + + results = monitoring_client.list_time_series( + request={ + "name": project_name, + "filter": filter_str, + "interval": interval, + "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.HEADERS, + } + ) + print(f"Results: {results}") + found_status = None + for series in results: + metric_labels = series.metric.labels + prediction_value = metric_labels.get("Value") + + if prediction_value: + print(f" Instance {instance_id}: Found metric label 'Value': {prediction_value}") + if prediction_value in ["NO_DEGRADATION_PREDICTED", "POSSIBLE_DEGRADATION_PREDICTED"]: + found_status = "True" + elif prediction_value == "DEGRADATION_PREDICTED": + found_status = "False" + else: + print(f" WARNING: Unknown prediction value: {prediction_value}") + found_status = "UNKNOWN" + return found_status + else: + print(f" WARNING: Metric label 'Value' not found in series for instance {instance_id}") + + if not found_status: + print(f" No series with 'Value' metric label found for instance {instance_id} in the interval.") + return None + + except Exception as e: + print(f"Error querying Cloud Monitoring for instance {instance_id}: {e}") + return None + +def get_instance_id(compute_service, project_id, zone, instance_name): + """Fetches the numeric GCE instance ID.""" + try: + result = compute_service.instances().get( + project=project_id, + zone=zone, + instance=instance_name).execute() + return result.get('id') + except Exception as e: + print(f"Error getting instance details for {instance_name} in {zone}: {e}") + return None + +def update_all_node_labels(kube, monitoring_client, compute_service, project_id): + """Fetches status and updates labels for all relevant nodes.""" + print("Listing nodes with GPUs...") + try: + nodes = kube.list_node(label_selector="cloud.google.com/gke-gpu=true") + except client.exceptions.ApiException as e: + print(f"Error listing nodes: {e}") + return + + print(f"Found {len(nodes.items)} GPU nodes to process.") + for node in nodes.items: + node_name = node.metadata.name + provider_id = node.spec.provider_id + zone = node.metadata.labels.get("topology.kubernetes.io/zone") + + if not provider_id or not provider_id.startswith("gce://"): + print(f"Node {node_name} has non-GCE or missing providerID: {provider_id}. Skipping.") + continue + + try: + instance_name = provider_id.split('/')[-1] + except Exception as e: + print(f"Could not parse instance name from providerID {provider_id} for node {node_name}: {e}") + continue + + if not zone: + print(f"Node {node_name} is missing 'topology.kubernetes.io/zone' label. Skipping.") + continue + + print(f"Processing Node: {node_name}, Instance Name: {instance_name}, Zone: {zone}") + + instance_id = get_instance_id(compute_service, project_id, zone, instance_name) + + if not instance_id: + print(f" Failed to get GCE Instance ID for node {node_name}. Skipping.") + continue + + print(f" GCE Instance ID: {instance_id}") + + status = query_cloud_monitoring_for_status(project_id, instance_id, monitoring_client) + + if status is not None: + if status == "UNKNOWN": + print(f" Label not updated for {node_name} due to UNKNOWN prediction value.") + else: + label_value = status + node_labels = { + "gke.io/recommended-to-run-large-training-workload": label_value + } + try: + kube.patch_node(node_name, {"metadata": {"labels": node_labels}}) + print(f" Successfully updated labels on node {node_name}: {node_labels}") + except client.exceptions.ApiException as e: + print(f" Error patching node {node_name}: {e}") + else: + print(f" Could not retrieve status for node {node_name} (Instance ID: {instance_id}). Labels not updated.") + +if __name__ == "__main__": + print("Starting label-nodes cronjob...") + + credentials, detected_project_id = google.auth.default(scopes=[ + 'https://www.googleapis.com/auth/monitoring.read', + 'https://www.googleapis.com/auth/compute.readonly' + ]) + project_id = detected_project_id + + if not project_id: + print("ERROR: Project ID is unknown. Exiting.") + exit(1) + + print(f"Using Project ID: {project_id}") + + try: + config.load_incluster_config() + kube = client.CoreV1Api() + print("Kubernetes client initialized.") + + # Get credentials for GCP APIs + credentials, _ = google.auth.default(scopes=[ + 'https://www.googleapis.com/auth/monitoring.read', + 'https://www.googleapis.com/auth/compute.readonly' + ]) + monitoring_client = monitoring_v3.MetricServiceClient(credentials=credentials) + compute_service = build('compute', 'v1', credentials=credentials) + print("GCP clients initialized.") + + except Exception as e: + print(f"Failed to initialize clients: {e}") + exit(1) + + update_all_node_labels(kube, monitoring_client, compute_service, project_id) + + print("label-nodes cronjob finished.") diff --git a/gpudirect-tcpxo/predictor-scheduler/service-account.yaml b/gpudirect-tcpxo/predictor-scheduler/service-account.yaml new file mode 100644 index 00000000..e7006258 --- /dev/null +++ b/gpudirect-tcpxo/predictor-scheduler/service-account.yaml @@ -0,0 +1,34 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: predictor-scheduler + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: predictor-scheduler +rules: +- apiGroups: [""] + resources: ["pods"] + verbs: ["get", "watch", "list", "update", "patch"] +- apiGroups: [""] + resources: ["namespaces"] + verbs: ["get", "watch", "list"] +- apiGroups: [""] + resources: ["nodes"] + verbs: ["get", "list", "watch", "update", "patch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: predictor-scheduler +subjects: +- kind: ServiceAccount + name: predictor-scheduler + namespace: kube-system +roleRef: + kind: ClusterRole + name: predictor-scheduler + apiGroup: rbac.authorization.k8s.io +