diff --git a/sentience/cloud_tracing.py b/sentience/cloud_tracing.py index 1ec836e..69c0168 100644 --- a/sentience/cloud_tracing.py +++ b/sentience/cloud_tracing.py @@ -213,7 +213,10 @@ def _do_upload(self, on_progress: Callable[[int, int], None] | None = None) -> N if on_progress: on_progress(compressed_size, compressed_size) - # Call /v1/traces/complete to report file sizes (NEW) + # Upload trace index file + self._upload_index() + + # Call /v1/traces/complete to report file sizes self._complete_trace() # Delete file only on successful upload @@ -244,6 +247,95 @@ def _generate_index(self) -> None: # Non-fatal: log but don't crash print(f"⚠️ Failed to generate trace index: {e}") + def _upload_index(self) -> None: + """ + Upload trace index file to cloud storage. + + Called after successful trace upload to provide fast timeline rendering. + The index file enables O(1) step lookups without parsing the entire trace. + """ + # Construct index file path (same as trace file with .index.json extension) + index_path = Path(str(self._path).replace(".jsonl", ".index.json")) + + if not index_path.exists(): + if self.logger: + self.logger.warning("Index file not found, skipping index upload") + return + + try: + # Request index upload URL from API + if not self.api_key: + # No API key - skip index upload + if self.logger: + self.logger.info("No API key provided, skipping index upload") + return + + response = requests.post( + f"{self.api_url}/v1/traces/index_upload", + headers={"Authorization": f"Bearer {self.api_key}"}, + json={"run_id": self.run_id}, + timeout=10, + ) + + if response.status_code != 200: + if self.logger: + self.logger.warning( + f"Failed to get index upload URL: HTTP {response.status_code}" + ) + return + + upload_data = response.json() + index_upload_url = upload_data.get("upload_url") + + if not index_upload_url: + if self.logger: + self.logger.warning("No upload URL in index upload response") + return + + # Read and compress index file + with open(index_path, "rb") as f: + index_data = f.read() + + compressed_index = gzip.compress(index_data) + index_size = len(compressed_index) + + if self.logger: + self.logger.info(f"Index file size: {index_size / 1024:.2f} KB") + + print(f"📤 [Sentience] Uploading trace index ({index_size} bytes)...") + + # Upload index to cloud storage + index_response = requests.put( + index_upload_url, + data=compressed_index, + headers={ + "Content-Type": "application/json", + "Content-Encoding": "gzip", + }, + timeout=30, + ) + + if index_response.status_code == 200: + print("✅ [Sentience] Trace index uploaded successfully") + + # Delete local index file after successful upload + try: + os.remove(index_path) + except Exception: + pass # Ignore cleanup errors + else: + if self.logger: + self.logger.warning( + f"Index upload failed: HTTP {index_response.status_code}" + ) + print(f"⚠️ [Sentience] Index upload failed: HTTP {index_response.status_code}") + + except Exception as e: + # Non-fatal: log but don't crash + if self.logger: + self.logger.warning(f"Error uploading trace index: {e}") + print(f"⚠️ [Sentience] Error uploading trace index: {e}") + def _complete_trace(self) -> None: """ Call /v1/traces/complete to report file sizes to gateway. diff --git a/tests/test_cloud_tracing.py b/tests/test_cloud_tracing.py index 38339c9..0801a5f 100644 --- a/tests/test_cloud_tracing.py +++ b/tests/test_cloud_tracing.py @@ -538,3 +538,186 @@ def test_tracer_api_unchanged(self): # Verify all events written lines = trace_path.read_text().strip().split("\n") assert len(lines) == 5 + + def test_cloud_trace_sink_index_upload_success(self): + """Test CloudTraceSink uploads index file after trace upload.""" + upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz" + run_id = "test-index-upload" + + with patch("sentience.cloud_tracing.requests.put") as mock_put, \ + patch("sentience.cloud_tracing.requests.post") as mock_post: + # Mock successful trace upload + trace_response = Mock() + trace_response.status_code = 200 + + # Mock successful index upload URL request + index_url_response = Mock() + index_url_response.status_code = 200 + index_url_response.json.return_value = { + "upload_url": "https://sentience.nyc3.digitaloceanspaces.com/traces/test.index.json.gz" + } + + # Mock successful /v1/traces/complete response + complete_response = Mock() + complete_response.status_code = 200 + + # Mock successful index upload + index_upload_response = Mock() + index_upload_response.status_code = 200 + + mock_put.side_effect = [trace_response, index_upload_response] + # POST is called twice: once for index_upload, once for complete + mock_post.side_effect = [index_url_response, complete_response] + + # Create sink and emit events + sink = CloudTraceSink(upload_url, run_id=run_id, api_key="sk_test_123") + sink.emit({"v": 1, "type": "run_start", "seq": 1, "data": {"agent": "TestAgent"}}) + sink.emit({"v": 1, "type": "step_start", "seq": 2, "data": {"step": 1}}) + sink.emit({"v": 1, "type": "snapshot", "seq": 3, "data": {"url": "https://example.com"}}) + sink.emit({"v": 1, "type": "run_end", "seq": 4, "data": {"steps": 1}}) + + # Close triggers upload + sink.close() + + # Verify trace upload + assert mock_put.call_count == 2 # Once for trace, once for index + + # Verify index upload URL request (first POST call) + assert mock_post.called + assert mock_post.call_count == 2 # index_upload + complete + + # Check first POST call (index_upload) + first_post_call = mock_post.call_args_list[0] + assert "/v1/traces/index_upload" in first_post_call[0][0] + assert first_post_call[1]["json"] == {"run_id": run_id} + + # Verify index file upload + index_call = mock_put.call_args_list[1] + assert "index.json.gz" in index_call[0][0] + assert index_call[1]["headers"]["Content-Type"] == "application/json" + assert index_call[1]["headers"]["Content-Encoding"] == "gzip" + + # Cleanup + cache_dir = Path.home() / ".sentience" / "traces" / "pending" + index_path = cache_dir / f"{run_id}.index.json" + if index_path.exists(): + os.remove(index_path) + + def test_cloud_trace_sink_index_upload_no_api_key(self): + """Test CloudTraceSink skips index upload when no API key provided.""" + upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz" + run_id = "test-no-api-key" + + with patch("sentience.cloud_tracing.requests.put") as mock_put, \ + patch("sentience.cloud_tracing.requests.post") as mock_post: + # Mock successful trace upload + mock_put.return_value = Mock(status_code=200) + + # Create sink WITHOUT api_key + sink = CloudTraceSink(upload_url, run_id=run_id) + sink.emit({"v": 1, "type": "run_start", "seq": 1}) + + sink.close() + + # Verify trace upload happened + assert mock_put.called + + # Verify index upload was NOT attempted (no API key) + assert not mock_post.called + + # Cleanup + cache_dir = Path.home() / ".sentience" / "traces" / "pending" + trace_path = cache_dir / f"{run_id}.jsonl" + index_path = cache_dir / f"{run_id}.index.json" + if trace_path.exists(): + os.remove(trace_path) + if index_path.exists(): + os.remove(index_path) + + def test_cloud_trace_sink_index_upload_failure_non_fatal(self, capsys): + """Test CloudTraceSink continues gracefully if index upload fails.""" + upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz" + run_id = "test-index-fail" + + with patch("sentience.cloud_tracing.requests.put") as mock_put, \ + patch("sentience.cloud_tracing.requests.post") as mock_post: + # Mock successful trace upload + trace_response = Mock() + trace_response.status_code = 200 + + # Mock failed index upload URL request + index_url_response = Mock() + index_url_response.status_code = 500 + + mock_put.return_value = trace_response + mock_post.return_value = index_url_response + + # Create sink + sink = CloudTraceSink(upload_url, run_id=run_id, api_key="sk_test_123") + sink.emit({"v": 1, "type": "run_start", "seq": 1}) + + # Close should succeed even if index upload fails + sink.close() + + # Verify trace upload succeeded + assert mock_put.called + + # Verify warning was printed + captured = capsys.readouterr() + # Index upload failure is non-fatal, so main upload should succeed + assert "✅" in captured.out # Trace upload success + + # Cleanup + cache_dir = Path.home() / ".sentience" / "traces" / "pending" + trace_path = cache_dir / f"{run_id}.jsonl" + index_path = cache_dir / f"{run_id}.index.json" + if trace_path.exists(): + os.remove(trace_path) + if index_path.exists(): + os.remove(index_path) + + def test_cloud_trace_sink_index_file_missing(self, capsys): + """Test CloudTraceSink handles missing index file gracefully.""" + upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz" + run_id = "test-missing-index" + + with patch("sentience.cloud_tracing.requests.put") as mock_put, \ + patch("sentience.cloud_tracing.requests.post") as mock_post, \ + patch("sentience.trace_indexing.write_trace_index") as mock_write_index: + # Mock index generation to fail (simulating missing index) + mock_write_index.side_effect = Exception("Index generation failed") + + # Mock successful trace upload + mock_put.return_value = Mock(status_code=200) + + # Mock /v1/traces/complete response (this will still be called) + complete_response = Mock() + complete_response.status_code = 200 + mock_post.return_value = complete_response + + # Create sink + sink = CloudTraceSink(upload_url, run_id=run_id, api_key="sk_test_123") + sink.emit({"v": 1, "type": "run_start", "seq": 1}) + + # Close should succeed even if index generation fails + sink.close() + + # Verify trace upload succeeded + assert mock_put.called + + # POST is called once for /v1/traces/complete, but NOT for /v1/traces/index_upload + # (because index file is missing) + assert mock_post.call_count == 1 + # Verify it was the complete call, not index_upload + assert "/v1/traces/complete" in mock_post.call_args[0][0] + + # Verify warning was printed + captured = capsys.readouterr() + assert "⚠️" in captured.out + assert "Failed to generate trace index" in captured.out + + # Cleanup + cache_dir = Path.home() / ".sentience" / "traces" / "pending" + trace_path = cache_dir / f"{run_id}.jsonl" + if trace_path.exists(): + os.remove(trace_path)