Skip to content

Commit 0833351

Browse files
committed
minor changes
1 parent 68d4227 commit 0833351

File tree

3 files changed

+205
-45
lines changed

3 files changed

+205
-45
lines changed

configure_edge_demo.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,18 @@
1111
"disable_cloud_escalation": False,
1212
"min_time_between_escalations": 30.0,
1313
},
14-
},
14+
"another_config": {
15+
"enabled": True,
16+
"always_return_edge_prediction": True,
17+
"disable_cloud_escalation": True,
18+
"min_time_between_escalations": 20.0,
19+
}, },
1520
detectors=[
1621
{"detector_id": "det_31WjVpvBiOmxUBzVqiOa3G2UgMV", "edge_inference_config": "tims_config"},
17-
# {"detector_id": "det_346zU49rFpY9I7f1PiuzArYdTdJ", "edge_inference_config": "tims_config"},
22+
{"detector_id": "det_346zU49rFpY9I7f1PiuzArYdTdJ", "edge_inference_config": "tims_config"},
1823
# {"detector_id": "det_398G23k3Dn6sRPVHbySHeyGLmai", "edge_inference_config": "tims_config"},
1924
# {"detector_id": "det_39oO6BR1IJjmLOjmWF3F4R4of4o", "edge_inference_config": "tims_config"},
20-
# {"detector_id": "det_2mWrXjFFSGcn2PYKakS7EfYfF3l", "edge_inference_config": "tims_config"},
25+
# {"detector_id": "det_2mWrXjFFSGcn2PYKakS7EfYfF3l", "edge_inference_config": "another_config"},
2126
],
2227
)
2328
print("Configured Edge Endpoint successfully!")

edge-config-plan.md

Lines changed: 138 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@ When the SDK is pointed at the cloud API, calling this method should raise an er
1919
| Waiting strategy | SDK polls `GET /status/metrics.json` for detector readiness |
2020
| Config persistence | Runtime config written to YAML file on shared PVC; read on startup by all containers |
2121

22-
## Implementation
22+
---
23+
24+
## Implemented
2325

2426
### Edge Endpoint: `POST /device-api/v1/edge/configure`
2527

2628
**File**: `app/api/routes/edge_config.py`, registered in `app/api/api.py`
2729

30+
Currently supports **merge mode only**: incoming config is merged into the existing config.
31+
2832
On receiving a config POST, the handler:
2933
1. Merges incoming JSON into the existing `RootEdgeConfig` (global_config, edge_inference_configs, detectors)
3034
2. Validates the merged config via Pydantic (returns 400 on invalid config)
@@ -68,7 +72,7 @@ of which PVC subdirectory it has mounted.
6872
every 5 seconds until all configured detectors have `status: "ready"`, or raises on timeout
6973
- Direct `requests.post()` call, not OpenAPI-generated
7074

71-
### End-to-End Flow
75+
### End-to-End Flow (merge mode)
7276

7377
1. SDK POSTs config to edge endpoint
7478
2. Edge endpoint merges incoming config with existing config
@@ -81,7 +85,138 @@ of which PVC subdirectory it has mounted.
8185
9. SDK polls `/status/metrics.json` -- status monitor reads runtime config from PVC and reports pod readiness via Kubernetes API
8286
10. All detectors show `status: "ready"` -> SDK returns
8387

