Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 93 additions & 1 deletion sentience/cloud_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ def _do_upload(self, on_progress: Callable[[int, int], None] | None = None) -> N
if on_progress:
on_progress(compressed_size, compressed_size)

# Call /v1/traces/complete to report file sizes (NEW)
# Upload trace index file
self._upload_index()

# Call /v1/traces/complete to report file sizes
self._complete_trace()

# Delete file only on successful upload
Expand Down Expand Up @@ -244,6 +247,95 @@ def _generate_index(self) -> None:
# Non-fatal: log but don't crash
print(f"⚠️ Failed to generate trace index: {e}")

def _upload_index(self) -> None:
"""
Upload trace index file to cloud storage.

Called after successful trace upload to provide fast timeline rendering.
The index file enables O(1) step lookups without parsing the entire trace.
"""
# Construct index file path (same as trace file with .index.json extension)
index_path = Path(str(self._path).replace(".jsonl", ".index.json"))

if not index_path.exists():
if self.logger:
self.logger.warning("Index file not found, skipping index upload")
return

try:
# Request index upload URL from API
if not self.api_key:
# No API key - skip index upload
if self.logger:
self.logger.info("No API key provided, skipping index upload")
return

response = requests.post(
f"{self.api_url}/v1/traces/index_upload",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"run_id": self.run_id},
timeout=10,
)

if response.status_code != 200:
if self.logger:
self.logger.warning(
f"Failed to get index upload URL: HTTP {response.status_code}"
)
return

upload_data = response.json()
index_upload_url = upload_data.get("upload_url")

if not index_upload_url:
if self.logger:
self.logger.warning("No upload URL in index upload response")
return

# Read and compress index file
with open(index_path, "rb") as f:
index_data = f.read()

compressed_index = gzip.compress(index_data)
index_size = len(compressed_index)

if self.logger:
self.logger.info(f"Index file size: {index_size / 1024:.2f} KB")

print(f"📤 [Sentience] Uploading trace index ({index_size} bytes)...")

# Upload index to cloud storage
index_response = requests.put(
index_upload_url,
data=compressed_index,
headers={
"Content-Type": "application/json",
"Content-Encoding": "gzip",
},
timeout=30,
)

if index_response.status_code == 200:
print("✅ [Sentience] Trace index uploaded successfully")

# Delete local index file after successful upload
try:
os.remove(index_path)
except Exception:
pass # Ignore cleanup errors
else:
if self.logger:
self.logger.warning(
f"Index upload failed: HTTP {index_response.status_code}"
)
print(f"⚠️ [Sentience] Index upload failed: HTTP {index_response.status_code}")

except Exception as e:
# Non-fatal: log but don't crash
if self.logger:
self.logger.warning(f"Error uploading trace index: {e}")
print(f"⚠️ [Sentience] Error uploading trace index: {e}")

