Skip to content

Commit 9be0cab

Browse files
committed
fix tests
1 parent dd4bcff commit 9be0cab

File tree

1 file changed

+223
-0
lines changed

1 file changed

+223
-0
lines changed

sentience/cloud_tracing.py

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,229 @@ def _complete_trace(self) -> None:
386386
if self.logger:
387387
self.logger.warning(f"Error reporting trace completion: {e}")
388388

389+
def store_screenshot(
390+
self,
391+
sequence: int,
392+
screenshot_data: str,
393+
format: str,
394+
step_id: str | None = None,
395+
) -> None:
396+
"""
397+
Store screenshot locally during execution.
398+
399+
Called by agent when screenshot is captured.
400+
Fast, non-blocking operation (~1-5ms per screenshot).
401+
402+
Args:
403+
sequence: Screenshot sequence number (1, 2, 3, ...)
404+
screenshot_data: Base64-encoded data URL from snapshot
405+
format: Image format ("jpeg" or "png")
406+
step_id: Optional step ID for trace event association
407+
"""
408+
if self._closed:
409+
raise RuntimeError("CloudTraceSink is closed")
410+
411+
try:
412+
# Extract base64 string from data URL
413+
# Format: "data:image/jpeg;base64,{base64_string}"
414+
if "," in screenshot_data:
415+
base64_string = screenshot_data.split(",", 1)[1]
416+
else:
417+
base64_string = screenshot_data # Already base64, no prefix
418+
419+
# Decode base64 to image bytes
420+
image_bytes = base64.b64decode(base64_string)
421+
image_size = len(image_bytes)
422+
423+
# Save to file
424+
filename = f"step_{sequence:04d}.{format}"
425+
filepath = self._screenshot_dir / filename
426+
427+
with open(filepath, "wb") as f:
428+
f.write(image_bytes)
429+
430+
# Track metadata using concrete type
431+
metadata = ScreenshotMetadata(
432+
sequence=sequence,
433+
format=format, # type: ignore[arg-type]
434+
size_bytes=image_size,
435+
step_id=step_id,
436+
filepath=str(filepath),
437+
)
438+
self._screenshot_metadata[sequence] = metadata
439+
440+
# Update total size
441+
self.screenshot_total_size_bytes += image_size
442+
443+
if self.logger:
444+
self.logger.info(
445+
f"Screenshot {sequence} stored: {image_size / 1024:.1f} KB ({format})"
446+
)
447+
448+
except Exception as e:
449+
# Log error but don't crash agent
450+
if self.logger:
451+
self.logger.error(f"Failed to store screenshot {sequence}: {e}")
452+
else:
453+
print(f"⚠️ [Sentience] Failed to store screenshot {sequence}: {e}")
454+
455+
def _request_screenshot_urls(self, sequences: list[int]) -> dict[int, str]:
456+
"""
457+
Request pre-signed upload URLs for screenshots from gateway.
458+
459+
Args:
460+
sequences: List of screenshot sequence numbers
461+
462+
Returns:
463+
dict mapping sequence number to upload URL
464+
"""
465+
if not self.api_key or not sequences:
466+
return {}
467+
468+
try:
469+
response = requests.post(
470+
f"{self.api_url}/v1/screenshots/init",
471+
headers={"Authorization": f"Bearer {self.api_key}"},
472+
json={
473+
"run_id": self.run_id,
474+
"sequences": sequences,
475+
},
476+
timeout=10,
477+
)
478+
479+
if response.status_code == 200:
480+
data = response.json()
481+
# Gateway returns sequences as strings in JSON, convert to int keys
482+
upload_urls = data.get("upload_urls", {})
483+
return {int(k): v for k, v in upload_urls.items()}
484+
else:
485+
if self.logger:
486+
self.logger.warning(
487+
f"Failed to get screenshot URLs: HTTP {response.status_code}"
488+
)
489+
return {}
490+
except Exception as e:
491+
if self.logger:
492+
self.logger.warning(f"Error requesting screenshot URLs: {e}")
493+
return {}
494+
495+
def _upload_screenshots(
496+
self,
497+
on_progress: Callable[[int, int], None] | None = None,
498+
) -> None:
499+
"""
500+
Upload all screenshots to cloud via pre-signed URLs.
501+
502+
Steps:
503+
1. Request pre-signed URLs from gateway (/v1/screenshots/init)
504+
2. Upload screenshots in parallel (10 concurrent workers)
505+
3. Track upload progress
506+
4. Handle failures gracefully
507+
508+
Args:
509+
on_progress: Optional callback(uploaded_count, total_count)
510+
"""
511+
if not self._screenshot_metadata:
512+
return # No screenshots to upload
513+
514+
# 1. Request pre-signed URLs from gateway
515+
sequences = sorted(self._screenshot_metadata.keys())
516+
upload_urls = self._request_screenshot_urls(sequences)
517+
518+
if not upload_urls:
519+
print("⚠️ [Sentience] No screenshot upload URLs received, skipping upload")
520+
return
521+
522+
# 2. Upload screenshots in parallel
523+
uploaded_count = 0
524+
total_count = len(upload_urls)
525+
failed_sequences: list[int] = []
526+
527+
def upload_one(seq: int, url: str) -> bool:
528+
"""Upload a single screenshot. Returns True if successful."""
529+
try:
530+
metadata = self._screenshot_metadata[seq]
531+
filepath = Path(metadata.filepath)
532+
533+
# Read image bytes from file
534+
with open(filepath, "rb") as f:
535+
image_bytes = f.read()
536+
537+
# Upload to pre-signed URL
538+
response = requests.put(
539+
url,
540+
data=image_bytes, # Binary image data
541+
headers={
542+
"Content-Type": f"image/{metadata.format}",
543+
},
544+
timeout=30, # 30 second timeout per screenshot
545+
)
546+
547+
if response.status_code == 200:
548+
return True
549+
else:
550+
if self.logger:
551+
self.logger.warning(
552+
f"Screenshot {seq} upload failed: HTTP {response.status_code}"
553+
)
554+
return False
555+
except Exception as e:
556+
if self.logger:
557+
self.logger.warning(f"Screenshot {seq} upload error: {e}")
558+
return False
559+
560+
# Upload in parallel (max 10 concurrent)
561+
with ThreadPoolExecutor(max_workers=10) as executor:
562+
futures = {
563+
executor.submit(upload_one, seq, url): seq
564+
for seq, url in upload_urls.items()
565+
}
566+
567+
for future in as_completed(futures):
568+
seq = futures[future]
569+
if future.result():
570+
uploaded_count += 1
571+
if on_progress:
572+
on_progress(uploaded_count, total_count)
573+
else:
574+
failed_sequences.append(seq)
575+
576+
# 3. Report results
577+
if uploaded_count == total_count:
578+
print(f"✅ [Sentience] All {total_count} screenshots uploaded successfully")
579+
else:
580+
print(f"⚠️ [Sentience] Uploaded {uploaded_count}/{total_count} screenshots")
581+
if failed_sequences:
582+
print(f" Failed sequences: {failed_sequences}")
583+
print(f" Failed screenshots remain at: {self._screenshot_dir}")
584+
585+
def _cleanup_files(self) -> None:
586+
"""Delete local files after successful upload."""
587+
# Delete trace file
588+
if os.path.exists(self._path):
589+
try:
590+
os.remove(self._path)
591+
except Exception:
592+
pass # Ignore cleanup errors
593+
594+
# Delete screenshot files and directory
595+
if self._screenshot_dir.exists() and self._upload_successful:
596+
try:
597+
# Delete all screenshot files
598+
for filepath in self._screenshot_dir.glob("step_*.jpeg"):
599+
filepath.unlink()
600+
for filepath in self._screenshot_dir.glob("step_*.png"):
601+
filepath.unlink()
602+
603+
# Delete directory if empty
604+
try:
605+
self._screenshot_dir.rmdir()
606+
except OSError:
607+
pass # Directory not empty (some uploads failed)
608+
except Exception as e:
609+
if self.logger:
610+
self.logger.warning(f"Failed to cleanup screenshots: {e}")
611+
389612
def __enter__(self):
390613
"""Context manager support."""
391614
return self

0 commit comments

Comments
 (0)