84-
### Known Limitations
88+
### Cloud vs. Edge Detection
89+
90+
| Scenario | What happens |
91+
|---|---|
92+
| SDK -> cloud (`api.groundlight.ai`) | Cloud returns 404 -> SDK raises error |
93+
| SDK -> edge endpoint (with new route) | FastAPI handles it -> returns 200 |
94+
| SDK -> old edge endpoint (without new route) | FastAPI returns 404 -> nginx falls back to cloud -> 404 -> SDK raises error |
95+
96+
---
97+
98+
## Planned: Replace Mode
99+
100+
### Problem
101+
102+
The current merge mode can only **add or update** detectors. It cannot remove detectors
103+
that are no longer desired. If a user wants to go from 5 detectors to 3, the old 2
104+
detectors' inference pods keep running and wasting resources.
105+
106+
### Proposed SDK Change
107+
108+
Add a `replace` parameter to `configure_edge()`:
109+
110+
- `replace=False` (default): Current merge behavior. Incoming config is merged into existing.
111+
- `replace=True`: Incoming config fully replaces the existing config. Detectors not in the
112+
new config are removed (their inference pods are deleted).
113+
114+
### Required Edge Endpoint Changes
115+
116+
The edge endpoint currently has **no code to delete** Kubernetes Deployments, Services,
117+
database records, or model files for detectors. All of this needs to be added.
118+
119+
**Existing infrastructure we can use:**
120+
- `get_edge_inference_deployment_name(detector_id)` and `get_edge_inference_service_name(detector_id)`
121+
in `app/core/edge_inference.py` map detector IDs to K8s resource names.
122+
- The service account (`edge-endpoint-service-account`) already has RBAC permissions to
123+
`delete` both `deployments` and `services`. These permissions are currently unused.
124+
- `InferenceDeploymentManager` in `app/core/kubernetes_management.py` already has a K8s
125+
client and namespace context. Adding a `delete_inference_deployment()` method here is natural.
126+
- `get_detector_models_dir(repository_root, detector_id)` returns the model directory
127+
(`{MODEL_REPOSITORY_PATH}/{detector_id}/`). Deleting this directory removes all model
128+
files (primary + oodd + all versions).
129+
130+
**What needs to be built:**
131+
132+
1. **`InferenceDeploymentManager.delete_inference_deployment(detector_id, is_oodd)`**
133+
(`app/core/kubernetes_management.py`)
134+
- Call `delete_namespaced_deployment()` to remove the inference Deployment
135+
- Call `delete_namespaced_service()` to remove the inference Service
136+
- The naming functions already exist to map detector ID -> resource names
137+
138+
2. **`DatabaseManager.delete_inference_deployment_record(model_name)`**
139+
(`app/core/database.py`)
140+
- Delete the DB record so the model updater doesn't recreate the deployment on its
141+
next refresh cycle
142+
143+
3. **Model file cleanup**
144+
- Delete `{MODEL_REPOSITORY_PATH}/{detector_id}/` (the entire detector directory,
145+
which contains `primary/` and `oodd/` subdirs with versioned model files)
146+
- Can use `shutil.rmtree()` (already used by `delete_model_version()`)
147+
148+
4. **Replace logic in `edge_config.py` handler**
149+
- Accept a `replace` flag in the POST body
150+
- If `replace=True`: use the incoming config as-is instead of merging
151+
- Diff old vs new detector sets: `removed = old_detector_ids - new_detector_ids`
152+
- **Deletion must complete before new pods roll out.** The edge endpoint must wait
153+
for removed detector pods to fully terminate (not just in `Terminating` state)
154+
before writing DB records for new detectors. This prevents OOM from old and new
155+
pods competing for the same finite GPU/memory resources.
156+
- For each removed detector:
157+
a. Call `InferenceDeploymentManager.delete_inference_deployment()` (primary + oodd)
158+
b. Poll until the pods are fully gone (not just Terminating)
159+
c. Call `DatabaseManager.delete_inference_deployment_record()` (primary + oodd)
160+
d. Delete model files from disk
161+
e. Clean up `EdgeInferenceManager` state (inference_client_urls, oodd URLs,
162+
escalation tracking)
163+
- After all deletions complete: proceed with new/retained detectors (same flow as
164+
current merge mode)
165+
- `configure_edge(detectors=[], replace=True)` is valid and removes all detector pods.
166+
167+
5. **SDK changes**
168+
- Add `replace: bool = False` parameter to `configure_edge()`
169+
- Pass `replace` flag in POST body
170+
- When `replace=True` and `wait > 0`: wait for removed pods to terminate AND for
171+
new/retained pods to become ready
172+
173+
### Ordering Guarantee
174+
175+
When `replace=True`, the edge endpoint enforces this sequence:
176+
177+
```
178+
1. Delete removed detector deployments/services
179+
2. Wait for removed pods to fully terminate
180+
3. Clean up DB records + model files for removed detectors
181+
4. Write DB records for new detectors
182+
5. Model updater picks up new detectors and creates deployments
183+
```
184+
185+
This ensures old pods release their resources before new pods are scheduled,
186+
preventing resource exhaustion on memory/GPU-constrained edge devices.
187+
188+
### Decisions
189+
190+
- **Async**: The POST handler returns immediately. Deletion and re-creation happen
191+
in a FastAPI background task. The SDK polls `/status/metrics.json` for completion.
192+
- **Termination time**: Inference pods use the K8s default of 30s
193+
`terminationGracePeriodSeconds`. No custom value is set.
194+
- **Partial failure**: Error out. If deletion of any detector fails, the background
195+
task logs the error and stops. The config is left in a partially cleaned state;
196+
the user can retry.
197+
198+
### Implementation Details
199+
200+
**Edge endpoint needs an `InferenceDeploymentManager` on `AppState`.**
201+
Currently only the model-updater container creates one. The edge-endpoint container
202+
has the K8s service account and RBAC permissions but doesn't use them. We add
203+
an `InferenceDeploymentManager` to `AppState`, guarded by the existing
204+
`DEPLOY_DETECTOR_LEVEL_INFERENCE` env var (only set in K8s, not Docker tests).
205+
206+
**Background task flow** (runs after POST returns):
207+
1. Delete K8s Deployments + Services for each removed detector
208+
2. Poll until pods are fully terminated (not just Terminating)
209+
3. Delete DB records for removed detectors
210+
4. Delete model files from PVC (`shutil.rmtree({MODEL_REPOSITORY_PATH}/{detector_id}/`)
211+
5. Write DB records for new detectors (model updater picks these up)
212+
213+
**SDK polling**: When `replace=True` and `wait > 0`, the SDK waits until:
214+
- Removed detector IDs no longer appear in `/status/metrics.json`
215+
- New/retained detector IDs all show `status: "ready"`
216+
217+
---
218+
219+
## Known Limitations
85220

