From 2eb247bc8e1a2d67d90e8e2bf0f8faeaf45d146f Mon Sep 17 00:00:00 2001 From: rcholic Date: Thu, 1 Jan 2026 18:44:18 -0800 Subject: [PATCH 1/2] set or infer final trace status --- pyproject.toml | 2 +- sentience/__init__.py | 2 +- sentience/cloud_tracing.py | 172 ++++++++++++++- sentience/schemas/trace_v1.json | 7 +- sentience/tracing.py | 225 +++++++++++++++++++- tests/test_cloud_tracing.py | 125 +++++++++++ tests/test_tracing.py | 361 ++++++++++++++++++++++++++++++++ 7 files changed, 883 insertions(+), 11 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6ed05fe..49c6de8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "sentienceapi" -version = "0.90.18" +version = "0.90.19" description = "Python SDK for Sentience AI Agent Browser Automation" readme = "README.md" requires-python = ">=3.11" diff --git a/sentience/__init__.py b/sentience/__init__.py index 754ceae..506607d 100644 --- a/sentience/__init__.py +++ b/sentience/__init__.py @@ -70,7 +70,7 @@ ) from .wait import wait_for -__version__ = "0.90.18" +__version__ = "0.90.19" __all__ = [ # Core SDK diff --git a/sentience/cloud_tracing.py b/sentience/cloud_tracing.py index 5d1d9e0..019a564 100644 --- a/sentience/cloud_tracing.py +++ b/sentience/cloud_tracing.py @@ -109,6 +109,7 @@ def __init__( self.trace_file_size_bytes = 0 self.screenshot_total_size_bytes = 0 self.screenshot_count = 0 # Track number of screenshots extracted + self.index_file_size_bytes = 0 # Track index file size def emit(self, event: dict[str, Any]) -> None: """ @@ -327,6 +328,7 @@ def _upload_index(self) -> None: compressed_index = gzip.compress(index_data) index_size = len(compressed_index) + self.index_file_size_bytes = index_size # Track index file size if self.logger: self.logger.info(f"Index file size: {index_size / 1024:.2f} KB") @@ -361,9 +363,158 @@ def _upload_index(self) -> None: if self.logger: self.logger.warning(f"Error uploading trace index: {e}") + def _infer_final_status_from_trace(self) -> str: + """ + Infer final status from trace events by reading the trace file. + + Returns: + Final status: "success", "failure", "partial", or "unknown" + """ + try: + # Read trace file to analyze events + with open(self._path, encoding="utf-8") as f: + events = [] + for line in f: + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + events.append(event) + except json.JSONDecodeError: + continue + + if not events: + return "unknown" + + # Check for run_end event with status + for event in reversed(events): + if event.get("type") == "run_end": + status = event.get("data", {}).get("status") + if status in ("success", "failure", "partial", "unknown"): + return status + + # Infer from error events + has_errors = any(e.get("type") == "error" for e in events) + if has_errors: + # Check if there are successful steps too (partial success) + step_ends = [e for e in events if e.get("type") == "step_end"] + if step_ends: + return "partial" + return "failure" + + # If we have step_end events and no errors, likely success + step_ends = [e for e in events if e.get("type") == "step_end"] + if step_ends: + return "success" + + return "unknown" + + except Exception: + # If we can't read the trace, default to unknown + return "unknown" + + def _extract_stats_from_trace(self) -> dict[str, Any]: + """ + Extract execution statistics from trace file. + + Returns: + Dictionary with stats fields for /v1/traces/complete + """ + try: + # Read trace file to extract stats + with open(self._path, encoding="utf-8") as f: + events = [] + for line in f: + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + events.append(event) + except json.JSONDecodeError: + continue + + if not events: + return { + "total_steps": 0, + "total_events": 0, + "duration_ms": None, + "final_status": "unknown", + "started_at": None, + "ended_at": None, + } + + # Find run_start and run_end events + run_start = next((e for e in events if e.get("type") == "run_start"), None) + run_end = next((e for e in events if e.get("type") == "run_end"), None) + + # Extract timestamps + started_at: str | None = None + ended_at: str | None = None + if run_start: + started_at = run_start.get("ts") + if run_end: + ended_at = run_end.get("ts") + + # Calculate duration + duration_ms: int | None = None + if started_at and ended_at: + try: + from datetime import datetime + + start_dt = datetime.fromisoformat(started_at.replace("Z", "+00:00")) + end_dt = datetime.fromisoformat(ended_at.replace("Z", "+00:00")) + delta = end_dt - start_dt + duration_ms = int(delta.total_seconds() * 1000) + except Exception: + pass + + # Count steps (from step_start events, only first attempt) + step_indices = set() + for event in events: + if event.get("type") == "step_start": + step_index = event.get("data", {}).get("step_index") + if step_index is not None: + step_indices.add(step_index) + total_steps = len(step_indices) if step_indices else 0 + + # If run_end has steps count, use that (more accurate) + if run_end: + steps_from_end = run_end.get("data", {}).get("steps") + if steps_from_end is not None: + total_steps = max(total_steps, steps_from_end) + + # Count total events + total_events = len(events) + + # Infer final status + final_status = self._infer_final_status_from_trace() + + return { + "total_steps": total_steps, + "total_events": total_events, + "duration_ms": duration_ms, + "final_status": final_status, + "started_at": started_at, + "ended_at": ended_at, + } + + except Exception as e: + if self.logger: + self.logger.warning(f"Error extracting stats from trace: {e}") + return { + "total_steps": 0, + "total_events": 0, + "duration_ms": None, + "final_status": "unknown", + "started_at": None, + "ended_at": None, + } + def _complete_trace(self) -> None: """ - Call /v1/traces/complete to report file sizes to gateway. + Call /v1/traces/complete to report file sizes and stats to gateway. This is a best-effort call - failures are logged but don't affect upload success. """ @@ -372,16 +523,25 @@ def _complete_trace(self) -> None: return try: + # Extract stats from trace file + stats = self._extract_stats_from_trace() + + # Add file size fields + stats.update( + { + "trace_file_size_bytes": self.trace_file_size_bytes, + "screenshot_total_size_bytes": self.screenshot_total_size_bytes, + "screenshot_count": self.screenshot_count, + "index_file_size_bytes": self.index_file_size_bytes, + } + ) + response = requests.post( f"{self.api_url}/v1/traces/complete", headers={"Authorization": f"Bearer {self.api_key}"}, json={ "run_id": self.run_id, - "stats": { - "trace_file_size_bytes": self.trace_file_size_bytes, - "screenshot_total_size_bytes": self.screenshot_total_size_bytes, - "screenshot_count": self.screenshot_count, - }, + "stats": stats, }, timeout=10, ) diff --git a/sentience/schemas/trace_v1.json b/sentience/schemas/trace_v1.json index 5cec1de..7935c64 100644 --- a/sentience/schemas/trace_v1.json +++ b/sentience/schemas/trace_v1.json @@ -198,7 +198,12 @@ "description": "run_end data", "required": ["steps"], "properties": { - "steps": {"type": "integer"} + "steps": {"type": "integer"}, + "status": { + "type": "string", + "enum": ["success", "failure", "partial", "unknown"], + "description": "Final execution status" + } } }, { diff --git a/sentience/tracing.py b/sentience/tracing.py index d15b12d..4c0a5ba 100644 --- a/sentience/tracing.py +++ b/sentience/tracing.py @@ -8,6 +8,7 @@ import time from abc import ABC, abstractmethod from dataclasses import dataclass, field +from datetime import datetime from pathlib import Path from typing import Any @@ -110,6 +111,120 @@ def close(self) -> None: # Generate index after closing file self._generate_index() + def get_stats(self) -> dict[str, Any]: + """ + Extract execution statistics from trace file (for local traces). + + Returns: + Dictionary with stats fields (same format as Tracer.get_stats()) + """ + try: + # Read trace file to extract stats + with open(self.path, encoding="utf-8") as f: + events = [] + for line in f: + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + events.append(event) + except json.JSONDecodeError: + continue + + if not events: + return { + "total_steps": 0, + "total_events": 0, + "duration_ms": None, + "final_status": "unknown", + "started_at": None, + "ended_at": None, + } + + # Find run_start and run_end events + run_start = next((e for e in events if e.get("type") == "run_start"), None) + run_end = next((e for e in events if e.get("type") == "run_end"), None) + + # Extract timestamps + started_at: str | None = None + ended_at: str | None = None + if run_start: + started_at = run_start.get("ts") + if run_end: + ended_at = run_end.get("ts") + + # Calculate duration + duration_ms: int | None = None + if started_at and ended_at: + try: + from datetime import datetime + + start_dt = datetime.fromisoformat(started_at.replace("Z", "+00:00")) + end_dt = datetime.fromisoformat(ended_at.replace("Z", "+00:00")) + delta = end_dt - start_dt + duration_ms = int(delta.total_seconds() * 1000) + except Exception: + pass + + # Count steps (from step_start events, only first attempt) + step_indices = set() + for event in events: + if event.get("type") == "step_start": + step_index = event.get("data", {}).get("step_index") + if step_index is not None: + step_indices.add(step_index) + total_steps = len(step_indices) if step_indices else 0 + + # If run_end has steps count, use that (more accurate) + if run_end: + steps_from_end = run_end.get("data", {}).get("steps") + if steps_from_end is not None: + total_steps = max(total_steps, steps_from_end) + + # Count total events + total_events = len(events) + + # Infer final status + final_status = "unknown" + # Check for run_end event with status + if run_end: + status = run_end.get("data", {}).get("status") + if status in ("success", "failure", "partial", "unknown"): + final_status = status + else: + # Infer from error events + has_errors = any(e.get("type") == "error" for e in events) + if has_errors: + step_ends = [e for e in events if e.get("type") == "step_end"] + if step_ends: + final_status = "partial" + else: + final_status = "failure" + else: + step_ends = [e for e in events if e.get("type") == "step_end"] + if step_ends: + final_status = "success" + + return { + "total_steps": total_steps, + "total_events": total_events, + "duration_ms": duration_ms, + "final_status": final_status, + "started_at": started_at, + "ended_at": ended_at, + } + + except Exception: + return { + "total_steps": 0, + "total_events": 0, + "duration_ms": None, + "final_status": "unknown", + "started_at": None, + "ended_at": None, + } + def _generate_index(self) -> None: """Generate trace index file (automatic on close).""" try: @@ -136,11 +251,22 @@ class Tracer: Trace event builder and emitter. Manages sequence numbers and provides convenient methods for emitting events. + Tracks execution statistics and final status for trace completion. """ run_id: str sink: TraceSink seq: int = field(default=0, init=False) + # Stats tracking + total_steps: int = field(default=0, init=False) + total_events: int = field(default=0, init=False) + started_at: datetime | None = field(default=None, init=False) + ended_at: datetime | None = field(default=None, init=False) + final_status: str = field(default="unknown", init=False) + # Track step outcomes for automatic status inference + _step_successes: int = field(default=0, init=False) + _step_failures: int = field(default=0, init=False) + _has_errors: bool = field(default=False, init=False) def emit( self, @@ -157,6 +283,7 @@ def emit( step_id: Step UUID (if step-scoped event) """ self.seq += 1 + self.total_events += 1 # Generate timestamps ts_ms = int(time.time() * 1000) @@ -175,6 +302,16 @@ def emit( self.sink.emit(event.to_dict()) + # Track step outcomes for automatic status inference + if event_type == "step_end": + success = data.get("success", False) + if success: + self._step_successes += 1 + else: + self._step_failures += 1 + elif event_type == "error": + self._has_errors = True + def emit_run_start( self, agent: str, @@ -189,6 +326,9 @@ def emit_run_start( llm_model: LLM model name config: Agent configuration """ + # Track start time + self.started_at = datetime.utcnow() + data: dict[str, Any] = {"agent": agent} if llm_model is not None: data["llm_model"] = llm_model @@ -215,6 +355,10 @@ def emit_step_start( attempt: Attempt number (0-indexed) pre_url: URL before step """ + # Track step count (only count first attempt of each step) + if attempt == 0: + self.total_steps = max(self.total_steps, step_index) + data = { "step_id": step_id, "step_index": step_index, @@ -226,14 +370,29 @@ def emit_step_start( self.emit("step_start", data, step_id=step_id) - def emit_run_end(self, steps: int) -> None: + def emit_run_end(self, steps: int, status: str | None = None) -> None: """ Emit run_end event. Args: steps: Total number of steps executed + status: Optional final status ("success", "failure", "partial", "unknown") + If not provided, infers from tracked outcomes or uses self.final_status """ - self.emit("run_end", {"steps": steps}) + # Track end time + self.ended_at = datetime.utcnow() + + # Auto-infer status if not provided and not explicitly set + if status is None and self.final_status == "unknown": + self._infer_final_status() + + # Use provided status or fallback to self.final_status + final_status = status if status is not None else self.final_status + + # Ensure total_steps is at least the provided steps value + self.total_steps = max(self.total_steps, steps) + + self.emit("run_end", {"steps": steps, "status": final_status}) def emit_error( self, @@ -256,6 +415,62 @@ def emit_error( } self.emit("error", data, step_id=step_id) + def set_final_status(self, status: str) -> None: + """ + Set the final status of the trace run. + + Args: + status: Final status ("success", "failure", "partial", "unknown") + """ + if status not in ("success", "failure", "partial", "unknown"): + raise ValueError( + f"Invalid status: {status}. Must be one of: success, failure, partial, unknown" + ) + self.final_status = status + + def get_stats(self) -> dict[str, Any]: + """ + Get execution statistics for trace completion. + + Returns: + Dictionary with stats fields for /v1/traces/complete + """ + duration_ms: int | None = None + if self.started_at and self.ended_at: + delta = self.ended_at - self.started_at + duration_ms = int(delta.total_seconds() * 1000) + + return { + "total_steps": self.total_steps, + "total_events": self.total_events, + "duration_ms": duration_ms, + "final_status": self.final_status, + "started_at": self.started_at.isoformat() + "Z" if self.started_at else None, + "ended_at": self.ended_at.isoformat() + "Z" if self.ended_at else None, + } + + def _infer_final_status(self) -> None: + """ + Automatically infer final_status from tracked step outcomes if not explicitly set. + + This is called automatically in close() if final_status is still "unknown". + """ + if self.final_status != "unknown": + # Status already set explicitly, don't override + return + + # Infer from tracked outcomes + if self._has_errors: + # Has errors - check if there were successful steps too + if self._step_successes > 0: + self.final_status = "partial" + else: + self.final_status = "failure" + elif self._step_successes > 0: + # Has successful steps and no errors + self.final_status = "success" + # Otherwise stays "unknown" (no steps executed or no clear outcome) + def close(self, **kwargs) -> None: """ Close the underlying sink. @@ -263,6 +478,12 @@ def close(self, **kwargs) -> None: Args: **kwargs: Passed through to sink.close() (e.g., blocking=True for CloudTraceSink) """ + # Auto-infer final_status if not explicitly set and we have step outcomes + if self.final_status == "unknown" and ( + self._step_successes > 0 or self._step_failures > 0 or self._has_errors + ): + self._infer_final_status() + # Check if sink.close() accepts kwargs (CloudTraceSink does, JsonlTraceSink doesn't) import inspect diff --git a/tests/test_cloud_tracing.py b/tests/test_cloud_tracing.py index 88dfd63..be424c8 100644 --- a/tests/test_cloud_tracing.py +++ b/tests/test_cloud_tracing.py @@ -328,6 +328,26 @@ def post_side_effect(*args, **kwargs): assert trace_upload_call is not None, "Trace upload should have been called" + # Verify completion request includes all required stats fields + complete_calls = [call for call in mock_post.call_args_list if "complete" in call[0][0]] + assert len(complete_calls) > 0, "Completion request should have been called" + + complete_call = complete_calls[0] + complete_data = complete_call[1].get("json", {}) + stats = complete_data.get("stats", {}) + + # Verify all required fields are present + assert "trace_file_size_bytes" in stats + assert "screenshot_total_size_bytes" in stats + assert "screenshot_count" in stats + assert "index_file_size_bytes" in stats + assert "total_steps" in stats + assert "total_events" in stats + assert "duration_ms" in stats + assert "final_status" in stats + assert "started_at" in stats + assert "ended_at" in stats + # Decompress and verify screenshot_base64 is removed compressed_data = trace_upload_call[1]["data"] decompressed_data = gzip.decompress(compressed_data) @@ -857,3 +877,108 @@ def test_cloud_trace_sink_index_file_missing(self, capsys): trace_path = cache_dir / f"{run_id}.jsonl" if trace_path.exists(): os.remove(trace_path) + + def test_cloud_trace_sink_completion_includes_all_stats(self): + """Test that _complete_trace() includes all required stats fields.""" + upload_url = "https://sentience.nyc3.digitaloceanspaces.com/user123/run456/trace.jsonl.gz" + run_id = "test-complete-stats" + api_key = "sk_test_123" + + sink = CloudTraceSink(upload_url, run_id=run_id, api_key=api_key) + + # Emit events with timestamps + from datetime import datetime + + start_time = datetime.utcnow() + sink.emit( + { + "v": 1, + "type": "run_start", + "ts": start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z"), + "run_id": run_id, + "seq": 1, + "data": {"agent": "TestAgent"}, + } + ) + + sink.emit( + { + "v": 1, + "type": "step_start", + "ts": start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z"), + "run_id": run_id, + "seq": 2, + "step_id": "step-1", + "data": {"step_id": "step-1", "step_index": 1, "goal": "Test", "attempt": 0}, + } + ) + + end_time = datetime.utcnow() + sink.emit( + { + "v": 1, + "type": "run_end", + "ts": end_time.strftime("%Y-%m-%dT%H:%M:%S.000Z"), + "run_id": run_id, + "seq": 3, + "data": {"steps": 1, "status": "success"}, + } + ) + + with ( + patch("sentience.cloud_tracing.requests.put") as mock_put, + patch("sentience.cloud_tracing.requests.post") as mock_post, + ): + # Mock successful trace upload + mock_put.return_value = Mock(status_code=200) + + # Mock index upload (optional) + mock_index_response = Mock() + mock_index_response.status_code = 200 + mock_index_response.json.return_value = {"upload_url": "https://example.com/index"} + + # Mock completion response + mock_complete_response = Mock() + mock_complete_response.status_code = 200 + + def post_side_effect(*args, **kwargs): + url = args[0] if args else kwargs.get("url", "") + if "index_upload" in url: + return mock_index_response + return mock_complete_response + + mock_post.side_effect = post_side_effect + + sink.close() + + # Verify completion was called + complete_calls = [call for call in mock_post.call_args_list if "complete" in call[0][0]] + assert len(complete_calls) > 0, "Completion request should have been called" + + complete_call = complete_calls[0] + complete_data = complete_call[1].get("json", {}) + stats = complete_data.get("stats", {}) + + # Verify all required fields are present + assert "trace_file_size_bytes" in stats + assert "screenshot_total_size_bytes" in stats + assert "screenshot_count" in stats + assert "index_file_size_bytes" in stats + assert "total_steps" in stats + assert stats["total_steps"] == 1 + assert "total_events" in stats + assert stats["total_events"] == 3 + assert "duration_ms" in stats + assert stats["duration_ms"] is not None + assert "final_status" in stats + assert stats["final_status"] == "success" + assert "started_at" in stats + assert stats["started_at"] is not None + assert "ended_at" in stats + assert stats["ended_at"] is not None + + # Cleanup + cache_dir = Path.home() / ".sentience" / "traces" / "pending" + trace_path = cache_dir / f"{run_id}.jsonl" + if trace_path.exists(): + os.remove(trace_path) diff --git a/tests/test_tracing.py b/tests/test_tracing.py index 68b7d25..bc99603 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -174,6 +174,367 @@ def test_tracer_emit_run_end(): assert event["type"] == "run_end" assert event["data"]["steps"] == 5 + assert event["data"]["status"] == "unknown" # Default status + + +def test_tracer_emit_run_end_with_status(): + """Test Tracer.emit_run_end() with status parameter.""" + with tempfile.TemporaryDirectory() as tmpdir: + trace_path = Path(tmpdir) / "trace.jsonl" + + with JsonlTraceSink(trace_path) as sink: + tracer = Tracer(run_id="test-run-123", sink=sink) + tracer.emit_run_end(steps=5, status="success") + + lines = trace_path.read_text().strip().split("\n") + event = json.loads(lines[0]) + + assert event["type"] == "run_end" + assert event["data"]["steps"] == 5 + assert event["data"]["status"] == "success" + + +def test_tracer_stats_tracking(): + """Test Tracer tracks execution statistics.""" + with tempfile.TemporaryDirectory() as tmpdir: + trace_path = Path(tmpdir) / "trace.jsonl" + + with JsonlTraceSink(trace_path) as sink: + tracer = Tracer(run_id="test-run-123", sink=sink) + + # Emit run_start (should track started_at) + tracer.emit_run_start("TestAgent", "gpt-4") + assert tracer.started_at is not None + assert tracer.total_events == 1 + + # Emit step_start (should track total_steps) + tracer.emit_step_start("step-1", 1, "Goal 1", attempt=0) + assert tracer.total_steps == 1 + assert tracer.total_events == 2 + + tracer.emit_step_start("step-2", 2, "Goal 2", attempt=0) + assert tracer.total_steps == 2 + assert tracer.total_events == 3 + + # Emit run_end (should track ended_at) + tracer.emit_run_end(steps=2) + assert tracer.ended_at is not None + assert tracer.total_events == 4 + + # Get stats + stats = tracer.get_stats() + assert stats["total_steps"] == 2 + assert stats["total_events"] == 4 + assert stats["final_status"] == "unknown" + assert stats["started_at"] is not None + assert stats["ended_at"] is not None + assert stats["duration_ms"] is not None + assert stats["duration_ms"] >= 0 + + +def test_tracer_set_final_status(): + """Test Tracer.set_final_status().""" + with tempfile.TemporaryDirectory() as tmpdir: + trace_path = Path(tmpdir) / "trace.jsonl" + + with JsonlTraceSink(trace_path) as sink: + tracer = Tracer(run_id="test-run-123", sink=sink) + + # Default status is "unknown" + assert tracer.final_status == "unknown" + + # Set status + tracer.set_final_status("success") + assert tracer.final_status == "success" + + # Status should be included in run_end + tracer.emit_run_end(steps=1) + + lines = trace_path.read_text().strip().split("\n") + event = json.loads(lines[0]) + assert event["data"]["status"] == "success" + + +def test_tracer_set_final_status_invalid(): + """Test Tracer.set_final_status() with invalid status.""" + with tempfile.TemporaryDirectory() as tmpdir: + trace_path = Path(tmpdir) / "trace.jsonl" + + with JsonlTraceSink(trace_path) as sink: + tracer = Tracer(run_id="test-run-123", sink=sink) + + # Invalid status should raise ValueError + try: + tracer.set_final_status("invalid") + assert False, "Should have raised ValueError" + except ValueError as e: + assert "Invalid status" in str(e) + + +def test_jsonl_trace_sink_get_stats(): + """Test JsonlTraceSink.get_stats() extracts stats from trace file.""" + with tempfile.TemporaryDirectory() as tmpdir: + trace_path = Path(tmpdir) / "trace.jsonl" + + with JsonlTraceSink(trace_path) as sink: + tracer = Tracer(run_id="test-run-123", sink=sink) + tracer.emit_run_start("TestAgent", "gpt-4") + tracer.emit_step_start("step-1", 1, "Goal 1", attempt=0) + tracer.emit_step_start("step-2", 2, "Goal 2", attempt=0) + tracer.emit_run_end(steps=2, status="success") + + # Get stats from sink + stats = sink.get_stats() + assert stats["total_steps"] == 2 + assert stats["total_events"] == 4 + assert stats["final_status"] == "success" + assert stats["started_at"] is not None + assert stats["ended_at"] is not None + assert stats["duration_ms"] is not None + + +def test_tracer_auto_infers_final_status(): + """Test that Tracer automatically infers final_status from step outcomes.""" + with tempfile.TemporaryDirectory() as tmpdir: + trace_path = Path(tmpdir) / "trace.jsonl" + + with JsonlTraceSink(trace_path) as sink: + tracer = Tracer(run_id="test-run-123", sink=sink) + tracer.emit_run_start("TestAgent", "gpt-4") + + # Emit successful step + tracer.emit_step_start("step-1", 1, "Goal 1", attempt=0) + tracer.emit("step_end", {"success": True, "action": "click"}, step_id="step-1") + + # Emit another successful step + tracer.emit_step_start("step-2", 2, "Goal 2", attempt=0) + tracer.emit("step_end", {"success": True, "action": "type"}, step_id="step-2") + + # Close without explicitly setting status or calling emit_run_end + # Status should be auto-inferred as "success" + tracer.close() + + # Verify status was auto-inferred + assert tracer.final_status == "success" + + # Verify stats reflect the inferred status + stats = tracer.get_stats() + assert stats["final_status"] == "success" + assert stats["total_steps"] == 2 + + +def test_tracer_auto_infers_final_status_with_errors(): + """Test that Tracer automatically infers 'partial' status when there are both successes and errors.""" + with tempfile.TemporaryDirectory() as tmpdir: + trace_path = Path(tmpdir) / "trace.jsonl" + + with JsonlTraceSink(trace_path) as sink: + tracer = Tracer(run_id="test-run-123", sink=sink) + tracer.emit_run_start("TestAgent", "gpt-4") + + # Emit successful step + tracer.emit_step_start("step-1", 1, "Goal 1", attempt=0) + tracer.emit("step_end", {"success": True, "action": "click"}, step_id="step-1") + + # Emit error + tracer.emit_error("step-2", "Element not found", attempt=0) + + # Close without explicitly setting status + tracer.close() + + # Verify status was auto-inferred as "partial" (has both successes and errors) + assert tracer.final_status == "partial" + + # Verify stats reflect the inferred status + stats = tracer.get_stats() + assert stats["final_status"] == "partial" + + +def test_tracer_auto_infers_final_status_failure(): + """Test that Tracer automatically infers 'failure' status when there are only errors.""" + with tempfile.TemporaryDirectory() as tmpdir: + trace_path = Path(tmpdir) / "trace.jsonl" + + with JsonlTraceSink(trace_path) as sink: + tracer = Tracer(run_id="test-run-123", sink=sink) + tracer.emit_run_start("TestAgent", "gpt-4") + + # Emit error without any successful steps + tracer.emit_error("step-1", "Element not found", attempt=0) + + # Close without explicitly setting status + tracer.close() + + # Verify status was auto-inferred as "failure" (only errors, no successes) + assert tracer.final_status == "failure" + + # Verify stats reflect the inferred status + stats = tracer.get_stats() + assert stats["final_status"] == "failure" + + +def test_tracer_auto_infer_does_not_override_explicit_status(): + """Test that auto-inference doesn't override explicitly set status.""" + with tempfile.TemporaryDirectory() as tmpdir: + trace_path = Path(tmpdir) / "trace.jsonl" + + with JsonlTraceSink(trace_path) as sink: + tracer = Tracer(run_id="test-run-123", sink=sink) + tracer.emit_run_start("TestAgent", "gpt-4") + + # Emit successful step + tracer.emit_step_start("step-1", 1, "Goal 1", attempt=0) + tracer.emit("step_end", {"success": True, "action": "click"}, step_id="step-1") + + # Explicitly set status to "partial" (even though we have success) + tracer.set_final_status("partial") + + # Close - should not override explicit status + tracer.close() + + # Verify explicit status was preserved + assert tracer.final_status == "partial" + + # Verify stats reflect the explicit status + stats = tracer.get_stats() + assert stats["final_status"] == "partial" + + +def test_tracer_close_sets_final_status_automatically(): + """Test that tracer.close() automatically sets final_status if not explicitly set.""" + with tempfile.TemporaryDirectory() as tmpdir: + trace_path = Path(tmpdir) / "trace.jsonl" + + with JsonlTraceSink(trace_path) as sink: + tracer = Tracer(run_id="test-run-123", sink=sink) + tracer.emit_run_start("TestAgent", "gpt-4") + + # Emit successful steps + tracer.emit_step_start("step-1", 1, "Goal 1", attempt=0) + tracer.emit("step_end", {"success": True, "action": "click"}, step_id="step-1") + + tracer.emit_step_start("step-2", 2, "Goal 2", attempt=0) + tracer.emit("step_end", {"success": True, "action": "type"}, step_id="step-2") + + # Verify status is still "unknown" before close + assert tracer.final_status == "unknown" + + # Close should auto-infer status + tracer.close() + + # Verify status was auto-inferred after close + assert tracer.final_status == "success" + + # Verify stats reflect the inferred status + stats = tracer.get_stats() + assert stats["final_status"] == "success" + assert stats["total_steps"] == 2 + + +def test_tracer_close_sets_final_status_in_run_end_event(): + """Test that tracer.close() sets final_status, and it's included in run_end if emit_run_end is called before close.""" + with tempfile.TemporaryDirectory() as tmpdir: + trace_path = Path(tmpdir) / "trace.jsonl" + + with JsonlTraceSink(trace_path) as sink: + tracer = Tracer(run_id="test-run-123", sink=sink) + tracer.emit_run_start("TestAgent", "gpt-4") + + # Emit successful step + tracer.emit_step_start("step-1", 1, "Goal 1", attempt=0) + tracer.emit("step_end", {"success": True, "action": "click"}, step_id="step-1") + + # Verify status is still "unknown" before emit_run_end + assert tracer.final_status == "unknown" + + # emit_run_end should auto-infer status if not provided + tracer.emit_run_end(steps=1) + + # Verify status was auto-inferred + assert tracer.final_status == "success" + + # Close the tracer + tracer.close() + + # Read trace file and verify run_end event has the inferred status + lines = trace_path.read_text().strip().split("\n") + run_end_events = [ + json.loads(line) for line in lines if json.loads(line).get("type") == "run_end" + ] + + assert len(run_end_events) > 0 + # The run_end event should have the auto-inferred status + last_run_end = run_end_events[-1] + assert last_run_end["data"]["status"] == "success" + + +def test_tracer_close_with_cloud_sink_includes_final_status_in_completion(): + """Test that CloudTraceSink includes auto-inferred final_status in completion request.""" + from unittest.mock import Mock, patch + + from sentience.cloud_tracing import CloudTraceSink + + upload_url = "https://sentience.nyc3.digitaloceanspaces.com/user123/run456/trace.jsonl.gz" + run_id = "test-close-status" + api_key = "sk_test_123" + + sink = CloudTraceSink(upload_url, run_id=run_id, api_key=api_key) + tracer = Tracer(run_id=run_id, sink=sink) + + tracer.emit_run_start("TestAgent", "gpt-4") + + # Emit successful step + tracer.emit_step_start("step-1", 1, "Goal 1", attempt=0) + tracer.emit("step_end", {"success": True, "action": "click"}, step_id="step-1") + + # Verify status is still "unknown" before close + assert tracer.final_status == "unknown" + + with ( + patch("sentience.cloud_tracing.requests.put") as mock_put, + patch("sentience.cloud_tracing.requests.post") as mock_post, + ): + # Mock successful trace upload + mock_put.return_value = Mock(status_code=200) + + # Mock index upload (optional) + mock_index_response = Mock() + mock_index_response.status_code = 200 + mock_index_response.json.return_value = {"upload_url": "https://example.com/index"} + + # Mock completion response + mock_complete_response = Mock() + mock_complete_response.status_code = 200 + + def post_side_effect(*args, **kwargs): + url = args[0] if args else kwargs.get("url", "") + if "index_upload" in url: + return mock_index_response + return mock_complete_response + + mock_post.side_effect = post_side_effect + + # Close should auto-infer status and include it in completion request + tracer.close() + + # Verify status was auto-inferred + assert tracer.final_status == "success" + + # Verify completion request includes the inferred status + complete_calls = [call for call in mock_post.call_args_list if "complete" in call[0][0]] + assert len(complete_calls) > 0, "Completion request should have been called" + + complete_call = complete_calls[0] + complete_data = complete_call[1].get("json", {}) + stats = complete_data.get("stats", {}) + + assert stats["final_status"] == "success" + + # Cleanup + cache_dir = Path.home() / ".sentience" / "traces" / "pending" + trace_path = cache_dir / f"{run_id}.jsonl" + if trace_path.exists(): + os.remove(trace_path) def test_tracer_emit_error(): From f5b559b45ee47971b453b5ad925a8cc77d64bcca Mon Sep 17 00:00:00 2001 From: rcholic Date: Thu, 1 Jan 2026 19:19:20 -0800 Subject: [PATCH 2/2] handle 409 --- sentience/tracer_factory.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sentience/tracer_factory.py b/sentience/tracer_factory.py index 86c3b01..d1b0472 100644 --- a/sentience/tracer_factory.py +++ b/sentience/tracer_factory.py @@ -208,6 +208,16 @@ def _recover_orphaned_traces(api_key: str, api_url: str = SENTIENCE_API_URL) -> ) if response.status_code != 200: + # HTTP 409 means trace already exists (already uploaded) + # Treat as success and delete local file + if response.status_code == 409: + print(f"✅ Trace {run_id} already exists in cloud (skipping re-upload)") + # Delete local file since it's already in cloud + try: + os.remove(trace_file) + except Exception: + pass # Ignore cleanup errors + continue # HTTP 422 typically means invalid run_id (e.g., test files) # Skip silently for 422, but log other errors if response.status_code == 422: