diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9277f45..219fe8a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -373,6 +373,15 @@ jobs: print('WARNING: Could not find assert_ method call in assert_done') sys.exit(1) PYEOF + + - name: Phase 0 regression safety net (unit) + shell: bash + run: | + pytest tests/unit/test_agent_runtime_phase0.py -v + + - name: Run full test suite + shell: bash + run: | pytest tests/ -v env: CI: true diff --git a/README.md b/README.md index 4a38d47..66e1a67 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,37 @@ async def main(): asyncio.run(main()) ``` +### Failure Artifact Buffer (Phase 1) + +Capture a short ring buffer of screenshots and persist them when a required assertion fails. + +```python +from sentience.failure_artifacts import FailureArtifactsOptions + +await runtime.enable_failure_artifacts( + FailureArtifactsOptions(buffer_seconds=15, capture_on_action=True, fps=0.0) +) + +# After each action, record it (best-effort). +await runtime.record_action("CLICK") +``` + +### Redaction callback (Phase 3) + +Provide a user-defined callback to redact snapshots and decide whether to persist frames. The SDK does not implement image/video redaction. + +```python +from sentience.failure_artifacts import FailureArtifactsOptions, RedactionContext, RedactionResult + +def redact(ctx: RedactionContext) -> RedactionResult: + # Example: drop frames entirely, keep JSON only. + return RedactionResult(drop_frames=True) + +await runtime.enable_failure_artifacts( + FailureArtifactsOptions(on_before_persist=redact) +) +``` + **See examples:** [`examples/asserts/`](examples/asserts/) ## 🚀 Quick Start: Choose Your Abstraction Level diff --git a/sentience/agent_runtime.py b/sentience/agent_runtime.py index ca5ab7c..668ed58 100644 --- a/sentience/agent_runtime.py +++ b/sentience/agent_runtime.py @@ -70,6 +70,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Any +from .failure_artifacts import FailureArtifactBuffer, FailureArtifactsOptions from .models import Snapshot, SnapshotOptions from .verification import AssertContext, AssertOutcome, Predicate @@ -138,6 +139,10 @@ def __init__( # Snapshot state self.last_snapshot: Snapshot | None = None + # Failure artifacts (Phase 1) + self._artifact_buffer: FailureArtifactBuffer | None = None + self._artifact_timer_task: asyncio.Task | None = None + # Cached URL (updated on snapshot or explicit get_url call) self._cached_url: str | None = None @@ -250,6 +255,113 @@ async def snapshot(self, **kwargs: Any) -> Snapshot: self.last_snapshot = await backend_snapshot(self.backend, options=options) return self.last_snapshot + async def enable_failure_artifacts( + self, + options: FailureArtifactsOptions | None = None, + ) -> None: + """ + Enable failure artifact buffer (Phase 1). + """ + opts = options or FailureArtifactsOptions() + self._artifact_buffer = FailureArtifactBuffer( + run_id=self.tracer.run_id, + options=opts, + ) + if opts.fps > 0: + self._artifact_timer_task = asyncio.create_task(self._artifact_timer_loop()) + + def disable_failure_artifacts(self) -> None: + """ + Disable failure artifact buffer and stop background capture. + """ + if self._artifact_timer_task: + self._artifact_timer_task.cancel() + self._artifact_timer_task = None + + async def record_action( + self, + action: str, + *, + url: str | None = None, + ) -> None: + """ + Record an action in the artifact timeline and capture a frame if enabled. + """ + if not self._artifact_buffer: + return + self._artifact_buffer.record_step( + action=action, + step_id=self.step_id, + step_index=self.step_index, + url=url, + ) + if self._artifact_buffer.options.capture_on_action: + await self._capture_artifact_frame() + + async def _capture_artifact_frame(self) -> None: + if not self._artifact_buffer: + return + try: + image_bytes = await self.backend.screenshot_png() + except Exception: + return + self._artifact_buffer.add_frame(image_bytes, fmt="png") + + async def _artifact_timer_loop(self) -> None: + if not self._artifact_buffer: + return + interval = 1.0 / max(0.001, self._artifact_buffer.options.fps) + try: + while True: + await self._capture_artifact_frame() + await asyncio.sleep(interval) + except asyncio.CancelledError: + return + + def finalize_run(self, *, success: bool) -> None: + """ + Finalize artifact buffer at end of run. + """ + if not self._artifact_buffer: + return + if success: + if self._artifact_buffer.options.persist_mode == "always": + self._artifact_buffer.persist( + reason="success", + status="success", + snapshot=self.last_snapshot, + diagnostics=getattr(self.last_snapshot, "diagnostics", None), + metadata=self._artifact_metadata(), + ) + self._artifact_buffer.cleanup() + else: + self._persist_failure_artifacts(reason="finalize_failure") + + def _persist_failure_artifacts(self, *, reason: str) -> None: + if not self._artifact_buffer: + return + self._artifact_buffer.persist( + reason=reason, + status="failure", + snapshot=self.last_snapshot, + diagnostics=getattr(self.last_snapshot, "diagnostics", None), + metadata=self._artifact_metadata(), + ) + self._artifact_buffer.cleanup() + if self._artifact_buffer.options.persist_mode == "onFail": + self.disable_failure_artifacts() + + def _artifact_metadata(self) -> dict[str, Any]: + url = None + if self.last_snapshot is not None: + url = self.last_snapshot.url + elif self._cached_url: + url = self._cached_url + return { + "backend": self.backend.__class__.__name__, + "url": url, + } + def begin_step(self, goal: str, step_index: int | None = None) -> str: """ Begin a new step in the verification loop. @@ -309,6 +421,8 @@ def assert_( kind="assert", record_in_step=True, ) + if required and not outcome.passed: + self._persist_failure_artifacts(reason=f"assert_failed:{label}") return outcome.passed def check(self, predicate: Predicate, label: str, required: bool = False) -> AssertionHandle: @@ -619,6 +733,10 @@ async def eventually( "vision_fallback": True, }, ) + if self.required and not passed: + self.runtime._persist_failure_artifacts( + reason=f"assert_eventually_failed:{self.label}" + ) return passed except Exception as e: # If vision fallback fails, fall through to snapshot_exhausted. @@ -649,6 +767,10 @@ async def eventually( "exhausted": True, }, ) + if self.required: + self.runtime._persist_failure_artifacts( + reason=f"assert_eventually_failed:{self.label}" + ) return False if time.monotonic() >= deadline: @@ -666,6 +788,10 @@ async def eventually( "timeout": True, }, ) + if self.required: + self.runtime._persist_failure_artifacts( + reason=f"assert_eventually_timeout:{self.label}" + ) return False await asyncio.sleep(poll_s) @@ -705,6 +831,10 @@ async def eventually( record_in_step=True, extra={"eventually": True, "attempt": attempt, "final": True, "timeout": True}, ) + if self.required: + self.runtime._persist_failure_artifacts( + reason=f"assert_eventually_timeout:{self.label}" + ) return False await asyncio.sleep(poll_s) diff --git a/sentience/failure_artifacts.py b/sentience/failure_artifacts.py new file mode 100644 index 0000000..fd92135 --- /dev/null +++ b/sentience/failure_artifacts.py @@ -0,0 +1,241 @@ +from __future__ import annotations + +import json +import shutil +import tempfile +import time +from collections.abc import Callable +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Literal + + +@dataclass +class FailureArtifactsOptions: + buffer_seconds: float = 15.0 + capture_on_action: bool = True + fps: float = 0.0 + persist_mode: Literal["onFail", "always"] = "onFail" + output_dir: str = ".sentience/artifacts" + on_before_persist: Callable[[RedactionContext], RedactionResult] | None = None + redact_snapshot_values: bool = True + + +@dataclass +class RedactionContext: + run_id: str + reason: str | None + status: Literal["failure", "success"] + snapshot: Any | None + diagnostics: Any | None + frame_paths: list[str] + metadata: dict[str, Any] + + +@dataclass +class RedactionResult: + snapshot: Any | None = None + diagnostics: Any | None = None + frame_paths: list[str] | None = None + drop_frames: bool = False + + +@dataclass +class _FrameRecord: + ts: float + file_name: str + path: Path + + +class FailureArtifactBuffer: + """ + Ring buffer of screenshots with minimal persistence on failure. + """ + + def __init__( + self, + *, + run_id: str, + options: FailureArtifactsOptions, + time_fn: Callable[[], float] = time.time, + ) -> None: + self.run_id = run_id + self.options = options + self._time_fn = time_fn + self._temp_dir = Path(tempfile.mkdtemp(prefix="sentience-artifacts-")) + self._frames_dir = self._temp_dir / "frames" + self._frames_dir.mkdir(parents=True, exist_ok=True) + self._frames: list[_FrameRecord] = [] + self._steps: list[dict] = [] + self._persisted = False + + @property + def temp_dir(self) -> Path: + return self._temp_dir + + def record_step( + self, + *, + action: str, + step_id: str | None, + step_index: int | None, + url: str | None, + ) -> None: + self._steps.append( + { + "ts": self._time_fn(), + "action": action, + "step_id": step_id, + "step_index": step_index, + "url": url, + } + ) + + def add_frame(self, image_bytes: bytes, *, fmt: str = "png") -> None: + ts = self._time_fn() + file_name = f"frame_{int(ts * 1000)}.{fmt}" + path = self._frames_dir / file_name + path.write_bytes(image_bytes) + self._frames.append(_FrameRecord(ts=ts, file_name=file_name, path=path)) + self._prune() + + def frame_count(self) -> int: + return len(self._frames) + + def _prune(self) -> None: + cutoff = self._time_fn() - max(0.0, self.options.buffer_seconds) + keep: list[_FrameRecord] = [] + for frame in self._frames: + if frame.ts >= cutoff: + keep.append(frame) + else: + try: + frame.path.unlink(missing_ok=True) + except Exception: + pass + self._frames = keep + + def _write_json_atomic(self, path: Path, data: Any) -> None: + tmp_path = path.with_suffix(path.suffix + ".tmp") + tmp_path.write_text(json.dumps(data, indent=2)) + tmp_path.replace(path) + + def _redact_snapshot_defaults(self, payload: Any) -> Any: + if not isinstance(payload, dict): + return payload + elements = payload.get("elements") + if not isinstance(elements, list): + return payload + redacted = [] + for el in elements: + if not isinstance(el, dict): + redacted.append(el) + continue + input_type = (el.get("input_type") or "").lower() + if input_type in {"password", "email", "tel"} and "value" in el: + el = dict(el) + el["value"] = None + el["value_redacted"] = True + redacted.append(el) + payload = dict(payload) + payload["elements"] = redacted + return payload + + def persist( + self, + *, + reason: str | None, + status: Literal["failure", "success"], + snapshot: Any | None = None, + diagnostics: Any | None = None, + metadata: dict[str, Any] | None = None, + ) -> Path | None: + if self._persisted: + return None + + output_dir = Path(self.options.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + ts = int(self._time_fn() * 1000) + run_dir = output_dir / f"{self.run_id}-{ts}" + frames_out = run_dir / "frames" + frames_out.mkdir(parents=True, exist_ok=True) + + snapshot_payload = None + if snapshot is not None: + if hasattr(snapshot, "model_dump"): + snapshot_payload = snapshot.model_dump() + else: + snapshot_payload = snapshot + if self.options.redact_snapshot_values: + snapshot_payload = self._redact_snapshot_defaults(snapshot_payload) + + diagnostics_payload = None + if diagnostics is not None: + if hasattr(diagnostics, "model_dump"): + diagnostics_payload = diagnostics.model_dump() + else: + diagnostics_payload = diagnostics + + frame_paths = [str(frame.path) for frame in self._frames] + drop_frames = False + + if self.options.on_before_persist is not None: + try: + result = self.options.on_before_persist( + RedactionContext( + run_id=self.run_id, + reason=reason, + status=status, + snapshot=snapshot_payload, + diagnostics=diagnostics_payload, + frame_paths=frame_paths, + metadata=metadata or {}, + ) + ) + if result.snapshot is not None: + snapshot_payload = result.snapshot + if result.diagnostics is not None: + diagnostics_payload = result.diagnostics + if result.frame_paths is not None: + frame_paths = result.frame_paths + drop_frames = result.drop_frames + except Exception: + drop_frames = True + + if not drop_frames: + for frame_path in frame_paths: + src = Path(frame_path) + if not src.exists(): + continue + shutil.copy2(src, frames_out / src.name) + + self._write_json_atomic(run_dir / "steps.json", self._steps) + if snapshot_payload is not None: + self._write_json_atomic(run_dir / "snapshot.json", snapshot_payload) + if diagnostics_payload is not None: + self._write_json_atomic(run_dir / "diagnostics.json", diagnostics_payload) + + manifest = { + "run_id": self.run_id, + "created_at_ms": ts, + "status": status, + "reason": reason, + "buffer_seconds": self.options.buffer_seconds, + "frame_count": 0 if drop_frames else len(frame_paths), + "frames": ( + [] if drop_frames else [{"file": Path(p).name, "ts": None} for p in frame_paths] + ), + "snapshot": "snapshot.json" if snapshot_payload is not None else None, + "diagnostics": "diagnostics.json" if diagnostics_payload is not None else None, + "metadata": metadata or {}, + "frames_redacted": not drop_frames and self.options.on_before_persist is not None, + "frames_dropped": drop_frames, + } + self._write_json_atomic(run_dir / "manifest.json", manifest) + + self._persisted = True + return run_dir + + def cleanup(self) -> None: + if self._temp_dir.exists(): + shutil.rmtree(self._temp_dir, ignore_errors=True) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index dc65871..7047367 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -9,7 +9,8 @@ pure unit/contract tests without requiring Playwright. IMPORTANT: -- These stubs are only active during pytest runs (via conftest import order). +- These stubs are ONLY loaded when Playwright is NOT installed. +- When Playwright IS installed, real Playwright is used for all tests. - Integration/E2E tests that need real Playwright should install Playwright and will typically run in separate environments. """ @@ -20,48 +21,61 @@ import types -def _ensure_module(name: str) -> types.ModuleType: - if name in sys.modules: - return sys.modules[name] - mod = types.ModuleType(name) - sys.modules[name] = mod - return mod +def _ensure_playwright_stubs() -> None: + """ + Provide minimal `playwright.*` stubs so the SDK can be imported in environments + where Playwright isn't installed (e.g., constrained CI/sandbox). + This is only intended to support pure unit/contract tests that don't actually + launch browsers. + """ -# Create top-level playwright module and submodules -playwright_mod = _ensure_module("playwright") -async_api_mod = _ensure_module("playwright.async_api") -sync_api_mod = _ensure_module("playwright.sync_api") + def _ensure_module(name: str) -> types.ModuleType: + if name in sys.modules: + return sys.modules[name] + mod = types.ModuleType(name) + sys.modules[name] = mod + return mod + # Create top-level playwright module and submodules + playwright_mod = _ensure_module("playwright") + async_api_mod = _ensure_module("playwright.async_api") + sync_api_mod = _ensure_module("playwright.sync_api") -class _Dummy: - """Placeholder type used for Playwright classes in unit tests.""" + class _Dummy: + """Placeholder type used for Playwright classes in unit tests.""" + # Minimal symbols imported by `sentience.browser` + async_api_mod.BrowserContext = _Dummy + async_api_mod.Browser = _Dummy + async_api_mod.Page = _Dummy + async_api_mod.Playwright = _Dummy + async_api_mod.PlaywrightContextManager = _Dummy -# Minimal symbols imported by `sentience.browser` -async_api_mod.BrowserContext = _Dummy -async_api_mod.Page = _Dummy -async_api_mod.Playwright = _Dummy + async def _async_playwright(): + raise RuntimeError("Playwright is not available in this unit-test environment.") + async_api_mod.async_playwright = _async_playwright -async def _async_playwright(): - raise RuntimeError("Playwright is not available in this unit-test environment.") + sync_api_mod.BrowserContext = _Dummy + sync_api_mod.Browser = _Dummy + sync_api_mod.Page = _Dummy + sync_api_mod.Playwright = _Dummy + sync_api_mod.PlaywrightContextManager = _Dummy + def _sync_playwright(): + raise RuntimeError("Playwright is not available in this unit-test environment.") -async_api_mod.async_playwright = _async_playwright + sync_api_mod.sync_playwright = _sync_playwright -sync_api_mod.BrowserContext = _Dummy -sync_api_mod.Page = _Dummy -sync_api_mod.Playwright = _Dummy + # Expose submodules on the top-level module for completeness + playwright_mod.async_api = async_api_mod + playwright_mod.sync_api = sync_api_mod -def _sync_playwright(): - raise RuntimeError("Playwright is not available in this unit-test environment.") - - -sync_api_mod.sync_playwright = _sync_playwright - - -# Expose submodules on the top-level module for completeness -playwright_mod.async_api = async_api_mod -playwright_mod.sync_api = sync_api_mod +# Only load stubs if Playwright is NOT available +# This prevents overwriting real Playwright when it IS installed +try: + import playwright # noqa: F401 +except ImportError: + _ensure_playwright_stubs() diff --git a/tests/unit/test_agent_runtime_phase0.py b/tests/unit/test_agent_runtime_phase0.py new file mode 100644 index 0000000..06a5ae4 --- /dev/null +++ b/tests/unit/test_agent_runtime_phase0.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from unittest.mock import MagicMock + +from sentience.agent_runtime import AgentRuntime +from sentience.models import BBox, Element, VisualCues +from sentience.verification import is_disabled, is_enabled, value_equals + + +class MockBackend: + """Mock BrowserBackend implementation for unit tests.""" + + async def get_url(self) -> str: + return "https://example.com" + + async def refresh_page_info(self): + return None + + +class MockTracer: + """Mock Tracer for unit tests.""" + + def __init__(self) -> None: + self.events: list[dict] = [] + + def emit(self, event_type: str, data: dict, step_id: str | None = None) -> None: + self.events.append( + { + "type": event_type, + "data": data, + "step_id": step_id, + } + ) + + +def test_assert_state_predicates_use_snapshot_context() -> None: + """State-aware predicates should run against snapshot context.""" + backend = MockBackend() + tracer = MockTracer() + runtime = AgentRuntime(backend=backend, tracer=tracer) + runtime.begin_step(goal="Test") + + cues = VisualCues(is_primary=False, background_color_name=None, is_clickable=True) + elements = [ + Element( + id=1, + role="button", + text="Submit", + importance=10, + bbox=BBox(x=0, y=0, width=100, height=40), + visual_cues=cues, + disabled=False, + ), + Element( + id=2, + role="textbox", + text=None, + importance=5, + bbox=BBox(x=0, y=50, width=200, height=40), + visual_cues=cues, + value="hello", + input_type="text", + disabled=False, + ), + Element( + id=3, + role="button", + text="Disabled", + importance=4, + bbox=BBox(x=0, y=100, width=120, height=40), + visual_cues=cues, + disabled=True, + ), + ] + + runtime.last_snapshot = MagicMock(url="https://example.com", elements=elements) + + assert runtime.assert_(is_enabled("text~'Submit'"), label="enabled") is True + assert runtime.assert_(is_disabled("text~'Disabled'"), label="disabled") is True + assert runtime.assert_(value_equals("role=textbox", "hello"), label="value") is True + assert len(runtime._assertions_this_step) == 3 diff --git a/tests/unit/test_failure_artifacts.py b/tests/unit/test_failure_artifacts.py new file mode 100644 index 0000000..0122bba --- /dev/null +++ b/tests/unit/test_failure_artifacts.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +import json + +from sentience.failure_artifacts import ( + FailureArtifactBuffer, + FailureArtifactsOptions, + RedactionContext, + RedactionResult, +) + + +def test_buffer_prunes_by_time(tmp_path) -> None: + now = {"t": 0.0} + + def time_fn() -> float: + return now["t"] + + opts = FailureArtifactsOptions(buffer_seconds=1.0, output_dir=str(tmp_path)) + buf = FailureArtifactBuffer(run_id="run-1", options=opts, time_fn=time_fn) + + buf.add_frame(b"first") + assert buf.frame_count() == 1 + + now["t"] = 2.0 + buf.add_frame(b"second") + assert buf.frame_count() == 1 + + +def test_persist_writes_manifest_and_steps(tmp_path) -> None: + now = {"t": 10.0} + + def time_fn() -> float: + return now["t"] + + opts = FailureArtifactsOptions(output_dir=str(tmp_path)) + buf = FailureArtifactBuffer(run_id="run-2", options=opts, time_fn=time_fn) + + buf.record_step(action="CLICK", step_id="s1", step_index=1, url="https://example.com") + buf.add_frame(b"frame") + + snapshot = { + "status": "success", + "url": "https://example.com", + "elements": [ + {"id": 1, "input_type": "password", "value": "secret"}, + {"id": 2, "input_type": "email", "value": "user@example.com"}, + ], + } + diagnostics = {"confidence": 0.9, "reasons": ["ok"], "metrics": {"quiet_ms": 42}} + run_dir = buf.persist( + reason="assert_failed", + status="failure", + snapshot=snapshot, + diagnostics=diagnostics, + metadata={"backend": "MockBackend", "url": "https://example.com"}, + ) + assert run_dir is not None + manifest = json.loads((run_dir / "manifest.json").read_text()) + steps = json.loads((run_dir / "steps.json").read_text()) + snap_json = json.loads((run_dir / "snapshot.json").read_text()) + diag_json = json.loads((run_dir / "diagnostics.json").read_text()) + + assert manifest["run_id"] == "run-2" + assert manifest["frame_count"] == 1 + assert manifest["snapshot"] == "snapshot.json" + assert manifest["diagnostics"] == "diagnostics.json" + assert manifest["metadata"]["backend"] == "MockBackend" + assert len(steps) == 1 + assert snap_json["url"] == "https://example.com" + assert diag_json["confidence"] == 0.9 + assert snap_json["elements"][0]["value"] is None + assert snap_json["elements"][0]["value_redacted"] is True + assert snap_json["elements"][1]["value"] is None + assert snap_json["elements"][1]["value_redacted"] is True + + +def test_redaction_callback_can_drop_frames(tmp_path) -> None: + opts = FailureArtifactsOptions(output_dir=str(tmp_path)) + + def redactor(ctx: RedactionContext) -> RedactionResult: + return RedactionResult(drop_frames=True) + + opts.on_before_persist = redactor + buf = FailureArtifactBuffer(run_id="run-3", options=opts) + buf.add_frame(b"frame") + + run_dir = buf.persist(reason="fail", status="failure", snapshot={"status": "success"}) + assert run_dir is not None + manifest = json.loads((run_dir / "manifest.json").read_text()) + assert manifest["frame_count"] == 0 + assert manifest["frames_dropped"] is True