86221
- **Multiprocess in-memory state**: Edge endpoint runs multiple uvicorn workers. The in-memory
87222
config update only applies to the worker that handles the POST. Other workers retain stale
@@ -92,14 +227,6 @@ of which PVC subdirectory it has mounted.
92227
- **File write race**: If two concurrent config POSTs hit different workers, the last write wins.
93228
This is acceptable for now; atomic file writes or a lock file can be added later if needed.
94229

95-
### Cloud vs. Edge Detection
96-
97-
| Scenario | What happens |
98-
|---|---|
99-
| SDK -> cloud (`api.groundlight.ai`) | Cloud returns 404 -> SDK raises error |
100-
| SDK -> edge endpoint (with new route) | FastAPI handles it -> returns 200 |
101-
| SDK -> old edge endpoint (without new route) | FastAPI returns 404 -> nginx falls back to cloud -> 404 -> SDK raises error |
102-
103230
## Future Work
104231

105232
- Define proper Pydantic models in the SDK for config validation

src/groundlight/experimental_api.py

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -912,20 +912,21 @@ def configure_edge( # noqa: PLR0913
912912
global_config: Optional[dict] = None,
913913
edge_inference_configs: Optional[dict] = None,
914914
detectors: Optional[list] = None,
915+
replace: bool = True,
915916
wait: float = 600,
916917
) -> None:
917918
"""
918919
Sends configuration to a Groundlight Edge Endpoint. This method only works when the SDK
919920
is connected to an Edge Endpoint -- calling it against the cloud API will raise an error.
920921
921-
All parameters are optional. Only the sections you provide will be updated.
922+
By default, the incoming configuration is **merged** into the existing config (adding or
923+
updating detectors, presets, and global settings). Set ``replace=True`` to **replace** the
924+
entire config, which will remove any detectors not included in the new config.
922925
923-
Currently, the Edge Endpoint logs the received configuration. Future versions will apply
924-
the configuration at runtime.
925-
926-
If ``wait`` is set and ``detectors`` are provided, the method will poll the edge
927-
endpoint's status monitor until all configured detectors report ``status: "ready"``,
928-
or until the timeout expires.
926+
If ``wait`` is set and ``detectors`` are provided (or detectors are being removed),
927+
the method will poll the edge endpoint's status monitor until all configured detectors
928+
report ``status: "ready"`` and all removed detectors are gone, or until the timeout
929+
expires.
929930
930931
.. note::
931932
This method makes a direct HTTP request rather than using the OpenAPI-generated client,
@@ -937,23 +938,24 @@ def configure_edge( # noqa: PLR0913
937938
938939
gl = ExperimentalApi(endpoint="http://localhost:30101")
939940
940-
# Configure and wait up to 120s for pods to be ready
941+
# Merge: add/update detectors (existing detectors are preserved)
941942
gl.configure_edge(
942-
edge_inference_configs={
943-
"no_cloud": {
944-
"enabled": True,
945-
"always_return_edge_prediction": True,
946-
"disable_cloud_escalation": True,
947-
},
948-
},
949943
detectors=[
950944
{"detector_id": "det_abc123", "edge_inference_config": "no_cloud"},
951945
],
952946
wait=120,
953947
)
954948
955-
# Or just update global config (no waiting needed)
956-
gl.configure_edge(global_config={"refresh_rate": 10})
949+
# Replace: only these detectors will remain; all others are removed
950+
gl.configure_edge(
951+
edge_inference_configs={
952+
"default": {"enabled": True},
953+
},
954+
detectors=[
955+
{"detector_id": "det_abc123", "edge_inference_config": "default"},
956+
],
957+
replace=True,
958+
)
957959
958960
:param global_config: Global edge endpoint settings (e.g., ``refresh_rate``,
959961
``confident_audit_rate``).
@@ -962,20 +964,24 @@ def configure_edge( # noqa: PLR0913
962964
``always_return_edge_prediction``, ``disable_cloud_escalation``).
963965
:param detectors: A list of detector configuration dicts, each containing a
964966
``detector_id`` and an ``edge_inference_config`` preset name.
967+
:param replace: If True, the incoming config fully replaces the existing config.
968+
Detectors not in the new config will have their inference pods deleted.
965969
:param wait: Maximum time in seconds to wait for all configured detectors to become
966970
ready on the edge endpoint. Defaults to 600 (10 minutes). Set to 0 to return
967971
immediately after posting the configuration.
968972
969973
:raises GroundlightClientError: If the endpoint is not an Edge Endpoint, or if the
970974
wait timeout expires before all detectors are ready.
971975
"""
972-
payload = {}
976+
payload: dict = {}
973977
if global_config is not None:
974978
payload["global_config"] = global_config
975979
if edge_inference_configs is not None:
976980
payload["edge_inference_configs"] = edge_inference_configs
977981
if detectors is not None:
978982
payload["detectors"] = detectors
983+
if replace:
984+
payload["replace"] = True
979985

