diff --git a/browser_use/integrations/sentience/__init__.py b/browser_use/integrations/sentience/__init__.py
index fbf80822ca..c47482b797 100644
--- a/browser_use/integrations/sentience/__init__.py
+++ b/browser_use/integrations/sentience/__init__.py
@@ -6,9 +6,11 @@
SentienceAgentSettings,
VisionFallbackConfig,
)
+from browser_use.integrations.sentience.multi_step_agent import MultiStepSentienceAgent
__all__ = [
"SentienceAgent",
+ "MultiStepSentienceAgent",
"SentienceAgentConfig",
"SentienceAgentSettings",
"VisionFallbackConfig",
diff --git a/browser_use/integrations/sentience/agent.py b/browser_use/integrations/sentience/agent.py
index aad6685d01..8b54e02ed6 100644
--- a/browser_use/integrations/sentience/agent.py
+++ b/browser_use/integrations/sentience/agent.py
@@ -1308,9 +1308,74 @@ async def run(self) -> Any:
return result
+ async def _get_sentience_browser(self) -> Any | None:
+ """
+ Get or create a SentienceBrowser instance for direct action execution.
+
+ Uses BrowserUseAdapter to get a proper browser backend that supports
+ Sentience SDK actions. This allows actions to execute directly using
+ window.sentience_registry[element_id], avoiding element ID mismatch issues.
+
+ Returns:
+ Browser instance with page attribute if available, None otherwise
+ """
+ try:
+ from sentience.browser import AsyncSentienceBrowser
+ from playwright.async_api import async_playwright
+
+ # Check if we already have a browser instance cached
+ if not hasattr(self, '_sentience_browser') or self._sentience_browser is None:
+ # Get CDP URL from browser session
+ if not self.browser_session.cdp_url:
+ logger.warning(" ā ļø No CDP URL available, cannot connect Playwright for Sentience SDK actions")
+ return None
+
+ cdp_url = self.browser_session.cdp_url
+ logger.debug(f" š Connecting Playwright to CDP: {cdp_url[:50]}...")
+
+ # Connect Playwright to the same CDP instance
+ playwright = await async_playwright().start()
+ browser = await playwright.chromium.connect_over_cdp(cdp_url)
+
+ # Get the current page (or create one if needed)
+ if browser.contexts and browser.contexts[0].pages:
+ page = browser.contexts[0].pages[0]
+ elif browser.contexts:
+ page = await browser.contexts[0].new_page()
+ else:
+ context = await browser.new_context()
+ page = await context.new_page()
+
+ # Create proper AsyncSentienceBrowser instance using from_page()
+ # This properly initializes the browser with all required attributes
+ self._sentience_browser = await AsyncSentienceBrowser.from_page(
+ page=page,
+ api_key=self.settings.sentience_config.sentience_api_key,
+ )
+
+ # Store playwright reference to prevent garbage collection
+ self._playwright = playwright
+
+ logger.debug(" ā
Created AsyncSentienceBrowser from Playwright page using from_page()")
+
+ return self._sentience_browser
+ except ImportError as e:
+ logger.debug(f" ā ļø Sentience SDK not available: {e}")
+ return None
+ except Exception as e:
+ logger.warning(f" ā ļø Could not create SentienceBrowser wrapper: {e}")
+ import traceback
+ logger.debug(f" š Traceback: {traceback.format_exc()}")
+ return None
+
async def _execute_actions(self, actions: list[Any]) -> list[Any]:
"""
Execute a list of actions.
+
+ Strategy:
+ - If we have a Sentience snapshot and element_id, use Sentience SDK direct actions
+ (avoids element ID mismatch by using window.sentience_registry[element_id])
+ - Otherwise, fall back to browser-use's action system
Args:
actions: List of ActionModel instances
@@ -1322,10 +1387,12 @@ async def _execute_actions(self, actions: list[Any]) -> list[Any]:
from browser_use.browser.events import BrowserStateRequestEvent
results: list[ActionResult] = []
- total_actions = len(actions)
+
+ # Try to get SentienceBrowser for direct action execution
+ sentience_browser = await self._get_sentience_browser()
+ use_sentience_actions = sentience_browser is not None and self._current_sentience_state is not None
- # Ensure selector_map is built before executing actions
- # This is needed because Sentience uses backend_node_ids that must exist in selector_map
+ # Ensure selector_map is built before executing actions (for fallback)
selector_map = await self.browser_session.get_selector_map()
if not selector_map:
logger.info(" š Selector map is empty, triggering DOM build...")
@@ -1339,6 +1406,10 @@ async def _execute_actions(self, actions: list[Any]) -> list[Any]:
logger.info(f" ā
Selector map built: {len(selector_map)} elements available")
for i, action in enumerate(actions):
+ # Skip None actions (marked as processed, e.g., send_keys handled by type_text)
+ if action is None:
+ continue
+
# Wait between actions (except first)
if i > 0:
wait_time = getattr(
@@ -1486,20 +1557,128 @@ async def _execute_actions(self, actions: list[Any]) -> list[Any]:
# Warn about multiple scroll actions (potential jittery behavior)
if action_name == "scroll" and i > 0:
- prev_action_data = actions[i - 1].model_dump(exclude_unset=True)
- prev_action_name = next(iter(prev_action_data.keys())) if prev_action_data else "unknown"
- if prev_action_name == "scroll":
- logger.info(f" ā ļø Multiple scroll actions detected - may cause jittery behavior")
-
- # Execute action
- result = await self.tools.act(
- action=action,
- browser_session=self.browser_session,
- file_system=self.file_system,
- page_extraction_llm=self.llm, # Use the same LLM for extraction
- sensitive_data=None, # TODO: Add sensitive data support
- available_file_paths=None, # TODO: Add file paths support
+ prev_action = actions[i - 1]
+ if prev_action is not None:
+ prev_action_data = prev_action.model_dump(exclude_unset=True)
+ prev_action_name = next(iter(prev_action_data.keys())) if prev_action_data else "unknown"
+ if prev_action_name == "scroll":
+ logger.info(f" ā ļø Multiple scroll actions detected - may cause jittery behavior")
+
+ # Try to use Sentience SDK direct actions if available (avoids element ID mismatch)
+ # action_index is already defined above from action_params.get('index')
+ use_sentience_direct = (
+ use_sentience_actions
+ and action_index is not None
+ and action_name in ('click', 'input', 'input_text')
+ and self._current_sentience_state is not None
)
+
+ if use_sentience_direct and sentience_browser is not None:
+ # Use Sentience SDK direct actions (uses window.sentience_registry[element_id])
+ try:
+ from sentience.actions import click_async, type_text_async, press_async
+
+ logger.info(f" šÆ Using Sentience SDK direct action for {action_name} (element_id={action_index})")
+
+ if action_name == 'click':
+ logger.info(f" š§ Calling Sentience SDK click_async(element_id={action_index})...")
+ try:
+ sentience_result = await click_async(
+ sentience_browser, # type: ignore[arg-type]
+ element_id=action_index,
+ use_mouse=True,
+ take_snapshot=False,
+ )
+ logger.info(
+ f" ā
Sentience SDK click completed: success={sentience_result.success}, "
+ f"outcome={sentience_result.outcome}, url_changed={sentience_result.url_changed}"
+ )
+ if sentience_result.error:
+ logger.warning(f" ā ļø Sentience SDK click had error: {sentience_result.error}")
+
+ # ActionResult validation: success=True only allowed when is_done=True
+ # For regular successful actions, leave success as None
+ result = ActionResult(
+ extracted_content=f"Clicked element {action_index}",
+ long_term_memory=f"Clicked element {action_index}",
+ success=None if sentience_result.success else False,
+ error=sentience_result.error.get('reason') if sentience_result.error else None,
+ )
+ logger.info(f" ā
Created ActionResult for Sentience SDK click")
+ except Exception as click_error:
+ logger.warning(f" ā ļø Sentience SDK click_async raised exception: {click_error}")
+ logger.warning(f" š Exception type: {type(click_error).__name__}")
+ import traceback
+ logger.debug(f" š Traceback: {traceback.format_exc()}")
+ # Fall through to browser-use fallback
+ raise # Re-raise to trigger fallback
+ elif action_name in ('input', 'input_text'):
+ text = action_params.get('text', '')
+ sentience_result = await type_text_async(
+ sentience_browser, # type: ignore[arg-type]
+ element_id=action_index,
+ text=text,
+ take_snapshot=False,
+ delay_ms=0,
+ )
+ # ActionResult validation: success=True only allowed when is_done=True
+ # For regular successful actions, leave success as None
+ result = ActionResult(
+ extracted_content=f"Typed '{text}' into element {action_index}",
+ long_term_memory=f"Typed '{text}' into element {action_index}",
+ success=None if sentience_result.success else False,
+ error=sentience_result.error.get('reason') if sentience_result.error else None,
+ )
+
+ # If there's a send_keys action next for Enter, handle it
+ if i + 1 < len(actions):
+ next_action = actions[i + 1]
+ if next_action is not None:
+ next_action_data = next_action.model_dump(exclude_unset=True)
+ next_action_name = next(iter(next_action_data.keys())) if next_action_data else None
+ if next_action_name == 'send_keys':
+ next_params = next_action_data.get('send_keys', {})
+ keys = next_params.get('keys', '')
+ if keys == 'Enter':
+ logger.info(" āØļø Pressing Enter after typing")
+ await press_async(
+ sentience_browser, # type: ignore[arg-type]
+ key='Enter',
+ take_snapshot=False,
+ )
+ # Skip the next send_keys action since we handled it
+ actions[i + 1] = None # Mark as processed
+ else:
+ # Fall back to browser-use for other actions
+ result = await self.tools.act(
+ action=action,
+ browser_session=self.browser_session,
+ file_system=self.file_system,
+ page_extraction_llm=self.llm,
+ sensitive_data=None,
+ available_file_paths=None,
+ )
+ except Exception as e:
+ logger.warning(f" ā ļø Sentience SDK direct action failed: {e}, falling back to browser-use")
+ # Fall back to browser-use action system
+ result = await self.tools.act(
+ action=action,
+ browser_session=self.browser_session,
+ file_system=self.file_system,
+ page_extraction_llm=self.llm,
+ sensitive_data=None,
+ available_file_paths=None,
+ )
+ else:
+ # Use browser-use action system (original behavior)
+ result = await self.tools.act(
+ action=action,
+ browser_session=self.browser_session,
+ file_system=self.file_system,
+ page_extraction_llm=self.llm, # Use the same LLM for extraction
+ sensitive_data=None, # TODO: Add sensitive data support
+ available_file_paths=None, # TODO: Add file paths support
+ )
results.append(result)
@@ -1543,6 +1722,10 @@ def _get_system_message(self) -> SystemMessage:
is_anthropic=False, # Will be auto-detected if needed
is_browser_use_model=False, # Will be auto-detected if needed
extend_system_message=(
+ "\n\n"
+ "CRITICAL: Your response MUST be valid JSON only. No explanations, no reasoning, no markdown, no code blocks.\n"
+ "Start with { and end with }. Output ONLY the JSON object matching the required schema.\n"
+ "\n"
"\n\n"
"CRITICAL: When browser_state contains elements in Sentience format, "
"the first column is labeled 'ID' but browser-use actions use a parameter called 'index'.\n"
diff --git a/browser_use/integrations/sentience/multi_step_agent.py b/browser_use/integrations/sentience/multi_step_agent.py
new file mode 100644
index 0000000000..8a926a44e5
--- /dev/null
+++ b/browser_use/integrations/sentience/multi_step_agent.py
@@ -0,0 +1,568 @@
+"""
+Multi-Step SentienceAgent: Uses SentienceAgentAsync from Sentience SDK for multi-step task execution with per-step verification.
+
+This agent provides:
+- Multi-step task execution with step-by-step verification
+- AgentRuntime integration for declarative assertions
+- Tracer support for execution tracking
+- Local LLM support (Qwen 2.5 3B via LocalLLMProvider)
+
+Example:
+ >>> from browser_use.integrations.sentience import MultiStepSentienceAgent
+ >>> from sentience.async_api import AsyncSentienceBrowser
+ >>> from sentience.llm_provider import LocalLLMProvider
+ >>>
+ >>> async with AsyncSentienceBrowser() as browser:
+ >>> llm = LocalLLMProvider(model_name="Qwen/Qwen2.5-3B-Instruct")
+ >>> agent = MultiStepSentienceAgent(
+ >>> browser=browser,
+ >>> llm=llm,
+ >>> )
+ >>>
+ >>> task_steps = [
+ >>> {"goal": "Step 1", "task": "Do something"},
+ >>> {"goal": "Step 2", "task": "Do something else"},
+ >>> ]
+ >>>
+ >>> results = await agent.run_multi_step(task_steps)
+"""
+
+from __future__ import annotations
+
+import logging
+import os
+import time
+from datetime import datetime
+from pathlib import Path
+from typing import TYPE_CHECKING, Any, Callable
+
+if TYPE_CHECKING:
+ from sentience.agent import SentienceAgentAsync
+ from sentience.agent_config import AgentConfig
+ from sentience.agent_runtime import AgentRuntime
+ from sentience.async_api import AsyncSentienceBrowser
+ from sentience.llm_provider import LLMProvider
+ from sentience.tracing import Tracer
+
+logger = logging.getLogger(__name__)
+
+
+class MultiStepSentienceAgent:
+ """
+ Multi-step agent using SentienceAgentAsync from Sentience SDK.
+
+ Features:
+ - Multi-step task execution
+ - AgentRuntime integration for verification
+ - Tracer support for execution tracking
+ - Step-by-step assertions using expect() DSL
+ - Local LLM support (Qwen 2.5 3B)
+ """
+
+ def __init__(
+ self,
+ browser: AsyncSentienceBrowser,
+ llm: LLMProvider,
+ runtime: AgentRuntime | None = None,
+ tracer: Tracer | None = None,
+ trace_dir: str | Path = "traces",
+ sentience_api_key: str | None = None,
+ agent_config: AgentConfig | None = None,
+ default_snapshot_limit: int = 50,
+ verbose: bool = True,
+ **agent_kwargs: Any,
+ ):
+ """
+ Initialize Multi-Step SentienceAgent.
+
+ Args:
+ browser: AsyncSentienceBrowser instance from Sentience SDK
+ llm: LLMProvider instance (e.g., LocalLLMProvider for Qwen 2.5 3B)
+ runtime: Optional AgentRuntime (will be created if not provided)
+ tracer: Optional Tracer (will be created if not provided)
+ trace_dir: Directory for trace files
+ sentience_api_key: Optional Sentience API key for gateway mode
+ agent_config: Optional AgentConfig for SentienceAgentAsync
+ default_snapshot_limit: Default snapshot limit for agent
+ verbose: Print execution logs
+ **agent_kwargs: Additional kwargs passed to SentienceAgentAsync
+ """
+ self.browser = browser
+ self.llm = llm
+ self.agent_config = agent_config
+ self.default_snapshot_limit = default_snapshot_limit
+ self.verbose = verbose
+ self.agent_kwargs = agent_kwargs
+ self.trace_dir = Path(trace_dir)
+ self.sentience_api_key = sentience_api_key or os.getenv("SENTIENCE_API_KEY")
+
+ # Runtime and tracer (initialized lazily)
+ self._runtime: AgentRuntime | None = runtime
+ self._tracer: Tracer | None = tracer
+ self._verification_initialized = False
+
+ async def _initialize_verification(self) -> None:
+ """Initialize AgentRuntime and Tracer for verification."""
+ if self._verification_initialized:
+ return
+
+ try:
+ from sentience.agent_runtime import AgentRuntime
+ from sentience.tracing import JsonlTraceSink, Tracer
+
+ # Create tracer if not provided
+ if self._tracer is None:
+ self.trace_dir.mkdir(exist_ok=True)
+ run_id = f"multi-step-agent-{int(time.time())}"
+ sink = JsonlTraceSink(str(self.trace_dir / f"{run_id}.jsonl"))
+ self._tracer = Tracer(run_id=run_id, sink=sink)
+ logger.info(f"š Created tracer: {self.trace_dir / f'{run_id}.jsonl'}")
+
+ # Create AgentRuntime if not provided
+ if self._runtime is None:
+ # AgentRuntime needs a backend - create PlaywrightBackend directly
+ # AsyncSentienceBrowser has a .page property
+ page = self.browser.page
+ if page is None:
+ logger.warning("ā ļø No page available for AgentRuntime")
+ raise ValueError("AsyncSentienceBrowser must have a page. Call browser.goto() or browser.new_page() first.")
+
+ # Create backend directly to avoid legacy path issues
+ from sentience.backends.playwright_backend import PlaywrightBackend
+
+ backend = PlaywrightBackend(page)
+ self._runtime = AgentRuntime(
+ backend=backend,
+ tracer=self._tracer,
+ sentience_api_key=self.sentience_api_key,
+ )
+ logger.info("ā
Created AgentRuntime for verification")
+
+ self._verification_initialized = True
+
+ except ImportError as e:
+ logger.warning(
+ f"ā ļø Verification requested but Sentience SDK not fully installed: {e}. "
+ "Install with: pip install sentienceapi"
+ )
+ self._verification_initialized = False
+ except Exception as e:
+ logger.warning(f"ā ļø Could not initialize verification: {e}")
+ import traceback
+ logger.debug(f" š Traceback: {traceback.format_exc()}")
+ self._verification_initialized = False
+
+ @property
+ def runtime(self) -> AgentRuntime | None:
+ """Get AgentRuntime instance."""
+ return self._runtime
+
+ @property
+ def tracer(self) -> Tracer | None:
+ """Get Tracer instance."""
+ return self._tracer
+
+ async def run_multi_step(
+ self,
+ task_steps: list[dict[str, str]],
+ verification_callbacks: dict[int, Callable[[Any, int, Any], bool]] | None = None,
+ max_retries: int = 2,
+ ) -> list[Any]:
+ """
+ Run a multi-step task with step-by-step verification.
+
+ Args:
+ task_steps: List of step dictionaries with 'goal' and 'task' keys
+ verification_callbacks: Optional dict mapping step_idx to verification function
+ Each callback receives (runtime, step_idx, snapshot) and returns bool
+ max_retries: Maximum retries per step (default: 2)
+
+ Returns:
+ List of AgentActionResult objects for each step
+
+ Example:
+ >>> task_steps = [
+ >>> {"goal": "Search Google", "task": "Search for 'python'"},
+ >>> {"goal": "Click first result", "task": "Click the first search result"},
+ >>> ]
+ >>> results = await agent.run_multi_step(task_steps)
+ """
+ # Initialize verification if needed
+ await self._initialize_verification()
+
+ results = []
+ verification_callbacks = verification_callbacks or {}
+
+ for step_idx, step_info in enumerate(task_steps, start=1):
+ goal = step_info.get("goal", f"Step {step_idx}")
+ task = step_info.get("task", goal)
+
+ # Record step start time
+ step_start_time = time.time()
+ step_start_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+
+ logger.info(f"\n{'=' * 80}")
+ logger.info(f"š Step {step_idx}: {goal}")
+ logger.info(f"ā° Started at: {step_start_timestamp}")
+ logger.info(f"{'=' * 80}")
+
+ # Begin verification step
+ if self._runtime:
+ self._runtime.begin_step(goal, step_index=step_idx - 1)
+ logger.info(f"ā
Began verification step {step_idx}")
+
+ # Determine snapshot limit (higher for last step to capture all posts)
+ snapshot_limit = self.default_snapshot_limit
+ if step_idx == len(task_steps):
+ snapshot_limit = max(self.default_snapshot_limit, 100) # Increase limit for last step
+ logger.info(f"š Using increased snapshot limit ({snapshot_limit}) for final step")
+
+ # Create SentienceAgentAsync for this step
+ from sentience.agent import SentienceAgentAsync
+ from sentience.agent_config import AgentConfig
+
+ # Merge agent_config with agent_kwargs
+ merged_config = self.agent_config
+ if merged_config is None:
+ merged_config = AgentConfig()
+
+ # For last step, use higher snapshot limit in agent config
+ if step_idx == len(task_steps):
+ merged_config.snapshot_limit = snapshot_limit
+
+ # Create agent
+ agent = SentienceAgentAsync(
+ browser=self.browser,
+ llm=self.llm,
+ default_snapshot_limit=snapshot_limit,
+ verbose=self.verbose,
+ tracer=self._tracer,
+ config=merged_config,
+ **self.agent_kwargs,
+ )
+
+ # Take snapshot and log compact prompt before running agent
+ logger.info(f"šø Taking snapshot for step {step_idx}...")
+ from sentience.snapshot import snapshot_async
+ from sentience.models import SnapshotOptions
+
+ # Use the goal from step_info for SnapshotOptions (more descriptive than task)
+ step_goal = step_info.get("goal", goal)
+ snap_opts = SnapshotOptions(
+ limit=snapshot_limit,
+ goal=step_goal, # Use the goal field from step_info
+ )
+ if self.agent_config:
+ if self.agent_config.show_overlay:
+ snap_opts.show_overlay = True
+
+ # Take snapshot with error handling for extension injection failures
+ try:
+ pre_agent_snapshot = await snapshot_async(self.browser, snap_opts)
+ except Exception as snapshot_error:
+ logger.warning(f"ā ļø Snapshot failed with exception: {snapshot_error}")
+ logger.warning(f" This may be due to extension injection timeout. Continuing without snapshot logging...")
+ # Create a failed snapshot object to continue execution
+ # Get current URL for the snapshot
+ current_url = "unknown"
+ try:
+ if self.browser.page:
+ current_url = self.browser.page.url
+ except Exception:
+ pass
+
+ from sentience.models import Snapshot
+ pre_agent_snapshot = Snapshot(
+ status="error",
+ error=str(snapshot_error),
+ elements=[],
+ url=current_url,
+ )
+
+ if pre_agent_snapshot.status == "success":
+ # Log snapshot statistics
+ all_element_ids = [el.id for el in pre_agent_snapshot.elements]
+ max_element_id = max(all_element_ids) if all_element_ids else 0
+ min_element_id = min(all_element_ids) if all_element_ids else 0
+ logger.info(f"š Snapshot stats: {len(pre_agent_snapshot.elements)} total elements, IDs range: {min_element_id}-{max_element_id}")
+
+ # Format snapshot in compact format: ID|role|text|imp|is_primary|docYq|ord|DG|href
+ # Use the same logic as SentienceContext._format_snapshot_for_llm
+ import re
+
+ # Filter to interactive elements only (same as SentienceContext)
+ interactive_roles = {
+ "button", "link", "textbox", "searchbox", "combobox", "checkbox",
+ "radio", "slider", "tab", "menuitem", "option", "switch", "cell",
+ "a", "input", "select", "textarea",
+ }
+
+ interactive_elements = [
+ el for el in pre_agent_snapshot.elements
+ if (el.role or "").lower() in interactive_roles
+ ]
+
+ # Log interactive elements stats
+ interactive_ids = [el.id for el in interactive_elements]
+ if interactive_ids:
+ max_interactive_id = max(interactive_ids)
+ min_interactive_id = min(interactive_ids)
+ logger.info(f"š Interactive elements: {len(interactive_elements)} elements, IDs range: {min_interactive_id}-{max_interactive_id}")
+ else:
+ logger.warning(f"ā ļø No interactive elements found in snapshot!")
+
+ # Compute rank_in_group for dominant group elements
+ rank_in_group_map: dict[int, int] = {}
+ dg_elements_for_rank = [
+ el for el in interactive_elements
+ if el.in_dominant_group is True
+ ]
+ if not dg_elements_for_rank and pre_agent_snapshot.dominant_group_key:
+ dg_elements_for_rank = [
+ el for el in interactive_elements
+ if el.group_key == pre_agent_snapshot.dominant_group_key
+ ]
+
+ # Sort by (doc_y, bbox.y, bbox.x, -importance) for rank
+ def rank_sort_key(el):
+ doc_y = el.doc_y if el.doc_y is not None else float("inf")
+ bbox_y = el.bbox.y if el.bbox else float("inf")
+ bbox_x = el.bbox.x if el.bbox else float("inf")
+ neg_importance = -(el.importance or 0)
+ return (doc_y, bbox_y, bbox_x, neg_importance)
+
+ dg_elements_for_rank.sort(key=rank_sort_key)
+ for rank, el in enumerate(dg_elements_for_rank):
+ rank_in_group_map[el.id] = rank
+
+ # Format elements
+ compact_lines = []
+ # Use the same limit as the snapshot (which may be higher for last step)
+ for el in interactive_elements[:snapshot_limit]:
+ # Skip REMOVED elements
+ if hasattr(el, 'diff_status') and el.diff_status == "REMOVED":
+ continue
+
+ # Get role (override to "link" if element has href)
+ role = el.role or ""
+ if el.href:
+ role = "link"
+ elif not role:
+ role = "element"
+
+ # Get name/text (truncate aggressively, normalize whitespace)
+ name = el.text or ""
+ name = re.sub(r"\s+", " ", name.strip())
+ if len(name) > 30:
+ name = name[:27] + "..."
+
+ # Extract fields
+ importance = el.importance or 0
+ doc_y = el.doc_y or 0
+
+ # is_primary: from visual_cues.is_primary
+ is_primary = False
+ if el.visual_cues:
+ is_primary = el.visual_cues.is_primary or False
+ is_primary_flag = "1" if is_primary else "0"
+
+ # docYq: bucketed doc_y (round to nearest 200)
+ doc_yq = int(round(doc_y / 200)) if doc_y else 0
+
+ # Determine if in dominant group
+ in_dg = el.in_dominant_group
+ if in_dg is None and pre_agent_snapshot.dominant_group_key:
+ in_dg = el.group_key == pre_agent_snapshot.dominant_group_key
+
+ # ord_val: rank_in_group if in dominant group
+ if in_dg and el.id in rank_in_group_map:
+ ord_val = rank_in_group_map[el.id]
+ else:
+ ord_val = "-"
+
+ # DG: 1 if dominant group, else 0
+ dg_flag = "1" if in_dg else "0"
+
+ # href: compress (use domain or last path segment)
+ href = el.href or ""
+ if href:
+ # Simple compression: use domain or last path segment
+ if "/" in href:
+ parts = href.split("/")
+ if len(parts) > 1:
+ href = parts[-1] or parts[-2] if len(parts) > 2 else ""
+ if len(href) > 30:
+ href = href[:27] + "..."
+
+ # Format: ID|role|text|importance|is_primary|docYq|ord|DG|href
+ compact_lines.append(f"{el.id}|{role}|{name}|{importance}|{is_primary_flag}|{doc_yq}|{ord_val}|{dg_flag}|{href}")
+
+ compact_prompt = "\n".join(compact_lines)
+
+ # Log which element IDs are actually shown to LLM
+ shown_ids = [el.id for el in interactive_elements[:self.default_snapshot_limit]]
+ if shown_ids:
+ logger.info(f"š Showing {len(shown_ids)} elements to LLM, IDs: {min(shown_ids)}-{max(shown_ids)}")
+ else:
+ logger.warning(f"ā ļø No elements shown to LLM!")
+
+ logger.info(f"\n{'=' * 80}")
+ logger.info(f"š Compact Snapshot Prompt for Step {step_idx}:")
+ logger.info(f"{'=' * 80}")
+ logger.info(compact_prompt)
+ logger.info(f"{'=' * 80}\n")
+ else:
+ error_msg = pre_agent_snapshot.error or "Unknown error"
+ logger.warning(f"ā ļø Snapshot failed: {error_msg}")
+ logger.warning(f" Continuing without snapshot logging - agent will still run")
+ pre_agent_snapshot = None # Set to None if snapshot failed
+
+ # Run agent for this step
+ logger.info(f"š¤ Running agent for step {step_idx}...")
+ result = await agent.act(task, max_retries=max_retries)
+ results.append(result)
+
+ if result.success:
+ logger.info(f"ā
Agent completed step {step_idx}: {result.action} on element {result.element_id}")
+
+ # Special handling for last step: extract element text and validate
+ if step_idx == len(task_steps) and result.element_id is not None:
+ # Check if element ID exists in snapshot
+ element_found = False
+ element_text = None
+ if pre_agent_snapshot and pre_agent_snapshot.status == "success":
+ all_ids = [el.id for el in pre_agent_snapshot.elements]
+ if result.element_id in all_ids:
+ element_found = True
+ for el in pre_agent_snapshot.elements:
+ if el.id == result.element_id:
+ element_text = el.text or ""
+ logger.info(f"š Found element {result.element_id}: role={el.role}, text={element_text[:100] if element_text else 'N/A'}...")
+ break
+ else:
+ logger.warning(f"ā ļø Element ID {result.element_id} not found in snapshot!")
+ logger.warning(f" Available element IDs range: {min(all_ids)}-{max(all_ids)}")
+ logger.warning(f" Total elements in snapshot: {len(pre_agent_snapshot.elements)}")
+
+ if element_text:
+ if "Show HN" in element_text:
+ logger.info(f"ā
Validation passed: Element text contains 'Show HN'")
+ else:
+ logger.warning(f"ā ļø Validation failed: Element text does not contain 'Show HN'")
+ logger.warning(f" Element text: {element_text[:200]}")
+ elif not element_found:
+ logger.error(f"ā Element {result.element_id} does not exist in snapshot - LLM selected invalid element ID!")
+ else:
+ logger.warning(f"ā ļø Agent step {step_idx} had issues: {result.error or 'Unknown error'}")
+
+ # Take snapshot for verification
+ if self._runtime:
+ logger.info(f"šø Taking snapshot for verification...")
+ snapshot = None
+ try:
+ snapshot = await self._runtime.snapshot()
+ logger.info(f"ā
Snapshot taken: {len(snapshot.elements)} elements found")
+ except Exception as e:
+ # Extension might not be loaded or page might have changed
+ # Try to use AsyncSentienceBrowser snapshot as fallback
+ logger.warning(f"ā ļø AgentRuntime.snapshot() failed: {e}")
+ logger.info(f" Attempting fallback snapshot via AsyncSentienceBrowser...")
+ try:
+ from sentience.snapshot import snapshot_async
+ from sentience.models import SnapshotOptions
+ fallback_snap_opts = SnapshotOptions(limit=50, goal="verification")
+ snapshot = await snapshot_async(self.browser, fallback_snap_opts)
+ if snapshot.status == "success":
+ logger.info(f"ā
Fallback snapshot taken: {len(snapshot.elements)} elements found")
+ else:
+ logger.warning(f"ā ļø Fallback snapshot failed: {snapshot.error}")
+ snapshot = None
+ except Exception as fallback_error:
+ logger.warning(f"ā ļø Fallback snapshot also failed: {fallback_error}")
+ snapshot = None
+
+ # Run verification callback if provided
+ if step_idx in verification_callbacks:
+ logger.info(f"š Running custom verification for step {step_idx}...")
+ callback = verification_callbacks[step_idx]
+ if snapshot:
+ passed = callback(self._runtime, step_idx, snapshot)
+ logger.info(f" {'ā
' if passed else 'ā'} Custom verification: {'PASSED' if passed else 'FAILED'}")
+ else:
+ logger.warning(f"ā ļø Skipping verification callback - no snapshot available")
+ # Still call callback but with None snapshot
+ try:
+ passed = callback(self._runtime, step_idx, None)
+ logger.info(f" {'ā
' if passed else 'ā'} Custom verification: {'PASSED' if passed else 'FAILED'}")
+ except Exception as callback_error:
+ logger.warning(f"ā ļø Verification callback failed: {callback_error}")
+
+ # Record step end time and calculate duration
+ step_end_time = time.time()
+ step_end_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ step_duration = step_end_time - step_start_time
+
+ logger.info(f"{'=' * 80}")
+ logger.info(f"ā° Step {step_idx} completed at: {step_end_timestamp}")
+ logger.info(f"ā±ļø Step {step_idx} duration: {step_duration:.2f} seconds")
+ logger.info(f"{'=' * 80}\n")
+
+ return results
+
+ async def assert_done(
+ self,
+ predicate: Any,
+ label: str = "task_complete",
+ ) -> bool:
+ """
+ Assert that the overall task is complete.
+
+ Args:
+ predicate: Predicate from sentience.asserts (e.g., expect(...).to_exist())
+ label: Label for the assertion
+
+ Returns:
+ True if assertion passed, False otherwise
+
+ Example:
+ >>> from sentience.asserts import expect, E, in_dominant_list
+ >>>
+ >>> task_complete = await agent.assert_done(
+ >>> expect(in_dominant_list().nth(0)).to_have_text_contains("Show HN"),
+ >>> label="top_post_found",
+ >>> )
+ """
+ if not self._runtime:
+ logger.warning("ā ļø AgentRuntime not initialized, cannot assert_done")
+ return False
+
+ logger.info("š Verifying task completion...")
+ result = self._runtime.assert_done(predicate, label=label)
+
+ if result:
+ logger.info("ā
Task completion verification passed")
+ else:
+ logger.info("ā Task completion verification failed")
+
+ return result
+
+ async def get_verification_summary(self) -> dict[str, Any]:
+ """
+ Get verification summary.
+
+ Returns:
+ Dictionary with verification statistics
+ """
+ if not self._runtime:
+ return {
+ "runtime_available": False,
+ "all_assertions_passed": None,
+ "required_assertions_passed": None,
+ }
+
+ return {
+ "runtime_available": True,
+ "all_assertions_passed": self._runtime.all_assertions_passed(),
+ "required_assertions_passed": self._runtime.required_assertions_passed(),
+ "trace_file": str(self.trace_dir / f"{self._tracer.run_id}.jsonl") if self._tracer else None,
+ }
diff --git a/browser_use/llm/huggingface/chat.py b/browser_use/llm/huggingface/chat.py
index a59bc0d686..3f5291cd8d 100644
--- a/browser_use/llm/huggingface/chat.py
+++ b/browser_use/llm/huggingface/chat.py
@@ -392,9 +392,10 @@ def _generate_structured(self, messages: list[BaseMessage], schema: dict[str, An
example_json = "{\n" + ",\n".join(example_fields) + "\n}"
- # Build minimal instruction (optimized for small local LLMs)
- # Keep it very short to avoid confusing the model
- schema_instruction = f"\n\nJSON only:\n{example_json}"
+ # Build explicit instruction for small local LLMs
+ # Must be very clear: ONLY JSON, no explanations, no reasoning, no extra text
+ # Use imperative language to be more direct - match system message style
+ schema_instruction = f"\n\nCRITICAL: Output ONLY this JSON format. No explanations, no reasoning, no markdown, no code blocks. Start with {{ and end with }}:\n{example_json}"
# Create modified messages
modified_messages = list(messages)
@@ -411,6 +412,9 @@ def _generate_structured(self, messages: list[BaseMessage], schema: dict[str, An
# Try to extract JSON from response
completion = completion.strip()
+ # Remove any leading/trailing whitespace or newlines
+ completion = completion.strip()
+
# Try to find JSON in the response (in case model adds extra text)
if completion.startswith('```json'):
# Extract from code block
@@ -418,10 +422,31 @@ def _generate_structured(self, messages: list[BaseMessage], schema: dict[str, An
elif completion.startswith('```'):
completion = completion.replace('```', '').strip()
+ # Find the JSON object (from first { to matching })
+ # Use a more robust approach: find the first { and then find the matching }
+ import re
+ json_match = re.search(r'\{.*\}', completion, re.DOTALL)
+ if json_match:
+ completion = json_match.group(0)
+ else:
+ # Fallback: try to find any JSON-like structure
+ # Look for first { and try to extract until we have balanced braces
+ brace_start = completion.find('{')
+ if brace_start >= 0:
+ brace_count = 0
+ for i in range(brace_start, len(completion)):
+ if completion[i] == '{':
+ brace_count += 1
+ elif completion[i] == '}':
+ brace_count -= 1
+ if brace_count == 0:
+ completion = completion[brace_start:i+1]
+ break
+
# Try to parse to validate JSON
try:
json.loads(completion)
- except json.JSONDecodeError:
- logger.warning(f"Generated text is not valid JSON: {completion[:200]}")
+ except json.JSONDecodeError as e:
+ logger.warning(f"Generated text is not valid JSON: {completion[:200]}... Error: {e}")
return completion, usage
diff --git a/examples/integrations/sentience_multi_step_agent.py b/examples/integrations/sentience_multi_step_agent.py
new file mode 100644
index 0000000000..c9923ca8f4
--- /dev/null
+++ b/examples/integrations/sentience_multi_step_agent.py
@@ -0,0 +1,359 @@
+"""
+Example: MultiStepSentienceAgent with Local LLM and AgentRuntime verification.
+
+This example demonstrates how to use MultiStepSentienceAgent with:
+- Primary: Local LLM (Qwen 2.5 3B) via LocalLLMProvider from Sentience SDK
+- Multi-step task execution with step-by-step verification via AgentRuntime
+- Declarative task completion verification using expect() DSL
+
+Requirements:
+1. Install transformers: pip install transformers torch accelerate
+2. Optional: pip install bitsandbytes (for 4-bit/8-bit quantization)
+3. Sentience SDK installed: pip install sentienceapi
+
+Note: Local models will be downloaded from Hugging Face on first use.
+Note: `accelerate` is required when using `device_map="auto"`.
+"""
+
+import asyncio
+import logging
+import os
+import traceback
+from pathlib import Path
+
+from dotenv import load_dotenv
+
+# Import Sentience SDK components
+from sentience.async_api import AsyncSentienceBrowser
+from sentience.llm_provider import LocalLLMProvider
+from sentience.agent_config import AgentConfig
+from sentience.verification import url_contains
+from sentience.asserts import E, expect, in_dominant_list
+
+# Import MultiStepSentienceAgent from browser-use integration
+from browser_use.integrations.sentience import MultiStepSentienceAgent
+
+load_dotenv()
+
+# Enable debug logging
+logging.getLogger("browser_use.integrations.sentience").setLevel(logging.DEBUG)
+
+
+def log(msg: str) -> None:
+ """Print with flush for immediate output."""
+ print(msg, flush=True)
+
+
+async def main():
+ """Example: Multi-step task with step-by-step verification using MultiStepSentienceAgent."""
+ browser = None
+ try:
+ # ========================================================================
+ # INITIALIZE SENTIENCE BROWSER
+ # ========================================================================
+ log("\n" + "=" * 80)
+ log("š Initializing AsyncSentienceBrowser")
+ log("=" * 80)
+
+ # Create AsyncSentienceBrowser from Sentience SDK
+ browser = AsyncSentienceBrowser(
+ headless=False,
+ api_key=os.getenv("SENTIENCE_API_KEY"),
+ )
+ await browser.start()
+ log("ā
AsyncSentienceBrowser started")
+
+ # Navigate to the first URL immediately so extension can inject properly
+ # The extension needs to be on an actual page, not about:blank
+ first_url = "https://google.com"
+ log(f"š Navigating to first URL: {first_url}")
+ await browser.goto(first_url)
+ log("ā
Navigated to first URL - extension should now be injected")
+
+ # ========================================================================
+ # INITIALIZE LOCAL LLM
+ # ========================================================================
+ log("\n" + "=" * 80)
+ log("š¤ Initializing Local LLM (Qwen 2.5 3B)")
+ log("=" * 80)
+
+ log("š¦ Creating LocalLLMProvider instance...")
+ log(" Model: Qwen/Qwen2.5-3B-Instruct")
+ log(" ā ļø IMPORTANT: Model download happens on FIRST LLM call")
+ llm = LocalLLMProvider(
+ model_name="Qwen/Qwen2.5-3B-Instruct",
+ device="auto",
+ load_in_4bit=False, # Set to True to save memory
+ torch_dtype="auto",
+ )
+ log("ā
LocalLLMProvider instance created (model not loaded yet)")
+
+ # OPTIONAL: Pre-load the model now
+ log("\nš Pre-loading model (this will download if not cached)...")
+ log(" ā ļø This is where the download happens - watch for progress!")
+ try:
+ log(" š Calling model to trigger download/loading...")
+ log(" ā³ This may take 5-15 minutes on first run (~6GB download)")
+ response = llm.generate(
+ system_prompt="You are a helpful assistant.",
+ user_prompt="Say 'ready'",
+ max_new_tokens=50,
+ )
+ log(f" ā
Model loaded successfully! Response: {response.content[:50]}...")
+ except Exception as e:
+ log(f" ā Model loading failed: {e}")
+ log(" Continuing anyway - model will load on first agent call")
+ traceback.print_exc()
+
+ log(f"ā
Using local LLM: {llm.model_name}")
+
+ # ========================================================================
+ # CREATE MULTI-STEP AGENT
+ # ========================================================================
+ log("\n" + "=" * 80)
+ log("š Creating MultiStepSentienceAgent")
+ log("=" * 80)
+
+ # Create AgentConfig for SentienceAgentAsync
+ agent_config = AgentConfig(
+ snapshot_limit=50,
+ temperature=0.0,
+ max_retries=3,
+ verify=True,
+ capture_screenshots=True,
+ screenshot_format="jpeg",
+ screenshot_quality=80,
+ show_overlay=True,
+ )
+
+ # Create multi-step agent
+ agent = MultiStepSentienceAgent(
+ browser=browser,
+ llm=llm,
+ trace_dir="traces",
+ sentience_api_key=os.getenv("SENTIENCE_API_KEY"),
+ agent_config=agent_config,
+ default_snapshot_limit=50,
+ verbose=True,
+ )
+ log("ā
MultiStepSentienceAgent created")
+
+ # ========================================================================
+ # DEFINE MULTI-STEP TASK
+ # ========================================================================
+ log("\n" + "=" * 80)
+ log("š Defining Multi-Step Task")
+ log("=" * 80)
+
+ task_steps = [
+ {
+ "goal": "Verify on Google search page",
+ "task": "You are on google.com. Verify you see the Google search interface with a search box.",
+ },
+ {
+ "goal": "Type 'Hacker News Show' in the search box",
+ "task": """Type "Hacker News Show" into the Google search box.
+
+ IMPORTANT:
+ 1. Find the search input (role="combobox" or "searchbox" with "Search" text)
+ 2. Use type_text action with its element ID to type "Hacker News Show"
+ 3. After typing, a dropdown with suggested search terms may appear - DO NOT click on any suggestions
+ 4. Wait a moment for the dropdown to appear, then proceed to click the "Google Search" button
+ 5. Do NOT press Enter key - click the search button instead
+ 6. Do NOT click on any autocomplete suggestions in the dropdown""",
+ },
+ {
+ "goal": "Click the Google Search button",
+ "task": """Click the "Google Search" button to submit the search.
+
+ IMPORTANT:
+ 1. Find the button (role="button" with "Google Search" text)
+ 2. Make sure you click the actual search BUTTON, not any autocomplete suggestions
+ 3. The button should be below or next to the search input box
+ 4. Use click action with the button's element ID
+ 5. Do NOT press Enter key
+ 6. Do NOT click on any dropdown suggestions""",
+ },
+ {
+ "goal": "Click 'Show | Hacker News' link",
+ "task": """Click the link with exact title "Show | Hacker News" in search results.
+
+ Find link element (role="link") with text "Show | Hacker News" (with pipe |). Use click action with its element ID. Only click this exact link, not others.""",
+ },
+ {
+ "goal": "Find the top 1 Show HN post",
+ "task": """On Hacker News Show page, identify the element ID of the first post in the list.
+
+ CRITICAL: This is an IDENTIFICATION task only. Do NOT click anything.
+
+ Find the first post element (role="link") in the list. The post should have "Show HN" in its title text.
+ Output the element ID using CLICK(id) format, but this is for identification only - the click will be prevented.
+ Example: If the first post has ID 631, output CLICK(631) but understand this is just to report the ID.""",
+ },
+ ]
+
+ log(f"ā
Defined {len(task_steps)} task steps")
+
+ # ========================================================================
+ # DEFINE VERIFICATION CALLBACKS
+ # ========================================================================
+ log("\n" + "=" * 80)
+ log("š Defining Verification Callbacks")
+ log("=" * 80)
+
+ def verify_step_1(runtime, step_idx, snapshot):
+ """Verify step 1: On Google search page."""
+ log(" Verifying: URL contains google.com")
+ passed = runtime.assert_(
+ url_contains("google.com"),
+ label="on_google",
+ required=True,
+ )
+ log(f" {'ā
' if passed else 'ā'} URL contains google.com: {passed}")
+ return passed
+
+ def verify_step_2(runtime, step_idx, snapshot):
+ """Verify step 2: Text typed in search box."""
+ # Verify we're still on Google
+ log(" Verifying: Still on google.com")
+ passed1 = runtime.assert_(
+ url_contains("google.com"),
+ label="still_on_google",
+ )
+ log(f" {'ā
' if passed1 else 'ā'} Still on google.com: {passed1}")
+ return passed1
+
+ def verify_step_3(runtime, step_idx, snapshot):
+ """Verify step 3: Search results page loaded."""
+ log(" Verifying: Search results contain 'Show | Hacker News'")
+ passed1 = runtime.assert_(
+ expect(E(text_contains="Show")).to_exist(),
+ label="search_results_contain_show",
+ )
+ log(f" {'ā
' if passed1 else 'ā'} Search results contain 'Show': {passed1}")
+
+ passed2 = runtime.assert_(
+ expect.text_present("Hacker News"),
+ label="hacker_news_text_present",
+ )
+ log(f" {'ā
' if passed2 else 'ā'} 'Hacker News' text present: {passed2}")
+
+ return passed1 and passed2
+
+ def verify_step_4(runtime, step_idx, snapshot):
+ """Verify step 4: On Show HN page."""
+ log(" Verifying: URL contains news.ycombinator.com/show")
+ passed1 = runtime.assert_(
+ url_contains("news.ycombinator.com/show"),
+ label="on_show_hn_page",
+ required=True,
+ )
+ log(f" {'ā
' if passed1 else 'ā'} URL contains news.ycombinator.com/show: {passed1}")
+
+ passed2 = runtime.assert_(
+ expect(E(text_contains="Show HN")).to_exist(),
+ label="show_hn_posts_visible",
+ )
+ log(f" {'ā
' if passed2 else 'ā'} Show HN posts visible: {passed2}")
+
+ return passed1 and passed2
+
+ def verify_step_5(runtime, step_idx, snapshot):
+ """Verify step 5: Top post found.
+
+ Note: The agent may have clicked the post (navigating away from Show HN page),
+ so we only verify that we're on a Hacker News page (either Show HN list or post detail).
+ The actual element text validation is done in multi_step_agent.py using the pre-agent snapshot.
+ """
+ if snapshot is None:
+ log(" ā ļø No snapshot available for verification - skipping")
+ return True # Don't fail verification if snapshot is unavailable
+
+ log(" Verifying: On Hacker News (either Show HN list or post detail page)")
+ # After clicking, we might be on the post detail page, so just check we're on HN
+ try:
+ passed = runtime.assert_(
+ url_contains("news.ycombinator.com"),
+ label="on_hackernews",
+ required=True,
+ )
+ log(f" {'ā
' if passed else 'ā'} On Hacker News page: {passed}")
+ except Exception as e:
+ log(f" ā ļø Verification assertion failed: {e}")
+ passed = False
+
+ # Note: We don't check for "Show HN" text or dominant list because:
+ # 1. If the agent clicked the post, we're on the detail page (no Show HN text)
+ # 2. The element text validation was already done in multi_step_agent.py using pre-agent snapshot
+ # 3. The task is to identify the element, not necessarily stay on the Show HN page
+
+ return passed
+
+ verification_callbacks = {
+ 1: verify_step_1,
+ 2: verify_step_2,
+ 3: verify_step_3,
+ 4: verify_step_4,
+ 5: verify_step_5,
+ }
+
+ log(f"ā
Defined {len(verification_callbacks)} verification callbacks")
+
+ # ========================================================================
+ # RUN MULTI-STEP TASK
+ # ========================================================================
+ log("\n" + "=" * 80)
+ log("š Running Multi-Step Task")
+ log("=" * 80)
+
+ results = await agent.run_multi_step(
+ task_steps=task_steps,
+ verification_callbacks=verification_callbacks,
+ max_retries=2,
+ )
+
+ log(f"\nā
Completed {len(results)} steps")
+
+ # ========================================================================
+ # FINAL VERIFICATION
+ # ========================================================================
+ log("\n" + "=" * 80)
+ log("š Final Task Verification")
+ log("=" * 80)
+
+ task_complete = await agent.assert_done(
+ expect(in_dominant_list().nth(0)).to_have_text_contains("Show HN"),
+ label="top_post_found",
+ )
+
+ if task_complete:
+ log("ā
Task completed successfully!")
+ else:
+ log("ā ļø Task may not be complete - check verification results")
+
+ # ========================================================================
+ # SUMMARY
+ # ========================================================================
+ log("\n" + "=" * 80)
+ log("š Verification Summary")
+ log("=" * 80)
+
+ summary = await agent.get_verification_summary()
+ log(f"Runtime available: {summary['runtime_available']}")
+ log(f"All assertions passed: {summary['all_assertions_passed']}")
+ log(f"Required assertions passed: {summary['required_assertions_passed']}")
+ if summary.get("trace_file"):
+ log(f"Trace file: {summary['trace_file']}")
+
+ except Exception as e:
+ log(f"\nā Error: {e}")
+ traceback.print_exc()
+ finally:
+ if browser:
+ log("\nš Closing browser...")
+ await browser.close()
+ log("ā
Browser closed")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())