diff --git a/.python-version b/.python-version deleted file mode 100644 index e4fba21835..0000000000 --- a/.python-version +++ /dev/null @@ -1 +0,0 @@ -3.12 diff --git a/browser_use/agent/message_manager/service.py b/browser_use/agent/message_manager/service.py index d79903e70a..76f0cd25f6 100644 --- a/browser_use/agent/message_manager/service.py +++ b/browser_use/agent/message_manager/service.py @@ -302,6 +302,17 @@ def create_state_messages( ) -> None: """Create single state message with all content""" + # Check if Sentience snapshot was injected BEFORE clearing context messages + # (Sentience message is added to context_messages, so we need to check before clearing) + has_sentience = any( + msg.content and isinstance(msg.content, str) and ( + "Elements (ID|role|text|importance)" in msg.content or + "Elements: ID|role|text|imp|docYq|ord|DG|href" in msg.content or + "Rules: ordinal→DG=1 then ord asc" in msg.content + ) + for msg in self.state.history.context_messages + ) + # Clear contextual messages from previous steps to prevent accumulation self.state.history.context_messages.clear() @@ -343,8 +354,36 @@ def create_state_messages( if include_screenshot and browser_state_summary.screenshot: screenshots.append(browser_state_summary.screenshot) - # Use vision in the user message if screenshots are included - effective_use_vision = len(screenshots) > 0 + # Use vision in the user message if screenshots are included OR if there are other images + # When use_vision=False, exclude ALL images (screenshots, sample_images, read_state_images) + has_other_images = bool(self.sample_images) or bool(self.state.read_state_images) + # Only use vision if: (1) we have screenshots, OR (2) use_vision is not False AND we have other images + effective_use_vision = len(screenshots) > 0 or (use_vision is not False and has_other_images) + + # Debug logging for vision usage + if effective_use_vision: + logger.info( + '⚠️ Vision is ENABLED: use_vision=%s, screenshots=%d, sample_images=%d, read_state_images=%d', + use_vision, len(screenshots), len(self.sample_images) if self.sample_images else 0, + len(self.state.read_state_images) if self.state.read_state_images else 0 + ) + else: + logger.info( + '✅ Vision is DISABLED: use_vision=%s, screenshots=%d, sample_images=%d, read_state_images=%d', + use_vision, len(screenshots), len(self.sample_images) if self.sample_images else 0, + len(self.state.read_state_images) if self.state.read_state_images else 0 + ) + + # Use the has_sentience flag we detected before clearing context_messages + # Log Sentience detection for debugging + if has_sentience: + logger.info('✅ Sentience detected - reducing DOM size to 5000 chars') + else: + logger.info('❌ Sentience NOT detected - using full DOM size (40000 chars)') + + # Reduce DOM tree size when Sentience provides semantic geometry + # Default is 40,000 chars, reduce to 5,000 when Sentience is available + max_clickable_elements_length = 5000 if has_sentience else 40000 # Create single state message with all content assert browser_state_summary @@ -357,6 +396,7 @@ def create_state_messages( include_attributes=self.include_attributes, step_info=step_info, page_filtered_actions=page_filtered_actions, + max_clickable_elements_length=max_clickable_elements_length, sensitive_data=self.sensitive_data_description, available_file_paths=available_file_paths, screenshots=screenshots, diff --git a/browser_use/agent/prompts.py b/browser_use/agent/prompts.py index 882f2fac6d..da9538b132 100644 --- a/browser_use/agent/prompts.py +++ b/browser_use/agent/prompts.py @@ -1,4 +1,5 @@ import importlib.resources +import logging from datetime import datetime from typing import TYPE_CHECKING, Literal, Optional @@ -7,6 +8,8 @@ from browser_use.observability import observe_debug from browser_use.utils import is_new_tab_page, sanitize_surrogates +logger = logging.getLogger(__name__) + if TYPE_CHECKING: from browser_use.agent.views import AgentStepInfo from browser_use.browser.views import BrowserStateSummary @@ -214,12 +217,21 @@ def _get_browser_state_description(self) -> str: stats_text += '\n' elements_text = self.browser_state.dom_state.llm_representation(include_attributes=self.include_attributes) + + # Log DOM size before truncation + original_dom_size = len(elements_text) + dom_tokens_estimate = original_dom_size // 4 if len(elements_text) > self.max_clickable_elements_length: elements_text = elements_text[: self.max_clickable_elements_length] truncated_text = f' (truncated to {self.max_clickable_elements_length} characters)' + logger.info( + '📊 DOM state: %d chars (~%d tokens) truncated to %d chars (~%d tokens)', + original_dom_size, dom_tokens_estimate, self.max_clickable_elements_length, self.max_clickable_elements_length // 4 + ) else: truncated_text = '' + logger.info('📊 DOM state: %d chars (~%d tokens)', original_dom_size, dom_tokens_estimate) has_content_above = False has_content_below = False @@ -400,10 +412,54 @@ def get_user_message(self, use_vision: bool = True) -> UserMessage: # Sanitize surrogates from all text content state_description = sanitize_surrogates(state_description) - # Check if we have images to include (from read_file action) - has_images = bool(self.read_state_images) + # Log token usage breakdown for debugging + agent_history_len = len(self.agent_history_description) if self.agent_history_description else 0 + browser_state_len = len(self._get_browser_state_description()) + agent_state_len = len(self._get_agent_state_description()) + read_state_len = len(self.read_state_description) if self.read_state_description else 0 + total_len = len(state_description) + + # Rough token estimate (1 token ≈ 4 characters) + logger.info( + '📊 Token breakdown (chars): agent_history=%d (~%d tokens), browser_state=%d (~%d tokens), ' + 'agent_state=%d (~%d tokens), read_state=%d (~%d tokens), total=%d (~%d tokens)', + agent_history_len, agent_history_len // 4, + browser_state_len, browser_state_len // 4, + agent_state_len, agent_state_len // 4, + read_state_len, read_state_len // 4, + total_len, total_len // 4 + ) + + # Check if we have images to include + # When use_vision=False, exclude ALL images (screenshots, sample_images, read_state_images) + has_read_state_images = bool(self.read_state_images) + has_sample_images = bool(self.sample_images) + has_screenshots = bool(self.screenshots) + + # Include images only if use_vision is not False and we have images + # When use_vision=False, never use vision (even for read_state_images from read_file) + should_use_vision = ( + use_vision is not False and + (has_screenshots or has_sample_images or has_read_state_images) + ) + + # Debug logging + if should_use_vision: + logger.info( + '⚠️ AgentMessagePrompt: Vision ENABLED - use_vision=%s, screenshots=%d, sample_images=%d, read_state_images=%d', + use_vision, len(self.screenshots) if self.screenshots else 0, + len(self.sample_images) if self.sample_images else 0, + len(self.read_state_images) if self.read_state_images else 0 + ) + else: + logger.info( + '✅ AgentMessagePrompt: Vision DISABLED - use_vision=%s, screenshots=%d, sample_images=%d, read_state_images=%d', + use_vision, len(self.screenshots) if self.screenshots else 0, + len(self.sample_images) if self.sample_images else 0, + len(self.read_state_images) if self.read_state_images else 0 + ) - if (use_vision is True and self.screenshots) or has_images: + if should_use_vision: # Start with text description content_parts: list[ContentPartTextParam | ContentPartImageParam] = [ContentPartTextParam(text=state_description)] @@ -412,28 +468,28 @@ def get_user_message(self, use_vision: bool = True) -> UserMessage: # Add screenshots with labels for i, screenshot in enumerate(self.screenshots): - if i == len(self.screenshots) - 1: - label = 'Current screenshot:' - else: - # Use simple, accurate labeling since we don't have actual step timing info - label = 'Previous screenshot:' - - # Add label as text content - content_parts.append(ContentPartTextParam(text=label)) - - # Resize screenshot if llm_screenshot_size is configured - processed_screenshot = self._resize_screenshot(screenshot) - - # Add the screenshot - content_parts.append( - ContentPartImageParam( - image_url=ImageURL( - url=f'data:image/png;base64,{processed_screenshot}', - media_type='image/png', - detail=self.vision_detail_level, - ), + if i == len(self.screenshots) - 1: + label = 'Current screenshot:' + else: + # Use simple, accurate labeling since we don't have actual step timing info + label = 'Previous screenshot:' + + # Add label as text content + content_parts.append(ContentPartTextParam(text=label)) + + # Resize screenshot if llm_screenshot_size is configured + processed_screenshot = self._resize_screenshot(screenshot) + + # Add the screenshot + content_parts.append( + ContentPartImageParam( + image_url=ImageURL( + url=f'data:image/png;base64,{processed_screenshot}', + media_type='image/png', + detail=self.vision_detail_level, + ), + ) ) - ) # Add read_state images (from read_file action) before screenshots for img_data in self.read_state_images: diff --git a/browser_use/agent/service.py b/browser_use/agent/service.py index 448c410bc8..b4d65fa8e0 100644 --- a/browser_use/agent/service.py +++ b/browser_use/agent/service.py @@ -1011,10 +1011,15 @@ async def _prepare_context(self, step_info: AgentStepInfo | None = None) -> Brow assert self.browser_session is not None, 'BrowserSession is not set up' self.logger.debug(f'🌐 Step {self.state.n_steps}: Getting browser state...') - # Always take screenshots for all steps - self.logger.debug('📸 Requesting browser state with include_screenshot=True') + # Only capture screenshots if use_vision is not False + # When use_vision=False, skip screenshot capture entirely to save resources + include_screenshot = self.settings.use_vision is not False + if include_screenshot: + self.logger.debug('📸 Requesting browser state with include_screenshot=True') + else: + self.logger.debug('📸 Skipping screenshot capture (use_vision=False)') browser_state_summary = await self.browser_session.get_browser_state_summary( - include_screenshot=True, # always capture even if use_vision=False so that cloud sync is useful (it's fast now anyway) + include_screenshot=include_screenshot, include_recent_events=self.include_recent_events, ) if browser_state_summary.screenshot: @@ -1043,6 +1048,38 @@ async def _prepare_context(self, step_info: AgentStepInfo | None = None) -> Brow if self.skill_service is not None: unavailable_skills_info = await self._get_unavailable_skills_info() + # Inject Sentience semantic geometry inventory (if available) + # This gives the LLM access to semantic element IDs and bbox coordinates + try: + from browser_use.integrations.sentience.state_injector import build_sentience_state + + sent_state = await build_sentience_state(self.browser_session) + if sent_state: + # Add Sentience element inventory to LLM context for this step + self._message_manager._add_context_message(UserMessage(content=sent_state.prompt_block)) + + # Log injection details + element_count = len(sent_state.snapshot.elements) + prompt_size = len(sent_state.prompt_block) + # Show sample of first few elements + lines = sent_state.prompt_block.split("\n") + sample_lines = lines[3:8] if len(lines) > 8 else lines[3:] # Skip header, show 5 elements + sample = "\n".join(sample_lines) if sample_lines else "" + + self.logger.info( + f"🧠 Sentience: Injected {element_count} semantic elements ({prompt_size} chars) into LLM context" + ) + if sample: + self.logger.debug(f" Sample elements:\n{sample}") + else: + self.logger.debug("Sentience: No snapshot available (extension may not be loaded)") + except ImportError: + # Sentience SDK not installed, skip silently + self.logger.debug("Sentience: SDK not installed, skipping") + except Exception as e: + # Extension not loaded or snapshot failed, log at debug level + self.logger.debug(f"Sentience: State injection skipped: {e}") + self._message_manager.create_state_messages( browser_state_summary=browser_state_summary, model_output=self.state.last_model_output, diff --git a/browser_use/browser/session.py b/browser_use/browser/session.py index 7977cf5170..ab42f337e5 100644 --- a/browser_use/browser/session.py +++ b/browser_use/browser/session.py @@ -1315,6 +1315,15 @@ async def get_browser_state_summary( if include_screenshot and not self._cached_browser_state_summary.screenshot: self.logger.debug('⚠️ Cached browser state has no screenshot, fetching fresh state with screenshot') # Fall through to fetch fresh state with screenshot + elif not include_screenshot and self._cached_browser_state_summary.screenshot: + # If we don't want a screenshot but cached state has one, create a copy without screenshot + from dataclasses import replace + cached_copy = replace( + self._cached_browser_state_summary, + screenshot=None, # Remove screenshot when not requested + ) + self.logger.debug('🔄 Using pre-cached browser state summary (screenshot removed per request)') + return cached_copy elif selector_map and len(selector_map) > 0: self.logger.debug('🔄 Using pre-cached browser state summary for open tab') return self._cached_browser_state_summary diff --git a/browser_use/integrations/sentience/__init__.py b/browser_use/integrations/sentience/__init__.py new file mode 100644 index 0000000000..f1cc225358 --- /dev/null +++ b/browser_use/integrations/sentience/__init__.py @@ -0,0 +1,5 @@ +"""Sentience integration for browser-use Agent.""" + +from .state_injector import build_sentience_state, format_snapshot_for_llm + +__all__ = ["build_sentience_state", "format_snapshot_for_llm"] diff --git a/browser_use/integrations/sentience/state_injector.py b/browser_use/integrations/sentience/state_injector.py new file mode 100644 index 0000000000..4d966cb1a8 --- /dev/null +++ b/browser_use/integrations/sentience/state_injector.py @@ -0,0 +1,249 @@ +"""Inject Sentience semantic geometry into Agent context.""" + +from __future__ import annotations + +import logging +import os +from dataclasses import dataclass +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from browser_use.browser.session import BrowserSession + from sentience.models import Snapshot + +logger = logging.getLogger(__name__) + + +@dataclass +class SentienceState: + """Sentience state with snapshot and formatted prompt block.""" + + url: str + snapshot: "Snapshot" + prompt_block: str + + +def format_snapshot_for_llm(snapshot: "Snapshot", top_by_importance: int = 40, top_from_dominant_group: int = 15, top_by_position: int = 10) -> str: + """ + Format Sentience snapshot for LLM consumption. + + Creates an ultra-compact inventory of interactive elements optimized for minimal token usage. + Selects top elements by importance + top elements from dominant group for ordinal tasks. + + Args: + snapshot: Sentience Snapshot object + top_by_importance: Number of top elements by importance to include (default: 20) + top_from_dominant_group: Number of top elements from dominant group to include (default: 15) + top_by_position: Number of top elements by position (lowest doc_y) to include (default: 10) + + Returns: + Formatted string for LLM prompt with format: ID|role|text|imp|docYq|ord|DG|href + """ + # Filter to interactive elements only (buttons, links, inputs, etc.) + interactive_roles = { + "button", "link", "textbox", "searchbox", "combobox", "checkbox", + "radio", "slider", "tab", "menuitem", "option", "switch", "cell" + } + + dominant_group_key = snapshot.dominant_group_key or "" + + # Extract and filter interactive elements + interactive_elements = [] + for el in snapshot.elements: + # Get role (prefer role, fallback to tag) + role = getattr(el, "role", None) or getattr(el, "tag", None) or "" + + # Skip non-interactive elements + if role.lower() not in interactive_roles and role.lower() not in {"a", "button", "input", "select", "textarea"}: + continue + + interactive_elements.append(el) + + # Sort by importance (descending) for importance-based selection + interactive_elements.sort(key=lambda el: getattr(el, "importance", 0), reverse=True) + + # Get top N by importance (track by ID for deduplication) + top_by_imp_ids = set() + top_by_imp = [] + for el in interactive_elements[:top_by_importance]: + el_id = getattr(el, "id", None) + if el_id and el_id not in top_by_imp_ids: + top_by_imp_ids.add(el_id) + top_by_imp.append(el) + + # Get top elements from dominant group (sorted by group_index for ordinal tasks) + dominant_group_elements = [ + el for el in interactive_elements + if getattr(el, "group_key", "") == dominant_group_key + ] + dominant_group_elements.sort(key=lambda el: getattr(el, "group_index", 999)) + + # Get top N by position (lowest doc_y = top of page) - critical for ordinal tasks + # Sort by doc_y ascending (smaller = higher on page) + elements_by_position = sorted( + interactive_elements, + key=lambda el: (getattr(el, "doc_y", 0) or 0, getattr(el, "importance", 0)) + ) + + # Combine all selections (deduplicate by element ID) + selected_elements = top_by_imp.copy() + selected_ids = top_by_imp_ids.copy() + + # Add dominant group elements + for el in dominant_group_elements[:top_from_dominant_group]: + el_id = getattr(el, "id", None) + if el_id and el_id not in selected_ids: + selected_ids.add(el_id) + selected_elements.append(el) + + # Add top elements by position (ensures we capture items at top of page) + for el in elements_by_position[:top_by_position]: + el_id = getattr(el, "id", None) + if el_id and el_id not in selected_ids: + selected_ids.add(el_id) + selected_elements.append(el) + + # Format lines with pre-encoded compact fields + lines = [] + for el in selected_elements: + # Get role (prefer role, fallback to tag) + role = getattr(el, "role", None) or getattr(el, "tag", None) or "" + + # Get name/text (truncate aggressively) + name = (getattr(el, "name", None) or getattr(el, "text", None) or "").strip() + if len(name) > 30: # Aggressive truncation + name = name[:27] + "..." + + # Extract fields + importance = int(getattr(el, "importance", 0)) + doc_y = getattr(el, "doc_y", 0) or 0 + group_key = getattr(el, "group_key", "") or "" + group_index = getattr(el, "group_index", 0) or 0 + print(f"group_key: {group_key}, dominant_group_key: {dominant_group_key}") + + # Pre-encode fields for compactness + # docYq: bucketed doc_y (round to nearest 200 for smaller numbers) + doc_yq = int(round(doc_y / 200)) if doc_y else 0 + + # Phase 3.2: Use pre-computed in_dominant_group field (uses fuzzy matching) + # This is computed by the gateway so we don't need to implement fuzzy matching here + in_dg = getattr(el, "in_dominant_group", None) + if in_dg is None: + # Fallback for older gateway versions: use exact string match + in_dg = group_key == dominant_group_key if dominant_group_key else False + + # ord: group_index if in dominant group, else "-" + ord_val = group_index if in_dg else "-" + + # DG: 1 if dominant group, else 0 + dg_flag = "1" if in_dg else "0" + + # href: short token (domain or last path segment, or blank) + href = "" + el_href = getattr(el, "href", None) + if el_href: + try: + from urllib.parse import urlparse + parsed = urlparse(el_href) + if parsed.netloc: + href = parsed.netloc.split(".")[-2] if "." in parsed.netloc else parsed.netloc[:10] + elif parsed.path: + href = parsed.path.split("/")[-1][:10] or "item" + except Exception: + href = "item" + + # Ultra-compact format: ID|role|text|imp|docYq|ord|DG|href + cur_line = f"{el.id}|{role}|{name}|{importance}|{doc_yq}|{ord_val}|{dg_flag}|{href}" + lines.append(cur_line) + + logger.debug( + "Formatted %d elements (top %d by importance + top %d from dominant group + top %d by position)", + len(lines), + top_by_importance, + top_from_dominant_group, + top_by_position, + ) + + return "\n".join(lines) + + +async def build_sentience_state( + browser_session: "BrowserSession", +) -> Optional[SentienceState]: + """ + Build Sentience state from browser session. + + Takes a snapshot using the Sentience extension and formats it for LLM consumption. + If snapshot fails (extension not loaded, timeout, etc.), returns None. + + Args: + browser_session: Browser-use BrowserSession instance + + Returns: + SentienceState with snapshot and formatted prompt, or None if snapshot failed + """ + try: + # Import here to avoid requiring sentience as a hard dependency + from sentience.backends import BrowserUseAdapter + from sentience.backends.snapshot import snapshot + from sentience.models import SnapshotOptions + + # Create adapter and backend + adapter = BrowserUseAdapter(browser_session) + backend = await adapter.create_backend() + + # Give extension a moment to inject (especially after navigation) + # The snapshot() call has its own timeout, but a small delay helps + import asyncio + await asyncio.sleep(0.5) + + # Get API key from environment if available + api_key = os.getenv("SENTIENCE_API_KEY") + # Limit to 50 interactive elements to minimize token usage + # Only interactive elements are included in the formatted output + if api_key: + options = SnapshotOptions(use_api=True, sentience_api_key=api_key, limit=50, show_overlay=True, goal="Click the first ShowHN link") # Get more, filter to ~50 interactive + else: + options = SnapshotOptions(limit=50, show_overlay=True, goal="Click the first ShowHN link") # Get more, filter to ~50 interactive + + # Take snapshot with retry logic (extension may need time to inject after navigation) + max_retries = 2 + for attempt in range(max_retries): + try: + snap = await snapshot(backend, options=options) + break # Success + except Exception: + if attempt < max_retries - 1: + # Wait a bit longer before retry + logger.debug("Sentience snapshot attempt %d failed, retrying...", attempt + 1) + await asyncio.sleep(1.0) + else: + raise # Re-raise on final attempt + + # Get URL from snapshot or browser state + url = getattr(snap, "url", "") or "" + + # Format for LLM (top 20 by importance + top 15 from dominant group + top 10 by position) + formatted = format_snapshot_for_llm(snap, top_by_importance=60, top_from_dominant_group=15, top_by_position=10) + print(f"formatted: {formatted}") + + # Ultra-compact per-step prompt (minimal token usage) + # Format: ID|role|text|imp|docYq|ord|DG|href + # Rules: ordinal→DG=1 then ord asc; otherwise imp desc. Use click(ID)/input_text(ID,...). + prompt = ( + "Elements: ID|role|text|imp|docYq|ord|DG|href\n" + "Rules: ordinal→DG=1 then ord asc; otherwise imp desc. Use click(ID)/input_text(ID,...).\n" + f"{formatted}" + ) + + + logger.info("✅ Sentience snapshot: %d elements, URL: %s", len(snap.elements), url) + return SentienceState(url=url, snapshot=snap, prompt_block=prompt) + + except ImportError: + logger.warning("⚠️ Sentience SDK not available, skipping snapshot") + return None + except Exception as e: + # Log warning if extension not loaded or snapshot fails + logger.warning("⚠️ Sentience snapshot skipped: %s", e) + return None diff --git a/playground/example.py b/playground/example.py new file mode 100644 index 0000000000..2ee1e54d51 --- /dev/null +++ b/playground/example.py @@ -0,0 +1,82 @@ +from browser_use import Agent, BrowserProfile, ChatBrowserUse +from dotenv import load_dotenv +import asyncio +import glob +from pathlib import Path + +# Sentience SDK imports +from sentience import get_extension_dir + +load_dotenv() + +async def main(): + # Find Playwright browser to avoid password prompt + playwright_path = Path.home() / "Library/Caches/ms-playwright" + chromium_patterns = [ + playwright_path / "chromium-*/chrome-mac*/Google Chrome for Testing.app/Contents/MacOS/Google Chrome for Testing", + playwright_path / "chromium-*/chrome-mac*/Chromium.app/Contents/MacOS/Chromium", + ] + + executable_path = None + for pattern in chromium_patterns: + matches = glob.glob(str(pattern)) + if matches: + matches.sort() + executable_path = matches[-1] # Use latest version + if Path(executable_path).exists(): + print(f"✅ Found Playwright browser: {executable_path}") + break + + if not executable_path: + print("⚠️ Playwright browser not found, browser-use will try to install it") + + # Get Sentience extension path + sentience_ext_path = get_extension_dir() + print(f"Loading Sentience extension from: {sentience_ext_path}") + + # Get default extension paths and combine with Sentience extension + # Chrome only uses the LAST --load-extension arg, so we must combine all extensions + all_extension_paths = [sentience_ext_path] + + # Create a temporary profile to ensure default extensions are downloaded + temp_profile = BrowserProfile(enable_default_extensions=True) + default_ext_paths = temp_profile._ensure_default_extensions_downloaded() + + if default_ext_paths: + all_extension_paths.extend(default_ext_paths) + print(f"Found {len(default_ext_paths)} default extensions") + + # Combine all extensions into a single --load-extension arg + combined_extensions = ",".join(all_extension_paths) + print(f"Loading {len(all_extension_paths)} extensions total (including Sentience)") + + # Create browser profile with ALL extensions combined + browser_profile = BrowserProfile( + executable_path=executable_path, # Use Playwright browser if found + enable_default_extensions=False, # Disable auto-loading, we'll load manually + args=[ + "--enable-extensions", + "--disable-extensions-file-access-check", + "--disable-extensions-http-throttling", + "--extensions-on-chrome-urls", + f"--load-extension={combined_extensions}", # Load ALL extensions together + ], + ) + + # Create agent with Sentience-enabled browser + llm = ChatBrowserUse() + task = "Find the number 1 post on Show HN" + agent = Agent( + task=task, + llm=llm, + browser_profile=browser_profile, + calculate_cost=True, + use_vision=False # Disable screenshots to reduce token usage (Sentience provides semantic geometry) + ) + history = await agent.run() + print(f"Token usage: {history.usage}") + usage_summary = await agent.token_cost_service.get_usage_summary() + print(f"Usage summary: {usage_summary}") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/playground/sentience_basic.py b/playground/sentience_basic.py new file mode 100644 index 0000000000..a894d0fcc6 --- /dev/null +++ b/playground/sentience_basic.py @@ -0,0 +1,270 @@ +""" +Basic example: Sentience extension with browser-use. + +This example demonstrates: +1. Loading the Sentience Chrome extension in browser-use +2. Taking a snapshot to detect page elements +3. Using semantic queries to find elements +4. Clicking on elements using grounded coordinates + +Requirements: + pip install browser-use sentienceapi + +Usage: + python playground/sentience_basic.py +""" + +import asyncio +import glob +import logging +import os +import sys +from pathlib import Path + +# Enable debug logging for sentience +logging.basicConfig(level=logging.DEBUG, format='%(name)s - %(levelname)s - %(message)s') + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from dotenv import load_dotenv + +load_dotenv() + +from browser_use.browser import BrowserProfile, BrowserSession + +# Sentience SDK imports +from sentience import find, get_extension_dir, query +from sentience.backends import ( + BrowserUseAdapter, + ExtensionNotLoadedError, + click, + snapshot, + type_text, +) + + +def log(msg: str) -> None: + """Print with flush for immediate output.""" + print(msg, flush=True) + + +async def main(): + """Demo: Use Sentience grounding with browser-use to search Google.""" + + # Get path to Sentience extension + sentience_ext_path = get_extension_dir() + log(f"Loading Sentience extension from: {sentience_ext_path}") + + # Verify extension exists + if not os.path.exists(sentience_ext_path): + raise FileNotFoundError(f"Sentience extension not found at: {sentience_ext_path}") + if not os.path.exists(os.path.join(sentience_ext_path, "manifest.json")): + raise FileNotFoundError(f"Sentience extension manifest not found at: {sentience_ext_path}/manifest.json") + log(f"✅ Sentience extension verified at: {sentience_ext_path}") + + # Find Playwright browser to avoid password prompt + playwright_path = Path.home() / "Library/Caches/ms-playwright" + chromium_patterns = [ + playwright_path / "chromium-*/chrome-mac*/Google Chrome for Testing.app/Contents/MacOS/Google Chrome for Testing", + playwright_path / "chromium-*/chrome-mac*/Chromium.app/Contents/MacOS/Chromium", + ] + + executable_path = None + for pattern in chromium_patterns: + matches = glob.glob(str(pattern)) + if matches: + matches.sort() + executable_path = matches[-1] # Use latest version + if Path(executable_path).exists(): + log(f"✅ Found Playwright browser: {executable_path}") + break + + if not executable_path: + log("⚠️ Playwright browser not found, browser-use will try to install it") + + # Get default extension paths and combine with Sentience extension + # Chrome only uses the LAST --load-extension arg, so we must combine all extensions + log("Collecting all extension paths...") + all_extension_paths = [sentience_ext_path] + + # Create a temporary profile to ensure default extensions are downloaded + # This ensures extensions exist before we try to load them + temp_profile = BrowserProfile(enable_default_extensions=True) + default_ext_paths = temp_profile._ensure_default_extensions_downloaded() + + if default_ext_paths: + all_extension_paths.extend(default_ext_paths) + log(f" ✅ Found {len(default_ext_paths)} default extensions") + else: + log(" ⚠️ No default extensions found (this is OK, Sentience will still work)") + + log(f"Total extensions to load: {len(all_extension_paths)} (including Sentience)") + + # Combine all extensions into a single --load-extension arg + combined_extensions = ",".join(all_extension_paths) + log(f"Combined extension paths (first 100 chars): {combined_extensions[:100]}...") + + # Create browser profile with ALL extensions combined + # Strategy: Disable default extensions, manually load all together + profile = BrowserProfile( + headless=False, # Run with visible browser for demo + executable_path=executable_path, # Use Playwright browser if found + enable_default_extensions=False, # Disable auto-loading, we'll load manually + ignore_default_args=[ + "--enable-automation", + "--disable-extensions", # Important: don't disable extensions + "--hide-scrollbars", + # Don't disable component extensions - we need background pages for Sentience + ], + args=[ + "--enable-extensions", + "--disable-extensions-file-access-check", # Allow extension file access + "--disable-extensions-http-throttling", # Don't throttle extension HTTP + "--extensions-on-chrome-urls", # Allow extensions on chrome:// URLs + f"--load-extension={combined_extensions}", # Load ALL extensions together + ], + ) + + log("Browser profile configured with Sentience extension") + + # Start browser session + log("Creating BrowserSession...") + session = BrowserSession(browser_profile=profile) + log("Starting browser session (this may take a moment)...") + try: + await session.start() + log("✅ Browser session started successfully") + except Exception as e: + log(f"❌ Error starting browser session: {e}") + import traceback + log(traceback.format_exc()) + return + + try: + # Navigate to Google + log("Getting current page...") + try: + page = await session.get_current_page() + log(f"✅ Got page: {page}") + except Exception as e: + log(f"❌ Error getting page: {e}") + import traceback + log(traceback.format_exc()) + return + + log("Navigating to Google...") + try: + await page.goto("https://www.google.com") + log("✅ Navigated to Google") + except Exception as e: + log(f"❌ Error navigating to Google: {e}") + import traceback + log(traceback.format_exc()) + return + + # Wait for page to settle + log("Waiting 2 seconds for page to settle...") + await asyncio.sleep(2) + log("Done waiting") + + # Create Sentience adapter and backend + log("Creating Sentience adapter...") + adapter = BrowserUseAdapter(session) + log("Creating backend...") + backend = await adapter.create_backend() + log("Created Sentience backend") + + # Give extension more time to initialize after page load + log("Waiting for extension to initialize...") + await asyncio.sleep(1) + + # Take a snapshot using Sentience extension + try: + log("Taking snapshot (this waits for extension to inject)...") + + # Enhanced diagnostics before snapshot + log("Checking extension injection status...") + diag = await backend.eval(""" + (() => { + const hasSentience = typeof window.sentience !== 'undefined'; + const hasSnapshot = hasSentience && typeof window.sentience.snapshot === 'function'; + const extId = document.documentElement.dataset.sentienceExtensionId || null; + return { + window_sentience: hasSentience, + window_sentience_snapshot: hasSnapshot, + extension_id_attr: extId, + url: window.location.href, + ready_state: document.readyState + }; + })() + """) + log(f"Extension diagnostics: {diag}") + + if not diag.get("window_sentience"): + log("⚠️ window.sentience not found - extension may not have injected yet") + log(" This can happen if:") + log(" 1. Extension wasn't loaded in browser args") + log(" 2. Page loaded before extension could inject") + log(" 3. Content Security Policy is blocking the extension") + log(" Waiting for extension injection (up to 10 seconds)...") + else: + log("✅ window.sentience found!") + + snap = await snapshot(backend) + log(f"✅ Snapshot taken: {len(snap.elements)} elements found") + except ExtensionNotLoadedError as e: + log(f"❌ Extension not loaded error:") + log(f" {e}") + if hasattr(e, 'diagnostics') and e.diagnostics: + log(f" Diagnostics: {e.diagnostics.to_dict()}") + log("\nTroubleshooting steps:") + log("1. Verify extension path exists and contains manifest.json") + log(f"2. Check browser console for extension errors") + log("3. Try increasing timeout in snapshot() call") + log("4. Ensure --enable-extensions is in browser args") + return + + # Find the search input using semantic query + # Google's search box has role=combobox or role=textbox + search_input = find(snap, 'role=combobox[name*="Search"]') + if not search_input: + search_input = find(snap, 'role=textbox[name*="Search"]') + + if search_input: + print(f"Found search input: {search_input.role} at {search_input.bbox}") + + # Click on the search input using grounded coordinates + await click(backend, search_input.bbox) + print("Clicked on search input") + + # Type a search query + await type_text(backend, "Sentience AI browser automation") + print("Typed search query") + + # Take another snapshot after typing + await asyncio.sleep(1) + snap2 = await snapshot(backend) + print(f"After typing: {len(snap2.elements)} elements") + + # Find and click the search button + search_btn = find(snap2, 'role=button[name*="Search"]') + if search_btn: + await click(backend, search_btn.bbox) + print("Clicked search button") + else: + print("Could not find search input") + # List all textbox/combobox elements for debugging + textboxes = query(snap, "role=textbox") + comboboxes = query(snap, "role=combobox") + print(f"Found {len(textboxes)} textboxes, {len(comboboxes)} comboboxes") + + # Keep browser open for inspection + print("\nPress Enter to close browser...") + input() + + finally: + await session.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/playground/sentience_cached_snapshot.py b/playground/sentience_cached_snapshot.py new file mode 100644 index 0000000000..4e52f3f605 --- /dev/null +++ b/playground/sentience_cached_snapshot.py @@ -0,0 +1,147 @@ +""" +Advanced example: Sentience CachedSnapshot for efficient action loops. + +This example demonstrates: +1. Using CachedSnapshot to reduce redundant snapshot calls +2. The invalidate() pattern after DOM-modifying actions +3. Scrolling and finding elements across multiple snapshots +4. Element grounding with BBox coordinates + +Requirements: + pip install browser-use sentienceapi + +Usage: + python playground/sentience_cached_snapshot.py +""" + +import asyncio +import os +import sys + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from dotenv import load_dotenv + +load_dotenv() + +from browser_use.browser import BrowserProfile, BrowserSession + +# Sentience SDK imports +from sentience import find, get_extension_dir, query +from sentience.backends import ( + BrowserUseAdapter, + CachedSnapshot, + ExtensionNotLoadedError, + click, + scroll, + snapshot, + type_text, +) + + +async def main(): + """Demo: CachedSnapshot for efficient element grounding.""" + + extension_path = get_extension_dir() + print(f"Sentience extension: {extension_path}") + + profile = BrowserProfile( + headless=False, + args=[ + f"--load-extension={extension_path}", + f"--disable-extensions-except={extension_path}", + ], + ) + + session = BrowserSession(browser_profile=profile) + await session.start() + + try: + # Navigate to a page with many elements + page = await session.get_current_page() + await page.goto("https://news.ycombinator.com") + print("Navigated to Hacker News") + await asyncio.sleep(2) + + # Create Sentience backend + adapter = BrowserUseAdapter(session) + backend = await adapter.create_backend() + + # Create cached snapshot with 2-second freshness + cache = CachedSnapshot(backend, max_age_ms=2000) + + # Take initial snapshot (cached) + snap1 = await cache.get() + print(f"Initial snapshot: {len(snap1.elements)} elements") + print(f"Cache age: {cache.age_ms:.0f}ms") + + # Second call uses cached version (no extension call) + snap2 = await cache.get() + print(f"Cached snapshot: {len(snap2.elements)} elements") + print(f"Cache age: {cache.age_ms:.0f}ms") + assert snap1 is snap2, "Should be same cached instance" + + # Find all links on the page + links = query(snap1, "role=link") + print(f"Found {len(links)} links on page") + + # Find the first story link (links with numeric index have class 'storylink' historically) + story_links = [el for el in links if el.name and len(el.name) > 10] + if story_links: + print(f"\nFirst few story titles:") + for link in story_links[:3]: + print(f" - {link.name[:50]}...") + + # Scroll down the page + print("\nScrolling down...") + await scroll(backend, delta_y=500) + + # After scroll, cache should still be valid (scroll doesn't change DOM) + # But if we want fresh element positions, we force refresh + cache.invalidate() # Manual invalidation + print("Cache invalidated after scroll") + + # Take fresh snapshot to get updated element positions + snap3 = await cache.get() + print(f"Fresh snapshot after scroll: {len(snap3.elements)} elements") + print(f"Cache age: {cache.age_ms:.0f}ms") + + # Demonstrate force_refresh parameter + snap4 = await cache.get(force_refresh=True) + print(f"Force refresh: {len(snap4.elements)} elements") + + # Find the "More" link at bottom + more_link = find(snap4, 'role=link[name="More"]') + if more_link: + print(f"\nFound 'More' link at: {more_link.bbox}") + + # Click to load next page + await click(backend, more_link.bbox) + print("Clicked 'More' link") + + # Invalidate cache after navigation + cache.invalidate() + + # Wait for new content + await asyncio.sleep(2) + + # Take snapshot of new page + snap5 = await cache.get() + print(f"New page snapshot: {len(snap5.elements)} elements") + + # Demo: Print cache statistics + print("\n--- Cache Usage Pattern ---") + print("1. Take initial snapshot: cache.get()") + print("2. Reuse for multiple queries: find(snap, ...), query(snap, ...)") + print("3. After DOM changes: cache.invalidate()") + print("4. Get fresh data: cache.get()") + + print("\nPress Enter to close browser...") + input() + + finally: + await session.close() + + +if __name__ == "__main__": + asyncio.run(main())