980986
url = f"{self.endpoint}/v1/edge/configure"
981987
headers = {"x-api-token": self.configuration.api_key["ApiToken"]}
@@ -985,14 +991,28 @@ def configure_edge( # noqa: PLR0913
985991
raise GroundlightClientError(
986992
"This endpoint does not support edge configuration. Are you connected to a Groundlight Edge Endpoint?"
987993
)
988-
response.raise_for_status()
994+
if not response.ok:
995+
try:
996+
detail = response.json().get("detail", response.text)
997+
except Exception:
998+
detail = response.text
999+
raise GroundlightClientError(f"Edge configuration failed (HTTP {response.status_code}): {detail}")
9891000

990-
if wait > 0 and detectors:
991-
self._wait_for_edge_detectors_ready(detectors, timeout=wait)
1001+
result = response.json()
1002+
removed_ids = set(result.get("removed", []))
9921003

993-
def _wait_for_edge_detectors_ready(self, detectors: list, timeout: float) -> None:
1004+
should_wait = wait > 0 and (detectors or removed_ids)
1005+
if should_wait:
1006+
expected_ids = {d["detector_id"] for d in detectors} if detectors else set()
1007+
self._wait_for_edge_config_applied(expected_ids, removed_ids, timeout=wait)
1008+
1009+
def _wait_for_edge_config_applied(
1010+
self,
1011+
expected_ids: set[str],
1012+
removed_ids: set[str],
1013+
timeout: float,
1014+
) -> None:
9941015
POLL_INTERVAL = 5
995-
detector_ids = {d["detector_id"] for d in detectors}
9961016
deadline = time.time() + timeout
9971017

9981018
while True:
@@ -1002,21 +1022,29 @@ def _wait_for_edge_detectors_ready(self, detectors: list, timeout: float) -> Non
10021022
logger.warning(f"Failed to fetch edge detector statuses: {e}")
10031023
statuses = {}
10041024

1005-
not_ready = []
1006-
for det_id in detector_ids:
1025+
issues = []
1026+
1027+
for det_id in expected_ids:
10071028
det_status = statuses.get(det_id, {}).get("status")
10081029
if det_status != "ready":
1009-
not_ready.append((det_id, det_status))
1030+
issues.append((det_id, det_status or "unknown"))
10101031

1011-
if not not_ready:
1012-
logger.info(f"All {len(detector_ids)} edge detector(s) are ready.")
1032+
for det_id in removed_ids:
1033+
if det_id in statuses:
1034+
issues.append((det_id, "still_present"))
1035+
1036+
if not issues:
1037+
logger.info(
1038+
f"Edge config applied: {len(expected_ids)} detector(s) ready, "
1039+
f"{len(removed_ids)} detector(s) removed."
1040+
)
10131041
return
10141042

10151043
if time.time() >= deadline:
1016-
summary = ", ".join(f"{did}={st or 'unknown'}" for did, st in not_ready)
1044+
summary = ", ".join(f"{did}={st}" for did, st in issues)
10171045
raise GroundlightClientError(
1018-
f"Timed out after {timeout}s waiting for edge detectors to be ready. Not ready: {summary}"
1046+
f"Timed out after {timeout}s waiting for edge config to be applied. Issues: {summary}"
10191047
)
10201048

1021-
logger.debug(f"Waiting for {len(not_ready)}/{len(detector_ids)} edge detector(s) to be ready...")
1049+
logger.debug(f"Waiting for edge config: {len(issues)} issue(s) remaining...")
10221050
time.sleep(POLL_INTERVAL)

0 commit comments

Comments
 (0)