From ce935ed9edb5a50fab4588844768880ce8ecb08a Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Mon, 23 Feb 2026 12:09:32 -0800 Subject: [PATCH 1/5] got it working --- src/groundlight/experimental_api.py | 142 ++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) diff --git a/src/groundlight/experimental_api.py b/src/groundlight/experimental_api.py index 90ef7470..45b982f9 100644 --- a/src/groundlight/experimental_api.py +++ b/src/groundlight/experimental_api.py @@ -8,9 +8,11 @@ """ import json +import time from io import BufferedReader, BytesIO from pathlib import Path from typing import Any, Dict, List, Optional, Union +from urllib.parse import urlsplit, urlunsplit import requests from groundlight_openapi_client.api.actions_api import ActionsApi @@ -882,3 +884,143 @@ def make_generic_api_request( # noqa: PLR0913 # pylint: disable=too-many-argume auth_settings=["ApiToken"], _preload_content=False, # This returns the urllib3 response rather than trying any type of processing ) + + def _get_edge_base_url(self) -> str: + """Derives the base URL (scheme + host) from self.endpoint by stripping the path.""" + parts = urlsplit(self.endpoint) + return urlunsplit((parts.scheme, parts.netloc, "", "", "")) + + def _get_edge_detector_statuses(self) -> dict: + """Fetches detector statuses from the edge endpoint's status monitor. + + :returns: A dict mapping detector IDs to their detail dicts (including ``status``). + """ + url = f"{self._get_edge_base_url()}/status/metrics.json" + response = requests.get(url, timeout=10) + response.raise_for_status() + metrics = response.json() + raw_details = metrics.get("detector_details") + if raw_details is None: + return {} + if isinstance(raw_details, str): + return json.loads(raw_details) + return raw_details + + def configure_edge( # noqa: PLR0913 + self, + *, + global_config: Optional[dict] = None, + edge_inference_configs: Optional[dict] = None, + detectors: Optional[list] = None, + wait: float = 600, + ) -> None: + """ + Sends configuration to a Groundlight Edge Endpoint. This method only works when the SDK + is connected to an Edge Endpoint -- calling it against the cloud API will raise an error. + + All parameters are optional. Only the sections you provide will be updated. + + Currently, the Edge Endpoint logs the received configuration. Future versions will apply + the configuration at runtime. + + If ``wait`` is set and ``detectors`` are provided, the method will poll the edge + endpoint's status monitor until all configured detectors report ``status: "ready"``, + or until the timeout expires. + + .. note:: + This method makes a direct HTTP request rather than using the OpenAPI-generated client, + since this endpoint is edge-only and not part of the cloud API spec. + + **Example usage**:: + + from groundlight import ExperimentalApi + + gl = ExperimentalApi(endpoint="http://localhost:30101") + + # Configure and wait up to 120s for pods to be ready + gl.configure_edge( + edge_inference_configs={ + "no_cloud": { + "enabled": True, + "always_return_edge_prediction": True, + "disable_cloud_escalation": True, + }, + }, + detectors=[ + {"detector_id": "det_abc123", "edge_inference_config": "no_cloud"}, + ], + wait=120, + ) + + # Or just update global config (no waiting needed) + gl.configure_edge(global_config={"refresh_rate": 10}) + + :param global_config: Global edge endpoint settings (e.g., ``refresh_rate``, + ``confident_audit_rate``). + :param edge_inference_configs: Named inference config presets. Each key is a preset name, + and the value is a dict of settings (e.g., ``enabled``, + ``always_return_edge_prediction``, ``disable_cloud_escalation``). + :param detectors: A list of detector configuration dicts, each containing a + ``detector_id`` and an ``edge_inference_config`` preset name. + :param wait: Maximum time in seconds to wait for all configured detectors to become + ready on the edge endpoint. Defaults to 600 (10 minutes). Set to 0 to return + immediately after posting the configuration. + + :raises GroundlightClientError: If the endpoint is not an Edge Endpoint, or if the + wait timeout expires before all detectors are ready. + """ + payload = {} + if global_config is not None: + payload["global_config"] = global_config + if edge_inference_configs is not None: + payload["edge_inference_configs"] = edge_inference_configs + if detectors is not None: + payload["detectors"] = detectors + + url = f"{self.endpoint}/v1/edge/configure" + headers = {"x-api-token": self.configuration.api_key["ApiToken"]} + + response = requests.post(url, headers=headers, json=payload) + if response.status_code == 404: + raise GroundlightClientError( + "This endpoint does not support edge configuration. " + "Are you connected to a Groundlight Edge Endpoint?" + ) + response.raise_for_status() + + if wait > 0 and detectors: + self._wait_for_edge_detectors_ready(detectors, timeout=wait) + + def _wait_for_edge_detectors_ready(self, detectors: list, timeout: float) -> None: + POLL_INTERVAL = 5 + detector_ids = {d["detector_id"] for d in detectors} + deadline = time.time() + timeout + + while True: + try: + statuses = self._get_edge_detector_statuses() + except Exception as e: + logger.warning(f"Failed to fetch edge detector statuses: {e}") + statuses = {} + + not_ready = [] + for det_id in detector_ids: + det_status = statuses.get(det_id, {}).get("status") + if det_status != "ready": + not_ready.append((det_id, det_status)) + + if not not_ready: + logger.info(f"All {len(detector_ids)} edge detector(s) are ready.") + return + + if time.time() >= deadline: + summary = ", ".join(f"{did}={st or 'unknown'}" for did, st in not_ready) + raise GroundlightClientError( + f"Timed out after {timeout}s waiting for edge detectors to be ready. " + f"Not ready: {summary}" + ) + + logger.debug( + f"Waiting for {len(not_ready)}/{len(detector_ids)} edge detector(s) to be ready..." + ) + time.sleep(POLL_INTERVAL) From 0d5fcaca5bdcc1b464dfecc19b5a51a9ddd1b07e Mon Sep 17 00:00:00 2001 From: Auto-format Bot Date: Mon, 23 Feb 2026 20:10:17 +0000 Subject: [PATCH 2/5] Automatically reformatting code --- src/groundlight/experimental_api.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/groundlight/experimental_api.py b/src/groundlight/experimental_api.py index 45b982f9..0b13d775 100644 --- a/src/groundlight/experimental_api.py +++ b/src/groundlight/experimental_api.py @@ -983,8 +983,7 @@ def configure_edge( # noqa: PLR0913 response = requests.post(url, headers=headers, json=payload) if response.status_code == 404: raise GroundlightClientError( - "This endpoint does not support edge configuration. " - "Are you connected to a Groundlight Edge Endpoint?" + "This endpoint does not support edge configuration. Are you connected to a Groundlight Edge Endpoint?" ) response.raise_for_status() @@ -1016,11 +1015,8 @@ def _wait_for_edge_detectors_ready(self, detectors: list, timeout: float) -> Non if time.time() >= deadline: summary = ", ".join(f"{did}={st or 'unknown'}" for did, st in not_ready) raise GroundlightClientError( - f"Timed out after {timeout}s waiting for edge detectors to be ready. " - f"Not ready: {summary}" + f"Timed out after {timeout}s waiting for edge detectors to be ready. Not ready: {summary}" ) - logger.debug( - f"Waiting for {len(not_ready)}/{len(detector_ids)} edge detector(s) to be ready..." - ) + logger.debug(f"Waiting for {len(not_ready)}/{len(detector_ids)} edge detector(s) to be ready...") time.sleep(POLL_INTERVAL) From 305e1b6e40d99aa7031d5397657247e21993e059 Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Mon, 23 Feb 2026 12:10:33 -0800 Subject: [PATCH 3/5] checking in the spec --- edge-config-plan.md | 109 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 edge-config-plan.md diff --git a/edge-config-plan.md b/edge-config-plan.md new file mode 100644 index 00000000..9e43119a --- /dev/null +++ b/edge-config-plan.md @@ -0,0 +1,109 @@ +# Plan: Edge Configuration via SDK + +## Goal + +Add a method to the Python SDK that can send configuration to a Groundlight Edge Endpoint, +and optionally wait until the configuration is fully applied (inference pods are ready). +When the SDK is pointed at the cloud API, calling this method should raise an error. + +## Decisions + +| Question | Decision | +|---|---| +| Config schema | Freeform JSON for now; model it later | +| Runtime effect | Apply immediately: update in-memory config + persist to shared PVC file + write DB records for model updater | +| Method placement | `ExperimentalApi` | +| Auth | Yes, require API token | +| Naming | `configure_edge` with kwargs: `global_config`, `edge_inference_configs`, `detectors`, `wait` | +| OpenAPI spec | Hand-coded HTTP request (like `create_note`); **not** in `public-api.yaml` | +| Waiting strategy | SDK polls `GET /status/metrics.json` for detector readiness | +| Config persistence | Runtime config written to YAML file on shared PVC; read on startup by all containers | + +## Implementation + +### Edge Endpoint: `POST /device-api/v1/edge/configure` + +**File**: `app/api/routes/edge_config.py`, registered in `app/api/api.py` + +On receiving a config POST, the handler: +1. Merges incoming JSON into the existing `RootEdgeConfig` (global_config, edge_inference_configs, detectors) +2. Validates the merged config via Pydantic (returns 400 on invalid config) +3. Updates `AppState.edge_config` and re-resolves detector inference configs on the `EdgeInferenceManager` +4. Sets up inference client URLs and escalation tracking for new detectors +5. Writes database records for new detectors so the model updater (separate container) + discovers them and creates Kubernetes inference deployments +6. Persists the merged config to `runtime-edge-config.yaml` on the shared PVC + +### Config Persistence (PVC file) + +**Problem**: The edge endpoint runs multiple uvicorn workers, plus a separate status-monitor +and model-updater container. An in-memory-only config update only affects the worker that +handles the POST. The status page and other processes see stale config from the original +ConfigMap. + +**Solution**: After every successful config update, the merged `RootEdgeConfig` is written +to `runtime-edge-config.yaml` on the shared PVC (`edge-endpoint-pvc`). On startup, +`load_edge_config()` checks for this file before falling back to the ConfigMap. + +**File locations** (`app/core/file_paths.py`): +- `/opt/groundlight/edge/sqlite/runtime-edge-config.yaml` (sqlite PVC mount) +- `/opt/groundlight/edge/serving/model-repo/runtime-edge-config.yaml` (model-repo PVC mount) + +The loader checks all known mount points so any container can find the file regardless +of which PVC subdirectory it has mounted. + +**Load priority** (`app/core/edge_config_loader.py`): +1. `EDGE_CONFIG` environment variable (inline YAML) -- highest priority +2. Runtime config file on shared PVC +3. Default ConfigMap file at `/etc/groundlight/edge-config/edge-config.yaml` + +### Python SDK: `ExperimentalApi.configure_edge()` + +**File**: `src/groundlight/experimental_api.py` + +- Keyword-only args: `global_config`, `edge_inference_configs`, `detectors`, `wait` +- `wait` defaults to 600 seconds (10 minutes) +- POSTs to `{endpoint}/v1/edge/configure`; 404 -> `GroundlightClientError` +- If `wait > 0` and `detectors` provided, polls `GET {base_url}/status/metrics.json` + every 5 seconds until all configured detectors have `status: "ready"`, or raises on timeout +- Direct `requests.post()` call, not OpenAPI-generated + +### End-to-End Flow + +1. SDK POSTs config to edge endpoint +2. Edge endpoint merges incoming config with existing config +3. Edge endpoint updates in-memory `AppState` (affects image query behavior immediately on that worker) +4. Edge endpoint persists merged config to `runtime-edge-config.yaml` on shared PVC +5. Edge endpoint writes DB records for new detectors +6. Model updater (separate container) discovers new detectors from DB on next refresh cycle +7. Model updater fetches models and creates Kubernetes inference deployments +8. Pods roll out (~70s) +9. SDK polls `/status/metrics.json` -- status monitor reads runtime config from PVC and reports pod readiness via Kubernetes API +10. All detectors show `status: "ready"` -> SDK returns + +### Known Limitations + +- **Multiprocess in-memory state**: Edge endpoint runs multiple uvicorn workers. The in-memory + config update only applies to the worker that handles the POST. Other workers retain stale + in-memory config until they restart. However, on restart they pick up the persisted runtime + config from the PVC file. +- **Model updater refresh**: The model updater checks for new detectors every `refresh_rate` + seconds (default 60s), so there can be a delay before pod creation starts. +- **File write race**: If two concurrent config POSTs hit different workers, the last write wins. + This is acceptable for now; atomic file writes or a lock file can be added later if needed. + +### Cloud vs. Edge Detection + +| Scenario | What happens | +|---|---| +| SDK -> cloud (`api.groundlight.ai`) | Cloud returns 404 -> SDK raises error | +| SDK -> edge endpoint (with new route) | FastAPI handles it -> returns 200 | +| SDK -> old edge endpoint (without new route) | FastAPI returns 404 -> nginx falls back to cloud -> 404 -> SDK raises error | + +## Future Work + +- Define proper Pydantic models in the SDK for config validation +- Consider adding a dedicated `GET /device-api/v1/edge/configure/status` endpoint + to avoid URL construction gymnastics when polling +- Support reading current config via `GET /device-api/v1/edge/configure` +- Atomic file writes or lock file for concurrent config updates From 68d4227dfd9c9473bfe81e040227b6e8ebab3f5a Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Mon, 23 Feb 2026 12:57:27 -0800 Subject: [PATCH 4/5] adding demo script --- configure_edge_demo.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 configure_edge_demo.py diff --git a/configure_edge_demo.py b/configure_edge_demo.py new file mode 100644 index 00000000..c060edab --- /dev/null +++ b/configure_edge_demo.py @@ -0,0 +1,23 @@ +from groundlight import ExperimentalApi + +gl = ExperimentalApi() + +print('Configuring Edge Endpoint...') +gl.configure_edge( + edge_inference_configs={ + "tims_config": { + "enabled": True, + "always_return_edge_prediction": True, + "disable_cloud_escalation": False, + "min_time_between_escalations": 30.0, + }, + }, + detectors=[ + {"detector_id": "det_31WjVpvBiOmxUBzVqiOa3G2UgMV", "edge_inference_config": "tims_config"}, + # {"detector_id": "det_346zU49rFpY9I7f1PiuzArYdTdJ", "edge_inference_config": "tims_config"}, + # {"detector_id": "det_398G23k3Dn6sRPVHbySHeyGLmai", "edge_inference_config": "tims_config"}, + # {"detector_id": "det_39oO6BR1IJjmLOjmWF3F4R4of4o", "edge_inference_config": "tims_config"}, + # {"detector_id": "det_2mWrXjFFSGcn2PYKakS7EfYfF3l", "edge_inference_config": "tims_config"}, + ], +) +print("Configured Edge Endpoint successfully!") From 08333511f86eebc282e90aa88c8657ab32c0e9ae Mon Sep 17 00:00:00 2001 From: Tim Huff Date: Mon, 2 Mar 2026 11:57:17 -0800 Subject: [PATCH 5/5] minor changes --- configure_edge_demo.py | 11 +- edge-config-plan.md | 149 ++++++++++++++++++++++++++-- src/groundlight/experimental_api.py | 90 +++++++++++------ 3 files changed, 205 insertions(+), 45 deletions(-) diff --git a/configure_edge_demo.py b/configure_edge_demo.py index c060edab..30bdc642 100644 --- a/configure_edge_demo.py +++ b/configure_edge_demo.py @@ -11,13 +11,18 @@ "disable_cloud_escalation": False, "min_time_between_escalations": 30.0, }, - }, + "another_config": { + "enabled": True, + "always_return_edge_prediction": True, + "disable_cloud_escalation": True, + "min_time_between_escalations": 20.0, + }, }, detectors=[ {"detector_id": "det_31WjVpvBiOmxUBzVqiOa3G2UgMV", "edge_inference_config": "tims_config"}, - # {"detector_id": "det_346zU49rFpY9I7f1PiuzArYdTdJ", "edge_inference_config": "tims_config"}, + {"detector_id": "det_346zU49rFpY9I7f1PiuzArYdTdJ", "edge_inference_config": "tims_config"}, # {"detector_id": "det_398G23k3Dn6sRPVHbySHeyGLmai", "edge_inference_config": "tims_config"}, # {"detector_id": "det_39oO6BR1IJjmLOjmWF3F4R4of4o", "edge_inference_config": "tims_config"}, - # {"detector_id": "det_2mWrXjFFSGcn2PYKakS7EfYfF3l", "edge_inference_config": "tims_config"}, + # {"detector_id": "det_2mWrXjFFSGcn2PYKakS7EfYfF3l", "edge_inference_config": "another_config"}, ], ) print("Configured Edge Endpoint successfully!") diff --git a/edge-config-plan.md b/edge-config-plan.md index 9e43119a..96697574 100644 --- a/edge-config-plan.md +++ b/edge-config-plan.md @@ -19,12 +19,16 @@ When the SDK is pointed at the cloud API, calling this method should raise an er | Waiting strategy | SDK polls `GET /status/metrics.json` for detector readiness | | Config persistence | Runtime config written to YAML file on shared PVC; read on startup by all containers | -## Implementation +--- + +## Implemented ### Edge Endpoint: `POST /device-api/v1/edge/configure` **File**: `app/api/routes/edge_config.py`, registered in `app/api/api.py` +Currently supports **merge mode only**: incoming config is merged into the existing config. + On receiving a config POST, the handler: 1. Merges incoming JSON into the existing `RootEdgeConfig` (global_config, edge_inference_configs, detectors) 2. Validates the merged config via Pydantic (returns 400 on invalid config) @@ -68,7 +72,7 @@ of which PVC subdirectory it has mounted. every 5 seconds until all configured detectors have `status: "ready"`, or raises on timeout - Direct `requests.post()` call, not OpenAPI-generated -### End-to-End Flow +### End-to-End Flow (merge mode) 1. SDK POSTs config to edge endpoint 2. Edge endpoint merges incoming config with existing config @@ -81,7 +85,138 @@ of which PVC subdirectory it has mounted. 9. SDK polls `/status/metrics.json` -- status monitor reads runtime config from PVC and reports pod readiness via Kubernetes API 10. All detectors show `status: "ready"` -> SDK returns -### Known Limitations +### Cloud vs. Edge Detection + +| Scenario | What happens | +|---|---| +| SDK -> cloud (`api.groundlight.ai`) | Cloud returns 404 -> SDK raises error | +| SDK -> edge endpoint (with new route) | FastAPI handles it -> returns 200 | +| SDK -> old edge endpoint (without new route) | FastAPI returns 404 -> nginx falls back to cloud -> 404 -> SDK raises error | + +--- + +## Planned: Replace Mode + +### Problem + +The current merge mode can only **add or update** detectors. It cannot remove detectors +that are no longer desired. If a user wants to go from 5 detectors to 3, the old 2 +detectors' inference pods keep running and wasting resources. + +### Proposed SDK Change + +Add a `replace` parameter to `configure_edge()`: + +- `replace=False` (default): Current merge behavior. Incoming config is merged into existing. +- `replace=True`: Incoming config fully replaces the existing config. Detectors not in the + new config are removed (their inference pods are deleted). + +### Required Edge Endpoint Changes + +The edge endpoint currently has **no code to delete** Kubernetes Deployments, Services, +database records, or model files for detectors. All of this needs to be added. + +**Existing infrastructure we can use:** +- `get_edge_inference_deployment_name(detector_id)` and `get_edge_inference_service_name(detector_id)` + in `app/core/edge_inference.py` map detector IDs to K8s resource names. +- The service account (`edge-endpoint-service-account`) already has RBAC permissions to + `delete` both `deployments` and `services`. These permissions are currently unused. +- `InferenceDeploymentManager` in `app/core/kubernetes_management.py` already has a K8s + client and namespace context. Adding a `delete_inference_deployment()` method here is natural. +- `get_detector_models_dir(repository_root, detector_id)` returns the model directory + (`{MODEL_REPOSITORY_PATH}/{detector_id}/`). Deleting this directory removes all model + files (primary + oodd + all versions). + +**What needs to be built:** + +1. **`InferenceDeploymentManager.delete_inference_deployment(detector_id, is_oodd)`** + (`app/core/kubernetes_management.py`) + - Call `delete_namespaced_deployment()` to remove the inference Deployment + - Call `delete_namespaced_service()` to remove the inference Service + - The naming functions already exist to map detector ID -> resource names + +2. **`DatabaseManager.delete_inference_deployment_record(model_name)`** + (`app/core/database.py`) + - Delete the DB record so the model updater doesn't recreate the deployment on its + next refresh cycle + +3. **Model file cleanup** + - Delete `{MODEL_REPOSITORY_PATH}/{detector_id}/` (the entire detector directory, + which contains `primary/` and `oodd/` subdirs with versioned model files) + - Can use `shutil.rmtree()` (already used by `delete_model_version()`) + +4. **Replace logic in `edge_config.py` handler** + - Accept a `replace` flag in the POST body + - If `replace=True`: use the incoming config as-is instead of merging + - Diff old vs new detector sets: `removed = old_detector_ids - new_detector_ids` + - **Deletion must complete before new pods roll out.** The edge endpoint must wait + for removed detector pods to fully terminate (not just in `Terminating` state) + before writing DB records for new detectors. This prevents OOM from old and new + pods competing for the same finite GPU/memory resources. + - For each removed detector: + a. Call `InferenceDeploymentManager.delete_inference_deployment()` (primary + oodd) + b. Poll until the pods are fully gone (not just Terminating) + c. Call `DatabaseManager.delete_inference_deployment_record()` (primary + oodd) + d. Delete model files from disk + e. Clean up `EdgeInferenceManager` state (inference_client_urls, oodd URLs, + escalation tracking) + - After all deletions complete: proceed with new/retained detectors (same flow as + current merge mode) + - `configure_edge(detectors=[], replace=True)` is valid and removes all detector pods. + +5. **SDK changes** + - Add `replace: bool = False` parameter to `configure_edge()` + - Pass `replace` flag in POST body + - When `replace=True` and `wait > 0`: wait for removed pods to terminate AND for + new/retained pods to become ready + +### Ordering Guarantee + +When `replace=True`, the edge endpoint enforces this sequence: + +``` +1. Delete removed detector deployments/services +2. Wait for removed pods to fully terminate +3. Clean up DB records + model files for removed detectors +4. Write DB records for new detectors +5. Model updater picks up new detectors and creates deployments +``` + +This ensures old pods release their resources before new pods are scheduled, +preventing resource exhaustion on memory/GPU-constrained edge devices. + +### Decisions + +- **Async**: The POST handler returns immediately. Deletion and re-creation happen + in a FastAPI background task. The SDK polls `/status/metrics.json` for completion. +- **Termination time**: Inference pods use the K8s default of 30s + `terminationGracePeriodSeconds`. No custom value is set. +- **Partial failure**: Error out. If deletion of any detector fails, the background + task logs the error and stops. The config is left in a partially cleaned state; + the user can retry. + +### Implementation Details + +**Edge endpoint needs an `InferenceDeploymentManager` on `AppState`.** +Currently only the model-updater container creates one. The edge-endpoint container +has the K8s service account and RBAC permissions but doesn't use them. We add +an `InferenceDeploymentManager` to `AppState`, guarded by the existing +`DEPLOY_DETECTOR_LEVEL_INFERENCE` env var (only set in K8s, not Docker tests). + +**Background task flow** (runs after POST returns): +1. Delete K8s Deployments + Services for each removed detector +2. Poll until pods are fully terminated (not just Terminating) +3. Delete DB records for removed detectors +4. Delete model files from PVC (`shutil.rmtree({MODEL_REPOSITORY_PATH}/{detector_id}/`) +5. Write DB records for new detectors (model updater picks these up) + +**SDK polling**: When `replace=True` and `wait > 0`, the SDK waits until: +- Removed detector IDs no longer appear in `/status/metrics.json` +- New/retained detector IDs all show `status: "ready"` + +--- + +## Known Limitations - **Multiprocess in-memory state**: Edge endpoint runs multiple uvicorn workers. The in-memory config update only applies to the worker that handles the POST. Other workers retain stale @@ -92,14 +227,6 @@ of which PVC subdirectory it has mounted. - **File write race**: If two concurrent config POSTs hit different workers, the last write wins. This is acceptable for now; atomic file writes or a lock file can be added later if needed. -### Cloud vs. Edge Detection - -| Scenario | What happens | -|---|---| -| SDK -> cloud (`api.groundlight.ai`) | Cloud returns 404 -> SDK raises error | -| SDK -> edge endpoint (with new route) | FastAPI handles it -> returns 200 | -| SDK -> old edge endpoint (without new route) | FastAPI returns 404 -> nginx falls back to cloud -> 404 -> SDK raises error | - ## Future Work - Define proper Pydantic models in the SDK for config validation diff --git a/src/groundlight/experimental_api.py b/src/groundlight/experimental_api.py index 0b13d775..21fedfee 100644 --- a/src/groundlight/experimental_api.py +++ b/src/groundlight/experimental_api.py @@ -912,20 +912,21 @@ def configure_edge( # noqa: PLR0913 global_config: Optional[dict] = None, edge_inference_configs: Optional[dict] = None, detectors: Optional[list] = None, + replace: bool = True, wait: float = 600, ) -> None: """ Sends configuration to a Groundlight Edge Endpoint. This method only works when the SDK is connected to an Edge Endpoint -- calling it against the cloud API will raise an error. - All parameters are optional. Only the sections you provide will be updated. + By default, the incoming configuration is **merged** into the existing config (adding or + updating detectors, presets, and global settings). Set ``replace=True`` to **replace** the + entire config, which will remove any detectors not included in the new config. - Currently, the Edge Endpoint logs the received configuration. Future versions will apply - the configuration at runtime. - - If ``wait`` is set and ``detectors`` are provided, the method will poll the edge - endpoint's status monitor until all configured detectors report ``status: "ready"``, - or until the timeout expires. + If ``wait`` is set and ``detectors`` are provided (or detectors are being removed), + the method will poll the edge endpoint's status monitor until all configured detectors + report ``status: "ready"`` and all removed detectors are gone, or until the timeout + expires. .. note:: This method makes a direct HTTP request rather than using the OpenAPI-generated client, @@ -937,23 +938,24 @@ def configure_edge( # noqa: PLR0913 gl = ExperimentalApi(endpoint="http://localhost:30101") - # Configure and wait up to 120s for pods to be ready + # Merge: add/update detectors (existing detectors are preserved) gl.configure_edge( - edge_inference_configs={ - "no_cloud": { - "enabled": True, - "always_return_edge_prediction": True, - "disable_cloud_escalation": True, - }, - }, detectors=[ {"detector_id": "det_abc123", "edge_inference_config": "no_cloud"}, ], wait=120, ) - # Or just update global config (no waiting needed) - gl.configure_edge(global_config={"refresh_rate": 10}) + # Replace: only these detectors will remain; all others are removed + gl.configure_edge( + edge_inference_configs={ + "default": {"enabled": True}, + }, + detectors=[ + {"detector_id": "det_abc123", "edge_inference_config": "default"}, + ], + replace=True, + ) :param global_config: Global edge endpoint settings (e.g., ``refresh_rate``, ``confident_audit_rate``). @@ -962,6 +964,8 @@ def configure_edge( # noqa: PLR0913 ``always_return_edge_prediction``, ``disable_cloud_escalation``). :param detectors: A list of detector configuration dicts, each containing a ``detector_id`` and an ``edge_inference_config`` preset name. + :param replace: If True, the incoming config fully replaces the existing config. + Detectors not in the new config will have their inference pods deleted. :param wait: Maximum time in seconds to wait for all configured detectors to become ready on the edge endpoint. Defaults to 600 (10 minutes). Set to 0 to return immediately after posting the configuration. @@ -969,13 +973,15 @@ def configure_edge( # noqa: PLR0913 :raises GroundlightClientError: If the endpoint is not an Edge Endpoint, or if the wait timeout expires before all detectors are ready. """ - payload = {} + payload: dict = {} if global_config is not None: payload["global_config"] = global_config if edge_inference_configs is not None: payload["edge_inference_configs"] = edge_inference_configs if detectors is not None: payload["detectors"] = detectors + if replace: + payload["replace"] = True url = f"{self.endpoint}/v1/edge/configure" headers = {"x-api-token": self.configuration.api_key["ApiToken"]} @@ -985,14 +991,28 @@ def configure_edge( # noqa: PLR0913 raise GroundlightClientError( "This endpoint does not support edge configuration. Are you connected to a Groundlight Edge Endpoint?" ) - response.raise_for_status() + if not response.ok: + try: + detail = response.json().get("detail", response.text) + except Exception: + detail = response.text + raise GroundlightClientError(f"Edge configuration failed (HTTP {response.status_code}): {detail}") - if wait > 0 and detectors: - self._wait_for_edge_detectors_ready(detectors, timeout=wait) + result = response.json() + removed_ids = set(result.get("removed", [])) - def _wait_for_edge_detectors_ready(self, detectors: list, timeout: float) -> None: + should_wait = wait > 0 and (detectors or removed_ids) + if should_wait: + expected_ids = {d["detector_id"] for d in detectors} if detectors else set() + self._wait_for_edge_config_applied(expected_ids, removed_ids, timeout=wait) + + def _wait_for_edge_config_applied( + self, + expected_ids: set[str], + removed_ids: set[str], + timeout: float, + ) -> None: POLL_INTERVAL = 5 - detector_ids = {d["detector_id"] for d in detectors} deadline = time.time() + timeout while True: @@ -1002,21 +1022,29 @@ def _wait_for_edge_detectors_ready(self, detectors: list, timeout: float) -> Non logger.warning(f"Failed to fetch edge detector statuses: {e}") statuses = {} - not_ready = [] - for det_id in detector_ids: + issues = [] + + for det_id in expected_ids: det_status = statuses.get(det_id, {}).get("status") if det_status != "ready": - not_ready.append((det_id, det_status)) + issues.append((det_id, det_status or "unknown")) - if not not_ready: - logger.info(f"All {len(detector_ids)} edge detector(s) are ready.") + for det_id in removed_ids: + if det_id in statuses: + issues.append((det_id, "still_present")) + + if not issues: + logger.info( + f"Edge config applied: {len(expected_ids)} detector(s) ready, " + f"{len(removed_ids)} detector(s) removed." + ) return if time.time() >= deadline: - summary = ", ".join(f"{did}={st or 'unknown'}" for did, st in not_ready) + summary = ", ".join(f"{did}={st}" for did, st in issues) raise GroundlightClientError( - f"Timed out after {timeout}s waiting for edge detectors to be ready. Not ready: {summary}" + f"Timed out after {timeout}s waiting for edge config to be applied. Issues: {summary}" ) - logger.debug(f"Waiting for {len(not_ready)}/{len(detector_ids)} edge detector(s) to be ready...") + logger.debug(f"Waiting for edge config: {len(issues)} issue(s) remaining...") time.sleep(POLL_INTERVAL)