diff --git a/examples/browser_use_integration.py b/examples/browser_use_integration.py new file mode 100644 index 0000000..d24468f --- /dev/null +++ b/examples/browser_use_integration.py @@ -0,0 +1,206 @@ +""" +Example: Using Sentience with browser-use for element grounding. + +This example demonstrates how to integrate Sentience's semantic element +detection with browser-use, enabling accurate click/type/scroll operations +using Sentience's snapshot-based grounding instead of coordinate estimation. + +Requirements: + pip install browser-use sentienceapi + +Usage: + python examples/browser_use_integration.py +""" + +import asyncio + +# Sentience imports +from sentience import find, get_extension_dir, query +from sentience.backends import ( + BrowserUseAdapter, + CachedSnapshot, + ExtensionNotLoadedError, + click, + scroll, + snapshot, + type_text, +) + +# browser-use imports (install via: pip install browser-use) +# from browser_use import BrowserSession, BrowserProfile + + +async def main() -> None: + """ + Demo: Search on Google using Sentience grounding with browser-use. + + This example shows the full workflow: + 1. Launch browser-use with Sentience extension loaded + 2. Create a Sentience backend adapter + 3. Take snapshots and interact with elements using semantic queries + """ + + # ========================================================================= + # STEP 1: Setup browser-use with Sentience extension + # ========================================================================= + # + # The Sentience extension must be loaded for element grounding to work. + # Use get_extension_dir() to get the path to the bundled extension. + # + # Uncomment the following when running with browser-use installed: + + # extension_path = get_extension_dir() + # print(f"Loading Sentience extension from: {extension_path}") + # + # profile = BrowserProfile( + # args=[ + # f"--load-extension={extension_path}", + # "--disable-extensions-except=" + extension_path, + # ], + # ) + # session = BrowserSession(browser_profile=profile) + # await session.start() + + # ========================================================================= + # STEP 2: Create Sentience backend adapter + # ========================================================================= + # + # The adapter bridges browser-use's CDP client to Sentience's backend protocol. + # + # adapter = BrowserUseAdapter(session) + # backend = await adapter.create_backend() + + # ========================================================================= + # STEP 3: Navigate and take snapshots + # ========================================================================= + # + # await session.navigate("https://www.google.com") + # + # # Take a snapshot - this uses the Sentience extension's element detection + # try: + # snap = await snapshot(backend) + # print(f"Found {len(snap.elements)} elements") + # except ExtensionNotLoadedError as e: + # print(f"Extension not loaded: {e}") + # print("Make sure the browser was launched with --load-extension flag") + # return + + # ========================================================================= + # STEP 4: Find and interact with elements using semantic queries + # ========================================================================= + # + # Sentience provides powerful element selectors: + # - Role-based: 'role=textbox', 'role=button' + # - Name-based: 'role=button[name="Submit"]' + # - Text-based: 'text=Search' + # + # # Find the search input + # search_input = find(snap, 'role=textbox[name*="Search"]') + # if search_input: + # # Click on the search input (uses center of bounding box) + # await click(backend, search_input.bbox) + # + # # Type search query + # await type_text(backend, "Sentience AI browser automation") + # print("Typed search query") + + # ========================================================================= + # STEP 5: Using cached snapshots for efficiency + # ========================================================================= + # + # Taking snapshots has overhead. Use CachedSnapshot to reuse recent snapshots: + # + # cache = CachedSnapshot(backend, max_age_ms=2000) + # + # # First call takes fresh snapshot + # snap1 = await cache.get() + # + # # Second call returns cached version if less than 2 seconds old + # snap2 = await cache.get() + # + # # After actions that modify DOM, invalidate the cache + # await click(backend, some_element.bbox) + # cache.invalidate() # Next get() will take fresh snapshot + + # ========================================================================= + # STEP 6: Scrolling to elements + # ========================================================================= + # + # # Scroll down by 500 pixels + # await scroll(backend, delta_y=500) + # + # # Scroll at a specific position (useful for scrollable containers) + # await scroll(backend, delta_y=300, target=(400, 500)) + + # ========================================================================= + # STEP 7: Advanced element queries + # ========================================================================= + # + # # Find all buttons + # buttons = query(snap, 'role=button') + # print(f"Found {len(buttons)} buttons") + # + # # Find by partial text match + # links = query(snap, 'role=link[name*="Learn"]') + # + # # Find by exact text + # submit_btn = find(snap, 'role=button[name="Submit"]') + + # ========================================================================= + # STEP 8: Error handling + # ========================================================================= + # + # Sentience provides specific exceptions for common errors: + # + # from sentience.backends import ( + # ExtensionNotLoadedError, # Extension not loaded in browser + # SnapshotError, # Snapshot failed + # ActionError, # Click/type/scroll failed + # ) + # + # try: + # snap = await snapshot(backend) + # except ExtensionNotLoadedError as e: + # # The error message includes fix suggestions + # print(f"Fix: {e}") + + # ========================================================================= + # CLEANUP + # ========================================================================= + # + # await session.stop() + + print("=" * 60) + print("browser-use + Sentience Integration Example") + print("=" * 60) + print() + print("This example demonstrates the integration pattern.") + print("To run with a real browser, uncomment the code sections above") + print("and install browser-use: pip install browser-use") + print() + print("Key imports:") + print(" from sentience import get_extension_dir, find, query") + print(" from sentience.backends import (") + print(" BrowserUseAdapter, snapshot, click, type_text, scroll") + print(" )") + print() + print("Extension path:", get_extension_dir()) + + +async def full_example() -> None: + """ + Complete working example - requires browser-use installed. + + This is the uncommented version for users who have browser-use installed. + """ + # Import browser-use (uncomment when installed) + # from browser_use import BrowserSession, BrowserProfile + + print("To run the full example:") + print("1. Install browser-use: pip install browser-use") + print("2. Uncomment the imports in this function") + print("3. Run: python examples/browser_use_integration.py") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/sentience/__init__.py b/sentience/__init__.py index 47e2745..91ebe36 100644 --- a/sentience/__init__.py +++ b/sentience/__init__.py @@ -2,11 +2,40 @@ Sentience Python SDK - AI Agent Browser Automation """ +# Extension helpers (for browser-use integration) +from ._extension_loader import ( + get_extension_dir, + get_extension_version, + verify_extension_injected, + verify_extension_injected_async, + verify_extension_version, + verify_extension_version_async, +) from .actions import click, click_rect, press, scroll_to, type_text from .agent import SentienceAgent, SentienceAgentAsync from .agent_config import AgentConfig from .agent_runtime import AgentRuntime +# Backend-agnostic actions (aliased to avoid conflict with existing actions) +# Browser backends (for browser-use integration) +from .backends import ( + BrowserBackendV0, + BrowserUseAdapter, + BrowserUseCDPTransport, + CachedSnapshot, + CDPBackendV0, + CDPTransport, + LayoutMetrics, + PlaywrightBackend, + ViewportInfo, +) +from .backends import click as backend_click +from .backends import scroll as backend_scroll +from .backends import scroll_to_element as backend_scroll_to_element +from .backends import snapshot as backend_snapshot +from .backends import type_text as backend_type_text +from .backends import wait_for_stable as backend_wait_for_stable + # Agent Layer (Phase 1 & 2) from .base_agent import BaseAgent from .browser import SentienceBrowser @@ -89,9 +118,33 @@ from .visual_agent import SentienceVisualAgent, SentienceVisualAgentAsync from .wait import wait_for -__version__ = "0.92.3" +__version__ = "0.93.0" __all__ = [ + # Extension helpers (for browser-use integration) + "get_extension_dir", + "get_extension_version", + "verify_extension_injected", + "verify_extension_injected_async", + "verify_extension_version", + "verify_extension_version_async", + # Browser backends (for browser-use integration) + "BrowserBackendV0", + "CDPTransport", + "CDPBackendV0", + "PlaywrightBackend", + "BrowserUseAdapter", + "BrowserUseCDPTransport", + "ViewportInfo", + "LayoutMetrics", + "backend_snapshot", + "CachedSnapshot", + # Backend-agnostic actions (prefixed to avoid conflicts) + "backend_click", + "backend_type_text", + "backend_scroll", + "backend_scroll_to_element", + "backend_wait_for_stable", # Core SDK "SentienceBrowser", "Snapshot", diff --git a/sentience/_extension_loader.py b/sentience/_extension_loader.py index d969ec3..3c8c74a 100644 --- a/sentience/_extension_loader.py +++ b/sentience/_extension_loader.py @@ -1,8 +1,19 @@ """ -Shared extension loading logic for sync and async implementations +Shared extension loading logic for sync and async implementations. + +Provides: +- get_extension_dir(): Returns path to bundled extension (for browser-use integration) +- verify_extension_injected(): Verifies window.sentience API is available +- get_extension_version(): Gets extension version from manifest +- verify_extension_version(): Checks SDK-extension version compatibility """ +import json from pathlib import Path +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from .protocols import AsyncPageProtocol, PageProtocol def find_extension_path() -> Path: @@ -38,3 +49,147 @@ def find_extension_path() -> Path: f"2. {dev_ext_path}\n" "Make sure the extension is built and 'sentience/extension' directory exists." ) + + +def get_extension_dir() -> str: + """ + Get path to the bundled Sentience extension directory. + + Use this to load the extension into browser-use or other Chromium-based browsers: + + from sentience import get_extension_dir + from browser_use import BrowserSession, BrowserProfile + + profile = BrowserProfile( + args=[f"--load-extension={get_extension_dir()}"], + ) + session = BrowserSession(browser_profile=profile) + + Returns: + Absolute path to extension directory as string + + Raises: + FileNotFoundError: If extension not found in package + """ + return str(find_extension_path()) + + +def get_extension_version() -> str: + """ + Get the version of the bundled extension from manifest.json. + + Returns: + Version string (e.g., "2.2.0") + + Raises: + FileNotFoundError: If extension or manifest not found + """ + ext_path = find_extension_path() + manifest_path = ext_path / "manifest.json" + with open(manifest_path) as f: + manifest = json.load(f) + return manifest.get("version", "unknown") + + +def verify_extension_injected(page: "PageProtocol") -> bool: + """ + Verify the Sentience extension injected window.sentience API (sync). + + Call this after navigating to a page to confirm the extension is working: + + browser.goto("https://example.com") + if not verify_extension_injected(browser.page): + raise RuntimeError("Extension not injected") + + Args: + page: Playwright Page object (sync) + + Returns: + True if window.sentience.snapshot is available, False otherwise + """ + try: + result = page.evaluate( + "(() => !!(window.sentience && typeof window.sentience.snapshot === 'function'))()" + ) + return bool(result) + except Exception: + return False + + +async def verify_extension_injected_async(page: "AsyncPageProtocol") -> bool: + """ + Verify the Sentience extension injected window.sentience API (async). + + Call this after navigating to a page to confirm the extension is working: + + await browser.goto("https://example.com") + if not await verify_extension_injected_async(browser.page): + raise RuntimeError("Extension not injected") + + Args: + page: Playwright Page object (async) + + Returns: + True if window.sentience.snapshot is available, False otherwise + """ + try: + result = await page.evaluate( + "(() => !!(window.sentience && typeof window.sentience.snapshot === 'function'))()" + ) + return bool(result) + except Exception: + return False + + +def verify_extension_version(page: "PageProtocol", expected: str | None = None) -> str | None: + """ + Check extension version exposed in page (sync). + + The extension sets window.__SENTIENCE_EXTENSION_VERSION__ when injected. + + Args: + page: Playwright Page object (sync) + expected: If provided, raises RuntimeError on mismatch + + Returns: + Version string if found, None if not set (page may not have injected yet) + + Raises: + RuntimeError: If expected version provided and doesn't match + """ + try: + got = page.evaluate("window.__SENTIENCE_EXTENSION_VERSION__ || null") + except Exception: + got = None + + if expected and got and got != expected: + raise RuntimeError(f"Sentience extension version mismatch: expected {expected}, got {got}") + return got + + +async def verify_extension_version_async( + page: "AsyncPageProtocol", expected: str | None = None +) -> str | None: + """ + Check extension version exposed in page (async). + + The extension sets window.__SENTIENCE_EXTENSION_VERSION__ when injected. + + Args: + page: Playwright Page object (async) + expected: If provided, raises RuntimeError on mismatch + + Returns: + Version string if found, None if not set (page may not have injected yet) + + Raises: + RuntimeError: If expected version provided and doesn't match + """ + try: + got = await page.evaluate("window.__SENTIENCE_EXTENSION_VERSION__ || null") + except Exception: + got = None + + if expected and got and got != expected: + raise RuntimeError(f"Sentience extension version mismatch: expected {expected}, got {got}") + return got diff --git a/sentience/agent_runtime.py b/sentience/agent_runtime.py index 83f37d6..168659e 100644 --- a/sentience/agent_runtime.py +++ b/sentience/agent_runtime.py @@ -233,7 +233,7 @@ def assert_done( Returns: True if task is complete (assertion passed), False otherwise """ - ok = self.assert_(predicate, label=label, required=True) + ok = self.assertTrue(predicate, label=label, required=True) if ok: self._task_done = True diff --git a/sentience/backends/__init__.py b/sentience/backends/__init__.py new file mode 100644 index 0000000..97601c6 --- /dev/null +++ b/sentience/backends/__init__.py @@ -0,0 +1,132 @@ +""" +Browser backend abstractions for Sentience SDK. + +This module provides backend protocols and implementations that allow +Sentience actions (click, type, scroll) to work with different browser +automation frameworks. + +Supported Backends +------------------ + +**PlaywrightBackend** + Wraps Playwright Page objects. Use this when integrating with existing + SentienceBrowser or Playwright-based code. + +**CDPBackendV0** + Low-level CDP (Chrome DevTools Protocol) backend. Use this when you have + direct access to a CDP client and session. + +**BrowserUseAdapter** + High-level adapter for browser-use framework. Automatically creates a + CDPBackendV0 from a BrowserSession. + +Quick Start with browser-use +---------------------------- + +.. code-block:: python + + from browser_use import BrowserSession, BrowserProfile + from sentience import get_extension_dir, find + from sentience.backends import BrowserUseAdapter, snapshot, click, type_text + + # Setup browser-use with Sentience extension + profile = BrowserProfile(args=[f"--load-extension={get_extension_dir()}"]) + session = BrowserSession(browser_profile=profile) + await session.start() + + # Create adapter and backend + adapter = BrowserUseAdapter(session) + backend = await adapter.create_backend() + + # Take snapshot and interact with elements + snap = await snapshot(backend) + search_box = find(snap, 'role=textbox[name*="Search"]') + await click(backend, search_box.bbox) + await type_text(backend, "Sentience AI") + +Snapshot Caching +---------------- + +Use CachedSnapshot to reduce redundant snapshot calls in action loops: + +.. code-block:: python + + from sentience.backends import CachedSnapshot + + cache = CachedSnapshot(backend, max_age_ms=2000) + + snap1 = await cache.get() # Takes fresh snapshot + snap2 = await cache.get() # Returns cached if < 2s old + + await click(backend, element.bbox) + cache.invalidate() # Force refresh on next get() + +Error Handling +-------------- + +The module provides specific exceptions for common failure modes: + +- ``ExtensionNotLoadedError``: Extension not loaded in browser launch args +- ``SnapshotError``: window.sentience.snapshot() failed +- ``ActionError``: Click/type/scroll operation failed + +All exceptions inherit from ``SentienceBackendError`` and include helpful +fix suggestions in their error messages. + +.. code-block:: python + + from sentience.backends import ExtensionNotLoadedError, snapshot + + try: + snap = await snapshot(backend) + except ExtensionNotLoadedError as e: + print(f"Fix suggestion: {e}") +""" + +from .actions import click, scroll, scroll_to_element, type_text, wait_for_stable +from .browser_use_adapter import BrowserUseAdapter, BrowserUseCDPTransport +from .cdp_backend import CDPBackendV0, CDPTransport +from .exceptions import ( + ActionError, + BackendEvalError, + ExtensionDiagnostics, + ExtensionInjectionError, + ExtensionNotLoadedError, + SentienceBackendError, + SnapshotError, +) +from .playwright_backend import PlaywrightBackend +from .protocol_v0 import BrowserBackendV0, LayoutMetrics, ViewportInfo +from .snapshot import CachedSnapshot, snapshot + +__all__ = [ + # Protocol + "BrowserBackendV0", + # Models + "ViewportInfo", + "LayoutMetrics", + # CDP Backend + "CDPTransport", + "CDPBackendV0", + # Playwright Backend + "PlaywrightBackend", + # browser-use adapter + "BrowserUseAdapter", + "BrowserUseCDPTransport", + # Backend-agnostic functions + "snapshot", + "CachedSnapshot", + "click", + "type_text", + "scroll", + "scroll_to_element", + "wait_for_stable", + # Exceptions + "SentienceBackendError", + "ExtensionNotLoadedError", + "ExtensionInjectionError", + "ExtensionDiagnostics", + "BackendEvalError", + "SnapshotError", + "ActionError", +] diff --git a/sentience/backends/actions.py b/sentience/backends/actions.py new file mode 100644 index 0000000..67ec479 --- /dev/null +++ b/sentience/backends/actions.py @@ -0,0 +1,343 @@ +""" +Backend-agnostic actions for browser-use integration. + +These actions work with any BrowserBackendV0 implementation, +enabling Sentience grounding with browser-use or other frameworks. + +Usage with browser-use: + from sentience.backends import BrowserUseAdapter + from sentience.backends.actions import click, type_text, scroll + + adapter = BrowserUseAdapter(session) + backend = await adapter.create_backend() + + # Take snapshot and click element + snap = await snapshot_from_backend(backend) + element = find(snap, 'role=button[name="Submit"]') + await click(backend, element.bbox) +""" + +import asyncio +import time +from typing import TYPE_CHECKING, Any, Literal + +from ..models import ActionResult, BBox, Snapshot + +if TYPE_CHECKING: + from .protocol_v0 import BrowserBackendV0 + + +async def click( + backend: "BrowserBackendV0", + target: BBox | dict[str, float] | tuple[float, float], + button: Literal["left", "right", "middle"] = "left", + click_count: int = 1, + move_first: bool = True, +) -> ActionResult: + """ + Click at coordinates using the backend. + + Args: + backend: BrowserBackendV0 implementation + target: Click target - BBox (clicks center), dict with x/y, or (x, y) tuple + button: Mouse button to click + click_count: Number of clicks (1=single, 2=double) + move_first: Whether to move mouse to position before clicking + + Returns: + ActionResult with success status + + Example: + # Click at coordinates + await click(backend, (100, 200)) + + # Click element bbox center + await click(backend, element.bbox) + + # Double-click + await click(backend, element.bbox, click_count=2) + """ + start_time = time.time() + + # Resolve coordinates + x, y = _resolve_coordinates(target) + + try: + # Optional mouse move for hover effects + if move_first: + await backend.mouse_move(x, y) + await asyncio.sleep(0.02) # Brief pause for hover + + # Perform click + await backend.mouse_click(x, y, button=button, click_count=click_count) + + duration_ms = int((time.time() - start_time) * 1000) + return ActionResult( + success=True, + duration_ms=duration_ms, + outcome="dom_updated", + ) + except Exception as e: + duration_ms = int((time.time() - start_time) * 1000) + return ActionResult( + success=False, + duration_ms=duration_ms, + outcome="error", + error={"code": "click_failed", "reason": str(e)}, + ) + + +async def type_text( + backend: "BrowserBackendV0", + text: str, + target: BBox | dict[str, float] | tuple[float, float] | None = None, + clear_first: bool = False, +) -> ActionResult: + """ + Type text, optionally clicking a target first. + + Args: + backend: BrowserBackendV0 implementation + text: Text to type + target: Optional click target before typing (BBox, dict, or tuple) + clear_first: If True, select all and delete before typing + + Returns: + ActionResult with success status + + Example: + # Type into focused element + await type_text(backend, "Hello World") + + # Click input then type + await type_text(backend, "search query", target=search_box.bbox) + + # Clear and type + await type_text(backend, "new value", target=input.bbox, clear_first=True) + """ + start_time = time.time() + + try: + # Click target if provided + if target is not None: + x, y = _resolve_coordinates(target) + await backend.mouse_click(x, y) + await asyncio.sleep(0.05) # Wait for focus + + # Clear existing content if requested + if clear_first: + # Select all (Ctrl+A / Cmd+A) and delete + await backend.eval("document.execCommand('selectAll')") + await asyncio.sleep(0.02) + + # Type the text + await backend.type_text(text) + + duration_ms = int((time.time() - start_time) * 1000) + return ActionResult( + success=True, + duration_ms=duration_ms, + outcome="dom_updated", + ) + except Exception as e: + duration_ms = int((time.time() - start_time) * 1000) + return ActionResult( + success=False, + duration_ms=duration_ms, + outcome="error", + error={"code": "type_failed", "reason": str(e)}, + ) + + +async def scroll( + backend: "BrowserBackendV0", + delta_y: float = 300, + target: BBox | dict[str, float] | tuple[float, float] | None = None, +) -> ActionResult: + """ + Scroll the page or element. + + Args: + backend: BrowserBackendV0 implementation + delta_y: Scroll amount (positive=down, negative=up) + target: Optional position for scroll (defaults to viewport center) + + Returns: + ActionResult with success status + + Example: + # Scroll down 300px + await scroll(backend, 300) + + # Scroll up 500px + await scroll(backend, -500) + + # Scroll at specific position + await scroll(backend, 200, target=(500, 300)) + """ + start_time = time.time() + + try: + x: float | None = None + y: float | None = None + + if target is not None: + x, y = _resolve_coordinates(target) + + await backend.wheel(delta_y=delta_y, x=x, y=y) + + # Wait for scroll to settle + await asyncio.sleep(0.1) + + duration_ms = int((time.time() - start_time) * 1000) + return ActionResult( + success=True, + duration_ms=duration_ms, + outcome="dom_updated", + ) + except Exception as e: + duration_ms = int((time.time() - start_time) * 1000) + return ActionResult( + success=False, + duration_ms=duration_ms, + outcome="error", + error={"code": "scroll_failed", "reason": str(e)}, + ) + + +async def scroll_to_element( + backend: "BrowserBackendV0", + element_id: int, + behavior: Literal["smooth", "instant", "auto"] = "instant", + block: Literal["start", "center", "end", "nearest"] = "center", +) -> ActionResult: + """ + Scroll element into view using JavaScript scrollIntoView. + + Args: + backend: BrowserBackendV0 implementation + element_id: Element ID from snapshot (requires sentience_registry) + behavior: Scroll behavior + block: Vertical alignment + + Returns: + ActionResult with success status + """ + start_time = time.time() + + try: + scrolled = await backend.eval( + f""" + (() => {{ + const el = window.sentience_registry && window.sentience_registry[{element_id}]; + if (el && el.scrollIntoView) {{ + el.scrollIntoView({{ + behavior: '{behavior}', + block: '{block}', + inline: 'nearest' + }}); + return true; + }} + return false; + }})() + """ + ) + + # Wait for scroll animation + wait_time = 0.3 if behavior == "smooth" else 0.05 + await asyncio.sleep(wait_time) + + duration_ms = int((time.time() - start_time) * 1000) + + if scrolled: + return ActionResult( + success=True, + duration_ms=duration_ms, + outcome="dom_updated", + ) + else: + return ActionResult( + success=False, + duration_ms=duration_ms, + outcome="error", + error={"code": "scroll_failed", "reason": "Element not found in registry"}, + ) + except Exception as e: + duration_ms = int((time.time() - start_time) * 1000) + return ActionResult( + success=False, + duration_ms=duration_ms, + outcome="error", + error={"code": "scroll_failed", "reason": str(e)}, + ) + + +async def wait_for_stable( + backend: "BrowserBackendV0", + state: Literal["interactive", "complete"] = "complete", + timeout_ms: int = 10000, +) -> ActionResult: + """ + Wait for page to reach stable state. + + Args: + backend: BrowserBackendV0 implementation + state: Target document.readyState + timeout_ms: Maximum wait time + + Returns: + ActionResult with success status + """ + start_time = time.time() + + try: + await backend.wait_ready_state(state=state, timeout_ms=timeout_ms) + + duration_ms = int((time.time() - start_time) * 1000) + return ActionResult( + success=True, + duration_ms=duration_ms, + outcome="dom_updated", + ) + except TimeoutError as e: + duration_ms = int((time.time() - start_time) * 1000) + return ActionResult( + success=False, + duration_ms=duration_ms, + outcome="error", + error={"code": "timeout", "reason": str(e)}, + ) + except Exception as e: + duration_ms = int((time.time() - start_time) * 1000) + return ActionResult( + success=False, + duration_ms=duration_ms, + outcome="error", + error={"code": "wait_failed", "reason": str(e)}, + ) + + +def _resolve_coordinates( + target: BBox | dict[str, float] | tuple[float, float], +) -> tuple[float, float]: + """ + Resolve target to (x, y) coordinates. + + - BBox: Returns center point + - dict: Returns x, y keys (or center if width/height present) + - tuple: Returns as-is + """ + if isinstance(target, BBox): + return (target.x + target.width / 2, target.y + target.height / 2) + elif isinstance(target, tuple): + return target + elif isinstance(target, dict): + # If has width/height, compute center + if "width" in target and "height" in target: + x = target.get("x", 0) + target["width"] / 2 + y = target.get("y", 0) + target["height"] / 2 + return (x, y) + # Otherwise use x/y directly + return (target.get("x", 0), target.get("y", 0)) + else: + raise ValueError(f"Invalid target type: {type(target)}") diff --git a/sentience/backends/browser_use_adapter.py b/sentience/backends/browser_use_adapter.py new file mode 100644 index 0000000..c932cd3 --- /dev/null +++ b/sentience/backends/browser_use_adapter.py @@ -0,0 +1,241 @@ +""" +Browser-use adapter for Sentience SDK. + +This module provides BrowserUseAdapter which wraps browser-use's BrowserSession +and provides a CDPBackendV0 for Sentience operations. + +Usage: + from browser_use import BrowserSession, BrowserProfile + from sentience import get_extension_dir + from sentience.backends import BrowserUseAdapter + + # Create browser-use session with Sentience extension + profile = BrowserProfile(args=[f"--load-extension={get_extension_dir()}"]) + session = BrowserSession(browser_profile=profile) + await session.start() + + # Create Sentience adapter + adapter = BrowserUseAdapter(session) + backend = await adapter.create_backend() + + # Use backend for Sentience operations + viewport = await backend.refresh_page_info() + await backend.mouse_click(100, 200) +""" + +from typing import TYPE_CHECKING, Any + +from .cdp_backend import CDPBackendV0, CDPTransport + +if TYPE_CHECKING: + # Import browser-use types only for type checking + # This avoids requiring browser-use as a hard dependency + pass + + +class BrowserUseCDPTransport(CDPTransport): + """ + CDP transport implementation for browser-use. + + Wraps browser-use's CDP client to provide the CDPTransport interface. + Uses cdp-use library pattern: cdp_client.send.Domain.method(params={}, session_id=) + """ + + def __init__(self, cdp_client: Any, session_id: str) -> None: + """ + Initialize transport with browser-use CDP client. + + Args: + cdp_client: browser-use's CDP client (from cdp_session.cdp_client) + session_id: CDP session ID (from cdp_session.session_id) + """ + self._client = cdp_client + self._session_id = session_id + + async def send(self, method: str, params: dict | None = None) -> dict: + """ + Send CDP command using browser-use's cdp-use client. + + Translates method name like "Runtime.evaluate" to + cdp_client.send.Runtime.evaluate(params={...}, session_id=...). + + Args: + method: CDP method name, e.g., "Runtime.evaluate" + params: Method parameters + + Returns: + CDP response dict + """ + # Split method into domain and method name + # e.g., "Runtime.evaluate" -> ("Runtime", "evaluate") + parts = method.split(".", 1) + if len(parts) != 2: + raise ValueError(f"Invalid CDP method format: {method}") + + domain_name, method_name = parts + + # Get the domain object from cdp_client.send + domain = getattr(self._client.send, domain_name, None) + if domain is None: + raise ValueError(f"Unknown CDP domain: {domain_name}") + + # Get the method from the domain + method_func = getattr(domain, method_name, None) + if method_func is None: + raise ValueError(f"Unknown CDP method: {method}") + + # Call the method with params and session_id + result = await method_func( + params=params or {}, + session_id=self._session_id, + ) + + # cdp-use returns the result directly or None + return result if result is not None else {} + + +class BrowserUseAdapter: + """ + Adapter to use Sentience with browser-use's BrowserSession. + + This adapter: + 1. Wraps browser-use's CDP client with BrowserUseCDPTransport + 2. Creates CDPBackendV0 for Sentience operations + 3. Provides access to the underlying page for extension calls + + Example: + from browser_use import BrowserSession, BrowserProfile + from sentience import get_extension_dir, snapshot_async, SnapshotOptions + from sentience.backends import BrowserUseAdapter + + # Setup browser-use with Sentience extension + profile = BrowserProfile(args=[f"--load-extension={get_extension_dir()}"]) + session = BrowserSession(browser_profile=profile) + await session.start() + + # Create adapter and backend + adapter = BrowserUseAdapter(session) + backend = await adapter.create_backend() + + # Navigate (using browser-use) + page = await session.get_current_page() + await page.goto("https://example.com") + + # Take Sentience snapshot (uses extension) + snap = await snapshot_async(adapter, SnapshotOptions()) + + # Use backend for precise clicking + await backend.mouse_click(snap.elements[0].bbox.x, snap.elements[0].bbox.y) + """ + + def __init__(self, session: Any) -> None: + """ + Initialize adapter with browser-use BrowserSession. + + Args: + session: browser-use BrowserSession instance + """ + self._session = session + self._backend: CDPBackendV0 | None = None + self._transport: BrowserUseCDPTransport | None = None + + @property + def page(self) -> Any: + """ + Get the current Playwright page from browser-use. + + This is needed for Sentience snapshot() which calls window.sentience.snapshot(). + + Returns: + Playwright Page object + """ + # browser-use stores page in session + # Access pattern may vary by browser-use version + if hasattr(self._session, "page"): + return self._session.page + if hasattr(self._session, "_page"): + return self._session._page + if hasattr(self._session, "get_current_page"): + # This is async, but we need sync access for property + # Caller should use get_page_async() instead + raise RuntimeError("Use await adapter.get_page_async() to get the page") + raise RuntimeError("Could not find page in browser-use session") + + async def get_page_async(self) -> Any: + """ + Get the current Playwright page (async). + + Returns: + Playwright Page object + """ + if hasattr(self._session, "get_current_page"): + return await self._session.get_current_page() + return self.page + + @property + def api_key(self) -> str | None: + """ + API key for Sentience API (for snapshot compatibility). + + Returns None since browser-use users pass api_key via SnapshotOptions. + """ + return None + + @property + def api_url(self) -> str | None: + """ + API URL for Sentience API (for snapshot compatibility). + + Returns None to use default. + """ + return None + + async def create_backend(self) -> CDPBackendV0: + """ + Create CDP backend for Sentience operations. + + This method: + 1. Gets or creates a CDP session from browser-use + 2. Creates BrowserUseCDPTransport to wrap the CDP client + 3. Creates CDPBackendV0 with the transport + + Returns: + CDPBackendV0 instance ready for use + + Raises: + RuntimeError: If CDP session cannot be created + """ + if self._backend is not None: + return self._backend + + # Get CDP session from browser-use + # browser-use uses: cdp_session = await session.get_or_create_cdp_session() + if not hasattr(self._session, "get_or_create_cdp_session"): + raise RuntimeError( + "browser-use session does not have get_or_create_cdp_session method. " + "Make sure you're using a compatible version of browser-use." + ) + + cdp_session = await self._session.get_or_create_cdp_session() + + # Extract CDP client and session ID + cdp_client = cdp_session.cdp_client + session_id = cdp_session.session_id + + # Create transport and backend + self._transport = BrowserUseCDPTransport(cdp_client, session_id) + self._backend = CDPBackendV0(self._transport) + + return self._backend + + async def get_transport(self) -> BrowserUseCDPTransport: + """ + Get the CDP transport (creates backend if needed). + + Returns: + BrowserUseCDPTransport instance + """ + if self._transport is None: + await self.create_backend() + assert self._transport is not None + return self._transport diff --git a/sentience/backends/cdp_backend.py b/sentience/backends/cdp_backend.py new file mode 100644 index 0000000..1061e1a --- /dev/null +++ b/sentience/backends/cdp_backend.py @@ -0,0 +1,388 @@ +""" +CDP Backend implementation for browser-use integration. + +This module provides CDPBackendV0, which implements BrowserBackendV0 protocol +using Chrome DevTools Protocol (CDP) commands. + +Usage with browser-use: + from browser_use import BrowserSession + from sentience.backends import CDPBackendV0 + from sentience.backends.browser_use_adapter import BrowserUseAdapter + + session = BrowserSession(...) + await session.start() + + adapter = BrowserUseAdapter(session) + backend = await adapter.create_backend() + + # Now use backend for Sentience operations + viewport = await backend.refresh_page_info() + await backend.mouse_click(100, 200) +""" + +import asyncio +import base64 +import time +from typing import Any, Literal, Protocol, runtime_checkable + +from .protocol_v0 import BrowserBackendV0, LayoutMetrics, ViewportInfo + + +@runtime_checkable +class CDPTransport(Protocol): + """ + Protocol for CDP transport layer. + + This abstracts the actual CDP communication, allowing different + implementations (browser-use, Playwright CDP, raw WebSocket). + """ + + async def send(self, method: str, params: dict | None = None) -> dict: + """ + Send a CDP command and return the result. + + Args: + method: CDP method name, e.g., "Runtime.evaluate" + params: Method parameters + + Returns: + CDP response dict + """ + ... + + +class CDPBackendV0: + """ + CDP-based implementation of BrowserBackendV0. + + This backend uses CDP commands to interact with the browser, + making it compatible with browser-use's CDP client. + """ + + def __init__(self, transport: CDPTransport) -> None: + """ + Initialize CDP backend. + + Args: + transport: CDP transport for sending commands + """ + self._transport = transport + self._cached_viewport: ViewportInfo | None = None + self._execution_context_id: int | None = None + + async def _get_execution_context(self) -> int: + """Get or create execution context ID for Runtime.callFunctionOn.""" + if self._execution_context_id is not None: + return self._execution_context_id + + # Enable Runtime domain if not already enabled + try: + await self._transport.send("Runtime.enable") + except Exception: + pass # May already be enabled + + # Get the main frame's execution context + result = await self._transport.send( + "Runtime.evaluate", + { + "expression": "1", + "returnByValue": True, + }, + ) + + # Extract context ID from the result + if "executionContextId" in result: + self._execution_context_id = result["executionContextId"] + else: + # Fallback: use context ID 1 (main frame) + self._execution_context_id = 1 + + return self._execution_context_id + + async def refresh_page_info(self) -> ViewportInfo: + """Cache viewport + scroll offsets; cheap & safe to call often.""" + result = await self.eval( + """(() => ({ + width: window.innerWidth, + height: window.innerHeight, + scroll_x: window.scrollX, + scroll_y: window.scrollY, + content_width: document.documentElement.scrollWidth, + content_height: document.documentElement.scrollHeight + }))()""" + ) + + self._cached_viewport = ViewportInfo( + width=result.get("width", 0), + height=result.get("height", 0), + scroll_x=result.get("scroll_x", 0), + scroll_y=result.get("scroll_y", 0), + content_width=result.get("content_width"), + content_height=result.get("content_height"), + ) + return self._cached_viewport + + async def eval(self, expression: str) -> Any: + """Evaluate JavaScript expression using Runtime.evaluate.""" + result = await self._transport.send( + "Runtime.evaluate", + { + "expression": expression, + "returnByValue": True, + "awaitPromise": True, + }, + ) + + # Check for exceptions + if "exceptionDetails" in result: + exc = result["exceptionDetails"] + text = exc.get("text", "Unknown error") + raise RuntimeError(f"JavaScript evaluation failed: {text}") + + # Extract value from result + if "result" in result: + res = result["result"] + if res.get("type") == "undefined": + return None + return res.get("value") + + return None + + async def call( + self, + function_declaration: str, + args: list[Any] | None = None, + ) -> Any: + """Call JavaScript function using Runtime.callFunctionOn.""" + # Build call arguments + call_args = [] + if args: + for arg in args: + if arg is None: + call_args.append({"value": None}) + elif isinstance(arg, bool): + call_args.append({"value": arg}) + elif isinstance(arg, (int, float)): + call_args.append({"value": arg}) + elif isinstance(arg, str): + call_args.append({"value": arg}) + elif isinstance(arg, dict): + call_args.append({"value": arg}) + elif isinstance(arg, list): + call_args.append({"value": arg}) + else: + # Serialize complex objects to JSON + call_args.append({"value": str(arg)}) + + # We need an object ID to call function on + # Use globalThis (window) as the target + global_result = await self._transport.send( + "Runtime.evaluate", + { + "expression": "globalThis", + "returnByValue": False, + }, + ) + + object_id = global_result.get("result", {}).get("objectId") + if not object_id: + # Fallback: evaluate the function directly + if args: + args_json = ", ".join(repr(a) if isinstance(a, str) else str(a) for a in args) + expression = f"({function_declaration})({args_json})" + else: + expression = f"({function_declaration})()" + return await self.eval(expression) + + result = await self._transport.send( + "Runtime.callFunctionOn", + { + "functionDeclaration": function_declaration, + "objectId": object_id, + "arguments": call_args, + "returnByValue": True, + "awaitPromise": True, + }, + ) + + # Check for exceptions + if "exceptionDetails" in result: + exc = result["exceptionDetails"] + text = exc.get("text", "Unknown error") + raise RuntimeError(f"JavaScript call failed: {text}") + + # Extract value from result + if "result" in result: + res = result["result"] + if res.get("type") == "undefined": + return None + return res.get("value") + + return None + + async def get_layout_metrics(self) -> LayoutMetrics: + """Get page layout metrics using Page.getLayoutMetrics.""" + result = await self._transport.send("Page.getLayoutMetrics") + + # Extract metrics from result + layout_viewport = result.get("layoutViewport", {}) + content_size = result.get("contentSize", {}) + visual_viewport = result.get("visualViewport", {}) + + return LayoutMetrics( + viewport_x=visual_viewport.get("pageX", 0), + viewport_y=visual_viewport.get("pageY", 0), + viewport_width=visual_viewport.get( + "clientWidth", layout_viewport.get("clientWidth", 0) + ), + viewport_height=visual_viewport.get( + "clientHeight", layout_viewport.get("clientHeight", 0) + ), + content_width=content_size.get("width", 0), + content_height=content_size.get("height", 0), + device_scale_factor=visual_viewport.get("scale", 1.0), + ) + + async def screenshot_png(self) -> bytes: + """Capture viewport screenshot as PNG bytes.""" + result = await self._transport.send( + "Page.captureScreenshot", + { + "format": "png", + "captureBeyondViewport": False, + }, + ) + + data = result.get("data", "") + return base64.b64decode(data) + + async def mouse_move(self, x: float, y: float) -> None: + """Move mouse to viewport coordinates.""" + await self._transport.send( + "Input.dispatchMouseEvent", + { + "type": "mouseMoved", + "x": x, + "y": y, + }, + ) + + async def mouse_click( + self, + x: float, + y: float, + button: Literal["left", "right", "middle"] = "left", + click_count: int = 1, + ) -> None: + """Click at viewport coordinates.""" + # Mouse down + await self._transport.send( + "Input.dispatchMouseEvent", + { + "type": "mousePressed", + "x": x, + "y": y, + "button": button, + "clickCount": click_count, + }, + ) + + # Small delay between press and release + await asyncio.sleep(0.05) + + # Mouse up + await self._transport.send( + "Input.dispatchMouseEvent", + { + "type": "mouseReleased", + "x": x, + "y": y, + "button": button, + "clickCount": click_count, + }, + ) + + async def wheel( + self, + delta_y: float, + x: float | None = None, + y: float | None = None, + ) -> None: + """Scroll using mouse wheel.""" + # Get viewport center if coordinates not provided + if x is None or y is None: + if self._cached_viewport is None: + await self.refresh_page_info() + assert self._cached_viewport is not None + x = x if x is not None else self._cached_viewport.width / 2 + y = y if y is not None else self._cached_viewport.height / 2 + + await self._transport.send( + "Input.dispatchMouseEvent", + { + "type": "mouseWheel", + "x": x, + "y": y, + "deltaX": 0, + "deltaY": delta_y, + }, + ) + + async def type_text(self, text: str) -> None: + """Type text using keyboard input.""" + for char in text: + # Key down + await self._transport.send( + "Input.dispatchKeyEvent", + { + "type": "keyDown", + "text": char, + }, + ) + + # Char event (for text input) + await self._transport.send( + "Input.dispatchKeyEvent", + { + "type": "char", + "text": char, + }, + ) + + # Key up + await self._transport.send( + "Input.dispatchKeyEvent", + { + "type": "keyUp", + "text": char, + }, + ) + + # Small delay between characters + await asyncio.sleep(0.01) + + async def wait_ready_state( + self, + state: Literal["interactive", "complete"] = "interactive", + timeout_ms: int = 15000, + ) -> None: + """Wait for document.readyState using polling.""" + start = time.monotonic() + timeout_sec = timeout_ms / 1000.0 + + # Map state to acceptable states + acceptable_states = {"complete"} if state == "complete" else {"interactive", "complete"} + + while True: + elapsed = time.monotonic() - start + if elapsed >= timeout_sec: + raise TimeoutError( + f"Timed out waiting for document.readyState='{state}' " f"after {timeout_ms}ms" + ) + + current_state = await self.eval("document.readyState") + if current_state in acceptable_states: + return + + # Poll every 100ms + await asyncio.sleep(0.1) diff --git a/sentience/backends/exceptions.py b/sentience/backends/exceptions.py new file mode 100644 index 0000000..a1d176c --- /dev/null +++ b/sentience/backends/exceptions.py @@ -0,0 +1,211 @@ +""" +Custom exceptions for Sentience backends. + +These exceptions provide clear, actionable error messages when things go wrong +during browser-use integration or backend operations. +""" + +from dataclasses import dataclass +from typing import Any + + +class SentienceBackendError(Exception): + """Base exception for all Sentience backend errors.""" + + pass + + +@dataclass +class ExtensionDiagnostics: + """Diagnostics collected when extension loading fails.""" + + sentience_defined: bool = False + sentience_snapshot: bool = False + url: str = "" + error: str | None = None + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "ExtensionDiagnostics": + """Create from diagnostic dict returned by browser eval.""" + return cls( + sentience_defined=data.get("sentience_defined", False), + sentience_snapshot=data.get("sentience_snapshot", False), + url=data.get("url", ""), + error=data.get("error"), + ) + + def to_dict(self) -> dict[str, Any]: + """Convert to dict for serialization.""" + return { + "sentience_defined": self.sentience_defined, + "sentience_snapshot": self.sentience_snapshot, + "url": self.url, + "error": self.error, + } + + +class ExtensionNotLoadedError(SentienceBackendError): + """ + Raised when the Sentience extension is not loaded in the browser. + + This typically means: + 1. Browser was launched without --load-extension flag + 2. Extension path is incorrect + 3. Extension failed to initialize + + Example fix for browser-use: + from sentience import get_extension_dir + from browser_use import BrowserSession, BrowserProfile + + profile = BrowserProfile( + args=[f"--load-extension={get_extension_dir()}"], + ) + session = BrowserSession(browser_profile=profile) + """ + + def __init__( + self, + message: str, + timeout_ms: int | None = None, + diagnostics: ExtensionDiagnostics | None = None, + ) -> None: + self.timeout_ms = timeout_ms + self.diagnostics = diagnostics + super().__init__(message) + + @classmethod + def from_timeout( + cls, + timeout_ms: int, + diagnostics: ExtensionDiagnostics | None = None, + ) -> "ExtensionNotLoadedError": + """Create error from timeout during extension wait.""" + diag_info = "" + if diagnostics: + if diagnostics.error: + diag_info = f"\n Error: {diagnostics.error}" + else: + diag_info = ( + f"\n window.sentience defined: {diagnostics.sentience_defined}" + f"\n window.sentience.snapshot available: {diagnostics.sentience_snapshot}" + f"\n Page URL: {diagnostics.url}" + ) + + message = ( + f"Sentience extension not loaded after {timeout_ms}ms.{diag_info}\n\n" + "To fix this, ensure the extension is loaded when launching the browser:\n\n" + " from sentience import get_extension_dir\n" + " from browser_use import BrowserSession, BrowserProfile\n\n" + " profile = BrowserProfile(\n" + f' args=[f"--load-extension={{get_extension_dir()}}"],\n' + " )\n" + " session = BrowserSession(browser_profile=profile)\n" + ) + return cls(message, timeout_ms=timeout_ms, diagnostics=diagnostics) + + +class ExtensionInjectionError(SentienceBackendError): + """ + Raised when window.sentience API is not available on the page. + + This can happen when: + 1. Page loaded before extension could inject + 2. Page has Content Security Policy blocking extension + 3. Extension crashed or was disabled + + Call snapshot() with a longer timeout or wait for page load. + """ + + def __init__( + self, + message: str, + url: str | None = None, + ) -> None: + self.url = url + super().__init__(message) + + @classmethod + def from_page(cls, url: str) -> "ExtensionInjectionError": + """Create error for a specific page.""" + message = ( + f"window.sentience API not available on page: {url}\n\n" + "Possible causes:\n" + " 1. Page loaded before extension could inject (try increasing timeout)\n" + " 2. Page has Content Security Policy blocking the extension\n" + " 3. Extension was disabled or crashed\n\n" + "Try:\n" + " snap = await snapshot(backend, options=SnapshotOptions(timeout_ms=10000))" + ) + return cls(message, url=url) + + +class BackendEvalError(SentienceBackendError): + """ + Raised when JavaScript evaluation fails in the browser. + + This wraps underlying CDP or Playwright errors with context. + """ + + def __init__( + self, + message: str, + expression: str | None = None, + original_error: Exception | None = None, + ) -> None: + self.expression = expression + self.original_error = original_error + super().__init__(message) + + +class SnapshotError(SentienceBackendError): + """ + Raised when taking a snapshot fails. + + This can happen when: + 1. Extension returned null or invalid data + 2. Page is in an invalid state + 3. Extension threw an error + """ + + def __init__( + self, + message: str, + url: str | None = None, + raw_result: Any = None, + ) -> None: + self.url = url + self.raw_result = raw_result + super().__init__(message) + + @classmethod + def from_null_result(cls, url: str | None = None) -> "SnapshotError": + """Create error for null snapshot result.""" + message = ( + "window.sentience.snapshot() returned null.\n\n" + "Possible causes:\n" + " 1. Extension is not properly initialized\n" + " 2. Page DOM is in an invalid state\n" + " 3. Extension encountered an internal error\n\n" + "Try refreshing the page and taking a new snapshot." + ) + if url: + message = f"{message}\n Page URL: {url}" + return cls(message, url=url, raw_result=None) + + +class ActionError(SentienceBackendError): + """ + Raised when a browser action (click, type, scroll) fails. + """ + + def __init__( + self, + action: str, + message: str, + coordinates: tuple[float, float] | None = None, + original_error: Exception | None = None, + ) -> None: + self.action = action + self.coordinates = coordinates + self.original_error = original_error + super().__init__(f"{action} failed: {message}") diff --git a/sentience/backends/playwright_backend.py b/sentience/backends/playwright_backend.py new file mode 100644 index 0000000..719561a --- /dev/null +++ b/sentience/backends/playwright_backend.py @@ -0,0 +1,190 @@ +""" +Playwright backend implementation for BrowserBackendV0 protocol. + +This wraps existing SentienceBrowser/AsyncSentienceBrowser to provide +a unified interface, enabling code that works with both browser-use +(CDPBackendV0) and native Playwright (PlaywrightBackend). + +Usage: + from sentience import SentienceBrowserAsync + from sentience.backends import PlaywrightBackend, snapshot_from_backend + + browser = SentienceBrowserAsync() + await browser.start() + await browser.goto("https://example.com") + + # Create backend from existing browser + backend = PlaywrightBackend(browser.page) + + # Use backend-agnostic functions + snap = await snapshot_from_backend(backend) + await click(backend, element.bbox) +""" + +import asyncio +import base64 +import time +from typing import TYPE_CHECKING, Any, Literal + +from .protocol_v0 import BrowserBackendV0, LayoutMetrics, ViewportInfo + +if TYPE_CHECKING: + from playwright.async_api import Page as AsyncPage + + +class PlaywrightBackend: + """ + Playwright-based implementation of BrowserBackendV0. + + Wraps a Playwright async Page to provide the standard backend interface. + This enables using backend-agnostic actions with existing SentienceBrowser code. + """ + + def __init__(self, page: "AsyncPage") -> None: + """ + Initialize Playwright backend. + + Args: + page: Playwright async Page object + """ + self._page = page + self._cached_viewport: ViewportInfo | None = None + + @property + def page(self) -> "AsyncPage": + """Access the underlying Playwright page.""" + return self._page + + async def refresh_page_info(self) -> ViewportInfo: + """Cache viewport + scroll offsets; cheap & safe to call often.""" + result = await self._page.evaluate( + """ + (() => ({ + width: window.innerWidth, + height: window.innerHeight, + scroll_x: window.scrollX, + scroll_y: window.scrollY, + content_width: document.documentElement.scrollWidth, + content_height: document.documentElement.scrollHeight + }))() + """ + ) + + self._cached_viewport = ViewportInfo( + width=result.get("width", 0), + height=result.get("height", 0), + scroll_x=result.get("scroll_x", 0), + scroll_y=result.get("scroll_y", 0), + content_width=result.get("content_width"), + content_height=result.get("content_height"), + ) + return self._cached_viewport + + async def eval(self, expression: str) -> Any: + """Evaluate JavaScript expression in page context.""" + return await self._page.evaluate(expression) + + async def call( + self, + function_declaration: str, + args: list[Any] | None = None, + ) -> Any: + """Call JavaScript function with arguments.""" + if args: + return await self._page.evaluate(function_declaration, *args) + return await self._page.evaluate(f"({function_declaration})()") + + async def get_layout_metrics(self) -> LayoutMetrics: + """Get page layout metrics.""" + # Playwright doesn't expose CDP directly in the same way, + # so we approximate using JavaScript + result = await self._page.evaluate( + """ + (() => ({ + viewport_x: window.scrollX, + viewport_y: window.scrollY, + viewport_width: window.innerWidth, + viewport_height: window.innerHeight, + content_width: document.documentElement.scrollWidth, + content_height: document.documentElement.scrollHeight, + device_scale_factor: window.devicePixelRatio || 1 + }))() + """ + ) + + return LayoutMetrics( + viewport_x=result.get("viewport_x", 0), + viewport_y=result.get("viewport_y", 0), + viewport_width=result.get("viewport_width", 0), + viewport_height=result.get("viewport_height", 0), + content_width=result.get("content_width", 0), + content_height=result.get("content_height", 0), + device_scale_factor=result.get("device_scale_factor", 1.0), + ) + + async def screenshot_png(self) -> bytes: + """Capture viewport screenshot as PNG bytes.""" + return await self._page.screenshot(type="png") + + async def mouse_move(self, x: float, y: float) -> None: + """Move mouse to viewport coordinates.""" + await self._page.mouse.move(x, y) + + async def mouse_click( + self, + x: float, + y: float, + button: Literal["left", "right", "middle"] = "left", + click_count: int = 1, + ) -> None: + """Click at viewport coordinates.""" + await self._page.mouse.click(x, y, button=button, click_count=click_count) + + async def wheel( + self, + delta_y: float, + x: float | None = None, + y: float | None = None, + ) -> None: + """Scroll using mouse wheel.""" + # Get viewport center if coordinates not provided + if x is None or y is None: + if self._cached_viewport is None: + await self.refresh_page_info() + assert self._cached_viewport is not None + x = x if x is not None else self._cached_viewport.width / 2 + y = y if y is not None else self._cached_viewport.height / 2 + + await self._page.mouse.wheel(0, delta_y) + + async def type_text(self, text: str) -> None: + """Type text using keyboard input.""" + await self._page.keyboard.type(text) + + async def wait_ready_state( + self, + state: Literal["interactive", "complete"] = "interactive", + timeout_ms: int = 15000, + ) -> None: + """Wait for document.readyState to reach target state.""" + acceptable_states = {"complete"} if state == "complete" else {"interactive", "complete"} + + start = time.monotonic() + timeout_sec = timeout_ms / 1000.0 + + while True: + elapsed = time.monotonic() - start + if elapsed >= timeout_sec: + raise TimeoutError( + f"Timed out waiting for document.readyState='{state}' " f"after {timeout_ms}ms" + ) + + current_state = await self._page.evaluate("document.readyState") + if current_state in acceptable_states: + return + + await asyncio.sleep(0.1) + + +# Verify protocol compliance at import time +assert isinstance(PlaywrightBackend.__new__(PlaywrightBackend), BrowserBackendV0) diff --git a/sentience/backends/protocol_v0.py b/sentience/backends/protocol_v0.py new file mode 100644 index 0000000..2ac86cc --- /dev/null +++ b/sentience/backends/protocol_v0.py @@ -0,0 +1,207 @@ +""" +v0 BrowserBackend Protocol - Minimal interface for browser-use integration. + +This protocol defines the minimal interface required to: +- Take Sentience snapshots (DOM/geometry via extension) +- Compute viewport-coord clicks +- Scroll + re-snapshot + click +- Stabilize after action + +No navigation API required (browser-use already handles navigation). + +Design principle: Keep it so small that nothing can break. +""" + +from typing import Any, Literal, Protocol, runtime_checkable + +from pydantic import BaseModel + + +class ViewportInfo(BaseModel): + """Viewport and scroll position information.""" + + width: int + height: int + scroll_x: float = 0.0 + scroll_y: float = 0.0 + content_width: float | None = None + content_height: float | None = None + + +class LayoutMetrics(BaseModel): + """Page layout metrics from CDP Page.getLayoutMetrics.""" + + # Viewport dimensions + viewport_x: float = 0.0 + viewport_y: float = 0.0 + viewport_width: float = 0.0 + viewport_height: float = 0.0 + + # Content dimensions (scrollable area) + content_width: float = 0.0 + content_height: float = 0.0 + + # Device scale factor + device_scale_factor: float = 1.0 + + +@runtime_checkable +class BrowserBackendV0(Protocol): + """ + Minimal backend protocol for v0 proof-of-concept. + + This is enough to: + - Take Sentience snapshots (DOM/geometry via extension) + - Execute JavaScript for element interaction + - Perform mouse operations (move, click, scroll) + - Wait for page stability + + Implementers: + - CDPBackendV0: For browser-use integration via CDP + - PlaywrightBackend: Wrapper around existing SentienceBrowser (future) + """ + + async def refresh_page_info(self) -> ViewportInfo: + """ + Cache viewport + scroll offsets + url; cheap & safe to call often. + + Returns: + ViewportInfo with current viewport state + """ + ... + + async def eval(self, expression: str) -> Any: + """ + Evaluate JavaScript expression in page context. + + Uses CDP Runtime.evaluate with returnByValue=True. + + Args: + expression: JavaScript expression to evaluate + + Returns: + Result value (JSON-serializable) + """ + ... + + async def call( + self, + function_declaration: str, + args: list[Any] | None = None, + ) -> Any: + """ + Call a JavaScript function with arguments. + + Uses CDP Runtime.callFunctionOn for safe argument passing. + Safer than eval() for passing complex arguments. + + Args: + function_declaration: JavaScript function body, e.g., "(x, y) => x + y" + args: Arguments to pass to the function + + Returns: + Result value (JSON-serializable) + """ + ... + + async def get_layout_metrics(self) -> LayoutMetrics: + """ + Get page layout metrics. + + Uses CDP Page.getLayoutMetrics to get viewport and content dimensions. + + Returns: + LayoutMetrics with viewport and content size info + """ + ... + + async def screenshot_png(self) -> bytes: + """ + Capture viewport screenshot as PNG bytes. + + Uses CDP Page.captureScreenshot. + + Returns: + PNG image bytes + """ + ... + + async def mouse_move(self, x: float, y: float) -> None: + """ + Move mouse to viewport coordinates. + + Uses CDP Input.dispatchMouseEvent with type="mouseMoved". + + Args: + x: X coordinate in viewport + y: Y coordinate in viewport + """ + ... + + async def mouse_click( + self, + x: float, + y: float, + button: Literal["left", "right", "middle"] = "left", + click_count: int = 1, + ) -> None: + """ + Click at viewport coordinates. + + Uses CDP Input.dispatchMouseEvent with mousePressed + mouseReleased. + + Args: + x: X coordinate in viewport + y: Y coordinate in viewport + button: Mouse button to click + click_count: Number of clicks (1 for single, 2 for double) + """ + ... + + async def wheel( + self, + delta_y: float, + x: float | None = None, + y: float | None = None, + ) -> None: + """ + Scroll using mouse wheel. + + Uses CDP Input.dispatchMouseEvent with type="mouseWheel". + + Args: + delta_y: Scroll amount (positive = down, negative = up) + x: X coordinate for scroll (default: viewport center) + y: Y coordinate for scroll (default: viewport center) + """ + ... + + async def type_text(self, text: str) -> None: + """ + Type text using keyboard input. + + Uses CDP Input.dispatchKeyEvent for each character. + + Args: + text: Text to type + """ + ... + + async def wait_ready_state( + self, + state: Literal["interactive", "complete"] = "interactive", + timeout_ms: int = 15000, + ) -> None: + """ + Wait for document.readyState to reach target state. + + Uses polling instead of CDP events (no leak from unregistered listeners). + + Args: + state: Target state ("interactive" or "complete") + timeout_ms: Maximum time to wait in milliseconds + + Raises: + TimeoutError: If state not reached within timeout + """ + ... diff --git a/sentience/backends/snapshot.py b/sentience/backends/snapshot.py new file mode 100644 index 0000000..2a1ff7d --- /dev/null +++ b/sentience/backends/snapshot.py @@ -0,0 +1,426 @@ +""" +Backend-agnostic snapshot for browser-use integration. + +Takes Sentience snapshots using BrowserBackendV0 protocol, +enabling element grounding with browser-use or other frameworks. + +Usage with browser-use: + from sentience.backends import BrowserUseAdapter, snapshot, CachedSnapshot + + adapter = BrowserUseAdapter(session) + backend = await adapter.create_backend() + + # Take snapshot + snap = await snapshot(backend) + print(f"Found {len(snap.elements)} elements") + + # With caching (reuse if fresh) + cache = CachedSnapshot(backend, max_age_ms=2000) + snap1 = await cache.get() # Fresh snapshot + snap2 = await cache.get() # Returns cached if < 2s old + cache.invalidate() # Force refresh on next get() +""" + +import time +from typing import TYPE_CHECKING, Any + +from ..models import Snapshot, SnapshotOptions +from ..snapshot import ( + _build_snapshot_payload, + _merge_api_result_with_local, + _post_snapshot_to_gateway_async, +) +from .exceptions import ExtensionDiagnostics, ExtensionNotLoadedError, SnapshotError + +if TYPE_CHECKING: + from .protocol_v0 import BrowserBackendV0 + + +class CachedSnapshot: + """ + Snapshot cache with staleness detection. + + Caches snapshots and returns cached version if still fresh. + Useful for reducing redundant snapshot calls in action loops. + + Usage: + cache = CachedSnapshot(backend, max_age_ms=2000) + + # First call takes fresh snapshot + snap1 = await cache.get() + + # Second call returns cached if < 2s old + snap2 = await cache.get() + + # Invalidate after actions that change DOM + await click(backend, element.bbox) + cache.invalidate() + + # Next get() will take fresh snapshot + snap3 = await cache.get() + """ + + def __init__( + self, + backend: "BrowserBackendV0", + max_age_ms: int = 2000, + options: SnapshotOptions | None = None, + ) -> None: + """ + Initialize cached snapshot. + + Args: + backend: BrowserBackendV0 implementation + max_age_ms: Maximum cache age in milliseconds (default: 2000) + options: Default snapshot options + """ + self._backend = backend + self._max_age_ms = max_age_ms + self._options = options + self._cached: Snapshot | None = None + self._cached_at: float = 0 # timestamp in seconds + self._cached_url: str | None = None + + async def get( + self, + options: SnapshotOptions | None = None, + force_refresh: bool = False, + ) -> Snapshot: + """ + Get snapshot, using cache if fresh. + + Args: + options: Override default options for this call + force_refresh: If True, always take fresh snapshot + + Returns: + Snapshot (cached or fresh) + """ + # Check if we need to refresh + if force_refresh or self._is_stale(): + self._cached = await snapshot( + self._backend, + options or self._options, + ) + self._cached_at = time.time() + self._cached_url = self._cached.url + + assert self._cached is not None + return self._cached + + def invalidate(self) -> None: + """ + Invalidate cache, forcing refresh on next get(). + + Call this after actions that modify the DOM. + """ + self._cached = None + self._cached_at = 0 + self._cached_url = None + + def _is_stale(self) -> bool: + """Check if cache is stale and needs refresh.""" + if self._cached is None: + return True + + # Check age + age_ms = (time.time() - self._cached_at) * 1000 + if age_ms > self._max_age_ms: + return True + + return False + + @property + def is_cached(self) -> bool: + """Check if a cached snapshot exists.""" + return self._cached is not None + + @property + def age_ms(self) -> float: + """Get age of cached snapshot in milliseconds.""" + if self._cached is None: + return float("inf") + return (time.time() - self._cached_at) * 1000 + + +async def snapshot( + backend: "BrowserBackendV0", + options: SnapshotOptions | None = None, +) -> Snapshot: + """ + Take a Sentience snapshot using the backend protocol. + + This function respects the `use_api` option and can call either: + - Server-side API (Pro/Enterprise tier) when `use_api=True` and API key is provided + - Local extension (Free tier) when `use_api=False` or no API key + + Requires: + - Sentience extension loaded in browser (via --load-extension) + - Extension injected window.sentience API + + Args: + backend: BrowserBackendV0 implementation (CDPBackendV0, PlaywrightBackend, etc.) + options: Snapshot options (limit, filter, screenshot, use_api, sentience_api_key, etc.) + + Returns: + Snapshot with elements, viewport, and optional screenshot + + Example: + from sentience.backends import BrowserUseAdapter + from sentience.backends.snapshot import snapshot + from sentience.models import SnapshotOptions + + adapter = BrowserUseAdapter(session) + backend = await adapter.create_backend() + + # Basic snapshot (uses local extension) + snap = await snapshot(backend) + + # With server-side API (Pro/Enterprise tier) + snap = await snapshot(backend, SnapshotOptions( + use_api=True, + sentience_api_key="sk_pro_xxxxx", + limit=100, + screenshot=True + )) + + # Force local extension (Free tier) + snap = await snapshot(backend, SnapshotOptions( + use_api=False + )) + """ + if options is None: + options = SnapshotOptions() + + # Determine if we should use server-side API + # Same logic as main snapshot() function in sentience/snapshot.py + should_use_api = ( + options.use_api if options.use_api is not None else (options.sentience_api_key is not None) + ) + + if should_use_api and options.sentience_api_key: + # Use server-side API (Pro/Enterprise tier) + return await _snapshot_via_api(backend, options) + else: + # Use local extension (Free tier) + return await _snapshot_via_extension(backend, options) + + +async def _wait_for_extension( + backend: "BrowserBackendV0", + timeout_ms: int = 5000, +) -> None: + """ + Wait for Sentience extension to inject window.sentience API. + + Args: + backend: BrowserBackendV0 implementation + timeout_ms: Maximum wait time + + Raises: + RuntimeError: If extension not injected within timeout + """ + import asyncio + import logging + + logger = logging.getLogger("sentience.backends.snapshot") + + start = time.monotonic() + timeout_sec = timeout_ms / 1000.0 + poll_count = 0 + + logger.debug(f"Waiting for extension injection (timeout={timeout_ms}ms)...") + + while True: + elapsed = time.monotonic() - start + poll_count += 1 + + if poll_count % 10 == 0: # Log every 10 polls (~1 second) + logger.debug(f"Extension poll #{poll_count}, elapsed={elapsed*1000:.0f}ms") + + if elapsed >= timeout_sec: + # Gather diagnostics + try: + diag_dict = await backend.eval( + """ + (() => ({ + sentience_defined: typeof window.sentience !== 'undefined', + sentience_snapshot: typeof window.sentience?.snapshot === 'function', + url: window.location.href, + extension_id: document.documentElement.dataset.sentienceExtensionId || null, + has_content_script: !!document.documentElement.dataset.sentienceExtensionId + }))() + """ + ) + diagnostics = ExtensionDiagnostics.from_dict(diag_dict) + logger.debug(f"Extension diagnostics: {diag_dict}") + except Exception as e: + diagnostics = ExtensionDiagnostics(error=f"Could not gather diagnostics: {e}") + + raise ExtensionNotLoadedError.from_timeout( + timeout_ms=timeout_ms, + diagnostics=diagnostics, + ) + + # Check if extension is ready + try: + ready = await backend.eval( + "typeof window.sentience !== 'undefined' && " + "typeof window.sentience.snapshot === 'function'" + ) + if ready: + return + except Exception: + pass # Keep polling + + await asyncio.sleep(0.1) + + +async def _snapshot_via_extension( + backend: "BrowserBackendV0", + options: SnapshotOptions, +) -> Snapshot: + """Take snapshot using local extension (Free tier)""" + # Wait for extension injection + await _wait_for_extension(backend, timeout_ms=5000) + + # Build options dict for extension API + ext_options = _build_extension_options(options) + + # Call extension's snapshot function + result = await backend.eval( + f""" + (() => {{ + const options = {_json_serialize(ext_options)}; + return window.sentience.snapshot(options); + }})() + """ + ) + + if result is None: + # Try to get URL for better error message + try: + url = await backend.eval("window.location.href") + except Exception: + url = None + raise SnapshotError.from_null_result(url=url) + + # Show overlay if requested + if options.show_overlay: + raw_elements = result.get("raw_elements", []) + if raw_elements: + await backend.eval( + f""" + (() => {{ + if (window.sentience && window.sentience.showOverlay) {{ + window.sentience.showOverlay({_json_serialize(raw_elements)}, null); + }} + }})() + """ + ) + + # Build and return Snapshot + return Snapshot(**result) + + +async def _snapshot_via_api( + backend: "BrowserBackendV0", + options: SnapshotOptions, +) -> Snapshot: + """Take snapshot using server-side API (Pro/Enterprise tier)""" + # Default API URL (same as main snapshot function) + api_url = "https://api.sentienceapi.com" + + # Wait for extension injection (needed even for API mode to collect raw data) + await _wait_for_extension(backend, timeout_ms=5000) + + # Step 1: Get raw data from local extension (always happens locally) + raw_options: dict[str, Any] = {} + if options.screenshot is not False: + raw_options["screenshot"] = options.screenshot + + # Call extension to get raw elements + raw_result = await backend.eval( + f""" + (() => {{ + const options = {_json_serialize(raw_options)}; + return window.sentience.snapshot(options); + }})() + """ + ) + + if raw_result is None: + try: + url = await backend.eval("window.location.href") + except Exception: + url = None + raise SnapshotError.from_null_result(url=url) + + # Step 2: Send to server for smart ranking/filtering + payload = _build_snapshot_payload(raw_result, options) + + try: + api_result = await _post_snapshot_to_gateway_async( + payload, options.sentience_api_key, api_url + ) + + # Merge API result with local data (screenshot, etc.) + snapshot_data = _merge_api_result_with_local(api_result, raw_result) + + # Show visual overlay if requested (use API-ranked elements) + if options.show_overlay: + elements = api_result.get("elements", []) + if elements: + await backend.eval( + f""" + (() => {{ + if (window.sentience && window.sentience.showOverlay) {{ + window.sentience.showOverlay({_json_serialize(elements)}, null); + }} + }})() + """ + ) + + return Snapshot(**snapshot_data) + except (RuntimeError, ValueError): + # Re-raise validation errors as-is + raise + except Exception as e: + # Fallback to local extension on API error + # This matches the behavior of the main snapshot function + raise RuntimeError( + f"Server-side snapshot API failed: {e}. " + "Try using use_api=False to use local extension instead." + ) from e + + +def _build_extension_options(options: SnapshotOptions) -> dict[str, Any]: + """Build options dict for extension API call.""" + ext_options: dict[str, Any] = {} + + # Screenshot config + if options.screenshot is not False: + if hasattr(options.screenshot, "model_dump"): + ext_options["screenshot"] = options.screenshot.model_dump() + else: + ext_options["screenshot"] = options.screenshot + + # Limit (only if not default) + if options.limit != 50: + ext_options["limit"] = options.limit + + # Filter + if options.filter is not None: + if hasattr(options.filter, "model_dump"): + ext_options["filter"] = options.filter.model_dump() + else: + ext_options["filter"] = options.filter + + return ext_options + + +def _json_serialize(obj: Any) -> str: + """Serialize object to JSON string for embedding in JS.""" + import json + + return json.dumps(obj) diff --git a/sentience/browser.py b/sentience/browser.py index 58d0134..1c610ae 100644 --- a/sentience/browser.py +++ b/sentience/browser.py @@ -3,7 +3,9 @@ """ import asyncio +import logging import os +import platform import shutil import tempfile import time @@ -20,6 +22,8 @@ from sentience._extension_loader import find_extension_path from sentience.models import ProxyConfig, StorageState, Viewport +logger = logging.getLogger(__name__) + # Import stealth for bot evasion (optional - graceful fallback if not available) try: from playwright_stealth import stealth_async, stealth_sync @@ -145,14 +149,16 @@ def _parse_proxy(self, proxy_string: str) -> ProxyConfig | None: # Validate scheme if parsed.scheme not in ("http", "https", "socks5"): - print(f"⚠️ [Sentience] Unsupported proxy scheme: {parsed.scheme}") - print(" Supported: http, https, socks5") + logger.warning( + f"Unsupported proxy scheme: {parsed.scheme}. Supported: http, https, socks5" + ) return None # Validate host and port if not parsed.hostname or not parsed.port: - print("⚠️ [Sentience] Proxy URL must include hostname and port") - print(" Expected format: http://username:password@host:port") + logger.warning( + "Proxy URL must include hostname and port. Expected format: http://username:password@host:port" + ) return None # Build server URL @@ -166,8 +172,9 @@ def _parse_proxy(self, proxy_string: str) -> ProxyConfig | None: ) except Exception as e: - print(f"⚠️ [Sentience] Invalid proxy configuration: {e}") - print(" Expected format: http://username:password@host:port") + logger.warning( + f"Invalid proxy configuration: {e}. Expected format: http://username:password@host:port" + ) return None def start(self) -> None: @@ -187,13 +194,41 @@ def start(self) -> None: f"--disable-extensions-except={self._extension_path}", f"--load-extension={self._extension_path}", "--disable-blink-features=AutomationControlled", # Hides 'navigator.webdriver' - "--no-sandbox", "--disable-infobars", # WebRTC leak protection (prevents real IP exposure when using proxies/VPNs) "--disable-features=WebRtcHideLocalIpsWithMdns", "--force-webrtc-ip-handling-policy=disable_non_proxied_udp", ] + # Only add --no-sandbox on Linux (causes crashes on macOS) + # macOS sandboxing works fine and the flag actually causes crashes + if platform.system() == "Linux": + args.append("--no-sandbox") + + # Add GPU-disabling flags for macOS to prevent Chrome for Testing crash-on-exit + # These flags help avoid EXC_BAD_ACCESS crashes during browser shutdown + if platform.system() == "Darwin": # macOS + args.extend( + [ + "--disable-gpu", + "--disable-software-rasterizer", + "--disable-dev-shm-usage", + "--disable-breakpad", # Disable crash reporter to prevent macOS crash dialogs + "--disable-crash-reporter", # Disable crash reporter UI + "--disable-crash-handler", # Disable crash handler completely + "--disable-in-process-stack-traces", # Disable stack trace collection + "--disable-hang-monitor", # Disable hang detection + "--disable-background-networking", # Disable background networking + "--disable-background-timer-throttling", # Disable background throttling + "--disable-backgrounding-occluded-windows", # Disable backgrounding + "--disable-renderer-backgrounding", # Disable renderer backgrounding + "--disable-features=TranslateUI", # Disable translate UI + "--disable-ipc-flooding-protection", # Disable IPC flooding protection + "--disable-logging", # Disable logging to reduce stderr noise + "--log-level=3", # Set log level to fatal only (suppresses warnings) + ] + ) + # Handle headless mode correctly for extensions # 'headless=True' DOES NOT support extensions in standard Chrome # We must use 'headless="new"' (Chrome 112+) or run visible @@ -219,6 +254,8 @@ def start(self) -> None: "viewport": {"width": self.viewport.width, "height": self.viewport.height}, # Remove "HeadlessChrome" from User Agent automatically "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + # Note: Don't set "channel" - let Playwright use its default managed Chromium + # Setting channel=None doesn't force bundled Chromium and can still pick Chrome for Testing } # Add device scale factor if configured @@ -230,7 +267,7 @@ def start(self) -> None: launch_params["proxy"] = proxy_config.to_playwright_dict() # Ignore HTTPS errors when using proxy (many residential proxies use self-signed certs) launch_params["ignore_https_errors"] = True - print(f"🌐 [Sentience] Using proxy: {proxy_config.server}") + logger.info(f"Using proxy: {proxy_config.server}") # Add video recording if configured if self.record_video_dir: @@ -238,9 +275,8 @@ def start(self) -> None: video_dir.mkdir(parents=True, exist_ok=True) launch_params["record_video_dir"] = str(video_dir) launch_params["record_video_size"] = self.record_video_size - print(f"🎥 [Sentience] Recording video to: {video_dir}") - print( - f" Resolution: {self.record_video_size['width']}x{self.record_video_size['height']}" + logger.info( + f"Recording video to: {video_dir} (Resolution: {self.record_video_size['width']}x{self.record_video_size['height']})" ) # Launch persistent context (required for extensions) @@ -346,7 +382,7 @@ def _inject_storage_state( playwright_cookies.append(playwright_cookie) self.context.add_cookies(playwright_cookies) - print(f"✅ [Sentience] Injected {len(state.cookies)} cookie(s)") + logger.debug(f"Injected {len(state.cookies)} cookie(s)") # Inject LocalStorage (requires navigation to each domain) if state.origins: @@ -373,11 +409,11 @@ def _inject_storage_state( }""", localStorage_dict, ) - print( - f"✅ [Sentience] Injected {len(origin_data.localStorage)} localStorage item(s) for {origin}" + logger.debug( + f"Injected {len(origin_data.localStorage)} localStorage item(s) for {origin}" ) except Exception as e: - print(f"⚠️ [Sentience] Failed to inject localStorage for {origin}: {e}") + logger.warning(f"Failed to inject localStorage for {origin}: {e}") def _wait_for_extension(self, timeout_sec: float = 5.0) -> bool: """Poll for window.sentience to be available""" @@ -438,30 +474,15 @@ def close(self, output_path: str | Path | None = None) -> str | None: Note: Video files are saved automatically by Playwright when context closes. If multiple pages exist, returns the path to the first page's video. """ - temp_video_path = None - - # Get video path before closing (if recording was enabled) - # Note: Playwright saves videos when pages/context close, but we can get the - # expected path before closing. The actual file will be available after close. - if self.record_video_dir: - try: - # Try to get video path from the first page - if self.page and self.page.video: - temp_video_path = self.page.video.path() - # If that fails, check all pages in the context - elif self.context: - for page in self.context.pages: - if page.video: - temp_video_path = page.video.path() - break - except Exception: - # Video path might not be available until after close - # In that case, we'll return None and user can check the directory - pass + # CRITICAL: Don't access page.video.path() BEFORE closing context + # This can poke the video subsystem at an awkward time and cause crashes on macOS + # Instead, we'll locate the video file after context closes # Close context (this triggers video file finalization) if self.context: self.context.close() + # Small grace period to ensure video file is fully flushed to disk + time.sleep(0.5) # Close playwright if self.playwright: @@ -471,8 +492,24 @@ def close(self, output_path: str | Path | None = None) -> str | None: if self._extension_path and os.path.exists(self._extension_path): shutil.rmtree(self._extension_path) + # NOW resolve video path after context is closed and video is finalized + temp_video_path = None + if self.record_video_dir: + try: + # Locate the newest .webm file in record_video_dir + # This avoids touching page.video during teardown + video_dir = Path(self.record_video_dir) + if video_dir.exists(): + webm_files = list(video_dir.glob("*.webm")) + if webm_files: + # Get the most recently modified file + temp_video_path = max(webm_files, key=lambda p: p.stat().st_mtime) + logger.debug(f"Found video file: {temp_video_path}") + except Exception as e: + logger.warning(f"Could not locate video file: {e}") + # Rename/move video if output_path is specified - final_path = temp_video_path + final_path = str(temp_video_path) if temp_video_path else None if temp_video_path and output_path and os.path.exists(temp_video_path): try: output_path = str(output_path) @@ -485,7 +522,7 @@ def close(self, output_path: str | Path | None = None) -> str | None: warnings.warn(f"Failed to rename video file: {e}") # Return original path if rename fails - final_path = temp_video_path + final_path = str(temp_video_path) return final_path @@ -605,6 +642,7 @@ def __init__( record_video_size: dict[str, int] | None = None, viewport: Viewport | dict[str, int] | None = None, device_scale_factor: float | None = None, + executable_path: str | None = None, ): """ Initialize Async Sentience browser @@ -629,6 +667,10 @@ def __init__( 2.0 (Retina/high-DPI, like MacBook Pro) 3.0 (very high DPI) If None, defaults to 1.0 (standard DPI). + executable_path: Optional path to Chromium executable. If provided, forces use of + this specific browser binary instead of Playwright's managed browser. + Useful to guarantee Chromium (not Chrome for Testing) on macOS. + Example: "/path/to/playwright/chromium-1234/chrome-mac/Chromium.app/Contents/MacOS/Chromium" """ self.api_key = api_key # Only set api_url if api_key is provided, otherwise None (free tier) @@ -666,6 +708,9 @@ def __init__( # Device scale factor for high-DPI emulation self.device_scale_factor = device_scale_factor + # Executable path override (for forcing specific Chromium binary) + self.executable_path = executable_path + self.playwright: AsyncPlaywright | None = None self.context: AsyncBrowserContext | None = None self.page: AsyncPage | None = None @@ -689,14 +734,16 @@ def _parse_proxy(self, proxy_string: str) -> ProxyConfig | None: # Validate scheme if parsed.scheme not in ("http", "https", "socks5"): - print(f"⚠️ [Sentience] Unsupported proxy scheme: {parsed.scheme}") - print(" Supported: http, https, socks5") + logger.warning( + f"Unsupported proxy scheme: {parsed.scheme}. Supported: http, https, socks5" + ) return None # Validate host and port if not parsed.hostname or not parsed.port: - print("⚠️ [Sentience] Proxy URL must include hostname and port") - print(" Expected format: http://username:password@host:port") + logger.warning( + "Proxy URL must include hostname and port. Expected format: http://username:password@host:port" + ) return None # Build server URL @@ -710,8 +757,9 @@ def _parse_proxy(self, proxy_string: str) -> ProxyConfig | None: ) except Exception as e: - print(f"⚠️ [Sentience] Invalid proxy configuration: {e}") - print(" Expected format: http://username:password@host:port") + logger.warning( + f"Invalid proxy configuration: {e}. Expected format: http://username:password@host:port" + ) return None async def start(self) -> None: @@ -730,12 +778,40 @@ async def start(self) -> None: f"--disable-extensions-except={self._extension_path}", f"--load-extension={self._extension_path}", "--disable-blink-features=AutomationControlled", - "--no-sandbox", "--disable-infobars", "--disable-features=WebRtcHideLocalIpsWithMdns", "--force-webrtc-ip-handling-policy=disable_non_proxied_udp", ] + # Only add --no-sandbox on Linux (causes crashes on macOS) + # macOS sandboxing works fine and the flag actually causes crashes + if platform.system() == "Linux": + args.append("--no-sandbox") + + # Add GPU-disabling flags for macOS to prevent Chrome for Testing crash-on-exit + # These flags help avoid EXC_BAD_ACCESS crashes during browser shutdown + if platform.system() == "Darwin": # macOS + args.extend( + [ + "--disable-gpu", + "--disable-software-rasterizer", + "--disable-dev-shm-usage", + "--disable-breakpad", # Disable crash reporter to prevent macOS crash dialogs + "--disable-crash-reporter", # Disable crash reporter UI + "--disable-crash-handler", # Disable crash handler completely + "--disable-in-process-stack-traces", # Disable stack trace collection + "--disable-hang-monitor", # Disable hang detection + "--disable-background-networking", # Disable background networking + "--disable-background-timer-throttling", # Disable background throttling + "--disable-backgrounding-occluded-windows", # Disable backgrounding + "--disable-renderer-backgrounding", # Disable renderer backgrounding + "--disable-features=TranslateUI", # Disable translate UI + "--disable-ipc-flooding-protection", # Disable IPC flooding protection + "--disable-logging", # Disable logging to reduce stderr noise + "--log-level=3", # Set log level to fatal only (suppresses warnings) + ] + ) + if self.headless: args.append("--headless=new") @@ -756,8 +832,16 @@ async def start(self) -> None: "args": args, "viewport": {"width": self.viewport.width, "height": self.viewport.height}, "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + # Note: Don't set "channel" - let Playwright use its default managed Chromium + # Setting channel=None doesn't force bundled Chromium and can still pick Chrome for Testing } + # If executable_path is provided, use it to force specific Chromium binary + # This guarantees we use Chromium (not Chrome for Testing) on macOS + if self.executable_path: + launch_params["executable_path"] = self.executable_path + logger.info(f"Using explicit executable: {self.executable_path}") + # Add device scale factor if configured if self.device_scale_factor is not None: launch_params["device_scale_factor"] = self.device_scale_factor @@ -766,7 +850,7 @@ async def start(self) -> None: if proxy_config: launch_params["proxy"] = proxy_config.to_playwright_dict() launch_params["ignore_https_errors"] = True - print(f"🌐 [Sentience] Using proxy: {proxy_config.server}") + logger.info(f"Using proxy: {proxy_config.server}") # Add video recording if configured if self.record_video_dir: @@ -774,9 +858,8 @@ async def start(self) -> None: video_dir.mkdir(parents=True, exist_ok=True) launch_params["record_video_dir"] = str(video_dir) launch_params["record_video_size"] = self.record_video_size - print(f"🎥 [Sentience] Recording video to: {video_dir}") - print( - f" Resolution: {self.record_video_size['width']}x{self.record_video_size['height']}" + logger.info( + f"Recording video to: {video_dir} (Resolution: {self.record_video_size['width']}x{self.record_video_size['height']})" ) # Launch persistent context @@ -867,7 +950,7 @@ async def _inject_storage_state(self, storage_state: str | Path | StorageState | playwright_cookies.append(playwright_cookie) await self.context.add_cookies(playwright_cookies) - print(f"✅ [Sentience] Injected {len(state.cookies)} cookie(s)") + logger.debug(f"Injected {len(state.cookies)} cookie(s)") # Inject LocalStorage if state.origins: @@ -891,11 +974,11 @@ async def _inject_storage_state(self, storage_state: str | Path | StorageState | }""", localStorage_dict, ) - print( - f"✅ [Sentience] Injected {len(origin_data.localStorage)} localStorage item(s) for {origin}" + logger.debug( + f"Injected {len(origin_data.localStorage)} localStorage item(s) for {origin}" ) except Exception as e: - print(f"⚠️ [Sentience] Failed to inject localStorage for {origin}: {e}") + logger.warning(f"Failed to inject localStorage for {origin}: {e}") async def _wait_for_extension(self, timeout_sec: float = 5.0) -> bool: """Poll for window.sentience to be available (async)""" @@ -933,7 +1016,7 @@ async def _wait_for_extension(self, timeout_sec: float = 5.0) -> bool: return False - async def close(self, output_path: str | Path | None = None) -> str | None: + async def close(self, output_path: str | Path | None = None) -> tuple[str | None, bool]: """ Close browser and cleanup (async) @@ -941,29 +1024,88 @@ async def close(self, output_path: str | Path | None = None) -> str | None: output_path: Optional path to rename the video file to Returns: - Path to video file if recording was enabled, None otherwise + Tuple of (video_path, shutdown_clean) + - video_path: Path to video file if recording was enabled, None otherwise + - shutdown_clean: True if shutdown completed without errors, False if there were issues + + Note: Video path is resolved AFTER context close to avoid touching video + subsystem during teardown, which can cause crashes on macOS. """ - temp_video_path = None + # CRITICAL: Don't access page.video.path() BEFORE closing context + # This can poke the video subsystem at an awkward time and cause crashes + # Instead, we'll locate the video file after context closes + + # CRITICAL: Wait before closing to ensure all operations are complete + # This is especially important for video recording - we need to ensure + # all frames are written and the encoder is ready to finalize + if platform.system() == "Darwin": # macOS + # On macOS, give extra time for video encoder to finish writing frames + # 4K video recording needs more time to flush buffers + logger.debug("Waiting for video recording to stabilize before closing (macOS)...") + await asyncio.sleep(2.0) + else: + await asyncio.sleep(1.0) - if self.record_video_dir: + # Graceful shutdown: close context first, then playwright + # Use longer timeouts on macOS where video finalization can take longer + context_close_success = True + if self.context: + try: + # Give context time to close gracefully (especially for video finalization) + # Increased timeout for macOS where 4K video finalization can take longer + await asyncio.wait_for(self.context.close(), timeout=30.0) + logger.debug("Context closed successfully") + except TimeoutError: + logger.warning("Context close timed out, continuing with cleanup...") + context_close_success = False + except Exception as e: + logger.warning(f"Error closing context: {e}") + context_close_success = False + finally: + self.context = None + + # Give Chrome a moment to fully flush video + release resources + # This avoids stopping the driver while the browser is still finishing the .webm write/encoder shutdown + # Increased grace period on macOS to allow more time for process cleanup + grace_period = 2.0 if platform.system() == "Darwin" else 1.0 + await asyncio.sleep(grace_period) + + playwright_stop_success = True + if self.playwright: try: - if self.page and self.page.video: - temp_video_path = await self.page.video.path() - elif self.context: - for page in self.context.pages: - if page.video: - temp_video_path = await page.video.path() - break - except Exception: - pass + # Give playwright time to stop gracefully + # Increased timeout to match context close timeout + await asyncio.wait_for(self.playwright.stop(), timeout=15.0) + logger.debug("Playwright stopped successfully") + except TimeoutError: + logger.warning("Playwright stop timed out, continuing with cleanup...") + playwright_stop_success = False + except Exception as e: + logger.warning(f"Error stopping playwright: {e}") + playwright_stop_success = False + finally: + self.playwright = None - if self.context: - await self.context.close() - self.context = None + # Additional cleanup: On macOS, wait a bit more to ensure all browser processes are terminated + # This helps prevent crash dialogs from appearing + if platform.system() == "Darwin": + await asyncio.sleep(0.5) - if self.playwright: - await self.playwright.stop() - self.playwright = None + # NOW resolve video path after context is closed and video is finalized + temp_video_path = None + if self.record_video_dir: + try: + # Locate the newest .webm file in record_video_dir + # This avoids touching page.video during teardown + video_dir = Path(self.record_video_dir) + if video_dir.exists(): + webm_files = list(video_dir.glob("*.webm")) + if webm_files: + # Get the most recently modified file + temp_video_path = max(webm_files, key=lambda p: p.stat().st_mtime) + logger.debug(f"Found video file: {temp_video_path}") + except Exception as e: + logger.warning(f"Could not locate video file: {e}") if self._extension_path and os.path.exists(self._extension_path): shutil.rmtree(self._extension_path) @@ -984,7 +1126,19 @@ async def close(self, output_path: str | Path | None = None) -> str | None: warnings.warn(f"Failed to rename video file: {e}") final_path = temp_video_path - return final_path + # Log shutdown status (useful for detecting crashes in headless mode) + shutdown_clean = context_close_success and playwright_stop_success + if not shutdown_clean: + logger.warning( + f"Browser shutdown had issues - may indicate a crash " + f"(context_close: {context_close_success}, playwright_stop: {playwright_stop_success})" + ) + else: + logger.debug("Browser shutdown completed cleanly") + + # Return tuple: (video_path, shutdown_clean) + # This allows callers to detect crashes even in headless mode + return (final_path, shutdown_clean) async def __aenter__(self): """Async context manager entry""" @@ -993,6 +1147,7 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit""" + # Ignore return value in context manager exit await self.close() @classmethod diff --git a/sentience/extension/background.js b/sentience/extension/background.js index aff49b0..02c0408 100644 --- a/sentience/extension/background.js +++ b/sentience/extension/background.js @@ -1,4 +1,4 @@ -import init, { analyze_page_with_options, analyze_page, prune_for_api } from "../pkg/sentience_core.js"; +import init, { analyze_page_with_options, analyze_page, prune_for_api } from "./pkg/sentience_core.js"; let wasmReady = !1, wasmInitPromise = null; diff --git a/sentience/models.py b/sentience/models.py index 7bf48d3..985a264 100644 --- a/sentience/models.py +++ b/sentience/models.py @@ -118,6 +118,16 @@ class SnapshotOptions(BaseModel): """ Configuration for snapshot calls. Matches TypeScript SnapshotOptions interface from sdk-ts/src/snapshot.ts + + For browser-use integration (where you don't have a SentienceBrowser), + you can pass sentience_api_key directly in options: + + from sentience.models import SnapshotOptions + options = SnapshotOptions( + sentience_api_key="sk_pro_xxxxx", + use_api=True, + goal="Find the login button" + ) """ screenshot: bool | ScreenshotConfig = False # Union type: boolean or config @@ -129,6 +139,9 @@ class SnapshotOptions(BaseModel): goal: str | None = None # Optional goal/task description for the snapshot show_overlay: bool = False # Show visual overlay highlighting elements in browser + # API credentials (for browser-use integration without SentienceBrowser) + sentience_api_key: str | None = None # Sentience API key for Pro/Enterprise features + class Config: arbitrary_types_allowed = True diff --git a/sentience/snapshot.py b/sentience/snapshot.py index 6f8e4fd..3366141 100644 --- a/sentience/snapshot.py +++ b/sentience/snapshot.py @@ -19,6 +19,122 @@ MAX_PAYLOAD_BYTES = 10 * 1024 * 1024 +def _build_snapshot_payload( + raw_result: dict[str, Any], + options: SnapshotOptions, +) -> dict[str, Any]: + """ + Build payload dict for gateway snapshot API. + + Shared helper used by both sync and async snapshot implementations. + """ + return { + "raw_elements": raw_result.get("raw_elements", []), + "url": raw_result.get("url", ""), + "viewport": raw_result.get("viewport"), + "goal": options.goal, + "options": { + "limit": options.limit, + "filter": options.filter.model_dump() if options.filter else None, + }, + } + + +def _validate_payload_size(payload_json: str) -> None: + """ + Validate payload size before sending to gateway. + + Raises ValueError if payload exceeds server limit. + """ + payload_size = len(payload_json.encode("utf-8")) + if payload_size > MAX_PAYLOAD_BYTES: + raise ValueError( + f"Payload size ({payload_size / 1024 / 1024:.2f}MB) exceeds server limit " + f"({MAX_PAYLOAD_BYTES / 1024 / 1024:.0f}MB). " + f"Try reducing the number of elements on the page or filtering elements." + ) + + +def _post_snapshot_to_gateway_sync( + payload: dict[str, Any], + api_key: str, + api_url: str = "https://api.sentienceapi.com", +) -> dict[str, Any]: + """ + Post snapshot payload to gateway (synchronous). + + Used by sync snapshot() function. + """ + payload_json = json.dumps(payload) + _validate_payload_size(payload_json) + + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + } + + response = requests.post( + f"{api_url}/v1/snapshot", + data=payload_json, + headers=headers, + timeout=30, + ) + response.raise_for_status() + return response.json() + + +async def _post_snapshot_to_gateway_async( + payload: dict[str, Any], + api_key: str, + api_url: str = "https://api.sentienceapi.com", +) -> dict[str, Any]: + """ + Post snapshot payload to gateway (asynchronous). + + Used by async backend snapshot() function. + """ + # Lazy import httpx - only needed for async API calls + import httpx + + payload_json = json.dumps(payload) + _validate_payload_size(payload_json) + + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + } + + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + f"{api_url}/v1/snapshot", + content=payload_json, + headers=headers, + ) + response.raise_for_status() + return response.json() + + +def _merge_api_result_with_local( + api_result: dict[str, Any], + raw_result: dict[str, Any], +) -> dict[str, Any]: + """ + Merge API result with local data (screenshot, etc.). + + Shared helper used by both sync and async snapshot implementations. + """ + return { + "status": api_result.get("status", "success"), + "timestamp": api_result.get("timestamp"), + "url": api_result.get("url", raw_result.get("url", "")), + "viewport": api_result.get("viewport", raw_result.get("viewport")), + "elements": api_result.get("elements", []), + "screenshot": raw_result.get("screenshot"), # Keep local screenshot + "screenshot_format": raw_result.get("screenshot_format"), + "error": api_result.get("error"), + } + + def _save_trace_to_file(raw_elements: list[dict[str, Any]], trace_path: str | None = None) -> None: """ Save raw_elements to a JSON file for benchmarking/training @@ -72,14 +188,18 @@ def snapshot( if options is None: options = SnapshotOptions() + # Resolve API key: options.sentience_api_key takes precedence, then browser.api_key + # This allows browser-use users to pass api_key via options without SentienceBrowser + effective_api_key = options.sentience_api_key or browser.api_key + # Determine if we should use server-side API should_use_api = ( - options.use_api if options.use_api is not None else (browser.api_key is not None) + options.use_api if options.use_api is not None else (effective_api_key is not None) ) - if should_use_api and browser.api_key: + if should_use_api and effective_api_key: # Use server-side API (Pro/Enterprise tier) - return _snapshot_via_api(browser, options) + return _snapshot_via_api(browser, options, effective_api_key) else: # Use local extension (Free tier) return _snapshot_via_extension(browser, options) @@ -150,16 +270,14 @@ def _snapshot_via_extension( def _snapshot_via_api( browser: SentienceBrowser, options: SnapshotOptions, + api_key: str, ) -> Snapshot: """Take snapshot using server-side API (Pro/Enterprise tier)""" if not browser.page: raise RuntimeError("Browser not started. Call browser.start() first.") - if not browser.api_key: - raise ValueError("API key required for server-side processing") - - if not browser.api_url: - raise ValueError("API URL required for server-side processing") + # Use browser.api_url if set, otherwise default + api_url = browser.api_url or "https://api.sentienceapi.com" # CRITICAL: Wait for extension injection to complete (CSP-resistant architecture) # Even for API mode, we need the extension to collect raw data locally @@ -179,54 +297,13 @@ def _snapshot_via_api( # Step 2: Send to server for smart ranking/filtering # Use raw_elements (raw data) instead of elements (processed data) # Server validates API key and applies proprietary ranking logic - payload = { - "raw_elements": raw_result.get("raw_elements", []), # Raw data needed for server processing - "url": raw_result.get("url", ""), - "viewport": raw_result.get("viewport"), - "goal": options.goal, # Optional goal/task description - "options": { - "limit": options.limit, - "filter": options.filter.model_dump() if options.filter else None, - }, - } - - # Check payload size before sending (server has 10MB limit) - payload_json = json.dumps(payload) - payload_size = len(payload_json.encode("utf-8")) - if payload_size > MAX_PAYLOAD_BYTES: - raise ValueError( - f"Payload size ({payload_size / 1024 / 1024:.2f}MB) exceeds server limit " - f"({MAX_PAYLOAD_BYTES / 1024 / 1024:.0f}MB). " - f"Try reducing the number of elements on the page or filtering elements." - ) - - headers = { - "Authorization": f"Bearer {browser.api_key}", - "Content-Type": "application/json", - } + payload = _build_snapshot_payload(raw_result, options) try: - response = requests.post( - f"{browser.api_url}/v1/snapshot", - data=payload_json, # Reuse already-serialized JSON - headers=headers, - timeout=30, - ) - response.raise_for_status() - - api_result = response.json() + api_result = _post_snapshot_to_gateway_sync(payload, api_key, api_url) # Merge API result with local data (screenshot, etc.) - snapshot_data = { - "status": api_result.get("status", "success"), - "timestamp": api_result.get("timestamp"), - "url": api_result.get("url", raw_result.get("url", "")), - "viewport": api_result.get("viewport", raw_result.get("viewport")), - "elements": api_result.get("elements", []), - "screenshot": raw_result.get("screenshot"), # Keep local screenshot - "screenshot_format": raw_result.get("screenshot_format"), - "error": api_result.get("error"), - } + snapshot_data = _merge_api_result_with_local(api_result, raw_result) # Show visual overlay if requested (use API-ranked elements) if options.show_overlay: @@ -245,7 +322,7 @@ def _snapshot_via_api( return Snapshot(**snapshot_data) except requests.exceptions.RequestException as e: - raise RuntimeError(f"API request failed: {e}") + raise RuntimeError(f"API request failed: {e}") from e # ========== Async Snapshot Functions ========== @@ -281,14 +358,18 @@ async def snapshot_async( if options is None: options = SnapshotOptions() + # Resolve API key: options.sentience_api_key takes precedence, then browser.api_key + # This allows browser-use users to pass api_key via options without SentienceBrowser + effective_api_key = options.sentience_api_key or browser.api_key + # Determine if we should use server-side API should_use_api = ( - options.use_api if options.use_api is not None else (browser.api_key is not None) + options.use_api if options.use_api is not None else (effective_api_key is not None) ) - if should_use_api and browser.api_key: + if should_use_api and effective_api_key: # Use server-side API (Pro/Enterprise tier) - return await _snapshot_via_api_async(browser, options) + return await _snapshot_via_api_async(browser, options, effective_api_key) else: # Use local extension (Free tier) return await _snapshot_via_extension_async(browser, options) @@ -388,16 +469,14 @@ async def _snapshot_via_extension_async( async def _snapshot_via_api_async( browser: AsyncSentienceBrowser, options: SnapshotOptions, + api_key: str, ) -> Snapshot: """Take snapshot using server-side API (Pro/Enterprise tier) - async""" if not browser.page: raise RuntimeError("Browser not started. Call await browser.start() first.") - if not browser.api_key: - raise ValueError("API key required for server-side processing") - - if not browser.api_url: - raise ValueError("API URL required for server-side processing") + # Use browser.api_url if set, otherwise default + api_url = browser.api_url or "https://api.sentienceapi.com" # Wait for extension injection try: @@ -466,7 +545,7 @@ async def _snapshot_via_api_async( ) headers = { - "Authorization": f"Bearer {browser.api_key}", + "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } @@ -476,7 +555,7 @@ async def _snapshot_via_api_async( async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( - f"{browser.api_url}/v1/snapshot", + f"{api_url}/v1/snapshot", content=payload_json, headers=headers, ) diff --git a/sentience/verification.py b/sentience/verification.py index 216f25e..db80850 100644 --- a/sentience/verification.py +++ b/sentience/verification.py @@ -26,8 +26,9 @@ from __future__ import annotations import re +from collections.abc import Callable from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any, Callable +from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from .models import Snapshot @@ -330,7 +331,10 @@ def _pred(ctx: AssertContext) -> AssertOutcome: return AssertOutcome( passed=True, reason="", - details={"sub_predicates": all_details, "matched_at_index": len(all_details) - 1}, + details={ + "sub_predicates": all_details, + "matched_at_index": len(all_details) - 1, + }, ) all_reasons.append(outcome.reason) diff --git a/tests/test_backends.py b/tests/test_backends.py new file mode 100644 index 0000000..00e4325 --- /dev/null +++ b/tests/test_backends.py @@ -0,0 +1,958 @@ +""" +Tests for the backends module. + +These tests verify the CDP backend implementation works correctly +without requiring a real browser (using mocked CDP transport). +""" + +import asyncio +import time +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from sentience.backends import ( + BrowserBackendV0, + BrowserUseAdapter, + BrowserUseCDPTransport, + CachedSnapshot, + CDPBackendV0, + CDPTransport, + LayoutMetrics, + PlaywrightBackend, + ViewportInfo, + click, + scroll, + type_text, + wait_for_stable, +) +from sentience.models import ActionResult, BBox + + +class MockCDPTransport: + """Mock CDP transport for testing.""" + + def __init__(self) -> None: + self.calls: list[tuple[str, dict | None]] = [] + self.responses: dict[str, Any] = {} + + def set_response(self, method: str, response: Any) -> None: + """Set a response for a specific method.""" + self.responses[method] = response + + async def send(self, method: str, params: dict | None = None) -> dict: + """Record the call and return mock response.""" + self.calls.append((method, params)) + if method in self.responses: + response = self.responses[method] + if callable(response): + return response(params) + return response + return {} + + +class TestViewportInfo: + """Tests for ViewportInfo model.""" + + def test_create_viewport_info(self) -> None: + """Test creating ViewportInfo with all fields.""" + info = ViewportInfo( + width=1920, + height=1080, + scroll_x=100.0, + scroll_y=200.0, + content_width=3000.0, + content_height=5000.0, + ) + assert info.width == 1920 + assert info.height == 1080 + assert info.scroll_x == 100.0 + assert info.scroll_y == 200.0 + assert info.content_width == 3000.0 + assert info.content_height == 5000.0 + + def test_viewport_info_defaults(self) -> None: + """Test ViewportInfo default values.""" + info = ViewportInfo(width=800, height=600) + assert info.scroll_x == 0.0 + assert info.scroll_y == 0.0 + assert info.content_width is None + assert info.content_height is None + + +class TestLayoutMetrics: + """Tests for LayoutMetrics model.""" + + def test_create_layout_metrics(self) -> None: + """Test creating LayoutMetrics with all fields.""" + metrics = LayoutMetrics( + viewport_x=0.0, + viewport_y=100.0, + viewport_width=1920.0, + viewport_height=1080.0, + content_width=1920.0, + content_height=5000.0, + device_scale_factor=2.0, + ) + assert metrics.viewport_x == 0.0 + assert metrics.viewport_y == 100.0 + assert metrics.viewport_width == 1920.0 + assert metrics.viewport_height == 1080.0 + assert metrics.content_width == 1920.0 + assert metrics.content_height == 5000.0 + assert metrics.device_scale_factor == 2.0 + + def test_layout_metrics_defaults(self) -> None: + """Test LayoutMetrics default values.""" + metrics = LayoutMetrics() + assert metrics.viewport_x == 0.0 + assert metrics.viewport_y == 0.0 + assert metrics.viewport_width == 0.0 + assert metrics.viewport_height == 0.0 + assert metrics.content_width == 0.0 + assert metrics.content_height == 0.0 + assert metrics.device_scale_factor == 1.0 + + +class TestCDPBackendV0: + """Tests for CDPBackendV0 implementation.""" + + @pytest.fixture + def transport(self) -> MockCDPTransport: + """Create mock transport.""" + return MockCDPTransport() + + @pytest.fixture + def backend(self, transport: MockCDPTransport) -> CDPBackendV0: + """Create backend with mock transport.""" + return CDPBackendV0(transport) + + @pytest.mark.asyncio + async def test_refresh_page_info( + self, backend: CDPBackendV0, transport: MockCDPTransport + ) -> None: + """Test refresh_page_info returns ViewportInfo.""" + transport.set_response( + "Runtime.evaluate", + { + "result": { + "type": "object", + "value": { + "width": 1920, + "height": 1080, + "scroll_x": 0, + "scroll_y": 100, + "content_width": 1920, + "content_height": 5000, + }, + } + }, + ) + + info = await backend.refresh_page_info() + + assert isinstance(info, ViewportInfo) + assert info.width == 1920 + assert info.height == 1080 + assert info.scroll_y == 100 + + @pytest.mark.asyncio + async def test_eval(self, backend: CDPBackendV0, transport: MockCDPTransport) -> None: + """Test eval executes JavaScript and returns value.""" + transport.set_response( + "Runtime.evaluate", + {"result": {"type": "number", "value": 42}}, + ) + + result = await backend.eval("1 + 1") + + assert result == 42 + assert len(transport.calls) == 1 + assert transport.calls[0][0] == "Runtime.evaluate" + assert transport.calls[0][1]["expression"] == "1 + 1" + + @pytest.mark.asyncio + async def test_eval_exception(self, backend: CDPBackendV0, transport: MockCDPTransport) -> None: + """Test eval raises on JavaScript exception.""" + transport.set_response( + "Runtime.evaluate", + { + "exceptionDetails": { + "text": "ReferenceError: foo is not defined", + } + }, + ) + + with pytest.raises(RuntimeError, match="JavaScript evaluation failed"): + await backend.eval("foo") + + @pytest.mark.asyncio + async def test_get_layout_metrics( + self, backend: CDPBackendV0, transport: MockCDPTransport + ) -> None: + """Test get_layout_metrics returns LayoutMetrics.""" + transport.set_response( + "Page.getLayoutMetrics", + { + "layoutViewport": {"clientWidth": 1920, "clientHeight": 1080}, + "contentSize": {"width": 1920, "height": 5000}, + "visualViewport": { + "pageX": 0, + "pageY": 100, + "clientWidth": 1920, + "clientHeight": 1080, + "scale": 1.0, + }, + }, + ) + + metrics = await backend.get_layout_metrics() + + assert isinstance(metrics, LayoutMetrics) + assert metrics.viewport_width == 1920 + assert metrics.viewport_height == 1080 + assert metrics.content_height == 5000 + + @pytest.mark.asyncio + async def test_screenshot_png(self, backend: CDPBackendV0, transport: MockCDPTransport) -> None: + """Test screenshot_png returns PNG bytes.""" + import base64 + + # Create a minimal PNG (1x1 transparent pixel) + png_data = base64.b64encode(b"\x89PNG\r\n\x1a\n").decode() + transport.set_response( + "Page.captureScreenshot", + {"data": png_data}, + ) + + result = await backend.screenshot_png() + + assert isinstance(result, bytes) + assert result.startswith(b"\x89PNG") + + @pytest.mark.asyncio + async def test_mouse_move(self, backend: CDPBackendV0, transport: MockCDPTransport) -> None: + """Test mouse_move dispatches mouseMoved event.""" + await backend.mouse_move(100, 200) + + assert len(transport.calls) == 1 + method, params = transport.calls[0] + assert method == "Input.dispatchMouseEvent" + assert params["type"] == "mouseMoved" + assert params["x"] == 100 + assert params["y"] == 200 + + @pytest.mark.asyncio + async def test_mouse_click(self, backend: CDPBackendV0, transport: MockCDPTransport) -> None: + """Test mouse_click dispatches press and release events.""" + await backend.mouse_click(100, 200) + + assert len(transport.calls) == 2 + + # Check mousePressed + method, params = transport.calls[0] + assert method == "Input.dispatchMouseEvent" + assert params["type"] == "mousePressed" + assert params["x"] == 100 + assert params["y"] == 200 + assert params["button"] == "left" + + # Check mouseReleased + method, params = transport.calls[1] + assert method == "Input.dispatchMouseEvent" + assert params["type"] == "mouseReleased" + + @pytest.mark.asyncio + async def test_mouse_click_right_button( + self, backend: CDPBackendV0, transport: MockCDPTransport + ) -> None: + """Test mouse_click with right button.""" + await backend.mouse_click(100, 200, button="right") + + method, params = transport.calls[0] + assert params["button"] == "right" + + @pytest.mark.asyncio + async def test_wheel(self, backend: CDPBackendV0, transport: MockCDPTransport) -> None: + """Test wheel dispatches mouseWheel event.""" + # First set up viewport info for default coordinates + transport.set_response( + "Runtime.evaluate", + { + "result": { + "type": "object", + "value": {"width": 1920, "height": 1080}, + } + }, + ) + + await backend.wheel(delta_y=100, x=500, y=300) + + # Find the wheel event (skip the eval call if it happened) + wheel_calls = [c for c in transport.calls if c[0] == "Input.dispatchMouseEvent"] + assert len(wheel_calls) == 1 + + method, params = wheel_calls[0] + assert params["type"] == "mouseWheel" + assert params["deltaY"] == 100 + assert params["x"] == 500 + assert params["y"] == 300 + + @pytest.mark.asyncio + async def test_type_text(self, backend: CDPBackendV0, transport: MockCDPTransport) -> None: + """Test type_text dispatches key events for each character.""" + await backend.type_text("Hi") + + # Each character generates keyDown, char, keyUp = 3 events + # "Hi" = 2 chars = 6 events + key_events = [c for c in transport.calls if c[0] == "Input.dispatchKeyEvent"] + assert len(key_events) == 6 + + # Check first character 'H' + assert key_events[0][1]["type"] == "keyDown" + assert key_events[0][1]["text"] == "H" + assert key_events[1][1]["type"] == "char" + assert key_events[2][1]["type"] == "keyUp" + + @pytest.mark.asyncio + async def test_wait_ready_state_immediate( + self, backend: CDPBackendV0, transport: MockCDPTransport + ) -> None: + """Test wait_ready_state returns immediately if state is met.""" + transport.set_response( + "Runtime.evaluate", + {"result": {"type": "string", "value": "complete"}}, + ) + + # Should not raise + await backend.wait_ready_state(state="complete", timeout_ms=1000) + + @pytest.mark.asyncio + async def test_wait_ready_state_timeout( + self, backend: CDPBackendV0, transport: MockCDPTransport + ) -> None: + """Test wait_ready_state raises on timeout.""" + transport.set_response( + "Runtime.evaluate", + {"result": {"type": "string", "value": "loading"}}, + ) + + with pytest.raises(TimeoutError, match="Timed out"): + await backend.wait_ready_state(state="complete", timeout_ms=200) + + +class TestCDPBackendProtocol: + """Test that CDPBackendV0 implements BrowserBackendV0 protocol.""" + + def test_implements_protocol(self) -> None: + """Verify CDPBackendV0 is recognized as BrowserBackendV0.""" + transport = MockCDPTransport() + backend = CDPBackendV0(transport) + assert isinstance(backend, BrowserBackendV0) + + +class TestBrowserUseCDPTransport: + """Tests for BrowserUseCDPTransport.""" + + @pytest.mark.asyncio + async def test_send_translates_method(self) -> None: + """Test that send correctly translates method to cdp-use pattern.""" + # Create mock cdp_client with send.Domain.method pattern + mock_method = AsyncMock(return_value={"result": "success"}) + mock_domain = MagicMock() + mock_domain.evaluate = mock_method + + mock_send = MagicMock() + mock_send.Runtime = mock_domain + + mock_client = MagicMock() + mock_client.send = mock_send + + transport = BrowserUseCDPTransport(mock_client, "session-123") + result = await transport.send("Runtime.evaluate", {"expression": "1+1"}) + + # Verify the method was called correctly + mock_method.assert_called_once_with( + params={"expression": "1+1"}, + session_id="session-123", + ) + assert result == {"result": "success"} + + @pytest.mark.asyncio + async def test_send_invalid_method_format(self) -> None: + """Test send raises on invalid method format.""" + mock_client = MagicMock() + transport = BrowserUseCDPTransport(mock_client, "session-123") + + with pytest.raises(ValueError, match="Invalid CDP method format"): + await transport.send("InvalidMethod") + + @pytest.mark.asyncio + async def test_send_unknown_domain(self) -> None: + """Test send raises on unknown domain.""" + mock_send = MagicMock() + mock_send.UnknownDomain = None + + mock_client = MagicMock() + mock_client.send = mock_send + + transport = BrowserUseCDPTransport(mock_client, "session-123") + + with pytest.raises(ValueError, match="Unknown CDP domain"): + await transport.send("UnknownDomain.method") + + +class TestBrowserUseAdapter: + """Tests for BrowserUseAdapter.""" + + def test_api_key_returns_none(self) -> None: + """Test api_key property returns None.""" + mock_session = MagicMock() + adapter = BrowserUseAdapter(mock_session) + assert adapter.api_key is None + + def test_api_url_returns_none(self) -> None: + """Test api_url property returns None.""" + mock_session = MagicMock() + adapter = BrowserUseAdapter(mock_session) + assert adapter.api_url is None + + @pytest.mark.asyncio + async def test_create_backend(self) -> None: + """Test create_backend creates CDPBackendV0.""" + # Create mock CDP session + mock_cdp_session = MagicMock() + mock_cdp_session.cdp_client = MagicMock() + mock_cdp_session.session_id = "session-123" + + # Create mock browser session + mock_session = MagicMock() + mock_session.get_or_create_cdp_session = AsyncMock(return_value=mock_cdp_session) + + adapter = BrowserUseAdapter(mock_session) + backend = await adapter.create_backend() + + assert isinstance(backend, CDPBackendV0) + mock_session.get_or_create_cdp_session.assert_called_once() + + @pytest.mark.asyncio + async def test_create_backend_caches_result(self) -> None: + """Test create_backend returns same instance on repeated calls.""" + mock_cdp_session = MagicMock() + mock_cdp_session.cdp_client = MagicMock() + mock_cdp_session.session_id = "session-123" + + mock_session = MagicMock() + mock_session.get_or_create_cdp_session = AsyncMock(return_value=mock_cdp_session) + + adapter = BrowserUseAdapter(mock_session) + + backend1 = await adapter.create_backend() + backend2 = await adapter.create_backend() + + assert backend1 is backend2 + # Should only create once + assert mock_session.get_or_create_cdp_session.call_count == 1 + + @pytest.mark.asyncio + async def test_create_backend_no_cdp_method(self) -> None: + """Test create_backend raises if session lacks CDP support.""" + mock_session = MagicMock(spec=[]) # No get_or_create_cdp_session + + adapter = BrowserUseAdapter(mock_session) + + with pytest.raises(RuntimeError, match="does not have get_or_create_cdp_session"): + await adapter.create_backend() + + @pytest.mark.asyncio + async def test_get_page_async(self) -> None: + """Test get_page_async returns page from session.""" + mock_page = MagicMock() + mock_session = MagicMock() + mock_session.get_current_page = AsyncMock(return_value=mock_page) + + adapter = BrowserUseAdapter(mock_session) + page = await adapter.get_page_async() + + assert page is mock_page + + +class TestBackendAgnosticActions: + """Tests for backend-agnostic action functions.""" + + @pytest.fixture + def transport(self) -> MockCDPTransport: + """Create mock transport.""" + return MockCDPTransport() + + @pytest.fixture + def backend(self, transport: MockCDPTransport) -> CDPBackendV0: + """Create backend with mock transport.""" + return CDPBackendV0(transport) + + @pytest.mark.asyncio + async def test_click_with_tuple( + self, backend: CDPBackendV0, transport: MockCDPTransport + ) -> None: + """Test click with (x, y) tuple.""" + result = await click(backend, (100, 200)) + + assert isinstance(result, ActionResult) + assert result.success is True + + # Should have mouse move + mouse click (press + release) + mouse_events = [c for c in transport.calls if c[0] == "Input.dispatchMouseEvent"] + assert len(mouse_events) == 3 # move, press, release + + @pytest.mark.asyncio + async def test_click_with_bbox( + self, backend: CDPBackendV0, transport: MockCDPTransport + ) -> None: + """Test click with BBox (clicks center).""" + bbox = BBox(x=100, y=200, width=50, height=30) + result = await click(backend, bbox) + + assert result.success is True + + # Find the click event + press_events = [ + c + for c in transport.calls + if c[0] == "Input.dispatchMouseEvent" and c[1]["type"] == "mousePressed" + ] + assert len(press_events) == 1 + # Should click at center: (100 + 25, 200 + 15) = (125, 215) + assert press_events[0][1]["x"] == 125 + assert press_events[0][1]["y"] == 215 + + @pytest.mark.asyncio + async def test_click_with_dict( + self, backend: CDPBackendV0, transport: MockCDPTransport + ) -> None: + """Test click with dict containing x, y.""" + result = await click(backend, {"x": 150, "y": 250}) + + assert result.success is True + + @pytest.mark.asyncio + async def test_click_double(self, backend: CDPBackendV0, transport: MockCDPTransport) -> None: + """Test double-click.""" + result = await click(backend, (100, 200), click_count=2) + + assert result.success is True + + # Check clickCount parameter + press_events = [ + c + for c in transport.calls + if c[0] == "Input.dispatchMouseEvent" and c[1]["type"] == "mousePressed" + ] + assert press_events[0][1]["clickCount"] == 2 + + @pytest.mark.asyncio + async def test_type_text_simple( + self, backend: CDPBackendV0, transport: MockCDPTransport + ) -> None: + """Test typing text.""" + result = await type_text(backend, "Hi") + + assert isinstance(result, ActionResult) + assert result.success is True + + # Check key events were dispatched + key_events = [c for c in transport.calls if c[0] == "Input.dispatchKeyEvent"] + assert len(key_events) == 6 # 2 chars * 3 events each + + @pytest.mark.asyncio + async def test_type_text_with_target( + self, backend: CDPBackendV0, transport: MockCDPTransport + ) -> None: + """Test typing text with click target.""" + result = await type_text(backend, "test", target=(100, 200)) + + assert result.success is True + + # Should have click + key events + mouse_events = [c for c in transport.calls if c[0] == "Input.dispatchMouseEvent"] + key_events = [c for c in transport.calls if c[0] == "Input.dispatchKeyEvent"] + assert len(mouse_events) >= 2 # At least press + release + assert len(key_events) == 12 # 4 chars * 3 events + + @pytest.mark.asyncio + async def test_scroll_down(self, backend: CDPBackendV0, transport: MockCDPTransport) -> None: + """Test scrolling down.""" + # Set up viewport for default coordinates + transport.set_response( + "Runtime.evaluate", + { + "result": { + "type": "object", + "value": {"width": 1920, "height": 1080}, + } + }, + ) + + result = await scroll(backend, delta_y=300) + + assert result.success is True + + wheel_events = [ + c + for c in transport.calls + if c[0] == "Input.dispatchMouseEvent" and c[1].get("type") == "mouseWheel" + ] + assert len(wheel_events) == 1 + assert wheel_events[0][1]["deltaY"] == 300 + + @pytest.mark.asyncio + async def test_scroll_at_position( + self, backend: CDPBackendV0, transport: MockCDPTransport + ) -> None: + """Test scrolling at specific position.""" + result = await scroll(backend, delta_y=200, target=(500, 300)) + + assert result.success is True + + wheel_events = [ + c + for c in transport.calls + if c[0] == "Input.dispatchMouseEvent" and c[1].get("type") == "mouseWheel" + ] + assert wheel_events[0][1]["x"] == 500 + assert wheel_events[0][1]["y"] == 300 + + @pytest.mark.asyncio + async def test_wait_for_stable_success( + self, backend: CDPBackendV0, transport: MockCDPTransport + ) -> None: + """Test wait_for_stable with immediate success.""" + transport.set_response( + "Runtime.evaluate", + {"result": {"type": "string", "value": "complete"}}, + ) + + result = await wait_for_stable(backend, state="complete", timeout_ms=1000) + + assert result.success is True + + @pytest.mark.asyncio + async def test_wait_for_stable_timeout( + self, backend: CDPBackendV0, transport: MockCDPTransport + ) -> None: + """Test wait_for_stable timeout.""" + transport.set_response( + "Runtime.evaluate", + {"result": {"type": "string", "value": "loading"}}, + ) + + result = await wait_for_stable(backend, state="complete", timeout_ms=200) + + assert result.success is False + assert result.error["code"] == "timeout" + + +class TestPlaywrightBackend: + """Tests for PlaywrightBackend wrapper.""" + + def test_implements_protocol(self) -> None: + """Verify PlaywrightBackend implements BrowserBackendV0.""" + mock_page = MagicMock() + backend = PlaywrightBackend(mock_page) + assert isinstance(backend, BrowserBackendV0) + + def test_page_property(self) -> None: + """Test page property returns underlying page.""" + mock_page = MagicMock() + backend = PlaywrightBackend(mock_page) + assert backend.page is mock_page + + @pytest.mark.asyncio + async def test_refresh_page_info(self) -> None: + """Test refresh_page_info calls page.evaluate.""" + mock_page = AsyncMock() + mock_page.evaluate = AsyncMock( + return_value={ + "width": 1920, + "height": 1080, + "scroll_x": 0, + "scroll_y": 100, + "content_width": 1920, + "content_height": 5000, + } + ) + + backend = PlaywrightBackend(mock_page) + info = await backend.refresh_page_info() + + assert isinstance(info, ViewportInfo) + assert info.width == 1920 + assert info.scroll_y == 100 + + @pytest.mark.asyncio + async def test_eval(self) -> None: + """Test eval calls page.evaluate.""" + mock_page = AsyncMock() + mock_page.evaluate = AsyncMock(return_value=42) + + backend = PlaywrightBackend(mock_page) + result = await backend.eval("1 + 1") + + assert result == 42 + + @pytest.mark.asyncio + async def test_mouse_click(self) -> None: + """Test mouse_click calls page.mouse.click.""" + mock_mouse = AsyncMock() + mock_page = MagicMock() + mock_page.mouse = mock_mouse + + backend = PlaywrightBackend(mock_page) + await backend.mouse_click(100, 200, button="left", click_count=1) + + mock_mouse.click.assert_called_once_with(100, 200, button="left", click_count=1) + + @pytest.mark.asyncio + async def test_type_text(self) -> None: + """Test type_text calls page.keyboard.type.""" + mock_keyboard = AsyncMock() + mock_page = MagicMock() + mock_page.keyboard = mock_keyboard + + backend = PlaywrightBackend(mock_page) + await backend.type_text("Hello") + + mock_keyboard.type.assert_called_once_with("Hello") + + @pytest.mark.asyncio + async def test_screenshot_png(self) -> None: + """Test screenshot_png calls page.screenshot.""" + mock_page = AsyncMock() + mock_page.screenshot = AsyncMock(return_value=b"\x89PNG\r\n\x1a\n") + + backend = PlaywrightBackend(mock_page) + result = await backend.screenshot_png() + + assert result.startswith(b"\x89PNG") + mock_page.screenshot.assert_called_once_with(type="png") + + +class TestCachedSnapshot: + """Tests for CachedSnapshot caching behavior.""" + + @pytest.fixture + def mock_backend(self) -> MagicMock: + """Create mock backend.""" + backend = MagicMock() + backend.eval = AsyncMock() + return backend + + def test_initial_state(self, mock_backend: MagicMock) -> None: + """Test initial cache state.""" + cache = CachedSnapshot(mock_backend, max_age_ms=2000) + + assert cache.is_cached is False + assert cache.age_ms == float("inf") + + def test_invalidate(self, mock_backend: MagicMock) -> None: + """Test cache invalidation.""" + cache = CachedSnapshot(mock_backend) + cache._cached = MagicMock() # Simulate cached snapshot + cache._cached_at = time.time() + + assert cache.is_cached is True + + cache.invalidate() + + assert cache.is_cached is False + assert cache.age_ms == float("inf") + + def test_staleness_by_age(self, mock_backend: MagicMock) -> None: + """Test cache staleness detection.""" + cache = CachedSnapshot(mock_backend, max_age_ms=100) + + # Simulate old cache + cache._cached = MagicMock() + cache._cached_at = time.time() - 0.2 # 200ms ago + + assert cache._is_stale() is True + + def test_fresh_cache(self, mock_backend: MagicMock) -> None: + """Test fresh cache detection.""" + cache = CachedSnapshot(mock_backend, max_age_ms=2000) + + # Simulate fresh cache + cache._cached = MagicMock() + cache._cached_at = time.time() + + assert cache._is_stale() is False + + +class TestCoordinateResolution: + """Test coordinate resolution in actions.""" + + @pytest.mark.asyncio + async def test_bbox_center_calculation(self) -> None: + """Test BBox center calculation.""" + from sentience.backends.actions import _resolve_coordinates + + bbox = BBox(x=100, y=200, width=50, height=30) + x, y = _resolve_coordinates(bbox) + + assert x == 125 # 100 + 50/2 + assert y == 215 # 200 + 30/2 + + @pytest.mark.asyncio + async def test_dict_with_dimensions(self) -> None: + """Test dict with width/height computes center.""" + from sentience.backends.actions import _resolve_coordinates + + target = {"x": 100, "y": 200, "width": 50, "height": 30} + x, y = _resolve_coordinates(target) + + assert x == 125 + assert y == 215 + + @pytest.mark.asyncio + async def test_dict_without_dimensions(self) -> None: + """Test dict without width/height uses x/y directly.""" + from sentience.backends.actions import _resolve_coordinates + + target = {"x": 150, "y": 250} + x, y = _resolve_coordinates(target) + + assert x == 150 + assert y == 250 + + @pytest.mark.asyncio + async def test_tuple_passthrough(self) -> None: + """Test tuple passes through unchanged.""" + from sentience.backends.actions import _resolve_coordinates + + x, y = _resolve_coordinates((300, 400)) + + assert x == 300 + assert y == 400 + + +class TestBackendExceptions: + """Tests for custom backend exceptions.""" + + def test_extension_diagnostics_from_dict(self) -> None: + """Test ExtensionDiagnostics.from_dict.""" + from sentience.backends.exceptions import ExtensionDiagnostics + + data = { + "sentience_defined": True, + "sentience_snapshot": False, + "url": "https://example.com", + } + diag = ExtensionDiagnostics.from_dict(data) + + assert diag.sentience_defined is True + assert diag.sentience_snapshot is False + assert diag.url == "https://example.com" + assert diag.error is None + + def test_extension_diagnostics_to_dict(self) -> None: + """Test ExtensionDiagnostics.to_dict.""" + from sentience.backends.exceptions import ExtensionDiagnostics + + diag = ExtensionDiagnostics( + sentience_defined=True, + sentience_snapshot=True, + url="https://test.com", + error=None, + ) + result = diag.to_dict() + + assert result["sentience_defined"] is True + assert result["sentience_snapshot"] is True + assert result["url"] == "https://test.com" + + def test_extension_not_loaded_error_from_timeout(self) -> None: + """Test ExtensionNotLoadedError.from_timeout creates helpful message.""" + from sentience.backends.exceptions import ExtensionDiagnostics, ExtensionNotLoadedError + + diag = ExtensionDiagnostics( + sentience_defined=False, + sentience_snapshot=False, + url="https://example.com", + ) + error = ExtensionNotLoadedError.from_timeout(timeout_ms=5000, diagnostics=diag) + + assert error.timeout_ms == 5000 + assert error.diagnostics is diag + assert "5000ms" in str(error) + assert "window.sentience defined: False" in str(error) + assert "get_extension_dir" in str(error) # Contains fix suggestion + + def test_extension_not_loaded_error_with_eval_error(self) -> None: + """Test ExtensionNotLoadedError when diagnostics collection failed.""" + from sentience.backends.exceptions import ExtensionDiagnostics, ExtensionNotLoadedError + + diag = ExtensionDiagnostics(error="Could not evaluate JavaScript") + error = ExtensionNotLoadedError.from_timeout(timeout_ms=3000, diagnostics=diag) + + assert "Could not evaluate JavaScript" in str(error) + + def test_snapshot_error_from_null_result(self) -> None: + """Test SnapshotError.from_null_result creates helpful message.""" + from sentience.backends.exceptions import SnapshotError + + error = SnapshotError.from_null_result(url="https://example.com/page") + + assert error.url == "https://example.com/page" + assert "returned null" in str(error) + assert "example.com/page" in str(error) + + def test_snapshot_error_from_null_result_no_url(self) -> None: + """Test SnapshotError.from_null_result without URL.""" + from sentience.backends.exceptions import SnapshotError + + error = SnapshotError.from_null_result(url=None) + + assert error.url is None + assert "returned null" in str(error) + + def test_action_error_message_format(self) -> None: + """Test ActionError formats message correctly.""" + from sentience.backends.exceptions import ActionError + + error = ActionError( + action="click", + message="Element not found", + coordinates=(100, 200), + ) + + assert error.action == "click" + assert error.coordinates == (100, 200) + assert "click failed" in str(error) + assert "Element not found" in str(error) + + def test_sentience_backend_error_inheritance(self) -> None: + """Test all exceptions inherit from SentienceBackendError.""" + from sentience.backends.exceptions import ( + ActionError, + BackendEvalError, + ExtensionInjectionError, + ExtensionNotLoadedError, + SentienceBackendError, + SnapshotError, + ) + + assert issubclass(ExtensionNotLoadedError, SentienceBackendError) + assert issubclass(ExtensionInjectionError, SentienceBackendError) + assert issubclass(BackendEvalError, SentienceBackendError) + assert issubclass(SnapshotError, SentienceBackendError) + assert issubclass(ActionError, SentienceBackendError) + + def test_extension_injection_error_from_page(self) -> None: + """Test ExtensionInjectionError.from_page.""" + from sentience.backends.exceptions import ExtensionInjectionError + + error = ExtensionInjectionError.from_page("https://secure-site.com") + + assert error.url == "https://secure-site.com" + assert "secure-site.com" in str(error) + assert "Content Security Policy" in str(error) diff --git a/tests/test_proxy.py b/tests/test_proxy.py index 63e4366..ad3c917 100644 --- a/tests/test_proxy.py +++ b/tests/test_proxy.py @@ -1,8 +1,11 @@ """Tests for proxy support in SentienceBrowser""" +import logging import os from unittest.mock import MagicMock, patch +import pytest + from sentience.browser import SentienceBrowser from sentience.models import ProxyConfig @@ -96,33 +99,33 @@ def test_parse_proxy_socks5(self): assert config.username == "user" assert config.password == "pass" - def test_parse_proxy_invalid_scheme(self, capsys): + def test_parse_proxy_invalid_scheme(self, caplog): """Test parsing proxy with invalid scheme""" - browser = SentienceBrowser() - config = browser._parse_proxy("ftp://proxy.example.com:8080") + with caplog.at_level(logging.WARNING): + browser = SentienceBrowser() + config = browser._parse_proxy("ftp://proxy.example.com:8080") - assert config is None - captured = capsys.readouterr() - assert "Unsupported proxy scheme: ftp" in captured.out - assert "Supported: http, https, socks5" in captured.out + assert config is None + assert "Unsupported proxy scheme: ftp" in caplog.text + assert "Supported: http, https, socks5" in caplog.text - def test_parse_proxy_missing_port(self, capsys): + def test_parse_proxy_missing_port(self, caplog): """Test parsing proxy without port""" - browser = SentienceBrowser() - config = browser._parse_proxy("http://proxy.example.com") + with caplog.at_level(logging.WARNING): + browser = SentienceBrowser() + config = browser._parse_proxy("http://proxy.example.com") - assert config is None - captured = capsys.readouterr() - assert "Proxy URL must include hostname and port" in captured.out + assert config is None + assert "Proxy URL must include hostname and port" in caplog.text - def test_parse_proxy_missing_host(self, capsys): + def test_parse_proxy_missing_host(self, caplog): """Test parsing proxy without hostname""" - browser = SentienceBrowser() - config = browser._parse_proxy("http://:8080") + with caplog.at_level(logging.WARNING): + browser = SentienceBrowser() + config = browser._parse_proxy("http://:8080") - assert config is None - captured = capsys.readouterr() - assert "Proxy URL must include hostname and port" in captured.out + assert config is None + assert "Proxy URL must include hostname and port" in caplog.text def test_parse_proxy_empty_string(self): """Test parsing empty proxy string""" @@ -202,7 +205,7 @@ def test_start_without_proxy(self, mock_playwright, mock_copytree): @patch("sentience.browser.shutil.copytree") @patch("sentience.browser.sync_playwright") - def test_start_with_proxy(self, mock_playwright, mock_copytree, capsys): + def test_start_with_proxy(self, mock_playwright, mock_copytree, caplog): """Test browser start with proxy""" # Mock Playwright mock_pw_instance = MagicMock() @@ -221,21 +224,21 @@ def test_start_with_proxy(self, mock_playwright, mock_copytree, capsys): mock_path.return_value.parent.parent.parent = MagicMock() mock_path.return_value.parent.parent.parent.__truediv__.return_value = mock_ext_path - browser = SentienceBrowser(proxy="http://user:pass@proxy.example.com:8080") - browser.start() - - # Verify proxy was passed to launch_persistent_context - call_kwargs = mock_pw_instance.chromium.launch_persistent_context.call_args[1] - assert "proxy" in call_kwargs - assert call_kwargs["proxy"] == { - "server": "http://proxy.example.com:8080", - "username": "user", - "password": "pass", - } - - # Verify console message - captured = capsys.readouterr() - assert "Using proxy: http://proxy.example.com:8080" in captured.out + with caplog.at_level(logging.INFO): + browser = SentienceBrowser(proxy="http://user:pass@proxy.example.com:8080") + browser.start() + + # Verify proxy was passed to launch_persistent_context + call_kwargs = mock_pw_instance.chromium.launch_persistent_context.call_args[1] + assert "proxy" in call_kwargs + assert call_kwargs["proxy"] == { + "server": "http://proxy.example.com:8080", + "username": "user", + "password": "pass", + } + + # Verify log message + assert "Using proxy: http://proxy.example.com:8080" in caplog.text @patch("sentience.browser.shutil.copytree") @patch("sentience.browser.sync_playwright") diff --git a/tests/test_video_recording.py b/tests/test_video_recording.py index 7db966b..0b49553 100644 --- a/tests/test_video_recording.py +++ b/tests/test_video_recording.py @@ -22,6 +22,10 @@ def test_video_recording_basic(): try: browser.page.goto("https://example.com") browser.page.wait_for_load_state("domcontentloaded") + + # Small delay to ensure page is fully loaded and video recording is stable + import time + time.sleep(0.5) video_path = browser.close() @@ -33,9 +37,29 @@ def test_video_recording_basic(): # Verify file has content file_size = os.path.getsize(video_path) assert file_size > 0 - except Exception: - browser.close() - raise + except Exception as e: + # Ensure browser is closed even on error + # Catch Playwright "Event loop is closed" errors during cleanup + try: + if browser.page: + try: + browser.page.close() + except Exception: + pass # Page might already be closed + if browser.context: + try: + browser.context.close() + except Exception: + pass # Context might already be closed + if browser.playwright: + try: + browser.playwright.stop() + except Exception: + pass # Playwright might already be stopped + except Exception: + pass # Ignore cleanup errors + # Re-raise original exception + raise e def test_video_recording_custom_resolution():