diff --git a/configure_edge_demo.py b/configure_edge_demo.py new file mode 100644 index 00000000..30bdc642 --- /dev/null +++ b/configure_edge_demo.py @@ -0,0 +1,28 @@ +from groundlight import ExperimentalApi + +gl = ExperimentalApi() + +print('Configuring Edge Endpoint...') +gl.configure_edge( + edge_inference_configs={ + "tims_config": { + "enabled": True, + "always_return_edge_prediction": True, + "disable_cloud_escalation": False, + "min_time_between_escalations": 30.0, + }, + "another_config": { + "enabled": True, + "always_return_edge_prediction": True, + "disable_cloud_escalation": True, + "min_time_between_escalations": 20.0, + }, }, + detectors=[ + {"detector_id": "det_31WjVpvBiOmxUBzVqiOa3G2UgMV", "edge_inference_config": "tims_config"}, + {"detector_id": "det_346zU49rFpY9I7f1PiuzArYdTdJ", "edge_inference_config": "tims_config"}, + # {"detector_id": "det_398G23k3Dn6sRPVHbySHeyGLmai", "edge_inference_config": "tims_config"}, + # {"detector_id": "det_39oO6BR1IJjmLOjmWF3F4R4of4o", "edge_inference_config": "tims_config"}, + # {"detector_id": "det_2mWrXjFFSGcn2PYKakS7EfYfF3l", "edge_inference_config": "another_config"}, + ], +) +print("Configured Edge Endpoint successfully!") diff --git a/edge-config-plan.md b/edge-config-plan.md new file mode 100644 index 00000000..96697574 --- /dev/null +++ b/edge-config-plan.md @@ -0,0 +1,236 @@ +# Plan: Edge Configuration via SDK + +## Goal + +Add a method to the Python SDK that can send configuration to a Groundlight Edge Endpoint, +and optionally wait until the configuration is fully applied (inference pods are ready). +When the SDK is pointed at the cloud API, calling this method should raise an error. + +## Decisions + +| Question | Decision | +|---|---| +| Config schema | Freeform JSON for now; model it later | +| Runtime effect | Apply immediately: update in-memory config + persist to shared PVC file + write DB records for model updater | +| Method placement | `ExperimentalApi` | +| Auth | Yes, require API token | +| Naming | `configure_edge` with kwargs: `global_config`, `edge_inference_configs`, `detectors`, `wait` | +| OpenAPI spec | Hand-coded HTTP request (like `create_note`); **not** in `public-api.yaml` | +| Waiting strategy | SDK polls `GET /status/metrics.json` for detector readiness | +| Config persistence | Runtime config written to YAML file on shared PVC; read on startup by all containers | + +--- + +## Implemented + +### Edge Endpoint: `POST /device-api/v1/edge/configure` + +**File**: `app/api/routes/edge_config.py`, registered in `app/api/api.py` + +Currently supports **merge mode only**: incoming config is merged into the existing config. + +On receiving a config POST, the handler: +1. Merges incoming JSON into the existing `RootEdgeConfig` (global_config, edge_inference_configs, detectors) +2. Validates the merged config via Pydantic (returns 400 on invalid config) +3. Updates `AppState.edge_config` and re-resolves detector inference configs on the `EdgeInferenceManager` +4. Sets up inference client URLs and escalation tracking for new detectors +5. Writes database records for new detectors so the model updater (separate container) + discovers them and creates Kubernetes inference deployments +6. Persists the merged config to `runtime-edge-config.yaml` on the shared PVC + +### Config Persistence (PVC file) + +**Problem**: The edge endpoint runs multiple uvicorn workers, plus a separate status-monitor +and model-updater container. An in-memory-only config update only affects the worker that +handles the POST. The status page and other processes see stale config from the original +ConfigMap. + +**Solution**: After every successful config update, the merged `RootEdgeConfig` is written +to `runtime-edge-config.yaml` on the shared PVC (`edge-endpoint-pvc`). On startup, +`load_edge_config()` checks for this file before falling back to the ConfigMap. + +**File locations** (`app/core/file_paths.py`): +- `/opt/groundlight/edge/sqlite/runtime-edge-config.yaml` (sqlite PVC mount) +- `/opt/groundlight/edge/serving/model-repo/runtime-edge-config.yaml` (model-repo PVC mount) + +The loader checks all known mount points so any container can find the file regardless +of which PVC subdirectory it has mounted. + +**Load priority** (`app/core/edge_config_loader.py`): +1. `EDGE_CONFIG` environment variable (inline YAML) -- highest priority +2. Runtime config file on shared PVC +3. Default ConfigMap file at `/etc/groundlight/edge-config/edge-config.yaml` + +### Python SDK: `ExperimentalApi.configure_edge()` + +**File**: `src/groundlight/experimental_api.py` + +- Keyword-only args: `global_config`, `edge_inference_configs`, `detectors`, `wait` +- `wait` defaults to 600 seconds (10 minutes) +- POSTs to `{endpoint}/v1/edge/configure`; 404 -> `GroundlightClientError` +- If `wait > 0` and `detectors` provided, polls `GET {base_url}/status/metrics.json` + every 5 seconds until all configured detectors have `status: "ready"`, or raises on timeout +- Direct `requests.post()` call, not OpenAPI-generated + +### End-to-End Flow (merge mode) + +1. SDK POSTs config to edge endpoint +2. Edge endpoint merges incoming config with existing config +3. Edge endpoint updates in-memory `AppState` (affects image query behavior immediately on that worker) +4. Edge endpoint persists merged config to `runtime-edge-config.yaml` on shared PVC +5. Edge endpoint writes DB records for new detectors +6. Model updater (separate container) discovers new detectors from DB on next refresh cycle +7. Model updater fetches models and creates Kubernetes inference deployments +8. Pods roll out (~70s) +9. SDK polls `/status/metrics.json` -- status monitor reads runtime config from PVC and reports pod readiness via Kubernetes API +10. All detectors show `status: "ready"` -> SDK returns + +### Cloud vs. Edge Detection + +| Scenario | What happens | +|---|---| +| SDK -> cloud (`api.groundlight.ai`) | Cloud returns 404 -> SDK raises error | +| SDK -> edge endpoint (with new route) | FastAPI handles it -> returns 200 | +| SDK -> old edge endpoint (without new route) | FastAPI returns 404 -> nginx falls back to cloud -> 404 -> SDK raises error | + +--- + +## Planned: Replace Mode + +### Problem + +The current merge mode can only **add or update** detectors. It cannot remove detectors +that are no longer desired. If a user wants to go from 5 detectors to 3, the old 2 +detectors' inference pods keep running and wasting resources. + +### Proposed SDK Change + +Add a `replace` parameter to `configure_edge()`: + +- `replace=False` (default): Current merge behavior. Incoming config is merged into existing. +- `replace=True`: Incoming config fully replaces the existing config. Detectors not in the + new config are removed (their inference pods are deleted). + +### Required Edge Endpoint Changes + +The edge endpoint currently has **no code to delete** Kubernetes Deployments, Services, +database records, or model files for detectors. All of this needs to be added. + +**Existing infrastructure we can use:** +- `get_edge_inference_deployment_name(detector_id)` and `get_edge_inference_service_name(detector_id)` + in `app/core/edge_inference.py` map detector IDs to K8s resource names. +- The service account (`edge-endpoint-service-account`) already has RBAC permissions to + `delete` both `deployments` and `services`. These permissions are currently unused. +- `InferenceDeploymentManager` in `app/core/kubernetes_management.py` already has a K8s + client and namespace context. Adding a `delete_inference_deployment()` method here is natural. +- `get_detector_models_dir(repository_root, detector_id)` returns the model directory + (`{MODEL_REPOSITORY_PATH}/{detector_id}/`). Deleting this directory removes all model + files (primary + oodd + all versions). + +**What needs to be built:** + +1. **`InferenceDeploymentManager.delete_inference_deployment(detector_id, is_oodd)`** + (`app/core/kubernetes_management.py`) + - Call `delete_namespaced_deployment()` to remove the inference Deployment + - Call `delete_namespaced_service()` to remove the inference Service + - The naming functions already exist to map detector ID -> resource names + +2. **`DatabaseManager.delete_inference_deployment_record(model_name)`** + (`app/core/database.py`) + - Delete the DB record so the model updater doesn't recreate the deployment on its + next refresh cycle + +3. **Model file cleanup** + - Delete `{MODEL_REPOSITORY_PATH}/{detector_id}/` (the entire detector directory, + which contains `primary/` and `oodd/` subdirs with versioned model files) + - Can use `shutil.rmtree()` (already used by `delete_model_version()`) + +4. **Replace logic in `edge_config.py` handler** + - Accept a `replace` flag in the POST body + - If `replace=True`: use the incoming config as-is instead of merging + - Diff old vs new detector sets: `removed = old_detector_ids - new_detector_ids` + - **Deletion must complete before new pods roll out.** The edge endpoint must wait + for removed detector pods to fully terminate (not just in `Terminating` state) + before writing DB records for new detectors. This prevents OOM from old and new + pods competing for the same finite GPU/memory resources. + - For each removed detector: + a. Call `InferenceDeploymentManager.delete_inference_deployment()` (primary + oodd) + b. Poll until the pods are fully gone (not just Terminating) + c. Call `DatabaseManager.delete_inference_deployment_record()` (primary + oodd) + d. Delete model files from disk + e. Clean up `EdgeInferenceManager` state (inference_client_urls, oodd URLs, + escalation tracking) + - After all deletions complete: proceed with new/retained detectors (same flow as + current merge mode) + - `configure_edge(detectors=[], replace=True)` is valid and removes all detector pods. + +5. **SDK changes** + - Add `replace: bool = False` parameter to `configure_edge()` + - Pass `replace` flag in POST body + - When `replace=True` and `wait > 0`: wait for removed pods to terminate AND for + new/retained pods to become ready + +### Ordering Guarantee + +When `replace=True`, the edge endpoint enforces this sequence: + +``` +1. Delete removed detector deployments/services +2. Wait for removed pods to fully terminate +3. Clean up DB records + model files for removed detectors +4. Write DB records for new detectors +5. Model updater picks up new detectors and creates deployments +``` + +This ensures old pods release their resources before new pods are scheduled, +preventing resource exhaustion on memory/GPU-constrained edge devices. + +### Decisions + +- **Async**: The POST handler returns immediately. Deletion and re-creation happen + in a FastAPI background task. The SDK polls `/status/metrics.json` for completion. +- **Termination time**: Inference pods use the K8s default of 30s + `terminationGracePeriodSeconds`. No custom value is set. +- **Partial failure**: Error out. If deletion of any detector fails, the background + task logs the error and stops. The config is left in a partially cleaned state; + the user can retry. + +### Implementation Details + +**Edge endpoint needs an `InferenceDeploymentManager` on `AppState`.** +Currently only the model-updater container creates one. The edge-endpoint container +has the K8s service account and RBAC permissions but doesn't use them. We add +an `InferenceDeploymentManager` to `AppState`, guarded by the existing +`DEPLOY_DETECTOR_LEVEL_INFERENCE` env var (only set in K8s, not Docker tests). + +**Background task flow** (runs after POST returns): +1. Delete K8s Deployments + Services for each removed detector +2. Poll until pods are fully terminated (not just Terminating) +3. Delete DB records for removed detectors +4. Delete model files from PVC (`shutil.rmtree({MODEL_REPOSITORY_PATH}/{detector_id}/`) +5. Write DB records for new detectors (model updater picks these up) + +**SDK polling**: When `replace=True` and `wait > 0`, the SDK waits until: +- Removed detector IDs no longer appear in `/status/metrics.json` +- New/retained detector IDs all show `status: "ready"` + +--- + +## Known Limitations + +- **Multiprocess in-memory state**: Edge endpoint runs multiple uvicorn workers. The in-memory + config update only applies to the worker that handles the POST. Other workers retain stale + in-memory config until they restart. However, on restart they pick up the persisted runtime + config from the PVC file. +- **Model updater refresh**: The model updater checks for new detectors every `refresh_rate` + seconds (default 60s), so there can be a delay before pod creation starts. +- **File write race**: If two concurrent config POSTs hit different workers, the last write wins. + This is acceptable for now; atomic file writes or a lock file can be added later if needed. + +## Future Work + +- Define proper Pydantic models in the SDK for config validation +- Consider adding a dedicated `GET /device-api/v1/edge/configure/status` endpoint + to avoid URL construction gymnastics when polling +- Support reading current config via `GET /device-api/v1/edge/configure` +- Atomic file writes or lock file for concurrent config updates diff --git a/src/groundlight/experimental_api.py b/src/groundlight/experimental_api.py index 90ef7470..21fedfee 100644 --- a/src/groundlight/experimental_api.py +++ b/src/groundlight/experimental_api.py @@ -8,9 +8,11 @@ """ import json +import time from io import BufferedReader, BytesIO from pathlib import Path from typing import Any, Dict, List, Optional, Union +from urllib.parse import urlsplit, urlunsplit import requests from groundlight_openapi_client.api.actions_api import ActionsApi @@ -882,3 +884,167 @@ def make_generic_api_request( # noqa: PLR0913 # pylint: disable=too-many-argume auth_settings=["ApiToken"], _preload_content=False, # This returns the urllib3 response rather than trying any type of processing ) + + def _get_edge_base_url(self) -> str: + """Derives the base URL (scheme + host) from self.endpoint by stripping the path.""" + parts = urlsplit(self.endpoint) + return urlunsplit((parts.scheme, parts.netloc, "", "", "")) + + def _get_edge_detector_statuses(self) -> dict: + """Fetches detector statuses from the edge endpoint's status monitor. + + :returns: A dict mapping detector IDs to their detail dicts (including ``status``). + """ + url = f"{self._get_edge_base_url()}/status/metrics.json" + response = requests.get(url, timeout=10) + response.raise_for_status() + metrics = response.json() + raw_details = metrics.get("detector_details") + if raw_details is None: + return {} + if isinstance(raw_details, str): + return json.loads(raw_details) + return raw_details + + def configure_edge( # noqa: PLR0913 + self, + *, + global_config: Optional[dict] = None, + edge_inference_configs: Optional[dict] = None, + detectors: Optional[list] = None, + replace: bool = True, + wait: float = 600, + ) -> None: + """ + Sends configuration to a Groundlight Edge Endpoint. This method only works when the SDK + is connected to an Edge Endpoint -- calling it against the cloud API will raise an error. + + By default, the incoming configuration is **merged** into the existing config (adding or + updating detectors, presets, and global settings). Set ``replace=True`` to **replace** the + entire config, which will remove any detectors not included in the new config. + + If ``wait`` is set and ``detectors`` are provided (or detectors are being removed), + the method will poll the edge endpoint's status monitor until all configured detectors + report ``status: "ready"`` and all removed detectors are gone, or until the timeout + expires. + + .. note:: + This method makes a direct HTTP request rather than using the OpenAPI-generated client, + since this endpoint is edge-only and not part of the cloud API spec. + + **Example usage**:: + + from groundlight import ExperimentalApi + + gl = ExperimentalApi(endpoint="http://localhost:30101") + + # Merge: add/update detectors (existing detectors are preserved) + gl.configure_edge( + detectors=[ + {"detector_id": "det_abc123", "edge_inference_config": "no_cloud"}, + ], + wait=120, + ) + + # Replace: only these detectors will remain; all others are removed + gl.configure_edge( + edge_inference_configs={ + "default": {"enabled": True}, + }, + detectors=[ + {"detector_id": "det_abc123", "edge_inference_config": "default"}, + ], + replace=True, + ) + + :param global_config: Global edge endpoint settings (e.g., ``refresh_rate``, + ``confident_audit_rate``). + :param edge_inference_configs: Named inference config presets. Each key is a preset name, + and the value is a dict of settings (e.g., ``enabled``, + ``always_return_edge_prediction``, ``disable_cloud_escalation``). + :param detectors: A list of detector configuration dicts, each containing a + ``detector_id`` and an ``edge_inference_config`` preset name. + :param replace: If True, the incoming config fully replaces the existing config. + Detectors not in the new config will have their inference pods deleted. + :param wait: Maximum time in seconds to wait for all configured detectors to become + ready on the edge endpoint. Defaults to 600 (10 minutes). Set to 0 to return + immediately after posting the configuration. + + :raises GroundlightClientError: If the endpoint is not an Edge Endpoint, or if the + wait timeout expires before all detectors are ready. + """ + payload: dict = {} + if global_config is not None: + payload["global_config"] = global_config + if edge_inference_configs is not None: + payload["edge_inference_configs"] = edge_inference_configs + if detectors is not None: + payload["detectors"] = detectors + if replace: + payload["replace"] = True + + url = f"{self.endpoint}/v1/edge/configure" + headers = {"x-api-token": self.configuration.api_key["ApiToken"]} + + response = requests.post(url, headers=headers, json=payload) + if response.status_code == 404: + raise GroundlightClientError( + "This endpoint does not support edge configuration. Are you connected to a Groundlight Edge Endpoint?" + ) + if not response.ok: + try: + detail = response.json().get("detail", response.text) + except Exception: + detail = response.text + raise GroundlightClientError(f"Edge configuration failed (HTTP {response.status_code}): {detail}") + + result = response.json() + removed_ids = set(result.get("removed", [])) + + should_wait = wait > 0 and (detectors or removed_ids) + if should_wait: + expected_ids = {d["detector_id"] for d in detectors} if detectors else set() + self._wait_for_edge_config_applied(expected_ids, removed_ids, timeout=wait) + + def _wait_for_edge_config_applied( + self, + expected_ids: set[str], + removed_ids: set[str], + timeout: float, + ) -> None: + POLL_INTERVAL = 5 + deadline = time.time() + timeout + + while True: + try: + statuses = self._get_edge_detector_statuses() + except Exception as e: + logger.warning(f"Failed to fetch edge detector statuses: {e}") + statuses = {} + + issues = [] + + for det_id in expected_ids: + det_status = statuses.get(det_id, {}).get("status") + if det_status != "ready": + issues.append((det_id, det_status or "unknown")) + + for det_id in removed_ids: + if det_id in statuses: + issues.append((det_id, "still_present")) + + if not issues: + logger.info( + f"Edge config applied: {len(expected_ids)} detector(s) ready, " + f"{len(removed_ids)} detector(s) removed." + ) + return + + if time.time() >= deadline: + summary = ", ".join(f"{did}={st}" for did, st in issues) + raise GroundlightClientError( + f"Timed out after {timeout}s waiting for edge config to be applied. Issues: {summary}" + ) + + logger.debug(f"Waiting for edge config: {len(issues)} issue(s) remaining...") + time.sleep(POLL_INTERVAL)