def _complete_trace(self) -> None:
"""
Call /v1/traces/complete to report file sizes to gateway.
Expand Down
183 changes: 183 additions & 0 deletions tests/test_cloud_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,3 +538,186 @@ def test_tracer_api_unchanged(self):
# Verify all events written
lines = trace_path.read_text().strip().split("\n")
assert len(lines) == 5

def test_cloud_trace_sink_index_upload_success(self):
"""Test CloudTraceSink uploads index file after trace upload."""
upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz"
run_id = "test-index-upload"

with patch("sentience.cloud_tracing.requests.put") as mock_put, \
patch("sentience.cloud_tracing.requests.post") as mock_post:
# Mock successful trace upload
trace_response = Mock()
trace_response.status_code = 200

# Mock successful index upload URL request
index_url_response = Mock()
index_url_response.status_code = 200
index_url_response.json.return_value = {
"upload_url": "https://sentience.nyc3.digitaloceanspaces.com/traces/test.index.json.gz"
}

# Mock successful /v1/traces/complete response
complete_response = Mock()
complete_response.status_code = 200

# Mock successful index upload
index_upload_response = Mock()
index_upload_response.status_code = 200

mock_put.side_effect = [trace_response, index_upload_response]
# POST is called twice: once for index_upload, once for complete
mock_post.side_effect = [index_url_response, complete_response]

# Create sink and emit events
sink = CloudTraceSink(upload_url, run_id=run_id, api_key="sk_test_123")
sink.emit({"v": 1, "type": "run_start", "seq": 1, "data": {"agent": "TestAgent"}})
sink.emit({"v": 1, "type": "step_start", "seq": 2, "data": {"step": 1}})
sink.emit({"v": 1, "type": "snapshot", "seq": 3, "data": {"url": "https://example.com"}})
sink.emit({"v": 1, "type": "run_end", "seq": 4, "data": {"steps": 1}})

# Close triggers upload
sink.close()

# Verify trace upload
assert mock_put.call_count == 2 # Once for trace, once for index

# Verify index upload URL request (first POST call)
assert mock_post.called
assert mock_post.call_count == 2 # index_upload + complete

# Check first POST call (index_upload)
first_post_call = mock_post.call_args_list[0]
assert "/v1/traces/index_upload" in first_post_call[0][0]
assert first_post_call[1]["json"] == {"run_id": run_id}

# Verify index file upload
index_call = mock_put.call_args_list[1]
assert "index.json.gz" in index_call[0][0]
assert index_call[1]["headers"]["Content-Type"] == "application/json"
assert index_call[1]["headers"]["Content-Encoding"] == "gzip"

# Cleanup
cache_dir = Path.home() / ".sentience" / "traces" / "pending"
index_path = cache_dir / f"{run_id}.index.json"
if index_path.exists():
os.remove(index_path)

def test_cloud_trace_sink_index_upload_no_api_key(self):
"""Test CloudTraceSink skips index upload when no API key provided."""
upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz"
run_id = "test-no-api-key"

with patch("sentience.cloud_tracing.requests.put") as mock_put, \
patch("sentience.cloud_tracing.requests.post") as mock_post:
# Mock successful trace upload
mock_put.return_value = Mock(status_code=200)

# Create sink WITHOUT api_key
sink = CloudTraceSink(upload_url, run_id=run_id)
sink.emit({"v": 1, "type": "run_start", "seq": 1})

sink.close()

# Verify trace upload happened
assert mock_put.called

# Verify index upload was NOT attempted (no API key)
assert not mock_post.called

# Cleanup
cache_dir = Path.home() / ".sentience" / "traces" / "pending"
trace_path = cache_dir / f"{run_id}.jsonl"
index_path = cache_dir / f"{run_id}.index.json"
if trace_path.exists():
os.remove(trace_path)
if index_path.exists():
os.remove(index_path)

def test_cloud_trace_sink_index_upload_failure_non_fatal(self, capsys):
"""Test CloudTraceSink continues gracefully if index upload fails."""
upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz"
run_id = "test-index-fail"

with patch("sentience.cloud_tracing.requests.put") as mock_put, \
patch("sentience.cloud_tracing.requests.post") as mock_post:
# Mock successful trace upload
trace_response = Mock()
trace_response.status_code = 200

# Mock failed index upload URL request
index_url_response = Mock()
index_url_response.status_code = 500

mock_put.return_value = trace_response
mock_post.return_value = index_url_response

# Create sink
sink = CloudTraceSink(upload_url, run_id=run_id, api_key="sk_test_123")
sink.emit({"v": 1, "type": "run_start", "seq": 1})

# Close should succeed even if index upload fails
sink.close()

# Verify trace upload succeeded
assert mock_put.called

# Verify warning was printed
captured = capsys.readouterr()
# Index upload failure is non-fatal, so main upload should succeed
assert "✅" in captured.out # Trace upload success

# Cleanup
cache_dir = Path.home() / ".sentience" / "traces" / "pending"
trace_path = cache_dir / f"{run_id}.jsonl"
index_path = cache_dir / f"{run_id}.index.json"
if trace_path.exists():
os.remove(trace_path)
if index_path.exists():
os.remove(index_path)

def test_cloud_trace_sink_index_file_missing(self, capsys):
"""Test CloudTraceSink handles missing index file gracefully."""
upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz"
run_id = "test-missing-index"

with patch("sentience.cloud_tracing.requests.put") as mock_put, \
patch("sentience.cloud_tracing.requests.post") as mock_post, \
patch("sentience.trace_indexing.write_trace_index") as mock_write_index:
# Mock index generation to fail (simulating missing index)
mock_write_index.side_effect = Exception("Index generation failed")

# Mock successful trace upload
mock_put.return_value = Mock(status_code=200)

# Mock /v1/traces/complete response (this will still be called)
complete_response = Mock()
complete_response.status_code = 200
mock_post.return_value = complete_response

# Create sink
sink = CloudTraceSink(upload_url, run_id=run_id, api_key="sk_test_123")
sink.emit({"v": 1, "type": "run_start", "seq": 1})

# Close should succeed even if index generation fails
sink.close()

# Verify trace upload succeeded
assert mock_put.called

# POST is called once for /v1/traces/complete, but NOT for /v1/traces/index_upload
# (because index file is missing)
assert mock_post.call_count == 1
# Verify it was the complete call, not index_upload
assert "/v1/traces/complete" in mock_post.call_args[0][0]

# Verify warning was printed
captured = capsys.readouterr()
assert "⚠️" in captured.out
assert "Failed to generate trace index" in captured.out

# Cleanup
cache_dir = Path.home() / ".sentience" / "traces" / "pending"
trace_path = cache_dir / f"{run_id}.jsonl"
if trace_path.exists():
os.remove(trace_path)