diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 419f2fd..3ccdbdf 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,6 +32,24 @@ jobs: - name: Install dependencies run: | pip install -e ".[dev]" + pip install pre-commit mypy types-requests + + - name: Lint with pre-commit + continue-on-error: true + run: | + pre-commit run --all-files + + - name: Type check with mypy + continue-on-error: true + run: | + mypy sentience --ignore-missing-imports --no-strict-optional + + - name: Check code style + continue-on-error: true + run: | + black --check sentience tests --line-length=100 + isort --check-only --profile black sentience tests + flake8 sentience tests --max-line-length=100 --extend-ignore=E203,W503,E501 --max-complexity=15 - name: Build extension (if needed) if: runner.os != 'Windows' diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7649ba7..7a4f356 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -50,20 +50,19 @@ repos: - '--max-complexity=15' exclude: ^(venv/|\.venv/|build/|dist/|tests/fixtures/) - # Type checking with mypy (disabled for now - too strict) - # Uncomment to enable strict type checking - # - repo: https://github.com/pre-commit/mirrors-mypy - # rev: v1.8.0 - # hooks: - # - id: mypy - # additional_dependencies: - # - pydantic>=2.0 - # - types-requests - # args: - # - '--ignore-missing-imports' - # - '--no-strict-optional' - # - '--warn-unused-ignores' - # exclude: ^(tests/|examples/|venv/|\.venv/|build/|dist/) + # Type checking with mypy + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.8.0 + hooks: + - id: mypy + additional_dependencies: + - pydantic>=2.0 + - types-requests + args: + - '--ignore-missing-imports' + - '--no-strict-optional' + - '--warn-unused-ignores' + exclude: ^(tests/|examples/|venv/|\.venv/|build/|dist/) # Security checks - repo: https://github.com/PyCQA/bandit diff --git a/sentience/__init__.py b/sentience/__init__.py index 20c337c..76458db 100644 --- a/sentience/__init__.py +++ b/sentience/__init__.py @@ -14,9 +14,6 @@ from .cloud_tracing import CloudTraceSink, SentienceLogger from .conversational_agent import ConversationalAgent from .expect import expect - -# Formatting (v0.12.0+) -from .formatting import format_snapshot_for_llm from .generator import ScriptGenerator, generate from .inspector import Inspector, inspect from .llm_provider import ( @@ -55,12 +52,14 @@ from .read import read from .recorder import Recorder, Trace, TraceStep, record from .screenshot import screenshot +from .sentience_methods import AgentAction, SentienceMethod from .snapshot import snapshot from .text_search import find_text_rect from .tracer_factory import SENTIENCE_API_URL, create_tracer from .tracing import JsonlTraceSink, TraceEvent, Tracer, TraceSink # Utilities (v0.12.0+) +# Import from utils package (re-exports from submodules for backward compatibility) from .utils import ( canonical_snapshot_loose, canonical_snapshot_strict, @@ -68,6 +67,9 @@ save_storage_state, sha256_digest, ) + +# Formatting (v0.12.0+) +from .utils.formatting import format_snapshot_for_llm from .wait import wait_for __version__ = "0.91.1" @@ -150,4 +152,7 @@ "format_snapshot_for_llm", # Agent Config (v0.12.0+) "AgentConfig", + # Enums + "SentienceMethod", + "AgentAction", ] diff --git a/sentience/action_executor.py b/sentience/action_executor.py new file mode 100644 index 0000000..c95f29b --- /dev/null +++ b/sentience/action_executor.py @@ -0,0 +1,215 @@ +""" +Action Executor for Sentience Agent. + +Handles parsing and execution of action commands (CLICK, TYPE, PRESS, FINISH). +This separates action execution concerns from LLM interaction. +""" + +import re +from typing import Any, Union + +from .actions import click, click_async, press, press_async, type_text, type_text_async +from .browser import AsyncSentienceBrowser, SentienceBrowser +from .models import Snapshot +from .protocols import AsyncBrowserProtocol, BrowserProtocol + + +class ActionExecutor: + """ + Executes actions and handles parsing of action command strings. + + This class encapsulates all action execution logic, making it easier to: + - Test action execution independently + - Add new action types in one place + - Handle action parsing errors consistently + """ + + def __init__( + self, + browser: SentienceBrowser | AsyncSentienceBrowser | BrowserProtocol | AsyncBrowserProtocol, + ): + """ + Initialize action executor. + + Args: + browser: SentienceBrowser, AsyncSentienceBrowser, or protocol-compatible instance + (for testing, can use mock objects that implement BrowserProtocol) + """ + self.browser = browser + # Check if browser is async - support both concrete types and protocols + # Check concrete types first (most reliable) + if isinstance(browser, AsyncSentienceBrowser): + self._is_async = True + elif isinstance(browser, SentienceBrowser): + self._is_async = False + else: + # For protocol-based browsers, check if methods are actually async + # This is more reliable than isinstance checks which can match both protocols + import inspect + + start_method = getattr(browser, "start", None) + if start_method and inspect.iscoroutinefunction(start_method): + self._is_async = True + elif isinstance(browser, BrowserProtocol): + # If it implements BrowserProtocol and start is not async, it's sync + self._is_async = False + else: + # Default to sync for unknown types + self._is_async = False + + def execute(self, action_str: str, snap: Snapshot) -> dict[str, Any]: + """ + Parse action string and execute SDK call (synchronous). + + Args: + action_str: Action string from LLM (e.g., "CLICK(42)", "TYPE(15, \"text\")") + snap: Current snapshot (for context, currently unused but kept for API consistency) + + Returns: + Execution result dictionary with keys: + - success: bool + - action: str (e.g., "click", "type", "press", "finish") + - element_id: Optional[int] (for click/type actions) + - text: Optional[str] (for type actions) + - key: Optional[str] (for press actions) + - outcome: Optional[str] (action outcome) + - url_changed: Optional[bool] (for click actions) + - error: Optional[str] (if action failed) + - message: Optional[str] (for finish action) + + Raises: + ValueError: If action format is unknown + RuntimeError: If called on async browser (use execute_async instead) + """ + if self._is_async: + raise RuntimeError( + "ActionExecutor.execute() called on async browser. Use execute_async() instead." + ) + + # Parse CLICK(42) + if match := re.match(r"CLICK\s*\(\s*(\d+)\s*\)", action_str, re.IGNORECASE): + element_id = int(match.group(1)) + result = click(self.browser, element_id) # type: ignore + return { + "success": result.success, + "action": "click", + "element_id": element_id, + "outcome": result.outcome, + "url_changed": result.url_changed, + } + + # Parse TYPE(42, "hello world") + elif match := re.match( + r'TYPE\s*\(\s*(\d+)\s*,\s*["\']([^"\']*)["\']\s*\)', + action_str, + re.IGNORECASE, + ): + element_id = int(match.group(1)) + text = match.group(2) + result = type_text(self.browser, element_id, text) # type: ignore + return { + "success": result.success, + "action": "type", + "element_id": element_id, + "text": text, + "outcome": result.outcome, + } + + # Parse PRESS("Enter") + elif match := re.match(r'PRESS\s*\(\s*["\']([^"\']+)["\']\s*\)', action_str, re.IGNORECASE): + key = match.group(1) + result = press(self.browser, key) # type: ignore + return { + "success": result.success, + "action": "press", + "key": key, + "outcome": result.outcome, + } + + # Parse FINISH() + elif re.match(r"FINISH\s*\(\s*\)", action_str, re.IGNORECASE): + return { + "success": True, + "action": "finish", + "message": "Task marked as complete", + } + + else: + raise ValueError( + f"Unknown action format: {action_str}\n" + f'Expected: CLICK(id), TYPE(id, "text"), PRESS("key"), or FINISH()' + ) + + async def execute_async(self, action_str: str, snap: Snapshot) -> dict[str, Any]: + """ + Parse action string and execute SDK call (asynchronous). + + Args: + action_str: Action string from LLM (e.g., "CLICK(42)", "TYPE(15, \"text\")") + snap: Current snapshot (for context, currently unused but kept for API consistency) + + Returns: + Execution result dictionary (same format as execute()) + + Raises: + ValueError: If action format is unknown + RuntimeError: If called on sync browser (use execute() instead) + """ + if not self._is_async: + raise RuntimeError( + "ActionExecutor.execute_async() called on sync browser. Use execute() instead." + ) + + # Parse CLICK(42) + if match := re.match(r"CLICK\s*\(\s*(\d+)\s*\)", action_str, re.IGNORECASE): + element_id = int(match.group(1)) + result = await click_async(self.browser, element_id) # type: ignore + return { + "success": result.success, + "action": "click", + "element_id": element_id, + "outcome": result.outcome, + "url_changed": result.url_changed, + } + + # Parse TYPE(42, "hello world") + elif match := re.match( + r'TYPE\s*\(\s*(\d+)\s*,\s*["\']([^"\']*)["\']\s*\)', + action_str, + re.IGNORECASE, + ): + element_id = int(match.group(1)) + text = match.group(2) + result = await type_text_async(self.browser, element_id, text) # type: ignore + return { + "success": result.success, + "action": "type", + "element_id": element_id, + "text": text, + "outcome": result.outcome, + } + + # Parse PRESS("Enter") + elif match := re.match(r'PRESS\s*\(\s*["\']([^"\']+)["\']\s*\)', action_str, re.IGNORECASE): + key = match.group(1) + result = await press_async(self.browser, key) # type: ignore + return { + "success": result.success, + "action": "press", + "key": key, + "outcome": result.outcome, + } + + # Parse FINISH() + elif re.match(r"FINISH\s*\(\s*\)", action_str, re.IGNORECASE): + return { + "success": True, + "action": "finish", + "message": "Task marked as complete", + } + + else: + raise ValueError( + f"Unknown action format: {action_str}\n" + f'Expected: CLICK(id), TYPE(id, "text"), PRESS("key"), or FINISH()' + ) diff --git a/sentience/actions.py b/sentience/actions.py index 50c26bc..b928b00 100644 --- a/sentience/actions.py +++ b/sentience/actions.py @@ -1,3 +1,5 @@ +from typing import Optional + """ Actions v1 - click, type, press """ @@ -5,7 +7,9 @@ import time from .browser import AsyncSentienceBrowser, SentienceBrowser +from .browser_evaluator import BrowserEvaluator from .models import ActionResult, BBox, Snapshot +from .sentience_methods import SentienceMethod from .snapshot import snapshot, snapshot_async @@ -59,13 +63,8 @@ def click( # noqa: C901 else: # Fallback to JS click if element not found in snapshot try: - success = browser.page.evaluate( - """ - (id) => { - return window.sentience.click(id); - } - """, - element_id, + success = BrowserEvaluator.invoke( + browser.page, SentienceMethod.CLICK, element_id ) except Exception: # Navigation might have destroyed context, assume success if URL changed @@ -73,27 +72,13 @@ def click( # noqa: C901 except Exception: # Fallback to JS click on error try: - success = browser.page.evaluate( - """ - (id) => { - return window.sentience.click(id); - } - """, - element_id, - ) + success = BrowserEvaluator.invoke(browser.page, SentienceMethod.CLICK, element_id) except Exception: # Navigation might have destroyed context, assume success if URL changed success = True else: # Legacy JS-based click - success = browser.page.evaluate( - """ - (id) => { - return window.sentience.click(id); - } - """, - element_id, - ) + success = BrowserEvaluator.invoke(browser.page, SentienceMethod.CLICK, element_id) # Wait a bit for navigation/DOM updates try: diff --git a/sentience/agent.py b/sentience/agent.py index 81e71cc..deafbd0 100644 --- a/sentience/agent.py +++ b/sentience/agent.py @@ -5,14 +5,15 @@ import asyncio import hashlib -import re import time -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, Any, Optional, Union -from .actions import click, click_async, press, press_async, type_text, type_text_async +from .action_executor import ActionExecutor from .agent_config import AgentConfig from .base_agent import BaseAgent, BaseAgentAsync from .browser import AsyncSentienceBrowser, SentienceBrowser +from .element_filter import ElementFilter +from .llm_interaction_handler import LLMInteractionHandler from .llm_provider import LLMProvider, LLMResponse from .models import ( ActionHistory, @@ -24,12 +25,45 @@ SnapshotOptions, TokenStats, ) +from .protocols import AsyncBrowserProtocol, BrowserProtocol from .snapshot import snapshot, snapshot_async +from .trace_event_builder import TraceEventBuilder if TYPE_CHECKING: from .tracing import Tracer +def _safe_tracer_call( + tracer: Optional["Tracer"], method_name: str, verbose: bool, *args, **kwargs +) -> None: + """ + Safely call tracer method, catching and logging errors without breaking execution. + + Args: + tracer: Tracer instance or None + method_name: Name of tracer method to call (e.g., "emit", "emit_error") + verbose: Whether to print error messages + *args: Positional arguments for the tracer method + **kwargs: Keyword arguments for the tracer method + """ + if not tracer: + return + try: + method = getattr(tracer, method_name) + if args and kwargs: + method(*args, **kwargs) + elif args: + method(*args) + elif kwargs: + method(**kwargs) + else: + method() + except Exception as tracer_error: + # Tracer errors should not break agent execution + if verbose: + print(f"⚠️ Tracer error (non-fatal): {tracer_error}") + + class SentienceAgent(BaseAgent): """ High-level agent that combines Sentience SDK with any LLM provider. @@ -56,7 +90,7 @@ class SentienceAgent(BaseAgent): def __init__( self, - browser: SentienceBrowser, + browser: SentienceBrowser | BrowserProtocol, llm: LLMProvider, default_snapshot_limit: int = 50, verbose: bool = True, @@ -67,7 +101,8 @@ def __init__( Initialize Sentience Agent Args: - browser: SentienceBrowser instance + browser: SentienceBrowser instance or BrowserProtocol-compatible object + (for testing, can use mock objects that implement BrowserProtocol) llm: LLM provider (OpenAIProvider, AnthropicProvider, etc.) default_snapshot_limit: Default maximum elements to include in context (default: 50) verbose: Print execution logs (default: True) @@ -81,6 +116,10 @@ def __init__( self.tracer = tracer self.config = config or AgentConfig() + # Initialize handlers + self.llm_handler = LLMInteractionHandler(llm) + self.action_executor = ActionExecutor(browser) + # Screenshot sequence counter # Execution history self.history: list[dict[str, Any]] = [] @@ -151,7 +190,10 @@ def act( # noqa: C901 # Emit step_start trace event if tracer is enabled if self.tracer: pre_url = self.browser.page.url if self.browser.page else None - self.tracer.emit_step_start( + _safe_tracer_call( + self.tracer, + "emit_step_start", + self.verbose, step_id=step_id, step_index=self._step_count, goal=goal, @@ -198,17 +240,8 @@ def act( # noqa: C901 # Emit snapshot trace event if tracer is enabled if self.tracer: - # Include ALL elements with full data for DOM tree display - # Use snap.elements (all elements) not filtered_elements - elements_data = [el.model_dump() for el in snap.elements] - # Build snapshot event data - snapshot_data = { - "url": snap.url, - "element_count": len(snap.elements), - "timestamp": snap.timestamp, - "elements": elements_data, # Full element data for DOM tree - } + snapshot_data = TraceEventBuilder.build_snapshot_event(snap) # Always include screenshot in trace event for studio viewer compatibility # CloudTraceSink will extract and upload screenshots separately, then remove @@ -229,7 +262,10 @@ def act( # noqa: C901 if snap.screenshot_format: snapshot_data["screenshot_format"] = snap.screenshot_format - self.tracer.emit( + _safe_tracer_call( + self.tracer, + "emit", + self.verbose, "snapshot", snapshot_data, step_id=step_id, @@ -248,14 +284,17 @@ def act( # noqa: C901 ) # 2. GROUND: Format elements for LLM context - context = self._build_context(filtered_snap, goal) + context = self.llm_handler.build_context(filtered_snap, goal) # 3. THINK: Query LLM for next action - llm_response = self._query_llm(context, goal) + llm_response = self.llm_handler.query_llm(context, goal) # Emit LLM query trace event if tracer is enabled if self.tracer: - self.tracer.emit( + _safe_tracer_call( + self.tracer, + "emit", + self.verbose, "llm_query", { "prompt_tokens": llm_response.prompt_tokens, @@ -273,10 +312,10 @@ def act( # noqa: C901 self._track_tokens(goal, llm_response) # Parse action from LLM response - action_str = self._extract_action_from_response(llm_response.content) + action_str = self.llm_handler.extract_action(llm_response.content) # 4. EXECUTE: Parse and run action - result_dict = self._execute_action(action_str, filtered_snap) + result_dict = self.action_executor.execute(action_str, filtered_snap) duration_ms = int((time.time() - start_time) * 1000) @@ -316,7 +355,10 @@ def act( # noqa: C901 for el in filtered_snap.elements[:50] ] - self.tracer.emit( + _safe_tracer_call( + self.tracer, + "emit", + self.verbose, "action", { "action": result.action, @@ -423,32 +465,41 @@ def act( # noqa: C901 } # Build complete step_end event - step_end_data = { - "v": 1, - "step_id": step_id, - "step_index": self._step_count, - "goal": goal, - "attempt": attempt, - "pre": { - "url": pre_url, - "snapshot_digest": snapshot_digest, - }, - "llm": llm_data, - "exec": exec_data, - "post": { - "url": post_url, - }, - "verify": verify_data, - } + step_end_data = TraceEventBuilder.build_step_end_event( + step_id=step_id, + step_index=self._step_count, + goal=goal, + attempt=attempt, + pre_url=pre_url, + post_url=post_url, + snapshot_digest=snapshot_digest, + llm_data=llm_data, + exec_data=exec_data, + verify_data=verify_data, + ) - self.tracer.emit("step_end", step_end_data, step_id=step_id) + _safe_tracer_call( + self.tracer, + "emit", + self.verbose, + "step_end", + step_end_data, + step_id=step_id, + ) return result except Exception as e: # Emit error trace event if tracer is enabled if self.tracer: - self.tracer.emit_error(step_id=step_id, error=str(e), attempt=attempt) + _safe_tracer_call( + self.tracer, + "emit_error", + self.verbose, + step_id=step_id, + error=str(e), + attempt=attempt, + ) if attempt < max_retries: if self.verbose: @@ -477,187 +528,6 @@ def act( # noqa: C901 ) raise RuntimeError(f"Failed after {max_retries} retries: {e}") - def _build_context(self, snap: Snapshot, goal: str) -> str: - """ - Convert snapshot elements to token-efficient prompt string - - Format: [ID] "text" {cues} @ (x,y) (Imp:score) - - Args: - snap: Snapshot object - goal: User goal (for context) - - Returns: - Formatted element context string - """ - lines = [] - # Note: elements are already filtered by filter_elements() in act() - for el in snap.elements: - # Extract visual cues - cues = [] - if el.visual_cues.is_primary: - cues.append("PRIMARY") - if el.visual_cues.is_clickable: - cues.append("CLICKABLE") - if el.visual_cues.background_color_name: - cues.append(f"color:{el.visual_cues.background_color_name}") - - # Format element line - cues_str = f" {{{','.join(cues)}}}" if cues else "" - text_preview = ( - (el.text[:50] + "...") if el.text and len(el.text) > 50 else (el.text or "") - ) - - lines.append( - f'[{el.id}] <{el.role}> "{text_preview}"{cues_str} ' - f"@ ({int(el.bbox.x)},{int(el.bbox.y)}) (Imp:{el.importance})" - ) - - return "\n".join(lines) - - def _extract_action_from_response(self, response: str) -> str: - """ - Extract action command from LLM response, handling cases where - the LLM adds extra explanation despite instructions. - - Args: - response: Raw LLM response text - - Returns: - Cleaned action command string - """ - import re - - # Remove markdown code blocks if present - response = re.sub(r"```[\w]*\n?", "", response) - response = response.strip() - - # Try to find action patterns in the response - # Pattern matches: CLICK(123), TYPE(123, "text"), PRESS("key"), FINISH() - action_pattern = r'(CLICK\s*\(\s*\d+\s*\)|TYPE\s*\(\s*\d+\s*,\s*["\'].*?["\']\s*\)|PRESS\s*\(\s*["\'].*?["\']\s*\)|FINISH\s*\(\s*\))' - - match = re.search(action_pattern, response, re.IGNORECASE) - if match: - return match.group(1) - - # If no pattern match, return the original response (will likely fail parsing) - return response - - def _query_llm(self, dom_context: str, goal: str) -> LLMResponse: - """ - Query LLM with standardized prompt template - - Args: - dom_context: Formatted element context - goal: User goal - - Returns: - LLMResponse from LLM provider - """ - system_prompt = f"""You are an AI web automation agent. - -GOAL: {goal} - -VISIBLE ELEMENTS (sorted by importance): -{dom_context} - -VISUAL CUES EXPLAINED: -- {{PRIMARY}}: Main call-to-action element on the page -- {{CLICKABLE}}: Element is clickable -- {{color:X}}: Background color name - -CRITICAL RESPONSE FORMAT: -You MUST respond with ONLY ONE of these exact action formats: -- CLICK(id) - Click element by ID -- TYPE(id, "text") - Type text into element -- PRESS("key") - Press keyboard key (Enter, Escape, Tab, ArrowDown, etc) -- FINISH() - Task complete - -DO NOT include any explanation, reasoning, or natural language. -DO NOT use markdown formatting or code blocks. -DO NOT say "The next step is..." or anything similar. - -CORRECT Examples: -CLICK(42) -TYPE(15, "magic mouse") -PRESS("Enter") -FINISH() - -INCORRECT Examples (DO NOT DO THIS): -"The next step is to click..." -"I will type..." -```CLICK(42)``` -""" - - user_prompt = "Return the single action command:" - - return self.llm.generate(system_prompt, user_prompt, temperature=0.0) - - def _execute_action(self, action_str: str, snap: Snapshot) -> dict[str, Any]: - """ - Parse action string and execute SDK call - - Args: - action_str: Action string from LLM (e.g., "CLICK(42)") - snap: Current snapshot (for context) - - Returns: - Execution result dictionary - """ - # Parse CLICK(42) - if match := re.match(r"CLICK\s*\(\s*(\d+)\s*\)", action_str, re.IGNORECASE): - element_id = int(match.group(1)) - result = click(self.browser, element_id) - return { - "success": result.success, - "action": "click", - "element_id": element_id, - "outcome": result.outcome, - "url_changed": result.url_changed, - } - - # Parse TYPE(42, "hello world") - elif match := re.match( - r'TYPE\s*\(\s*(\d+)\s*,\s*["\']([^"\']*)["\']\s*\)', - action_str, - re.IGNORECASE, - ): - element_id = int(match.group(1)) - text = match.group(2) - result = type_text(self.browser, element_id, text) - return { - "success": result.success, - "action": "type", - "element_id": element_id, - "text": text, - "outcome": result.outcome, - } - - # Parse PRESS("Enter") - elif match := re.match(r'PRESS\s*\(\s*["\']([^"\']+)["\']\s*\)', action_str, re.IGNORECASE): - key = match.group(1) - result = press(self.browser, key) - return { - "success": result.success, - "action": "press", - "key": key, - "outcome": result.outcome, - } - - # Parse FINISH() - elif re.match(r"FINISH\s*\(\s*\)", action_str, re.IGNORECASE): - return { - "success": True, - "action": "finish", - "message": "Task marked as complete", - } - - else: - raise ValueError( - f"Unknown action format: {action_str}\n" - f'Expected: CLICK(id), TYPE(id, "text"), PRESS("key"), or FINISH()' - ) - def _track_tokens(self, goal: str, llm_response: LLMResponse): """ Track token usage for analytics @@ -721,8 +591,8 @@ def filter_elements(self, snapshot: Snapshot, goal: str | None = None) -> list[E """ Filter elements from snapshot based on goal context. - This default implementation applies goal-based keyword matching to boost - relevant elements and filters out irrelevant ones. + This implementation uses ElementFilter to apply goal-based keyword matching + to boost relevant elements and filters out irrelevant ones. Args: snapshot: Current page snapshot @@ -731,76 +601,7 @@ def filter_elements(self, snapshot: Snapshot, goal: str | None = None) -> list[E Returns: Filtered list of elements """ - elements = snapshot.elements - - # If no goal provided, return all elements (up to limit) - if not goal: - return elements[: self.default_snapshot_limit] - - goal_lower = goal.lower() - - # Extract keywords from goal - keywords = self._extract_keywords(goal_lower) - - # Boost elements matching goal keywords - scored_elements = [] - for el in elements: - score = el.importance - - # Boost if element text matches goal - if el.text and any(kw in el.text.lower() for kw in keywords): - score += 0.3 - - # Boost if role matches goal intent - if "click" in goal_lower and el.visual_cues.is_clickable: - score += 0.2 - if "type" in goal_lower and el.role in ["textbox", "searchbox"]: - score += 0.2 - if "search" in goal_lower: - # Filter out non-interactive elements for search tasks - if el.role in ["link", "img"] and not el.visual_cues.is_primary: - score -= 0.5 - - scored_elements.append((score, el)) - - # Re-sort by boosted score - scored_elements.sort(key=lambda x: x[0], reverse=True) - elements = [el for _, el in scored_elements] - - return elements[: self.default_snapshot_limit] - - def _extract_keywords(self, text: str) -> list[str]: - """ - Extract meaningful keywords from goal text - - Args: - text: Text to extract keywords from - - Returns: - List of keywords - """ - stopwords = { - "the", - "a", - "an", - "and", - "or", - "but", - "in", - "on", - "at", - "to", - "for", - "of", - "with", - "by", - "from", - "as", - "is", - "was", - } - words = text.split() - return [w for w in words if w not in stopwords and len(w) > 2] + return ElementFilter.filter_by_goal(snapshot, goal, self.default_snapshot_limit) class SentienceAgentAsync(BaseAgentAsync): @@ -853,6 +654,10 @@ def __init__( self.tracer = tracer self.config = config or AgentConfig() + # Initialize handlers + self.llm_handler = LLMInteractionHandler(llm) + self.action_executor = ActionExecutor(browser) + # Screenshot sequence counter # Execution history self.history: list[dict[str, Any]] = [] @@ -920,7 +725,10 @@ async def act( # noqa: C901 # Emit step_start trace event if tracer is enabled if self.tracer: pre_url = self.browser.page.url if self.browser.page else None - self.tracer.emit_step_start( + _safe_tracer_call( + self.tracer, + "emit_step_start", + self.verbose, step_id=step_id, step_index=self._step_count, goal=goal, @@ -970,17 +778,8 @@ async def act( # noqa: C901 # Emit snapshot trace event if tracer is enabled if self.tracer: - # Include ALL elements with full data for DOM tree display - # Use snap.elements (all elements) not filtered_elements - elements_data = [el.model_dump() for el in snap.elements] - # Build snapshot event data - snapshot_data = { - "url": snap.url, - "element_count": len(snap.elements), - "timestamp": snap.timestamp, - "elements": elements_data, # Full element data for DOM tree - } + snapshot_data = TraceEventBuilder.build_snapshot_event(snap) # Always include screenshot in trace event for studio viewer compatibility # CloudTraceSink will extract and upload screenshots separately, then remove @@ -1001,7 +800,10 @@ async def act( # noqa: C901 if snap.screenshot_format: snapshot_data["screenshot_format"] = snap.screenshot_format - self.tracer.emit( + _safe_tracer_call( + self.tracer, + "emit", + self.verbose, "snapshot", snapshot_data, step_id=step_id, @@ -1020,14 +822,17 @@ async def act( # noqa: C901 ) # 2. GROUND: Format elements for LLM context - context = self._build_context(filtered_snap, goal) + context = self.llm_handler.build_context(filtered_snap, goal) # 3. THINK: Query LLM for next action - llm_response = self._query_llm(context, goal) + llm_response = self.llm_handler.query_llm(context, goal) # Emit LLM query trace event if tracer is enabled if self.tracer: - self.tracer.emit( + _safe_tracer_call( + self.tracer, + "emit", + self.verbose, "llm_query", { "prompt_tokens": llm_response.prompt_tokens, @@ -1045,10 +850,10 @@ async def act( # noqa: C901 self._track_tokens(goal, llm_response) # Parse action from LLM response - action_str = self._extract_action_from_response(llm_response.content) + action_str = self.llm_handler.extract_action(llm_response.content) # 4. EXECUTE: Parse and run action - result_dict = await self._execute_action(action_str, filtered_snap) + result_dict = await self.action_executor.execute_async(action_str, filtered_snap) duration_ms = int((time.time() - start_time) * 1000) @@ -1088,7 +893,10 @@ async def act( # noqa: C901 for el in filtered_snap.elements[:50] ] - self.tracer.emit( + _safe_tracer_call( + self.tracer, + "emit", + self.verbose, "action", { "action": result.action, @@ -1195,32 +1003,41 @@ async def act( # noqa: C901 } # Build complete step_end event - step_end_data = { - "v": 1, - "step_id": step_id, - "step_index": self._step_count, - "goal": goal, - "attempt": attempt, - "pre": { - "url": pre_url, - "snapshot_digest": snapshot_digest, - }, - "llm": llm_data, - "exec": exec_data, - "post": { - "url": post_url, - }, - "verify": verify_data, - } + step_end_data = TraceEventBuilder.build_step_end_event( + step_id=step_id, + step_index=self._step_count, + goal=goal, + attempt=attempt, + pre_url=pre_url, + post_url=post_url, + snapshot_digest=snapshot_digest, + llm_data=llm_data, + exec_data=exec_data, + verify_data=verify_data, + ) - self.tracer.emit("step_end", step_end_data, step_id=step_id) + _safe_tracer_call( + self.tracer, + "emit", + self.verbose, + "step_end", + step_end_data, + step_id=step_id, + ) return result except Exception as e: # Emit error trace event if tracer is enabled if self.tracer: - self.tracer.emit_error(step_id=step_id, error=str(e), attempt=attempt) + _safe_tracer_call( + self.tracer, + "emit_error", + self.verbose, + step_id=step_id, + error=str(e), + attempt=attempt, + ) if attempt < max_retries: if self.verbose: @@ -1249,156 +1066,6 @@ async def act( # noqa: C901 ) raise RuntimeError(f"Failed after {max_retries} retries: {e}") - def _build_context(self, snap: Snapshot, goal: str) -> str: - """Convert snapshot elements to token-efficient prompt string (same as sync version)""" - lines = [] - # Note: elements are already filtered by filter_elements() in act() - for el in snap.elements: - # Extract visual cues - cues = [] - if el.visual_cues.is_primary: - cues.append("PRIMARY") - if el.visual_cues.is_clickable: - cues.append("CLICKABLE") - if el.visual_cues.background_color_name: - cues.append(f"color:{el.visual_cues.background_color_name}") - - # Format element line - cues_str = f" {{{','.join(cues)}}}" if cues else "" - text_preview = ( - (el.text[:50] + "...") if el.text and len(el.text) > 50 else (el.text or "") - ) - - lines.append( - f'[{el.id}] <{el.role}> "{text_preview}"{cues_str} ' - f"@ ({int(el.bbox.x)},{int(el.bbox.y)}) (Imp:{el.importance})" - ) - - return "\n".join(lines) - - def _extract_action_from_response(self, response: str) -> str: - """Extract action command from LLM response (same as sync version)""" - # Remove markdown code blocks if present - response = re.sub(r"```[\w]*\n?", "", response) - response = response.strip() - - # Try to find action patterns in the response - # Pattern matches: CLICK(123), TYPE(123, "text"), PRESS("key"), FINISH() - action_pattern = r'(CLICK\s*\(\s*\d+\s*\)|TYPE\s*\(\s*\d+\s*,\s*["\'].*?["\']\s*\)|PRESS\s*\(\s*["\'].*?["\']\s*\)|FINISH\s*\(\s*\))' - - match = re.search(action_pattern, response, re.IGNORECASE) - if match: - return match.group(1) - - # If no pattern match, return the original response (will likely fail parsing) - return response - - def _query_llm(self, dom_context: str, goal: str) -> LLMResponse: - """Query LLM with standardized prompt template (same as sync version)""" - system_prompt = f"""You are an AI web automation agent. - -GOAL: {goal} - -VISIBLE ELEMENTS (sorted by importance): -{dom_context} - -VISUAL CUES EXPLAINED: -- {{PRIMARY}}: Main call-to-action element on the page -- {{CLICKABLE}}: Element is clickable -- {{color:X}}: Background color name - -CRITICAL RESPONSE FORMAT: -You MUST respond with ONLY ONE of these exact action formats: -- CLICK(id) - Click element by ID -- TYPE(id, "text") - Type text into element -- PRESS("key") - Press keyboard key (Enter, Escape, Tab, ArrowDown, etc) -- FINISH() - Task complete - -DO NOT include any explanation, reasoning, or natural language. -DO NOT use markdown formatting or code blocks. -DO NOT say "The next step is..." or anything similar. - -CORRECT Examples: -CLICK(42) -TYPE(15, "magic mouse") -PRESS("Enter") -FINISH() - -INCORRECT Examples (DO NOT DO THIS): -"The next step is to click..." -"I will type..." -```CLICK(42)``` -""" - - user_prompt = "Return the single action command:" - - return self.llm.generate(system_prompt, user_prompt, temperature=0.0) - - async def _execute_action(self, action_str: str, snap: Snapshot) -> dict[str, Any]: - """ - Parse action string and execute SDK call (async) - - Args: - action_str: Action string from LLM (e.g., "CLICK(42)") - snap: Current snapshot (for context) - - Returns: - Execution result dictionary - """ - # Parse CLICK(42) - if match := re.match(r"CLICK\s*\(\s*(\d+)\s*\)", action_str, re.IGNORECASE): - element_id = int(match.group(1)) - result = await click_async(self.browser, element_id) - return { - "success": result.success, - "action": "click", - "element_id": element_id, - "outcome": result.outcome, - "url_changed": result.url_changed, - } - - # Parse TYPE(42, "hello world") - elif match := re.match( - r'TYPE\s*\(\s*(\d+)\s*,\s*["\']([^"\']*)["\']\s*\)', - action_str, - re.IGNORECASE, - ): - element_id = int(match.group(1)) - text = match.group(2) - result = await type_text_async(self.browser, element_id, text) - return { - "success": result.success, - "action": "type", - "element_id": element_id, - "text": text, - "outcome": result.outcome, - } - - # Parse PRESS("Enter") - elif match := re.match(r'PRESS\s*\(\s*["\']([^"\']+)["\']\s*\)', action_str, re.IGNORECASE): - key = match.group(1) - result = await press_async(self.browser, key) - return { - "success": result.success, - "action": "press", - "key": key, - "outcome": result.outcome, - } - - # Parse FINISH() - elif re.match(r"FINISH\s*\(\s*\)", action_str, re.IGNORECASE): - return { - "success": True, - "action": "finish", - "message": "Task marked as complete", - } - - else: - raise ValueError( - f"Unknown action format: {action_str}\n" - f'Expected: CLICK(id), TYPE(id, "text"), PRESS("key"), or FINISH()' - ) - def _track_tokens(self, goal: str, llm_response: LLMResponse): """Track token usage for analytics (same as sync version)""" if llm_response.prompt_tokens: @@ -1443,66 +1110,17 @@ def clear_history(self) -> None: } def filter_elements(self, snapshot: Snapshot, goal: str | None = None) -> list[Element]: - """Filter elements from snapshot based on goal context (same as sync version)""" - elements = snapshot.elements - - # If no goal provided, return all elements (up to limit) - if not goal: - return elements[: self.default_snapshot_limit] - - goal_lower = goal.lower() - - # Extract keywords from goal - keywords = self._extract_keywords(goal_lower) - - # Boost elements matching goal keywords - scored_elements = [] - for el in elements: - score = el.importance - - # Boost if element text matches goal - if el.text and any(kw in el.text.lower() for kw in keywords): - score += 0.3 - - # Boost if role matches goal intent - if "click" in goal_lower and el.visual_cues.is_clickable: - score += 0.2 - if "type" in goal_lower and el.role in ["textbox", "searchbox"]: - score += 0.2 - if "search" in goal_lower: - # Filter out non-interactive elements for search tasks - if el.role in ["link", "img"] and not el.visual_cues.is_primary: - score -= 0.5 - - scored_elements.append((score, el)) - - # Re-sort by boosted score - scored_elements.sort(key=lambda x: x[0], reverse=True) - elements = [el for _, el in scored_elements] - - return elements[: self.default_snapshot_limit] - - def _extract_keywords(self, text: str) -> list[str]: - """Extract meaningful keywords from goal text (same as sync version)""" - stopwords = { - "the", - "a", - "an", - "and", - "or", - "but", - "in", - "on", - "at", - "to", - "for", - "of", - "with", - "by", - "from", - "as", - "is", - "was", - } - words = text.split() - return [w for w in words if w not in stopwords and len(w) > 2] + """ + Filter elements from snapshot based on goal context. + + This implementation uses ElementFilter to apply goal-based keyword matching + to boost relevant elements and filters out irrelevant ones. + + Args: + snapshot: Current page snapshot + goal: User's goal (can inform filtering) + + Returns: + Filtered list of elements + """ + return ElementFilter.filter_by_goal(snapshot, goal, self.default_snapshot_limit) diff --git a/sentience/base_agent.py b/sentience/base_agent.py index a7c1e3c..43e00d2 100644 --- a/sentience/base_agent.py +++ b/sentience/base_agent.py @@ -1,3 +1,5 @@ +from typing import Optional + """ BaseAgent: Abstract base class for all Sentience agents Defines the interface that all agent implementations must follow diff --git a/sentience/browser.py b/sentience/browser.py index 4188e1d..a07dbdb 100644 --- a/sentience/browser.py +++ b/sentience/browser.py @@ -8,6 +8,7 @@ import tempfile import time from pathlib import Path +from typing import Optional, Union from urllib.parse import urlparse from playwright.async_api import BrowserContext as AsyncBrowserContext diff --git a/sentience/browser_evaluator.py b/sentience/browser_evaluator.py new file mode 100644 index 0000000..3cae2b4 --- /dev/null +++ b/sentience/browser_evaluator.py @@ -0,0 +1,299 @@ +""" +Browser evaluation helper for common window.sentience API patterns. + +Consolidates repeated patterns for: +- Waiting for extension injection +- Calling window.sentience methods +- Error handling with diagnostics +""" + +from typing import Any, Optional, Union + +from playwright.async_api import Page as AsyncPage +from playwright.sync_api import Page + +from .browser import AsyncSentienceBrowser, SentienceBrowser +from .sentience_methods import SentienceMethod + + +class BrowserEvaluator: + """Helper class for common browser evaluation patterns""" + + @staticmethod + def wait_for_extension( + page: Page | AsyncPage, + timeout_ms: int = 5000, + ) -> None: + """ + Wait for window.sentience API to be available. + + Args: + page: Playwright Page instance (sync or async) + timeout_ms: Timeout in milliseconds (default: 5000) + + Raises: + RuntimeError: If extension fails to inject within timeout + """ + if hasattr(page, "wait_for_function"): + # Sync page + try: + page.wait_for_function( + "typeof window.sentience !== 'undefined'", + timeout=timeout_ms, + ) + except Exception as e: + diag = BrowserEvaluator._gather_diagnostics(page) + raise RuntimeError( + f"Sentience extension failed to inject window.sentience API. " + f"Is the extension loaded? Diagnostics: {diag}" + ) from e + else: + # Async page - should use async version + raise TypeError("Use wait_for_extension_async for async pages") + + @staticmethod + async def wait_for_extension_async( + page: AsyncPage, + timeout_ms: int = 5000, + ) -> None: + """ + Wait for window.sentience API to be available (async). + + Args: + page: Playwright AsyncPage instance + timeout_ms: Timeout in milliseconds (default: 5000) + + Raises: + RuntimeError: If extension fails to inject within timeout + """ + try: + await page.wait_for_function( + "typeof window.sentience !== 'undefined'", + timeout=timeout_ms, + ) + except Exception as e: + diag = await BrowserEvaluator._gather_diagnostics_async(page) + raise RuntimeError( + f"Sentience extension failed to inject window.sentience API. " + f"Is the extension loaded? Diagnostics: {diag}" + ) from e + + @staticmethod + def _gather_diagnostics(page: Page | AsyncPage) -> dict[str, Any]: + """ + Gather diagnostics about extension state. + + Args: + page: Playwright Page instance + + Returns: + Dictionary with diagnostic information + """ + try: + if hasattr(page, "evaluate"): + # Sync page + return page.evaluate( + """() => ({ + sentience_defined: typeof window.sentience !== 'undefined', + extension_id: document.documentElement.dataset.sentienceExtensionId || 'not set', + url: window.location.href + })""" + ) + else: + return {"error": "Could not gather diagnostics - invalid page type"} + except Exception: + return {"error": "Could not gather diagnostics"} + + @staticmethod + async def _gather_diagnostics_async(page: AsyncPage) -> dict[str, Any]: + """ + Gather diagnostics about extension state (async). + + Args: + page: Playwright AsyncPage instance + + Returns: + Dictionary with diagnostic information + """ + try: + return await page.evaluate( + """() => ({ + sentience_defined: typeof window.sentience !== 'undefined', + extension_id: document.documentElement.dataset.sentienceExtensionId || 'not set', + url: window.location.href + })""" + ) + except Exception: + return {"error": "Could not gather diagnostics"} + + @staticmethod + def invoke( + page: Page, + method: SentienceMethod | str, + *args: Any, + **kwargs: Any, + ) -> Any: + """ + Invoke a window.sentience method with error handling (sync). + + Args: + page: Playwright Page instance (sync) + method: SentienceMethod enum value or method name string (e.g., SentienceMethod.SNAPSHOT or "snapshot") + *args: Positional arguments to pass to the method + **kwargs: Keyword arguments to pass to the method + + Returns: + Result from the method call + + Raises: + RuntimeError: If method is not available or call fails + + Example: + ```python + result = BrowserEvaluator.invoke(page, SentienceMethod.SNAPSHOT, limit=50) + success = BrowserEvaluator.invoke(page, SentienceMethod.CLICK, element_id) + ``` + """ + # Convert enum to string if needed + method_name = method.value if isinstance(method, SentienceMethod) else method + + # Build JavaScript call + if args and kwargs: + # Both args and kwargs - use object spread + js_code = f""" + (args, kwargs) => {{ + return window.sentience.{method_name}(...args, kwargs); + }} + """ + result = page.evaluate(js_code, list(args), kwargs) + elif args: + # Only args + js_code = f""" + (args) => {{ + return window.sentience.{method_name}(...args); + }} + """ + result = page.evaluate(js_code, list(args)) + elif kwargs: + # Only kwargs - pass as single object + js_code = f""" + (options) => {{ + return window.sentience.{method_name}(options); + }} + """ + result = page.evaluate(js_code, kwargs) + else: + # No arguments + js_code = f""" + () => {{ + return window.sentience.{method_name}(); + }} + """ + result = page.evaluate(js_code) + + return result + + @staticmethod + async def invoke_async( + page: AsyncPage, + method: SentienceMethod | str, + *args: Any, + **kwargs: Any, + ) -> Any: + """ + Invoke a window.sentience method with error handling (async). + + Args: + page: Playwright AsyncPage instance + method: SentienceMethod enum value or method name string (e.g., SentienceMethod.SNAPSHOT or "snapshot") + *args: Positional arguments to pass to the method + **kwargs: Keyword arguments to pass to the method + + Returns: + Result from the method call + + Raises: + RuntimeError: If method is not available or call fails + + Example: + ```python + result = await BrowserEvaluator.invoke_async(page, SentienceMethod.SNAPSHOT, limit=50) + success = await BrowserEvaluator.invoke_async(page, SentienceMethod.CLICK, element_id) + ``` + """ + # Convert enum to string if needed + method_name = method.value if isinstance(method, SentienceMethod) else method + + # Build JavaScript call + if args and kwargs: + js_code = f""" + (args, kwargs) => {{ + return window.sentience.{method_name}(...args, kwargs); + }} + """ + result = await page.evaluate(js_code, list(args), kwargs) + elif args: + js_code = f""" + (args) => {{ + return window.sentience.{method_name}(...args); + }} + """ + result = await page.evaluate(js_code, list(args)) + elif kwargs: + js_code = f""" + (options) => {{ + return window.sentience.{method_name}(options); + }} + """ + result = await page.evaluate(js_code, kwargs) + else: + js_code = f""" + () => {{ + return window.sentience.{method_name}(); + }} + """ + result = await page.evaluate(js_code) + + return result + + @staticmethod + def verify_method_exists( + page: Page, + method: SentienceMethod | str, + ) -> bool: + """ + Verify that a window.sentience method exists. + + Args: + page: Playwright Page instance (sync) + method: SentienceMethod enum value or method name string + + Returns: + True if method exists, False otherwise + """ + method_name = method.value if isinstance(method, SentienceMethod) else method + try: + return page.evaluate(f"typeof window.sentience.{method_name} !== 'undefined'") + except Exception: + return False + + @staticmethod + async def verify_method_exists_async( + page: AsyncPage, + method: SentienceMethod | str, + ) -> bool: + """ + Verify that a window.sentience method exists (async). + + Args: + page: Playwright AsyncPage instance + method: SentienceMethod enum value or method name string + + Returns: + True if method exists, False otherwise + """ + method_name = method.value if isinstance(method, SentienceMethod) else method + try: + return await page.evaluate(f"typeof window.sentience.{method_name} !== 'undefined'") + except Exception: + return False diff --git a/sentience/cloud_tracing.py b/sentience/cloud_tracing.py index 55871c8..ab2d366 100644 --- a/sentience/cloud_tracing.py +++ b/sentience/cloud_tracing.py @@ -12,10 +12,12 @@ from collections.abc import Callable from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path -from typing import Any, Protocol +from typing import Any, Optional, Protocol, Union import requests +from sentience.models import TraceStats +from sentience.trace_file_manager import TraceFileManager from sentience.tracing import TraceSink @@ -97,6 +99,7 @@ def __init__( # Use persistent cache directory instead of temp file # This ensures traces survive process crashes cache_dir = Path.home() / ".sentience" / "traces" / "pending" + # Create directory if it doesn't exist (ensure_directory is for file paths, not dirs) cache_dir.mkdir(parents=True, exist_ok=True) # Persistent file (survives process crash) @@ -123,9 +126,7 @@ def emit(self, event: dict[str, Any]) -> None: if self._closed: raise RuntimeError("CloudTraceSink is closed") - json_str = json.dumps(event, ensure_ascii=False) - self._trace_file.write(json_str + "\n") - self._trace_file.flush() # Ensure written to disk + TraceFileManager.write_event(self._trace_file, event) def close( self, @@ -146,9 +147,25 @@ def close( self._closed = True - # Close file first + # Flush and sync file to disk before closing to ensure all data is written + # This is critical on CI systems where file system operations may be slower + self._trace_file.flush() + try: + # Force OS to write buffered data to disk + os.fsync(self._trace_file.fileno()) + except (OSError, AttributeError): + # Some file handles don't support fsync (e.g., StringIO in tests) + # This is fine - flush() is usually sufficient + pass self._trace_file.close() + # Ensure file exists and has content before proceeding + if not self._path.exists() or self._path.stat().st_size == 0: + # No events were emitted, nothing to upload + if self.logger: + self.logger.warning("No trace events to upload (file is empty or missing)") + return + # Generate index after closing file self._generate_index() @@ -384,7 +401,9 @@ def _upload_index(self) -> None: if self.logger: self.logger.warning(f"Error uploading trace index: {e}") - def _infer_final_status_from_trace(self) -> str: + def _infer_final_status_from_trace( + self, events: list[dict[str, Any]], run_end: dict[str, Any] | None + ) -> str: """ Infer final status from trace events by reading the trace file. @@ -435,103 +454,44 @@ def _infer_final_status_from_trace(self) -> str: # If we can't read the trace, default to unknown return "unknown" - def _extract_stats_from_trace(self) -> dict[str, Any]: + def _extract_stats_from_trace(self) -> TraceStats: """ Extract execution statistics from trace file. Returns: - Dictionary with stats fields for /v1/traces/complete + TraceStats with stats fields for /v1/traces/complete """ try: - # Read trace file to extract stats - with open(self._path, encoding="utf-8") as f: - events = [] - for line in f: - line = line.strip() - if not line: - continue - try: - event = json.loads(line) - events.append(event) - except json.JSONDecodeError: - continue - - if not events: - return { - "total_steps": 0, - "total_events": 0, - "duration_ms": None, - "final_status": "unknown", - "started_at": None, - "ended_at": None, - } - - # Find run_start and run_end events - run_start = next((e for e in events if e.get("type") == "run_start"), None) - run_end = next((e for e in events if e.get("type") == "run_end"), None) - - # Extract timestamps - started_at: str | None = None - ended_at: str | None = None - if run_start: - started_at = run_start.get("ts") - if run_end: - ended_at = run_end.get("ts") - - # Calculate duration - duration_ms: int | None = None - if started_at and ended_at: - try: - from datetime import datetime - - start_dt = datetime.fromisoformat(started_at.replace("Z", "+00:00")) - end_dt = datetime.fromisoformat(ended_at.replace("Z", "+00:00")) - delta = end_dt - start_dt - duration_ms = int(delta.total_seconds() * 1000) - except Exception: - pass - - # Count steps (from step_start events, only first attempt) - step_indices = set() - for event in events: - if event.get("type") == "step_start": - step_index = event.get("data", {}).get("step_index") - if step_index is not None: - step_indices.add(step_index) - total_steps = len(step_indices) if step_indices else 0 - - # If run_end has steps count, use that (more accurate) - if run_end: - steps_from_end = run_end.get("data", {}).get("steps") - if steps_from_end is not None: - total_steps = max(total_steps, steps_from_end) - - # Count total events - total_events = len(events) - - # Infer final status - final_status = self._infer_final_status_from_trace() - - return { - "total_steps": total_steps, - "total_events": total_events, - "duration_ms": duration_ms, - "final_status": final_status, - "started_at": started_at, - "ended_at": ended_at, - } + # Check if file exists before reading + if not self._path.exists(): + if self.logger: + self.logger.warning(f"Trace file not found: {self._path}") + return TraceStats( + total_steps=0, + total_events=0, + duration_ms=None, + final_status="unknown", + started_at=None, + ended_at=None, + ) + # Read trace file to extract stats + events = TraceFileManager.read_events(self._path) + # Use TraceFileManager to extract stats (with custom status inference) + return TraceFileManager.extract_stats( + events, infer_status_func=self._infer_final_status_from_trace + ) except Exception as e: if self.logger: self.logger.warning(f"Error extracting stats from trace: {e}") - return { - "total_steps": 0, - "total_events": 0, - "duration_ms": None, - "final_status": "unknown", - "started_at": None, - "ended_at": None, - } + return TraceStats( + total_steps=0, + total_events=0, + duration_ms=None, + final_status="unknown", + started_at=None, + ended_at=None, + ) def _complete_trace(self) -> None: """ @@ -547,22 +507,21 @@ def _complete_trace(self) -> None: # Extract stats from trace file stats = self._extract_stats_from_trace() - # Add file size fields - stats.update( - { - "trace_file_size_bytes": self.trace_file_size_bytes, - "screenshot_total_size_bytes": self.screenshot_total_size_bytes, - "screenshot_count": self.screenshot_count, - "index_file_size_bytes": self.index_file_size_bytes, - } - ) + # Build completion payload with stats and file size fields + completion_payload = { + **stats.model_dump(), # Convert TraceStats to dict + "trace_file_size_bytes": self.trace_file_size_bytes, + "screenshot_total_size_bytes": self.screenshot_total_size_bytes, + "screenshot_count": self.screenshot_count, + "index_file_size_bytes": self.index_file_size_bytes, + } response = requests.post( f"{self.api_url}/v1/traces/complete", headers={"Authorization": f"Bearer {self.api_key}"}, json={ "run_id": self.run_id, - "stats": stats, + "stats": completion_payload, }, timeout=10, ) @@ -593,28 +552,26 @@ def _extract_screenshots_from_trace(self) -> dict[int, dict[str, Any]]: sequence = 0 try: - with open(self._path, encoding="utf-8") as f: - for line in f: - line = line.strip() - if not line: - continue + # Check if file exists before reading + if not self._path.exists(): + if self.logger: + self.logger.warning(f"Trace file not found: {self._path}") + return screenshots - try: - event = json.loads(line) - # Check if this is a snapshot event with screenshot - if event.get("type") == "snapshot": - data = event.get("data", {}) - screenshot_base64 = data.get("screenshot_base64") - - if screenshot_base64: - sequence += 1 - screenshots[sequence] = { - "base64": screenshot_base64, - "format": data.get("screenshot_format", "jpeg"), - "step_id": event.get("step_id"), - } - except json.JSONDecodeError: - continue + events = TraceFileManager.read_events(self._path) + for event in events: + # Check if this is a snapshot event with screenshot + if event.get("type") == "snapshot": + data = event.get("data", {}) + screenshot_base64 = data.get("screenshot_base64") + + if screenshot_base64: + sequence += 1 + screenshots[sequence] = { + "base64": screenshot_base64, + "format": data.get("screenshot_format", "jpeg"), + "step_id": event.get("step_id"), + } except Exception as e: if self.logger: self.logger.error(f"Error extracting screenshots: {e}") @@ -629,34 +586,32 @@ def _create_cleaned_trace(self, output_path: Path) -> None: output_path: Path to write cleaned trace file """ try: - with ( - open(self._path, encoding="utf-8") as infile, - open(output_path, "w", encoding="utf-8") as outfile, - ): - for line in infile: - line = line.strip() - if not line: - continue + # Check if file exists before reading + if not self._path.exists(): + if self.logger: + self.logger.warning(f"Trace file not found: {self._path}") + # Create empty cleaned trace file + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.touch() + return - try: - event = json.loads(line) - # Remove screenshot_base64 from snapshot events - if event.get("type") == "snapshot": - data = event.get("data", {}) - if "screenshot_base64" in data: - # Create copy without screenshot fields - cleaned_data = { - k: v - for k, v in data.items() - if k not in ("screenshot_base64", "screenshot_format") - } - event["data"] = cleaned_data - - # Write cleaned event - outfile.write(json.dumps(event, ensure_ascii=False) + "\n") - except json.JSONDecodeError: - # Skip invalid lines - continue + events = TraceFileManager.read_events(self._path) + with open(output_path, "w", encoding="utf-8") as outfile: + for event in events: + # Remove screenshot_base64 from snapshot events + if event.get("type") == "snapshot": + data = event.get("data", {}) + if "screenshot_base64" in data: + # Create copy without screenshot fields + cleaned_data = { + k: v + for k, v in data.items() + if k not in ("screenshot_base64", "screenshot_format") + } + event["data"] = cleaned_data + + # Write cleaned event + TraceFileManager.write_event(outfile, event) except Exception as e: if self.logger: self.logger.error(f"Error creating cleaned trace: {e}") diff --git a/sentience/conversational_agent.py b/sentience/conversational_agent.py index c207f04..f9f2fc8 100644 --- a/sentience/conversational_agent.py +++ b/sentience/conversational_agent.py @@ -5,12 +5,13 @@ import json import time -from typing import Any +from typing import Any, Union from .agent import SentienceAgent from .browser import SentienceBrowser from .llm_provider import LLMProvider -from .models import Snapshot, SnapshotOptions +from .models import ExtractionResult, Snapshot, SnapshotOptions, StepExecutionResult +from .protocols import BrowserProtocol from .snapshot import snapshot @@ -29,12 +30,18 @@ class ConversationalAgent: The top result is from amazon.com selling the Apple Magic Mouse 2 for $79." """ - def __init__(self, browser: SentienceBrowser, llm: LLMProvider, verbose: bool = True): + def __init__( + self, + browser: SentienceBrowser | BrowserProtocol, + llm: LLMProvider, + verbose: bool = True, + ): """ Initialize conversational agent Args: - browser: SentienceBrowser instance + browser: SentienceBrowser instance or BrowserProtocol-compatible object + (for testing, can use mock objects that implement BrowserProtocol) llm: LLM provider (OpenAI, Anthropic, LocalLLM, etc.) verbose: Print step-by-step execution logs (default: True) """ @@ -90,7 +97,7 @@ def execute(self, user_input: str) -> str: step_result = self._execute_step(step) execution_results.append(step_result) - if not step_result.get("success", False): + if not step_result.success: # Early exit on failure if self.verbose: print(f"⚠️ Step failed: {step['description']}") @@ -203,7 +210,7 @@ def _create_plan(self, user_input: str) -> dict[str, Any]: "expected_outcome": "Complete user request", } - def _execute_step(self, step: dict[str, Any]) -> dict[str, Any]: + def _execute_step(self, step: dict[str, Any]) -> StepExecutionResult: """ Execute a single atomic step from the plan @@ -230,46 +237,42 @@ def _execute_step(self, step: dict[str, Any]) -> dict[str, Any]: self.execution_context["current_url"] = url time.sleep(1) # Brief wait for page to settle - return {"success": True, "action": action, "data": {"url": url}} + return StepExecutionResult(success=True, action=action, data={"url": url}) elif action == "FIND_AND_CLICK": element_desc = params["element_description"] # Use technical agent to find and click (returns AgentActionResult) result = self.technical_agent.act(f"Click the {element_desc}") - return { - "success": result.success, # Use attribute access - "action": action, - "data": result.model_dump(), # Convert to dict for flexibility - } + return StepExecutionResult( + success=result.success, + action=action, + data=result.model_dump(), # Convert to dict for flexibility + ) elif action == "FIND_AND_TYPE": element_desc = params["element_description"] text = params["text"] # Use technical agent to find input and type (returns AgentActionResult) result = self.technical_agent.act(f"Type '{text}' into {element_desc}") - return { - "success": result.success, # Use attribute access - "action": action, - "data": {"text": text, "result": result.model_dump()}, - } + return StepExecutionResult( + success=result.success, + action=action, + data={"text": text, "result": result.model_dump()}, + ) elif action == "PRESS_KEY": key = params["key"] result = self.technical_agent.act(f"Press {key} key") - return { - "success": result.success, # Use attribute access - "action": action, - "data": {"key": key, "result": result.model_dump()}, - } + return StepExecutionResult( + success=result.success, + action=action, + data={"key": key, "result": result.model_dump()}, + ) elif action == "WAIT": duration = params.get("duration", 2.0) time.sleep(duration) - return { - "success": True, - "action": action, - "data": {"duration": duration}, - } + return StepExecutionResult(success=True, action=action, data={"duration": duration}) elif action == "EXTRACT_INFO": info_type = params["info_type"] @@ -279,21 +282,28 @@ def _execute_step(self, step: dict[str, Any]) -> dict[str, Any]: # Use LLM to extract specific information extracted = self._extract_information(snap, info_type) - return { - "success": True, - "action": action, - "data": {"extracted": extracted, "info_type": info_type}, - } + return StepExecutionResult( + success=True, + action=action, + data={ + "extracted": ( + extracted.model_dump() + if isinstance(extracted, ExtractionResult) + else extracted + ), + "info_type": info_type, + }, + ) elif action == "VERIFY": condition = params["condition"] # Verify condition using current page state is_verified = self._verify_condition(condition) - return { - "success": is_verified, - "action": action, - "data": {"condition": condition, "verified": is_verified}, - } + return StepExecutionResult( + success=is_verified, + action=action, + data={"condition": condition, "verified": is_verified}, + ) else: raise ValueError(f"Unknown action: {action}") @@ -301,9 +311,9 @@ def _execute_step(self, step: dict[str, Any]) -> dict[str, Any]: except Exception as e: if self.verbose: print(f"❌ Step failed: {e}") - return {"success": False, "action": action, "error": str(e)} + return StepExecutionResult(success=False, action=action, error=str(e)) - def _extract_information(self, snap: Snapshot, info_type: str) -> dict[str, Any]: + def _extract_information(self, snap: Snapshot, info_type: str) -> ExtractionResult: """ Extract specific information from snapshot using LLM @@ -403,14 +413,38 @@ def _synthesize_response( Human-readable response string """ # Build summary of what happened - successful_steps = [r for r in execution_results if r.get("success")] - failed_steps = [r for r in execution_results if not r.get("success")] + successful_steps = [ + r + for r in execution_results + if (isinstance(r, StepExecutionResult) and r.success) + or (isinstance(r, dict) and r.get("success", False)) + ] + failed_steps = [ + r + for r in execution_results + if (isinstance(r, StepExecutionResult) and not r.success) + or (isinstance(r, dict) and not r.get("success", False)) + ] # Extract key data extracted_data = [] for result in execution_results: - if result.get("action") == "EXTRACT_INFO": - extracted_data.append(result.get("data", {}).get("extracted", {})) + if isinstance(result, StepExecutionResult): + action = result.action + data = result.data + else: + action = result.get("action") + data = result.get("data", {}) + + if action == "EXTRACT_INFO": + extracted = data.get("extracted", {}) + if isinstance(extracted, dict): + extracted_data.append(extracted) + else: + # If it's an ExtractionResult model, convert to dict + extracted_data.append( + extracted.model_dump() if hasattr(extracted, "model_dump") else extracted + ) # Use LLM to create natural response system_prompt = """You are a helpful assistant that summarizes web automation results diff --git a/sentience/element_filter.py b/sentience/element_filter.py new file mode 100644 index 0000000..a6256c7 --- /dev/null +++ b/sentience/element_filter.py @@ -0,0 +1,134 @@ +""" +Element filtering utilities for agent-based element selection. + +This module provides centralized element filtering logic to reduce duplication +across agent implementations. +""" + +from typing import Optional + +from .models import Element, Snapshot + + +class ElementFilter: + """ + Centralized element filtering logic for agent-based element selection. + + Provides static methods for filtering elements based on: + - Importance scores + - Goal-based keyword matching + - Role and visual properties + """ + + # Common stopwords for keyword extraction + STOPWORDS = { + "the", + "a", + "an", + "and", + "or", + "but", + "in", + "on", + "at", + "to", + "for", + "of", + "with", + "by", + "from", + "as", + "is", + "was", + } + + @staticmethod + def filter_by_importance( + snapshot: Snapshot, + max_elements: int = 50, + ) -> list[Element]: + """ + Filter elements by importance score (simple top-N selection). + + Args: + snapshot: Current page snapshot + max_elements: Maximum number of elements to return + + Returns: + Top N elements sorted by importance score + """ + elements = snapshot.elements + # Elements are already sorted by importance in snapshot + return elements[:max_elements] + + @staticmethod + def filter_by_goal( + snapshot: Snapshot, + goal: str | None, + max_elements: int = 100, + ) -> list[Element]: + """ + Filter elements from snapshot based on goal context. + + Applies goal-based keyword matching to boost relevant elements + and filters out irrelevant ones. + + Args: + snapshot: Current page snapshot + goal: User's goal (can inform filtering) + max_elements: Maximum number of elements to return + + Returns: + Filtered list of elements sorted by boosted importance score + """ + elements = snapshot.elements + + # If no goal provided, return all elements (up to limit) + if not goal: + return elements[:max_elements] + + goal_lower = goal.lower() + + # Extract keywords from goal + keywords = ElementFilter._extract_keywords(goal_lower) + + # Boost elements matching goal keywords + scored_elements = [] + for el in elements: + score = el.importance + + # Boost if element text matches goal + if el.text and any(kw in el.text.lower() for kw in keywords): + score += 0.3 + + # Boost if role matches goal intent + if "click" in goal_lower and el.visual_cues.is_clickable: + score += 0.2 + if "type" in goal_lower and el.role in ["textbox", "searchbox"]: + score += 0.2 + if "search" in goal_lower: + # Filter out non-interactive elements for search tasks + if el.role in ["link", "img"] and not el.visual_cues.is_primary: + score -= 0.5 + + scored_elements.append((score, el)) + + # Re-sort by boosted score + scored_elements.sort(key=lambda x: x[0], reverse=True) + elements = [el for _, el in scored_elements] + + return elements[:max_elements] + + @staticmethod + def _extract_keywords(text: str) -> list[str]: + """ + Extract meaningful keywords from goal text. + + Args: + text: Text to extract keywords from + + Returns: + List of keywords (non-stopwords, length > 2) + """ + words = text.split() + return [w for w in words if w not in ElementFilter.STOPWORDS and len(w) > 2] diff --git a/sentience/formatting.py b/sentience/formatting.py index f8961c5..b8dd653 100644 --- a/sentience/formatting.py +++ b/sentience/formatting.py @@ -1,59 +1,15 @@ """ Snapshot formatting utilities for LLM prompts. -Provides functions to convert Sentience snapshots into text format suitable -for LLM consumption. -""" - -from typing import List - -from .models import Snapshot - - -def format_snapshot_for_llm(snap: Snapshot, limit: int = 50) -> str: - """ - Convert snapshot elements to text format for LLM consumption. - - This is the canonical way Sentience formats DOM state for LLMs. - The format includes element ID, role, text preview, visual cues, - position, and importance score. +DEPRECATED: This module is maintained for backward compatibility only. +New code should import from sentience.utils.formatting or sentience directly: - Args: - snap: Snapshot object with elements - limit: Maximum number of elements to include (default: 50) - - Returns: - Formatted string with one element per line - - Example: - >>> snap = snapshot(browser) - >>> formatted = format_snapshot_for_llm(snap, limit=10) - >>> print(formatted) - [1]