Skip to content

Commit 1d51a87

Browse files
authored
Merge pull request #84 from SentienceAPI/upload_indexing
Upload includes trade indexing
2 parents 3f836e8 + 6ffadab commit 1d51a87

File tree

2 files changed

+276
-1
lines changed

2 files changed

+276
-1
lines changed

sentience/cloud_tracing.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,10 @@ def _do_upload(self, on_progress: Callable[[int, int], None] | None = None) -> N
213213
if on_progress:
214214
on_progress(compressed_size, compressed_size)
215215

216-
# Call /v1/traces/complete to report file sizes (NEW)
216+
# Upload trace index file
217+
self._upload_index()
218+
219+
# Call /v1/traces/complete to report file sizes
217220
self._complete_trace()
218221

219222
# Delete file only on successful upload
@@ -244,6 +247,95 @@ def _generate_index(self) -> None:
244247
# Non-fatal: log but don't crash
245248
print(f"⚠️ Failed to generate trace index: {e}")
246249

250+
def _upload_index(self) -> None:
251+
"""
252+
Upload trace index file to cloud storage.
253+
254+
Called after successful trace upload to provide fast timeline rendering.
255+
The index file enables O(1) step lookups without parsing the entire trace.
256+
"""
257+
# Construct index file path (same as trace file with .index.json extension)
258+
index_path = Path(str(self._path).replace(".jsonl", ".index.json"))
259+
260+
if not index_path.exists():
261+
if self.logger:
262+
self.logger.warning("Index file not found, skipping index upload")
263+
return
264+
265+
try:
266+
# Request index upload URL from API
267+
if not self.api_key:
268+
# No API key - skip index upload
269+
if self.logger:
270+
self.logger.info("No API key provided, skipping index upload")
271+
return
272+
273+
response = requests.post(
274+
f"{self.api_url}/v1/traces/index_upload",
275+
headers={"Authorization": f"Bearer {self.api_key}"},
276+
json={"run_id": self.run_id},
277+
timeout=10,
278+
)
279+
280+
if response.status_code != 200:
281+
if self.logger:
282+
self.logger.warning(
283+
f"Failed to get index upload URL: HTTP {response.status_code}"
284+
)
285+
return
286+
287+
upload_data = response.json()
288+
index_upload_url = upload_data.get("upload_url")
289+
290+
if not index_upload_url:
291+
if self.logger:
292+
self.logger.warning("No upload URL in index upload response")
293+
return
294+
295+
# Read and compress index file
296+
with open(index_path, "rb") as f:
297+
index_data = f.read()
298+
299+
compressed_index = gzip.compress(index_data)
300+
index_size = len(compressed_index)
301+
302+
if self.logger:
303+
self.logger.info(f"Index file size: {index_size / 1024:.2f} KB")
304+
305+
print(f"📤 [Sentience] Uploading trace index ({index_size} bytes)...")
306+
307+
# Upload index to cloud storage
308+
index_response = requests.put(
309+
index_upload_url,
310+
data=compressed_index,
311+
headers={
312+
"Content-Type": "application/json",
313+
"Content-Encoding": "gzip",
314+
},
315+
timeout=30,
316+
)
317+
318+
if index_response.status_code == 200:
319+
print("✅ [Sentience] Trace index uploaded successfully")
320+
321+
# Delete local index file after successful upload
322+
try:
323+
os.remove(index_path)
324+
except Exception:
325+
pass # Ignore cleanup errors
326+
else:
327+
if self.logger:
328+
self.logger.warning(
329+
f"Index upload failed: HTTP {index_response.status_code}"
330+
)
331+
print(f"⚠️ [Sentience] Index upload failed: HTTP {index_response.status_code}")
332+
333+
except Exception as e:
334+
# Non-fatal: log but don't crash
335+
if self.logger:
336+
self.logger.warning(f"Error uploading trace index: {e}")
337+
print(f"⚠️ [Sentience] Error uploading trace index: {e}")
338+
247339
def _complete_trace(self) -> None:
248340
"""
249341
Call /v1/traces/complete to report file sizes to gateway.

tests/test_cloud_tracing.py

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,3 +538,186 @@ def test_tracer_api_unchanged(self):
538538
# Verify all events written
539539
lines = trace_path.read_text().strip().split("\n")
540540
assert len(lines) == 5
541+
542+
def test_cloud_trace_sink_index_upload_success(self):
543+
"""Test CloudTraceSink uploads index file after trace upload."""
544+
upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz"
545+
run_id = "test-index-upload"
546+
547+
with patch("sentience.cloud_tracing.requests.put") as mock_put, \
548+
patch("sentience.cloud_tracing.requests.post") as mock_post:
549+
# Mock successful trace upload
550+
trace_response = Mock()
551+
trace_response.status_code = 200
552+
553+
# Mock successful index upload URL request
554+
index_url_response = Mock()
555+
index_url_response.status_code = 200
556+
index_url_response.json.return_value = {
557+
"upload_url": "https://sentience.nyc3.digitaloceanspaces.com/traces/test.index.json.gz"
558+
}
559+
560+
# Mock successful /v1/traces/complete response
561+
complete_response = Mock()
562+
complete_response.status_code = 200
563+
564+
# Mock successful index upload
565+
index_upload_response = Mock()
566+
index_upload_response.status_code = 200
567+
568+
mock_put.side_effect = [trace_response, index_upload_response]
569+
# POST is called twice: once for index_upload, once for complete
570+
mock_post.side_effect = [index_url_response, complete_response]
571+
572+
# Create sink and emit events
573+
sink = CloudTraceSink(upload_url, run_id=run_id, api_key="sk_test_123")
574+
sink.emit({"v": 1, "type": "run_start", "seq": 1, "data": {"agent": "TestAgent"}})
575+
sink.emit({"v": 1, "type": "step_start", "seq": 2, "data": {"step": 1}})
576+
sink.emit({"v": 1, "type": "snapshot", "seq": 3, "data": {"url": "https://example.com"}})
577+
sink.emit({"v": 1, "type": "run_end", "seq": 4, "data": {"steps": 1}})
578+
579+
# Close triggers upload
580+
sink.close()
581+
582+
# Verify trace upload
583+
assert mock_put.call_count == 2 # Once for trace, once for index
584+
585+
# Verify index upload URL request (first POST call)
586+
assert mock_post.called
587+
assert mock_post.call_count == 2 # index_upload + complete
588+
589+
# Check first POST call (index_upload)
590+
first_post_call = mock_post.call_args_list[0]
591+
assert "/v1/traces/index_upload" in first_post_call[0][0]
592+
assert first_post_call[1]["json"] == {"run_id": run_id}
593+
594+
# Verify index file upload
595+
index_call = mock_put.call_args_list[1]
596+
assert "index.json.gz" in index_call[0][0]
597+
assert index_call[1]["headers"]["Content-Type"] == "application/json"
598+
assert index_call[1]["headers"]["Content-Encoding"] == "gzip"
599+
600+
# Cleanup
601+
cache_dir = Path.home() / ".sentience" / "traces" / "pending"
602+
index_path = cache_dir / f"{run_id}.index.json"
603+
if index_path.exists():
604+
os.remove(index_path)
605+
606+
def test_cloud_trace_sink_index_upload_no_api_key(self):
607+
"""Test CloudTraceSink skips index upload when no API key provided."""
608+
upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz"
609+
run_id = "test-no-api-key"
610+
611+
with patch("sentience.cloud_tracing.requests.put") as mock_put, \
612+
patch("sentience.cloud_tracing.requests.post") as mock_post:
613+
# Mock successful trace upload
614+
mock_put.return_value = Mock(status_code=200)
615+
616+
# Create sink WITHOUT api_key
617+
sink = CloudTraceSink(upload_url, run_id=run_id)
618+
sink.emit({"v": 1, "type": "run_start", "seq": 1})
619+
620+
sink.close()
621+
622+
# Verify trace upload happened
623+
assert mock_put.called
624+
625+
# Verify index upload was NOT attempted (no API key)
626+
assert not mock_post.called
627+
628+
# Cleanup
629+
cache_dir = Path.home() / ".sentience" / "traces" / "pending"
630+
trace_path = cache_dir / f"{run_id}.jsonl"
631+
index_path = cache_dir / f"{run_id}.index.json"
632+
if trace_path.exists():
633+
os.remove(trace_path)
634+
if index_path.exists():
635+
os.remove(index_path)
636+
637+
def test_cloud_trace_sink_index_upload_failure_non_fatal(self, capsys):
638+
"""Test CloudTraceSink continues gracefully if index upload fails."""
639+
upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz"
640+
run_id = "test-index-fail"
641+
642+
with patch("sentience.cloud_tracing.requests.put") as mock_put, \
643+
patch("sentience.cloud_tracing.requests.post") as mock_post:
644+
# Mock successful trace upload
645+
trace_response = Mock()
646+
trace_response.status_code = 200
647+
648+
# Mock failed index upload URL request
649+
index_url_response = Mock()
650+
index_url_response.status_code = 500
651+
652+
mock_put.return_value = trace_response
653+
mock_post.return_value = index_url_response
654+
655+
# Create sink
656+
sink = CloudTraceSink(upload_url, run_id=run_id, api_key="sk_test_123")
657+
sink.emit({"v": 1, "type": "run_start", "seq": 1})
658+
659+
# Close should succeed even if index upload fails
660+
sink.close()
661+
662+
# Verify trace upload succeeded
663+
assert mock_put.called
664+
665+
# Verify warning was printed
666+
captured = capsys.readouterr()
667+
# Index upload failure is non-fatal, so main upload should succeed
668+
assert "✅" in captured.out # Trace upload success
669+
670+
# Cleanup
671+
cache_dir = Path.home() / ".sentience" / "traces" / "pending"
672+
trace_path = cache_dir / f"{run_id}.jsonl"
673+
index_path = cache_dir / f"{run_id}.index.json"
674+
if trace_path.exists():
675+
os.remove(trace_path)
676+
if index_path.exists():
677+
os.remove(index_path)
678+
679+
def test_cloud_trace_sink_index_file_missing(self, capsys):
680+
"""Test CloudTraceSink handles missing index file gracefully."""
681+
upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz"
682+
run_id = "test-missing-index"
683+
684+
with patch("sentience.cloud_tracing.requests.put") as mock_put, \
685+
patch("sentience.cloud_tracing.requests.post") as mock_post, \
686+
patch("sentience.trace_indexing.write_trace_index") as mock_write_index:
687+
# Mock index generation to fail (simulating missing index)
688+
mock_write_index.side_effect = Exception("Index generation failed")
689+
690+
# Mock successful trace upload
691+
mock_put.return_value = Mock(status_code=200)
692+
693+
# Mock /v1/traces/complete response (this will still be called)
694+
complete_response = Mock()
695+
complete_response.status_code = 200
696+
mock_post.return_value = complete_response
697+
698+
# Create sink
699+
sink = CloudTraceSink(upload_url, run_id=run_id, api_key="sk_test_123")
700+
sink.emit({"v": 1, "type": "run_start", "seq": 1})
701+
702+
# Close should succeed even if index generation fails
703+
sink.close()
704+
705+
# Verify trace upload succeeded
706+
assert mock_put.called
707+
708+
# POST is called once for /v1/traces/complete, but NOT for /v1/traces/index_upload
709+
# (because index file is missing)
710+
assert mock_post.call_count == 1
711+
# Verify it was the complete call, not index_upload
712+
assert "/v1/traces/complete" in mock_post.call_args[0][0]
713+
714+
# Verify warning was printed
715+
captured = capsys.readouterr()
716+
assert "⚠️" in captured.out
717+
assert "Failed to generate trace index" in captured.out
718+
719+
# Cleanup
720+
cache_dir = Path.home() / ".sentience" / "traces" / "pending"
721+
trace_path = cache_dir / f"{run_id}.jsonl"
722+
if trace_path.exists():
723+
os.remove(trace_path)

0 commit comments

Comments
 (0)