|
38 | 38 |
|
39 | 39 | import argparse |
40 | 40 | import base64 |
| 41 | +import copy |
41 | 42 | import json |
42 | 43 | import os |
43 | 44 | import socket |
@@ -314,7 +315,6 @@ def _sanitize_stub(stub: dict, label: str) -> dict: |
314 | 315 | In strict mode (OS4CSAPI_STRICT_BOOTSTRAP=1) a RuntimeError is raised |
315 | 316 | instead — useful for CI to catch callers that should use sml_body instead. |
316 | 317 | """ |
317 | | - import copy |
318 | 318 | if not isinstance(stub, dict): |
319 | 319 | return stub |
320 | 320 | props = stub.get("properties", stub) |
@@ -343,6 +343,105 @@ def _sanitize_stub(stub: dict, label: str) -> dict: |
343 | 343 | _warn_if_sml_fields_in_stub = _sanitize_stub |
344 | 344 |
|
345 | 345 |
|
| 346 | +_SAFE_PROPS = frozenset({"uid", "featureType", "name", "description", "validTime", |
| 347 | + "platform@link", "deployedSystem@link"}) |
| 348 | +_SAFE_DS_FIELDS = frozenset({"outputName", "name", "description", "schema", "obsTypes"}) |
| 349 | + |
| 350 | +# Top-level datastream body fields rejected by csapi-go-v2 |
| 351 | +_DS_STRIP_FIELDS = frozenset({"uid", "documentation", "links"}) |
| 352 | + |
| 353 | +# SWE Common field-level attributes rejected by csapi-go-v2 |
| 354 | +_SWE_FIELD_STRIP_ATTRS = frozenset({"referenceTime"}) |
| 355 | + |
| 356 | + |
| 357 | +def _sanitize_swe_fields(fields: list) -> list: |
| 358 | + """Recursively strip unknown SWE field attributes (e.g. referenceTime).""" |
| 359 | + result = [] |
| 360 | + for field in fields: |
| 361 | + if not isinstance(field, dict): |
| 362 | + result.append(field) |
| 363 | + continue |
| 364 | + f = {k: v for k, v in field.items() if k not in _SWE_FIELD_STRIP_ATTRS} |
| 365 | + if "fields" in f: |
| 366 | + f = dict(f) |
| 367 | + f["fields"] = _sanitize_swe_fields(f["fields"]) |
| 368 | + result.append(f) |
| 369 | + return result |
| 370 | + |
| 371 | + |
| 372 | +def _sanitize_datastream_body(body: dict, label: str = "") -> dict: |
| 373 | + """Strip fields from a datastream POST body that strict CSAPI servers reject. |
| 374 | +
|
| 375 | + Removes top-level extension fields (uid, documentation, links) and unknown |
| 376 | + SWE field attributes (referenceTime) within schema.resultSchema.fields. |
| 377 | + Returns a modified copy; the original is not mutated. |
| 378 | + """ |
| 379 | + body = copy.deepcopy(body) |
| 380 | + stripped = [f for f in _DS_STRIP_FIELDS if f in body] |
| 381 | + if stripped: |
| 382 | + for f in stripped: |
| 383 | + body.pop(f) |
| 384 | + print(f" [WARN] Stripped datastream field(s) {stripped} before POST" |
| 385 | + f"{' for ' + label if label else ''}") |
| 386 | + try: |
| 387 | + fields = body["schema"]["resultSchema"]["fields"] |
| 388 | + sanitized = _sanitize_swe_fields(fields) |
| 389 | + if sanitized != fields: |
| 390 | + body["schema"] = dict(body["schema"]) |
| 391 | + body["schema"]["resultSchema"] = dict(body["schema"]["resultSchema"]) |
| 392 | + body["schema"]["resultSchema"]["fields"] = sanitized |
| 393 | + except (KeyError, TypeError): |
| 394 | + pass |
| 395 | + return body |
| 396 | + |
| 397 | + |
| 398 | +def _post_with_fallback(base_url: str, collection: str, stub: dict, auth: str, |
| 399 | + label: str = "") -> dict | None: |
| 400 | + """POST stub to collection, retrying with a minimal body on field-rejection 400s. |
| 401 | +
|
| 402 | + Strict CSAPI servers (e.g. csapi-go-v2) reject unknown fields with HTTP 400. |
| 403 | + Errors may say 'unknown field X' or the more generic 'Invalid request body'. |
| 404 | + On either, we rebuild the body keeping only the known-safe fields and retry |
| 405 | + once, so the resource gets created without the unsupported metadata. |
| 406 | + """ |
| 407 | + try: |
| 408 | + return api_post(base_url, collection, stub, auth) |
| 409 | + except RuntimeError as exc: |
| 410 | + exc_str = str(exc) |
| 411 | + if "400" not in exc_str: |
| 412 | + raise |
| 413 | + if "unknown field" not in exc_str and "Invalid request body" not in exc_str: |
| 414 | + raise |
| 415 | + # Rebuild with only safe fields |
| 416 | + props = stub.get("properties", stub) |
| 417 | + minimal_props = {k: v for k, v in props.items() if k in _SAFE_PROPS} |
| 418 | + minimal = {"type": "Feature", "geometry": stub.get("geometry"), "properties": minimal_props} |
| 419 | + print(f" [WARN] POST {collection} failed ({exc_str:.120}); retrying with minimal stub{' for ' + label if label else ''}") |
| 420 | + return api_post(base_url, collection, minimal, auth) |
| 421 | + |
| 422 | + |
| 423 | +def _post_datastream_with_fallback(base_url: str, path: str, body: dict, auth: str, |
| 424 | + label: str = "") -> dict | None: |
| 425 | + """POST datastream body, retrying with safe fields only on 'unknown field' 400.""" |
| 426 | + try: |
| 427 | + return api_post(base_url, path, body, auth) |
| 428 | + except RuntimeError as exc: |
| 429 | + if "400" not in str(exc) or "unknown field" not in str(exc): |
| 430 | + raise |
| 431 | + minimal = {k: v for k, v in body.items() if k in _SAFE_DS_FIELDS} |
| 432 | + # Preserve sanitized schema fields in minimal body |
| 433 | + if "schema" in minimal: |
| 434 | + try: |
| 435 | + fields = minimal["schema"]["resultSchema"]["fields"] |
| 436 | + minimal["schema"] = dict(minimal["schema"]) |
| 437 | + minimal["schema"]["resultSchema"] = dict(minimal["schema"]["resultSchema"]) |
| 438 | + minimal["schema"]["resultSchema"]["fields"] = _sanitize_swe_fields(fields) |
| 439 | + except (KeyError, TypeError): |
| 440 | + pass |
| 441 | + print(f" [WARN] POST {path} failed ({exc!s:.120}); retrying with minimal datastream body{' for ' + label if label else ''}") |
| 442 | + return api_post(base_url, path, minimal, auth) |
| 443 | + |
| 444 | + |
346 | 445 | # ═══════════════════════════════════════════════════════════════════════════ |
347 | 446 | # Idempotent resource creation |
348 | 447 | # ═══════════════════════════════════════════════════════════════════════════ |
@@ -400,8 +499,8 @@ def ensure_procedure(base_url: str, auth: str, uid: str, stub_body: dict, |
400 | 499 | print(f" [DRY] Would create procedure: {uid}") |
401 | 500 | return None |
402 | 501 |
|
403 | | - # Step 1: POST geo+json stub |
404 | | - result = api_post(base_url, "procedures", stub_body, auth) |
| 502 | + # Step 1: POST geo+json stub (with unknown-field fallback for strict servers) |
| 503 | + result = _post_with_fallback(base_url, "procedures", stub_body, auth, uid) |
405 | 504 | new_id = result.get("id") if result else None |
406 | 505 |
|
407 | 506 | # Step 2: PUT SensorML if provided |
@@ -458,8 +557,8 @@ def ensure_system(base_url: str, auth: str, uid: str, stub_body: dict, |
458 | 557 | print(f" [DRY] Would create system: {uid}") |
459 | 558 | return None |
460 | 559 |
|
461 | | - # Step 1: POST geo+json stub |
462 | | - result = api_post(base_url, "systems", stub_body, auth) |
| 560 | + # Step 1: POST geo+json stub (with unknown-field fallback for strict servers) |
| 561 | + result = _post_with_fallback(base_url, "systems", stub_body, auth, uid) |
463 | 562 | new_id = result.get("id") if result else None |
464 | 563 |
|
465 | 564 | # Step 2: PUT SensorML if provided |
@@ -496,7 +595,9 @@ def ensure_datastream(base_url: str, auth: str, system_id: str, |
496 | 595 | print(f" [DRY] Would create datastream '{output_name}' on system {system_id}") |
497 | 596 | return None |
498 | 597 |
|
499 | | - result = api_post(base_url, f"systems/{system_id}/datastreams", schema_body, auth) |
| 598 | + schema_body = _sanitize_datastream_body(schema_body, output_name) |
| 599 | + result = _post_datastream_with_fallback(base_url, f"systems/{system_id}/datastreams", |
| 600 | + schema_body, auth, output_name) |
500 | 601 | new_id = result.get("id") if result else None |
501 | 602 | print(f" [OK] Created datastream '{output_name}' → id={new_id}") |
502 | 603 | if stats: |
@@ -573,7 +674,7 @@ def ensure_deployment(base_url: str, auth: str, uid: str, stub_body: dict, |
573 | 674 | if parent_id: |
574 | 675 | path = f"deployments/{parent_id}/subdeployments" |
575 | 676 |
|
576 | | - result = api_post(base_url, path, stub_body, auth) |
| 677 | + result = _post_with_fallback(base_url, path, stub_body, auth, uid) |
577 | 678 | new_id = result.get("id") if result else None |
578 | 679 |
|
579 | 680 | # Step 2: PUT SensorML against the canonical /deployments/{id} path |
|
0 commit comments