diff --git a/CUSTOM_AGENT_DESIGN.md b/CUSTOM_AGENT_DESIGN.md
new file mode 100644
index 0000000000..3d0a9deeec
--- /dev/null
+++ b/CUSTOM_AGENT_DESIGN.md
@@ -0,0 +1,512 @@
+# Custom Agent Design: Full Control Over LLM Prompts
+
+## Executive Summary
+
+This document outlines the design for implementing a custom browser automation agent with full control over prompt construction, enabling:
+1. **Primary**: Sentience SDK snapshot elements as compact, token-efficient prompts
+2. **Fallback**: Vision-based prompts when Sentience snapshots fail
+3. **Token tracking**: Integration with browser-use's built-in token usage utilities
+4. **SDK integration**: Leveraging `SentienceContext` and other SDK backend modules
+
+## Current Architecture Analysis
+
+### Existing Agent Flow
+
+The current `browser_use.Agent` class follows this flow:
+
+```
+Agent.run()
+ └─> _prepare_context()
+ ├─> build_sentience_state() [optional, if Sentience SDK available]
+ │ └─> Injects Sentience prompt block via _add_context_message()
+ └─> _message_manager.create_state_messages()
+ └─> AgentMessagePrompt.get_user_message()
+ ├─> Builds browser state (DOM tree, screenshots)
+ ├─> Combines agent history, state, browser state
+ └─> Returns UserMessage with text + optional images
+ └─> _get_next_action()
+ └─> LLM.ainvoke(messages)
+ └─> TokenCostService tracks usage automatically
+```
+
+### Key Components
+
+1. **Agent** (`browser_use/agent/service.py`):
+ - Orchestrates the agent loop
+ - Manages browser session, tools, and state
+ - Calls `_prepare_context()` before each LLM call
+ - Handles action execution and retries
+
+2. **MessageManager** (`browser_use/agent/message_manager/service.py`):
+ - Manages conversation history
+ - Creates state messages via `create_state_messages()`
+ - Detects Sentience injection and reduces DOM size accordingly
+ - Handles vision mode (screenshots vs. text-only)
+
+3. **AgentMessagePrompt** (`browser_use/agent/prompts.py`):
+ - Builds the complete user message
+ - Combines: agent history, agent state, browser state, read state
+ - Handles vision mode (text + images vs. text-only)
+ - Formats DOM tree and screenshots
+
+4. **TokenCostService** (`browser_use/tokens/service.py`):
+ - Automatically tracks token usage when LLMs are registered
+ - Calculates costs based on model pricing
+ - Provides usage summaries and statistics
+
+5. **SentienceContext** (`sentience/backends/sentience_context.py`):
+ - Provides `build()` method that returns `SentienceContextState`
+ - `SentienceContextState` contains: `url`, `snapshot`, `prompt_block`
+ - Handles extension waiting, snapshot retries, and formatting
+
+## Design Goals
+
+### 1. Primary: Sentience Snapshot as Preferred Prompt
+
+**Requirement**: Use Sentience SDK snapshot elements as the primary, compact prompt format.
+
+**Implementation Strategy**:
+- Use `SentienceContext.build()` to get snapshot and formatted prompt
+- Inject the `prompt_block` as the primary browser state representation
+- Skip or minimize DOM tree extraction when Sentience is available
+- Format: `ID|role|text|imp|is_primary|docYq|ord|DG|href`
+
+**Benefits**:
+- **Token efficiency**: ~60 elements × ~50 chars = ~3K tokens vs. ~40K tokens for full DOM
+- **Semantic accuracy**: Importance scores and dominant group detection
+- **Ordinal support**: Built-in support for "first", "third", etc. via `ord` and `DG` fields
+
+### 2. Fallback: Vision Mode When Snapshot Fails
+
+**Requirement**: Automatically fall back to vision-based prompts if Sentience snapshot fails.
+
+**Failure Scenarios**:
+- Extension not loaded
+- Snapshot timeout
+- Network errors
+- Invalid snapshot response
+
+**Implementation Strategy**:
+- Try `SentienceContext.build()` first
+- If `None` returned, fall back to vision mode:
+ - Enable screenshots (`use_vision=True`)
+ - Use full DOM tree (truncated to ~40K chars)
+ - Include browser state summary
+
+**Decision Logic**:
+```python
+sentience_state = await sentience_context.build(browser_session, goal=task)
+if sentience_state:
+ # Use Sentience prompt block
+ prompt = sentience_state.prompt_block
+ use_vision = False
+else:
+ # Fall back to vision
+ prompt = build_dom_state(browser_session)
+ use_vision = True
+```
+
+### 3. Token Usage Tracking
+
+**Requirement**: Use browser-use's built-in token usage utilities.
+
+**Implementation Strategy**:
+- Initialize `TokenCost` service with `calculate_cost=True`
+- Register LLM instance: `token_cost_service.register_llm(llm)`
+- Token tracking happens automatically via wrapped `ainvoke()` method
+- Access usage via:
+ - `agent.token_cost_service.get_usage_summary()`
+ - `history.usage` (from `agent.run()`)
+
+**Token Tracking Flow**:
+```
+LLM.ainvoke(messages)
+ └─> [wrapped by TokenCostService]
+ ├─> original_ainvoke(messages)
+ │ └─> Returns result with result.usage
+ └─> token_cost_service.add_usage(model, usage)
+ └─> Tracks in usage_history
+```
+
+### 4. SDK Integration
+
+**Requirement**: Use `SentienceContext` and other SDK backend modules.
+
+**Available SDK Components**:
+- `SentienceContext` (`sentience/backends/sentience_context.py`):
+ - `build(browser_session, goal=...)` → `SentienceContextState | None`
+ - Handles extension waiting, snapshot, formatting
+- `BrowserUseAdapter` (`sentience/backends/browser_use_adapter.py`):
+ - Adapts browser-use `BrowserSession` to Sentience backend interface
+- `snapshot()` (`sentience/backends/snapshot.py`):
+ - Low-level snapshot function (used by `SentienceContext`)
+
+**Integration Points**:
+- Use `SentienceContext` as the primary interface (recommended)
+- Or use `BrowserUseAdapter` + `snapshot()` directly for more control
+
+## Proposed Architecture
+
+### Custom Agent Class Structure
+
+```python
+class CustomSentienceAgent:
+ """
+ Custom agent with full control over prompt construction.
+
+ Features:
+ - Primary: Sentience snapshot as compact prompt
+ - Fallback: Vision mode when snapshot fails
+ - Token usage tracking
+ - Full control over message construction
+ """
+
+ def __init__(
+ self,
+ task: str,
+ llm: BaseChatModel,
+ browser_session: BrowserSession,
+ tools: Tools,
+ # Sentience configuration
+ sentience_api_key: str | None = None,
+ sentience_use_api: bool | None = None,
+ sentience_max_elements: int = 60,
+ sentience_show_overlay: bool = False,
+ # Vision fallback configuration
+ vision_fallback_enabled: bool = True,
+ vision_detail_level: Literal['auto', 'low', 'high'] = 'auto',
+ # Token tracking
+ calculate_cost: bool = True,
+ # Other agent settings
+ max_steps: int = 100,
+ use_vision: bool = False, # Default: prefer Sentience over vision
+ ...
+ ):
+ self.task = task
+ self.llm = llm
+ self.browser_session = browser_session
+ self.tools = tools
+
+ # Initialize SentienceContext
+ self.sentience_context = SentienceContext(
+ sentience_api_key=sentience_api_key,
+ use_api=sentience_use_api,
+ max_elements=sentience_max_elements,
+ show_overlay=sentience_show_overlay,
+ )
+
+ # Initialize token cost service
+ self.token_cost_service = TokenCost(include_cost=calculate_cost)
+ self.token_cost_service.register_llm(llm)
+
+ # Vision fallback settings
+ self.vision_fallback_enabled = vision_fallback_enabled
+ self.vision_detail_level = vision_detail_level
+ self.use_vision = use_vision # Can be overridden by fallback logic
+
+ # Message manager for conversation history
+ self.message_manager = CustomMessageManager(...)
+
+ async def run(self) -> AgentHistoryList:
+ """Main agent loop with custom prompt construction."""
+ # Similar to Agent.run() but with custom _prepare_context()
+ ...
+
+ async def _prepare_context(self) -> tuple[UserMessage, bool]:
+ """
+ Prepare context with Sentience-first, vision-fallback strategy.
+
+ Returns:
+ (user_message, sentience_used): Tuple of message and whether Sentience was used
+ """
+ # Try Sentience first
+ sentience_state = await self.sentience_context.build(
+ self.browser_session,
+ goal=self.task,
+ )
+
+ if sentience_state:
+ # Use Sentience prompt block
+ user_message = self._build_sentience_message(sentience_state)
+ return user_message, True
+ else:
+ # Fall back to vision
+ if self.vision_fallback_enabled:
+ user_message = await self._build_vision_message()
+ return user_message, False
+ else:
+ # No fallback: return minimal message
+ user_message = self._build_minimal_message()
+ return user_message, False
+
+ def _build_sentience_message(self, sentience_state: SentienceContextState) -> UserMessage:
+ """Build user message using Sentience prompt block."""
+ # Combine agent history + Sentience prompt block
+ content = (
+ f"\n{self.message_manager.get_history_description()}\n\n\n"
+ f"\n{sentience_state.prompt_block}\n\n"
+ )
+ return UserMessage(content=content, cache=True)
+
+ async def _build_vision_message(self) -> UserMessage:
+ """Build user message using vision (screenshots + DOM)."""
+ # Get browser state summary with screenshots
+ browser_state = await self.browser_session.get_browser_state_summary(
+ include_screenshot=True
+ )
+
+ # Build DOM state description
+ dom_state = self._build_dom_state(browser_state)
+
+ # Combine with screenshots
+ content_parts = [
+ ContentPartTextParam(text=dom_state),
+ # Add screenshots...
+ ]
+
+ return UserMessage(content=content_parts, cache=True)
+```
+
+### Message Construction Flow
+
+```
+_prepare_context()
+ ├─> Try: sentience_context.build(browser_session, goal=task)
+ │ ├─> Success: _build_sentience_message()
+ │ │ └─> Returns: UserMessage with Sentience prompt block
+ │ └─> Failure: None returned
+ │
+ └─> Fallback (if sentience_state is None):
+ ├─> vision_fallback_enabled?
+ │ ├─> Yes: _build_vision_message()
+ │ │ └─> Returns: UserMessage with screenshots + DOM
+ │ └─> No: _build_minimal_message()
+ │ └─> Returns: UserMessage with minimal state
+```
+
+### Integration with Existing Components
+
+#### 1. Browser Session
+- **Reuse**: `BrowserSession` from browser-use
+- **No changes needed**: Works with existing browser session
+
+#### 2. Tools
+- **Reuse**: `Tools` registry from browser-use
+- **No changes needed**: Same tool interface
+
+#### 3. Token Cost Service
+- **Reuse**: `TokenCost` from browser-use
+- **Integration**: Register LLM and access usage summaries
+
+#### 4. Message Manager
+- **Custom**: Create `CustomMessageManager` that:
+ - Manages conversation history (similar to existing `MessageManager`)
+ - Does NOT automatically inject Sentience (we handle it explicitly)
+ - Provides history description for prompt construction
+
+## Implementation Plan
+
+### Phase 1: Core Custom Agent (Week 1)
+
+**Tasks**:
+1. Create `CustomSentienceAgent` class skeleton
+2. Implement `_prepare_context()` with Sentience-first logic
+3. Implement `_build_sentience_message()` using `SentienceContext`
+4. Implement basic agent loop (`run()` method)
+5. Integrate token cost service
+
+**Deliverables**:
+- `custom_sentience_agent.py` with basic functionality
+- Unit tests for prompt construction logic
+
+### Phase 2: Vision Fallback (Week 1-2)
+
+**Tasks**:
+1. Implement `_build_vision_message()` with screenshots
+2. Implement `_build_dom_state()` for DOM tree extraction
+3. Add fallback decision logic
+4. Test fallback scenarios (extension not loaded, timeout, etc.)
+
+**Deliverables**:
+- Complete fallback implementation
+- Integration tests for fallback scenarios
+
+### Phase 3: Message Manager Integration (Week 2)
+
+**Tasks**:
+1. Create `CustomMessageManager` for history management
+2. Integrate with agent loop
+3. Handle system messages and tool definitions
+4. Test conversation history tracking
+
+**Deliverables**:
+- `custom_message_manager.py`
+- History tracking tests
+
+### Phase 4: Advanced Features (Week 2-3)
+
+**Tasks**:
+1. Add configuration options (max_elements, show_overlay, etc.)
+2. Add logging and observability
+3. Add error handling and retries
+4. Performance optimization
+
+**Deliverables**:
+- Production-ready custom agent
+- Documentation and examples
+
+## Code Structure
+
+```
+browser_use/
+ integrations/
+ sentience/
+ custom_agent.py # CustomSentienceAgent class
+ custom_message_manager.py # CustomMessageManager class
+ prompt_builder.py # Prompt construction utilities
+ examples/
+ custom_agent_example.py # Example usage
+```
+
+## Example Usage
+
+```python
+from browser_use import BrowserSession, Tools, ChatBrowserUse
+from browser_use.integrations.sentience.custom_agent import CustomSentienceAgent
+from sentience import get_extension_dir
+from browser_use import BrowserProfile
+
+async def main():
+ # Setup browser with Sentience extension
+ sentience_ext_path = get_extension_dir()
+ browser_profile = BrowserProfile(
+ args=[f"--load-extension={sentience_ext_path}"]
+ )
+ browser_session = BrowserSession(browser_profile=browser_profile)
+ await browser_session.start()
+
+ # Initialize custom agent
+ llm = ChatBrowserUse()
+ tools = Tools() # Use default tools
+
+ agent = CustomSentienceAgent(
+ task="Find the number 1 post on Show HN",
+ llm=llm,
+ browser_session=browser_session,
+ tools=tools,
+ # Sentience configuration
+ sentience_api_key=os.getenv("SENTIENCE_API_KEY"),
+ sentience_max_elements=60,
+ sentience_show_overlay=True,
+ # Vision fallback
+ vision_fallback_enabled=True,
+ vision_detail_level='auto',
+ # Token tracking
+ calculate_cost=True,
+ # Agent settings
+ max_steps=100,
+ use_vision=False, # Prefer Sentience over vision
+ )
+
+ # Run agent
+ history = await agent.run()
+
+ # Get token usage
+ usage_summary = await agent.token_cost_service.get_usage_summary()
+ print(f"Token usage: {usage_summary}")
+
+ # Check if Sentience was used
+ sentience_used = history.metadata.get('sentience_used', False)
+ print(f"Sentience used: {sentience_used}")
+
+if __name__ == "__main__":
+ asyncio.run(main())
+```
+
+## Benefits of This Design
+
+### 1. Token Efficiency
+- **Sentience mode**: ~3K tokens per step (60 elements × ~50 chars)
+- **Vision mode**: ~40K tokens per step (full DOM + screenshots)
+- **Savings**: ~92% token reduction when Sentience is available
+
+### 2. Reliability
+- **Automatic fallback**: No manual intervention needed
+- **Graceful degradation**: Works even if extension fails
+- **Error handling**: Robust retry logic for snapshots
+
+### 3. Flexibility
+- **Full control**: Customize prompt construction
+- **Configurable**: Adjust Sentience and vision settings
+- **Extensible**: Easy to add new prompt strategies
+
+### 4. Integration
+- **Reuses existing components**: Browser session, tools, token tracking
+- **SDK compatibility**: Uses official Sentience SDK interfaces
+- **Backward compatible**: Can coexist with existing Agent class
+
+## Challenges and Mitigations
+
+### Challenge 1: Extension Loading Timing
+**Issue**: Extension may not be ready when agent starts.
+
+**Mitigation**:
+- `SentienceContext.build()` already handles extension waiting
+- Can increase `wait_for_extension_ms` parameter
+- Fallback to vision if extension never loads
+
+### Challenge 2: Snapshot Failures
+**Issue**: Snapshot may fail due to network, timeout, or extension issues.
+
+**Mitigation**:
+- Automatic fallback to vision mode
+- Retry logic in `SentienceContext.build()`
+- Configurable retry count and delays
+
+### Challenge 3: Token Tracking Accuracy
+**Issue**: Need to track tokens for both Sentience and vision modes.
+
+**Mitigation**:
+- `TokenCostService` automatically tracks all LLM calls
+- No manual token counting needed
+- Usage summaries include both modes
+
+### Challenge 4: Message Format Consistency
+**Issue**: Sentience and vision messages have different formats.
+
+**Mitigation**:
+- Use consistent message structure (agent_history + browser_state)
+- LLM adapts to different browser_state formats
+- Can add format indicators if needed
+
+## Testing Strategy
+
+### Unit Tests
+- Prompt construction logic
+- Fallback decision logic
+- Message formatting
+
+### Integration Tests
+- Full agent loop with Sentience
+- Full agent loop with vision fallback
+- Token usage tracking
+- Extension loading scenarios
+
+### Performance Tests
+- Token usage comparison (Sentience vs. vision)
+- Latency comparison
+- Memory usage
+
+## Future Enhancements
+
+1. **Hybrid Mode**: Use both Sentience and vision (Sentience for structure, vision for visual confirmation)
+2. **Adaptive Selection**: Automatically choose best mode based on page type
+3. **Caching**: Cache Sentience snapshots to reduce API calls
+4. **Streaming**: Stream snapshot results as they become available
+5. **Multi-page**: Handle multiple pages/tabs with different strategies
+
+## Conclusion
+
+This design provides a clean, flexible architecture for implementing a custom agent with full control over prompt construction. The Sentience-first, vision-fallback strategy maximizes token efficiency while maintaining reliability. Integration with existing browser-use components minimizes code duplication and leverages proven functionality.
+
+The implementation can be done incrementally, starting with core functionality and adding advanced features over time. The modular design allows for easy testing and maintenance.
diff --git a/browser_use/integrations/sentience/__init__.py b/browser_use/integrations/sentience/__init__.py
new file mode 100644
index 0000000000..fbf80822ca
--- /dev/null
+++ b/browser_use/integrations/sentience/__init__.py
@@ -0,0 +1,15 @@
+"""Sentience integration for browser-use."""
+
+from browser_use.integrations.sentience.agent import (
+ SentienceAgent,
+ SentienceAgentConfig,
+ SentienceAgentSettings,
+ VisionFallbackConfig,
+)
+
+__all__ = [
+ "SentienceAgent",
+ "SentienceAgentConfig",
+ "SentienceAgentSettings",
+ "VisionFallbackConfig",
+]
diff --git a/browser_use/integrations/sentience/agent.py b/browser_use/integrations/sentience/agent.py
new file mode 100644
index 0000000000..5857c8cb00
--- /dev/null
+++ b/browser_use/integrations/sentience/agent.py
@@ -0,0 +1,1390 @@
+"""
+SentienceAgent: Custom agent with full control over prompt construction.
+
+This agent uses Sentience SDK snapshots as the primary, compact prompt format,
+with automatic fallback to vision mode when snapshots fail.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Literal
+
+from pydantic import BaseModel, Field
+
+from browser_use.llm.base import BaseChatModel
+from browser_use.llm.messages import SystemMessage, UserMessage
+from browser_use.tokens.service import TokenCost
+from browser_use.tokens.views import UsageSummary
+
+if TYPE_CHECKING:
+ from browser_use.browser.session import BrowserSession
+ from browser_use.tools.service import Tools
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class SentienceAgentConfig:
+ """Configuration for Sentience snapshot behavior."""
+
+ sentience_api_key: str | None = None
+ """Sentience API key for gateway mode."""
+
+ sentience_use_api: bool | None = None
+ """Force API vs extension mode (auto-detected if None)."""
+
+ sentience_max_elements: int = 60
+ """Maximum elements to fetch from snapshot."""
+
+ sentience_show_overlay: bool = False
+ """Show visual overlay highlighting elements in browser."""
+
+ sentience_wait_for_extension_ms: int = 5000
+ """Maximum time to wait for extension injection (milliseconds)."""
+
+ sentience_retries: int = 2
+ """Number of retry attempts on snapshot failure."""
+
+ sentience_retry_delay_s: float = 1.0
+ """Delay between retries in seconds."""
+
+
+@dataclass
+class VisionFallbackConfig:
+ """Configuration for vision fallback behavior."""
+
+ enabled: bool = True
+ """Whether to fall back to vision mode when Sentience fails."""
+
+ detail_level: Literal['auto', 'low', 'high'] = 'auto'
+ """Vision detail level for screenshots."""
+
+ include_screenshots: bool = True
+ """Whether to include screenshots in vision fallback."""
+
+
+class SentienceAgentSettings(BaseModel):
+ """Settings for SentienceAgent."""
+
+ task: str = Field(..., description="The task for the agent to complete")
+ max_steps: int = Field(default=100, description="Maximum number of steps")
+ max_failures: int = Field(default=3, description="Maximum consecutive failures before stopping")
+ calculate_cost: bool = Field(default=True, description="Track token usage and costs")
+ llm_timeout: int = Field(default=60, description="Timeout for LLM calls in seconds")
+ step_timeout: int = Field(default=120, description="Timeout for each step in seconds")
+
+ # Sentience configuration
+ sentience_config: SentienceAgentConfig = Field(
+ default_factory=SentienceAgentConfig,
+ description="Configuration for Sentience snapshot behavior"
+ )
+
+ # Vision fallback configuration
+ vision_fallback: VisionFallbackConfig = Field(
+ default_factory=VisionFallbackConfig,
+ description="Configuration for vision fallback behavior"
+ )
+
+
+class SentienceAgent:
+ """
+ Custom agent with full control over prompt construction.
+
+ Features:
+ - Primary: Sentience snapshot as compact prompt (~3K tokens)
+ - Fallback: Vision mode when snapshot fails (~40K tokens)
+ - Token usage tracking via browser-use utilities
+ - Clear isolation from built-in vision model
+ """
+
+ def __init__(
+ self,
+ task: str,
+ llm: BaseChatModel,
+ browser_session: BrowserSession,
+ tools: Tools | None = None,
+ *,
+ # Sentience configuration
+ sentience_api_key: str | None = None,
+ sentience_use_api: bool | None = None,
+ sentience_max_elements: int = 40,
+ sentience_show_overlay: bool = False,
+ sentience_wait_for_extension_ms: int = 5000,
+ sentience_retries: int = 2,
+ sentience_retry_delay_s: float = 1.0,
+ # Vision fallback configuration
+ vision_fallback_enabled: bool = True,
+ vision_detail_level: Literal['auto', 'low', 'high'] = 'auto',
+ vision_include_screenshots: bool = True,
+ vision_llm: BaseChatModel | None = None,
+ # Token tracking
+ calculate_cost: bool = True,
+ # Agent settings
+ max_steps: int = 100,
+ max_failures: int = 3,
+ llm_timeout: int = 60,
+ step_timeout: int = 120,
+ **kwargs,
+ ):
+ """
+ Initialize SentienceAgent.
+
+ Args:
+ task: The task for the agent to complete
+ llm: Language model to use (primary model for Sentience snapshots)
+ browser_session: Browser session instance
+ tools: Tools registry (optional)
+ sentience_api_key: Sentience API key for gateway mode
+ sentience_use_api: Force API vs extension mode
+ sentience_max_elements: Maximum elements in snapshot
+ sentience_show_overlay: Show visual overlay
+ sentience_wait_for_extension_ms: Wait time for extension
+ sentience_retries: Number of snapshot retries
+ sentience_retry_delay_s: Delay between retries
+ vision_fallback_enabled: Enable vision fallback
+ vision_detail_level: Vision detail level
+ vision_include_screenshots: Include screenshots in fallback
+ vision_llm: Optional vision-capable LLM for vision fallback mode.
+ If None, uses the primary `llm` for vision fallback too.
+ calculate_cost: Track token usage
+ max_steps: Maximum steps
+ max_failures: Maximum failures
+ llm_timeout: LLM timeout
+ step_timeout: Step timeout
+ """
+ self.task = task
+ self.llm = llm
+ self.vision_llm = vision_llm # Optional vision-capable model for fallback
+ self.browser_session = browser_session
+
+ # Initialize tools if not provided
+ if tools is None:
+ from browser_use.tools.service import Tools
+ self.tools = Tools()
+ else:
+ self.tools = tools
+
+ # Initialize file system for actions that require it (e.g., done action)
+ from browser_use.filesystem.file_system import FileSystem
+ import tempfile
+ self.file_system = FileSystem(base_dir=tempfile.mkdtemp(prefix="sentience_agent_"))
+
+ # Build settings
+ sentience_config = SentienceAgentConfig(
+ sentience_api_key=sentience_api_key,
+ sentience_use_api=sentience_use_api,
+ sentience_max_elements=sentience_max_elements,
+ sentience_show_overlay=sentience_show_overlay,
+ sentience_wait_for_extension_ms=sentience_wait_for_extension_ms,
+ sentience_retries=sentience_retries,
+ sentience_retry_delay_s=sentience_retry_delay_s,
+ )
+ vision_fallback = VisionFallbackConfig(
+ enabled=vision_fallback_enabled,
+ detail_level=vision_detail_level,
+ include_screenshots=vision_include_screenshots,
+ )
+ self.settings = SentienceAgentSettings(
+ task=task,
+ max_steps=max_steps,
+ max_failures=max_failures,
+ calculate_cost=calculate_cost,
+ llm_timeout=llm_timeout,
+ step_timeout=step_timeout,
+ sentience_config=sentience_config,
+ vision_fallback=vision_fallback,
+ )
+
+ # Initialize SentienceContext (lazy import to avoid hard dependency)
+ self._sentience_context: Any | None = None
+
+ # Initialize token cost service
+ self.token_cost_service = TokenCost(include_cost=calculate_cost)
+ self.token_cost_service.register_llm(llm)
+
+ # Initialize message manager for history tracking
+ from browser_use.integrations.sentience.message_manager import CustomMessageManager
+
+ system_message = self._get_system_message()
+ self.message_manager = CustomMessageManager(
+ task=task,
+ system_message=system_message,
+ max_history_items=4, # Keep recent history for context (0 may cause issues with some LLMs)
+ )
+
+ # Track state
+ self._current_step = 0
+ self._consecutive_failures = 0
+ self._sentience_used_in_last_step = False
+ self._current_sentience_state: Any | None = None # Store current Sentience snapshot for element lookup
+
+ logger.info(
+ f"Initialized SentienceAgent: task='{task}', "
+ f"sentience_max_elements={sentience_max_elements}, "
+ f"vision_fallback={'enabled' if vision_fallback_enabled else 'disabled'}"
+ )
+
+ def _get_sentience_context(self) -> Any:
+ """Get or create SentienceContext instance."""
+ if self._sentience_context is None:
+ try:
+ from sentience.backends import SentienceContext
+
+ self._sentience_context = SentienceContext(
+ sentience_api_key=self.settings.sentience_config.sentience_api_key,
+ use_api=self.settings.sentience_config.sentience_use_api,
+ max_elements=self.settings.sentience_config.sentience_max_elements,
+ show_overlay=self.settings.sentience_config.sentience_show_overlay,
+ )
+ except ImportError as e:
+ logger.info(f"Sentience SDK not available: {e}")
+ raise ImportError(
+ "Sentience SDK is required for SentienceAgent. "
+ "Install it with: pip install sentienceapi"
+ ) from e
+ return self._sentience_context
+
+ async def _prepare_context(self) -> tuple[UserMessage, bool]:
+ """
+ Prepare context with Sentience-first, vision-fallback strategy.
+
+ Returns:
+ (user_message, sentience_used): Tuple of message and whether Sentience was used
+ """
+ # Try Sentience first
+ sentience_state = await self._try_sentience_snapshot()
+
+ if sentience_state:
+ # Store current Sentience state for element lookup during action execution
+ self._current_sentience_state = sentience_state
+ # Use Sentience prompt block
+ user_message = await self._build_sentience_message(sentience_state)
+ self._sentience_used_in_last_step = True
+ logger.info("✅ Using Sentience snapshot for prompt")
+ return user_message, True
+ else:
+ # Clear Sentience state if snapshot failed
+ self._current_sentience_state = None
+ # Fall back to vision
+ if self.settings.vision_fallback.enabled:
+ user_message = await self._build_vision_message()
+ self._sentience_used_in_last_step = False
+ logger.info("⚠️ Sentience failed, falling back to vision mode")
+ return user_message, False
+ else:
+ # No fallback: return minimal message
+ user_message = self._build_minimal_message()
+ self._sentience_used_in_last_step = False
+ logger.info("⚠️ Sentience failed and vision fallback disabled, using minimal message")
+ return user_message, False
+
+ async def _try_sentience_snapshot(self) -> Any | None:
+ """
+ Attempt to get Sentience snapshot.
+
+ Returns:
+ SentienceContextState if successful, None otherwise
+ """
+ try:
+ # CRITICAL: Check if we're on about:blank - Sentience extension doesn't inject there
+ # The extension's content scripts only inject on actual URLs ( doesn't include about:blank)
+ current_url = await self.browser_session.get_current_page_url()
+ if current_url == 'about:blank' or not current_url or current_url.startswith('about:'):
+ logger.info(
+ f"⚠️ Current page is '{current_url}' - Sentience extension doesn't inject on about:blank. "
+ f"Extracting URL from task or navigating to default page..."
+ )
+
+ # Try to extract URL from task
+ import re
+ url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
+ urls = re.findall(url_pattern, self.task)
+
+ if urls:
+ target_url = urls[0]
+ logger.info(f"📍 Found URL in task: {target_url} - navigating...")
+ else:
+ # Default to a simple page if no URL in task
+ # The agent will navigate to the actual target page in the next step
+ target_url = "https://www.google.com"
+ logger.info(f"📍 No URL in task - navigating to default page: {target_url}")
+
+ # Navigate to a real URL so extension can inject
+ await self.browser_session.navigate_to(target_url)
+
+ # Wait a moment for navigation and extension injection
+ await asyncio.sleep(1.0)
+
+ # Verify we're no longer on about:blank
+ new_url = await self.browser_session.get_current_page_url()
+ if new_url == 'about:blank' or new_url.startswith('about:'):
+ logger.warning(f"⚠️ Navigation may have failed, still on: {new_url}")
+ else:
+ logger.info(f"✅ Navigated to: {new_url}")
+
+ sentience_context = self._get_sentience_context()
+ logger.info(
+ f"Attempting Sentience snapshot on URL: {await self.browser_session.get_current_page_url()}, "
+ f"wait_for_extension_ms={self.settings.sentience_config.sentience_wait_for_extension_ms}, "
+ f"retries={self.settings.sentience_config.sentience_retries}, "
+ f"use_api={self.settings.sentience_config.sentience_use_api}"
+ )
+ sentience_state = await sentience_context.build(
+ self.browser_session,
+ goal=self.task,
+ wait_for_extension_ms=self.settings.sentience_config.sentience_wait_for_extension_ms,
+ retries=self.settings.sentience_config.sentience_retries,
+ retry_delay_s=self.settings.sentience_config.sentience_retry_delay_s,
+ )
+ if sentience_state:
+ num_elements = len(sentience_state.snapshot.elements) if hasattr(sentience_state, 'snapshot') else 'unknown'
+ logger.info(f"✅ Sentience snapshot successful: {num_elements} elements")
+
+ # Log overlay status (SDK handles overlay display during snapshot if show_overlay=True)
+ if self.settings.sentience_config.sentience_show_overlay:
+ logger.info(
+ f"🎨 Overlay should be visible in browser (auto-clears after 5 seconds). "
+ f"Elements highlighted: {num_elements}"
+ )
+ else:
+ logger.debug("Overlay disabled (sentience_show_overlay=False)")
+ return sentience_state
+ except Exception as e:
+ error_type = type(e).__name__
+ error_msg = str(e)
+ logger.info(
+ f"❌ Sentience snapshot failed: {error_type}: {error_msg}\n"
+ f" This usually means:\n"
+ f" - Extension not injected (check if extension is loaded in browser)\n"
+ f" - Extension injection timeout (increase wait_for_extension_ms)\n"
+ f" - Snapshot API call failed (check network/API key)\n"
+ f" - Page not ready (wait for page load to complete)"
+ )
+ import traceback
+ logger.debug(f"Sentience snapshot failure traceback:\n{traceback.format_exc()}")
+ return None
+
+ def _find_element_in_snapshot(self, snapshot: Any, element_id: int | None = None, text: str | None = None) -> Any | None:
+ """
+ Find an element in Sentience snapshot using SDK's find() function.
+
+ Args:
+ snapshot: Sentience Snapshot object
+ element_id: Element ID to find (backend_node_id)
+ text: Text to search for (uses SDK's text matching)
+
+ Returns:
+ Element if found, None otherwise
+ """
+ if not hasattr(snapshot, 'elements'):
+ return None
+
+ # If searching by ID, iterate directly (most efficient)
+ if element_id is not None:
+ for el in snapshot.elements:
+ if hasattr(el, 'id') and el.id == element_id:
+ return el
+
+ # If searching by text, use SDK's find() function
+ if text:
+ try:
+ from sentience.query import find
+ # Try exact match first
+ element = find(snapshot, f"text='{text}'")
+ if element:
+ return element
+ # Fallback to contains match (case-insensitive)
+ element = find(snapshot, f"text~'{text[:50]}'") # Limit to 50 chars for contains
+ if element:
+ return element
+ except ImportError:
+ logger.debug("SDK query module not available, using direct iteration for text search")
+ # Fallback: iterate and match text manually
+ text_lower = text.lower()
+ for el in snapshot.elements:
+ if hasattr(el, 'text') and el.text and text_lower in el.text.lower():
+ return el
+
+ return None
+
+ async def _build_sentience_message(self, sentience_state: Any) -> UserMessage:
+ """
+ Build user message using Sentience prompt block.
+
+ Args:
+ sentience_state: SentienceContextState from SDK
+
+ Returns:
+ UserMessage with Sentience prompt block
+ """
+ # Get agent history from message manager
+ history_text = self._get_agent_history_description()
+
+ # Get read_state if available
+ read_state_text = ""
+ if self.message_manager.state.read_state_description:
+ read_state_text = (
+ f"\n\n{self.message_manager.state.read_state_description}\n\n"
+ )
+
+ # Include task in agent_state (required for LLM to know what to do)
+ agent_state_text = f"\n{self.task}\n"
+
+ # Extract and validate Sentience element IDs against browser-use selector_map
+ available_ids = []
+ if hasattr(sentience_state, 'snapshot') and hasattr(sentience_state.snapshot, 'elements'):
+ available_ids = [el.id for el in sentience_state.snapshot.elements if hasattr(el, 'id')]
+
+ # Get browser-use selector_map to check overlap
+ selector_map = await self.browser_session.get_selector_map()
+ if not selector_map:
+ # Trigger DOM build if selector_map is empty
+ from browser_use.browser.events import BrowserStateRequestEvent
+ event = self.browser_session.event_bus.dispatch(
+ BrowserStateRequestEvent(include_screenshot=False)
+ )
+ await event
+ await event.event_result(raise_if_any=True, raise_if_none=False)
+ selector_map = await self.browser_session.get_selector_map()
+
+ # Check which Sentience IDs exist in selector_map
+ selector_map_keys = set(selector_map.keys()) if selector_map else set()
+ sentience_ids_set = set(available_ids)
+ matching_ids = sentience_ids_set & selector_map_keys
+ missing_ids = sentience_ids_set - selector_map_keys
+
+ logger.info(
+ f"📋 Sentience snapshot: {len(available_ids)} elements, "
+ f"{len(matching_ids)} match selector_map, {len(missing_ids)} missing from selector_map"
+ )
+ if missing_ids:
+ missing_list = sorted(list(missing_ids))[:10]
+ logger.info(
+ f" ⚠️ Sentience IDs not in selector_map (first 10): {missing_list}"
+ f"{'...' if len(missing_ids) > 10 else ''} "
+ f"(These elements may not be interactive by browser-use's criteria)"
+ )
+
+ # Log the FULL Sentience prompt block for debugging
+ logger.info(
+ f"📋 Sentience prompt block ({len(sentience_state.prompt_block)} chars, "
+ f"~{len(sentience_state.prompt_block) // 4} tokens):\n"
+ f"{sentience_state.prompt_block}"
+ )
+
+ # Combine agent history + agent state + Sentience prompt block + read_state
+ # Note: We explicitly avoid screenshots here for clear isolation
+ content = (
+ f"\n{history_text}\n\n\n"
+ f"\n{agent_state_text}\n\n\n"
+ f"\n{sentience_state.prompt_block}\n"
+ f"{read_state_text}"
+ )
+
+ return UserMessage(content=content, cache=True)
+
+ async def _build_vision_message(self) -> UserMessage:
+ """
+ Build user message using vision (screenshots + DOM).
+
+ This is the fallback when Sentience fails. It uses browser-use's
+ built-in browser state summary with screenshots and full DOM tree.
+
+ Returns:
+ UserMessage with screenshots and comprehensive DOM state
+ """
+ # Get browser state summary with screenshots (only in fallback mode)
+ browser_state = await self.browser_session.get_browser_state_summary(
+ include_screenshot=self.settings.vision_fallback.include_screenshots
+ )
+
+ # Build comprehensive DOM state description (Phase 2: full DOM extraction)
+ dom_state = self._build_dom_state(browser_state)
+
+ # Get agent history from message manager
+ history_text = self._get_agent_history_description()
+
+ # Include task in agent_state (required for LLM to know what to do)
+ agent_state_text = f"\n{self.task}\n"
+
+ # Get read_state if available
+ read_state_text = ""
+ if self.message_manager.state.read_state_description:
+ read_state_text = (
+ f"\n\n{self.message_manager.state.read_state_description}\n\n"
+ )
+
+ # Combine into message
+ content = (
+ f"\n{history_text}\n\n\n"
+ f"\n{agent_state_text}\n\n\n"
+ f"\n{dom_state}\n"
+ f"{read_state_text}"
+ )
+
+ # If screenshots are enabled, add them to the message
+ if (
+ self.settings.vision_fallback.include_screenshots
+ and browser_state.screenshot
+ ):
+ from browser_use.llm.messages import (
+ ContentPartImageParam,
+ ContentPartTextParam,
+ ImageURL,
+ )
+
+ # Resize screenshot if needed (similar to AgentMessagePrompt)
+ screenshot = self._resize_screenshot_if_needed(browser_state.screenshot)
+
+ content_parts = [
+ ContentPartTextParam(text=content),
+ ContentPartTextParam(text="Current screenshot:"),
+ ContentPartImageParam(
+ image_url=ImageURL(
+ url=f"data:image/png;base64,{screenshot}",
+ media_type="image/png",
+ detail=self.settings.vision_fallback.detail_level,
+ )
+ ),
+ ]
+ return UserMessage(content=content_parts, cache=True)
+
+ return UserMessage(content=content, cache=True)
+
+ def _resize_screenshot_if_needed(self, screenshot_b64: str) -> str:
+ """
+ Resize screenshot if it's too large for the LLM.
+
+ Args:
+ screenshot_b64: Base64-encoded screenshot
+
+ Returns:
+ Resized screenshot as base64 string (or original if no resize needed)
+ """
+ # For Phase 2, we'll use a simple approach - return as-is
+ # In future phases, we can add actual resizing logic similar to AgentMessagePrompt
+ # For now, LLMs can handle reasonable screenshot sizes
+ return screenshot_b64
+
+ def _build_minimal_message(self) -> UserMessage:
+ """
+ Build minimal message when both Sentience and vision fallback are disabled.
+
+ Returns:
+ UserMessage with minimal state
+ """
+ history_text = self._get_agent_history_description()
+
+ # Include task in agent_state (required for LLM to know what to do)
+ agent_state_text = f"\n{self.task}\n"
+
+ read_state_text = ""
+ if self.message_manager.state.read_state_description:
+ read_state_text = (
+ f"\n\n{self.message_manager.state.read_state_description}\n\n"
+ )
+ content = (
+ f"\n{history_text}\n\n\n"
+ f"\n{agent_state_text}\n"
+ f"{read_state_text}"
+ )
+ return UserMessage(content=content, cache=True)
+
+ def _get_agent_history_description(self) -> str:
+ """
+ Get agent history description from message manager.
+
+ Returns:
+ History description string
+ """
+ return self.message_manager.agent_history_description
+
+ def _build_dom_state(self, browser_state: Any) -> str:
+ """
+ Build comprehensive DOM state description from browser state.
+
+ This is used in vision fallback mode to provide full DOM context
+ when Sentience snapshot is not available.
+
+ Args:
+ browser_state: BrowserStateSummary
+
+ Returns:
+ Complete DOM state description string with page info, stats, and DOM tree
+ """
+ from browser_use.dom.views import DEFAULT_INCLUDE_ATTRIBUTES, NodeType, SimplifiedNode
+
+ # Extract page information
+ url = getattr(browser_state, "url", None) or "unknown"
+ title = getattr(browser_state, "title", None) or "unknown"
+ page_info = getattr(browser_state, "page_info", None)
+ dom_state = getattr(browser_state, "dom_state", None)
+
+ # Build page statistics (similar to AgentMessagePrompt._extract_page_statistics)
+ page_stats = self._extract_page_statistics(browser_state)
+
+ # Format statistics for LLM
+ stats_text = ""
+ if page_stats["total_elements"] < 10:
+ stats_text += "Page appears empty (SPA not loaded?) - "
+ stats_text += (
+ f'{page_stats["links"]} links, {page_stats["interactive_elements"]} interactive, '
+ f'{page_stats["iframes"]} iframes, {page_stats["scroll_containers"]} scroll containers'
+ )
+ if page_stats["shadow_open"] > 0 or page_stats["shadow_closed"] > 0:
+ stats_text += (
+ f', {page_stats["shadow_open"]} shadow(open), '
+ f'{page_stats["shadow_closed"]} shadow(closed)'
+ )
+ if page_stats["images"] > 0:
+ stats_text += f', {page_stats["images"]} images'
+ stats_text += f', {page_stats["total_elements"]} total elements'
+ stats_text += "\n"
+
+ # Get DOM tree representation
+ elements_text = ""
+ if dom_state:
+ # Use the same method as AgentMessagePrompt to get LLM representation
+ try:
+ elements_text = dom_state.llm_representation(
+ include_attributes=DEFAULT_INCLUDE_ATTRIBUTES
+ )
+ except Exception as e:
+ logger.info(f"Error getting DOM representation: {e}")
+ elements_text = "Error extracting DOM tree"
+
+ # Truncate DOM if too long (default max for vision fallback: 40000 chars)
+ max_dom_length = 40000
+ if len(elements_text) > max_dom_length:
+ elements_text = elements_text[:max_dom_length]
+ truncated_text = f" (truncated to {max_dom_length} characters)"
+ else:
+ truncated_text = ""
+
+ # Build page info text
+ page_info_text = ""
+ has_content_above = False
+ has_content_below = False
+
+ if page_info:
+ pi = page_info
+ pages_above = pi.pixels_above / pi.viewport_height if pi.viewport_height > 0 else 0
+ pages_below = pi.pixels_below / pi.viewport_height if pi.viewport_height > 0 else 0
+ has_content_above = pages_above > 0
+ has_content_below = pages_below > 0
+ total_pages = pi.page_height / pi.viewport_height if pi.viewport_height > 0 else 0
+
+ page_info_text = ""
+ page_info_text += f"{pages_above:.1f} pages above, "
+ page_info_text += f"{pages_below:.1f} pages below, "
+ page_info_text += f"{total_pages:.1f} total pages"
+ page_info_text += "\n"
+
+ # Format elements text with page position indicators
+ if elements_text:
+ if has_content_above:
+ if page_info:
+ pages_above = (
+ page_info.pixels_above / page_info.viewport_height
+ if page_info.viewport_height > 0
+ else 0
+ )
+ elements_text = f"... {pages_above:.1f} pages above ...\n{elements_text}"
+ else:
+ elements_text = f"[Start of page]\n{elements_text}"
+ if not has_content_below:
+ elements_text = f"{elements_text}\n[End of page]"
+ else:
+ elements_text = "empty page"
+
+ # Build tabs information
+ tabs_text = ""
+ tabs = getattr(browser_state, "tabs", [])
+ if tabs:
+ tabs_text = "\n"
+ for tab in tabs:
+ tab_id = getattr(tab, "target_id", "unknown")
+ tab_url = getattr(tab, "url", "unknown")
+ tab_title = getattr(tab, "title", "unknown")
+ # Use last 4 chars of target_id for display
+ tab_id_short = tab_id[-4:] if isinstance(tab_id, str) and len(tab_id) >= 4 else str(tab_id)
+ tabs_text += f"Tab {tab_id_short}: {tab_url} - {tab_title[:30]}\n"
+ tabs_text += "\n"
+
+ # Combine all parts
+ dom_state_text = (
+ f"URL: {url}\n"
+ f"Title: {title}\n"
+ f"{stats_text}"
+ f"{page_info_text}"
+ f"{tabs_text}"
+ f"\n{elements_text}{truncated_text}\n"
+ )
+
+ return dom_state_text
+
+ def _extract_page_statistics(self, browser_state: Any) -> dict[str, int]:
+ """
+ Extract high-level page statistics from DOM tree.
+
+ Args:
+ browser_state: BrowserStateSummary
+
+ Returns:
+ Dictionary with page statistics
+ """
+ from browser_use.dom.views import NodeType, SimplifiedNode
+
+ stats = {
+ "links": 0,
+ "iframes": 0,
+ "shadow_open": 0,
+ "shadow_closed": 0,
+ "scroll_containers": 0,
+ "images": 0,
+ "interactive_elements": 0,
+ "total_elements": 0,
+ }
+
+ dom_state = getattr(browser_state, "dom_state", None)
+ if not dom_state or not hasattr(dom_state, "_root") or not dom_state._root:
+ return stats
+
+ def traverse_node(node: SimplifiedNode) -> None:
+ """Recursively traverse simplified DOM tree to count elements"""
+ if not node or not hasattr(node, "original_node") or not node.original_node:
+ return
+
+ original = node.original_node
+ stats["total_elements"] += 1
+
+ # Count by node type and tag
+ if original.node_type == NodeType.ELEMENT_NODE:
+ tag = original.tag_name.lower() if hasattr(original, "tag_name") and original.tag_name else ""
+
+ if tag == "a":
+ stats["links"] += 1
+ elif tag in ("iframe", "frame"):
+ stats["iframes"] += 1
+ elif tag == "img":
+ stats["images"] += 1
+
+ # Check if scrollable
+ if hasattr(original, "is_actually_scrollable") and original.is_actually_scrollable:
+ stats["scroll_containers"] += 1
+
+ # Check if interactive
+ if hasattr(node, "is_interactive") and node.is_interactive:
+ stats["interactive_elements"] += 1
+
+ # Check if this element hosts shadow DOM
+ if hasattr(node, "is_shadow_host") and node.is_shadow_host:
+ # Check if any shadow children are closed
+ has_closed_shadow = False
+ if hasattr(node, "children"):
+ for child in node.children:
+ if (
+ hasattr(child, "original_node")
+ and child.original_node
+ and child.original_node.node_type == NodeType.DOCUMENT_FRAGMENT_NODE
+ and hasattr(child.original_node, "shadow_root_type")
+ and child.original_node.shadow_root_type
+ and child.original_node.shadow_root_type.lower() == "closed"
+ ):
+ has_closed_shadow = True
+ break
+ if has_closed_shadow:
+ stats["shadow_closed"] += 1
+ else:
+ stats["shadow_open"] += 1
+
+ # Traverse children
+ if hasattr(node, "children"):
+ for child in node.children:
+ traverse_node(child)
+
+ traverse_node(dom_state._root)
+ return stats
+
+ async def run(self) -> Any:
+ """
+ Run the agent loop with full action execution and history tracking.
+
+ Returns:
+ Dictionary with execution results (will return AgentHistoryList in future phases)
+ """
+ from browser_use.agent.views import AgentOutput, AgentStepInfo, ActionResult
+
+ logger.info(f"Starting SentienceAgent: task='{self.task}'")
+
+ # Initialize browser session if needed (start() is idempotent)
+ await self.browser_session.start()
+
+ # Get AgentOutput type from tools registry
+ # Create action model from registered actions
+ action_model = self.tools.registry.create_action_model()
+ # Create AgentOutput type with custom actions
+ from browser_use.agent.views import AgentOutput
+ AgentOutputType = AgentOutput.type_with_custom_actions(action_model)
+
+ # Track execution history
+ execution_history: list[dict[str, Any]] = []
+ sentience_used_in_any_step = False # Track if Sentience was used in ANY step
+
+ # Main agent loop
+ for step in range(self.settings.max_steps):
+ self._current_step = step
+ step_info = AgentStepInfo(step_number=step, max_steps=self.settings.max_steps)
+ logger.info(f"📍 Step {step + 1}/{self.settings.max_steps}")
+
+ # Prepare context
+ try:
+ user_message, sentience_used = await self._prepare_context()
+ # Log token usage breakdown
+ message_content = str(user_message.content)
+ history_text = self.message_manager.agent_history_description
+ logger.info(
+ f"Context prepared: sentience_used={sentience_used}, "
+ f"message_length={len(message_content)} chars (~{len(message_content) // 4} tokens), "
+ f"history_length={len(history_text)} chars (~{len(history_text) // 4} tokens)"
+ )
+
+ # Get messages from message manager
+ messages = self.message_manager.get_messages(user_message=user_message)
+
+ # Select LLM: use vision_llm for vision fallback, primary llm for Sentience
+ active_llm = self.vision_llm if (not sentience_used and self.vision_llm is not None) else self.llm
+ if not sentience_used and self.vision_llm is not None:
+ logger.info("👁️ Using vision LLM for vision fallback mode")
+ elif sentience_used:
+ logger.info("📊 Using primary LLM for Sentience snapshot mode")
+
+ # Call LLM with structured output
+ # NOTE: For Hugging Face models, this is where model loading/downloading happens
+ logger.info("🤖 Calling LLM (this may trigger model download/loading for Hugging Face models)...")
+ kwargs: dict = {"output_format": AgentOutputType, "session_id": self.browser_session.id}
+ response = await asyncio.wait_for(
+ active_llm.ainvoke(messages, **kwargs),
+ timeout=self.settings.llm_timeout,
+ )
+ logger.info("✅ LLM response received")
+
+ # Parse AgentOutput from response
+ # Handle case where LLM returns string instead of structured output
+ if isinstance(response.completion, str):
+ logger.warning(
+ f"⚠️ LLM returned raw text instead of structured output. "
+ f"This may happen with smaller local models. Response: {response.completion[:200]}..."
+ )
+ # Try to parse as JSON manually with improved repair logic
+ try:
+ import json
+ import re
+
+ # Try to extract JSON from response (might be wrapped in markdown or have extra text)
+ json_text = response.completion.strip()
+
+ # Log the full response for debugging (truncated JSON issues)
+ logger.debug(f"Full LLM response ({len(json_text)} chars): {json_text[:1000]}...")
+
+ # Remove markdown code blocks if present
+ if json_text.startswith('```json'):
+ json_text = re.sub(r'^```json\s*', '', json_text, flags=re.MULTILINE)
+ json_text = re.sub(r'```\s*$', '', json_text, flags=re.MULTILINE)
+ elif json_text.startswith('```'):
+ json_text = re.sub(r'^```\s*', '', json_text, flags=re.MULTILINE)
+ json_text = re.sub(r'```\s*$', '', json_text, flags=re.MULTILINE)
+
+ # Try to find JSON object in the text (from first { to last })
+ json_match = re.search(r'\{.*', json_text, re.DOTALL)
+ if json_match:
+ json_text = json_match.group(0)
+
+ # Try to fix incomplete JSON (common with truncated responses)
+ # Count braces and brackets to see what's missing
+ open_braces = json_text.count('{')
+ close_braces = json_text.count('}')
+ open_brackets = json_text.count('[')
+ close_brackets = json_text.count(']')
+
+ # Find the last complete structure and close everything after it
+ # Strategy: Find the last complete key-value pair or array element, then close everything
+ if open_braces > close_braces or open_brackets > close_brackets:
+ logger.debug(
+ f"JSON appears incomplete: braces {open_braces}/{close_braces}, "
+ f"brackets {open_brackets}/{close_brackets}. Attempting repair..."
+ )
+
+ # Try to find where the JSON was cut off
+ # Look for incomplete strings, incomplete objects, etc.
+
+ # Close missing brackets first (they're usually nested inside objects)
+ if open_brackets > close_brackets:
+ missing_brackets = open_brackets - close_brackets
+ json_text += ']' * missing_brackets
+
+ # Close missing braces
+ if open_braces > close_braces:
+ missing_braces = open_braces - close_braces
+ json_text += '\n' + '}' * missing_braces
+
+ # Try to fix incomplete strings (if JSON was cut off mid-string)
+ # Count unescaped quotes
+ unescaped_quotes = len(re.findall(r'(? 0 and json_text[last_quote_pos - 1] != '\\':
+ # Check if we're in a string context
+ before_quote = json_text[:last_quote_pos]
+ # If the last quote is opening a string (not closing), add closing quote
+ if before_quote.count('"') % 2 == 0:
+ json_text = json_text[:last_quote_pos + 1] + '"' + json_text[last_quote_pos + 1:]
+
+ logger.debug(f"Repaired JSON ({len(json_text)} chars): {json_text[:500]}...")
+ parsed = json.loads(json_text)
+ model_output = AgentOutputType.model_validate(parsed)
+ except (json.JSONDecodeError, Exception) as e:
+ logger.error(f"Failed to parse LLM response as JSON: {e}")
+ # Log the problematic JSON for debugging
+ logger.error(f"Problematic JSON (first 800 chars): {json_text[:800]}")
+ logger.error(f"Full raw response length: {len(response.completion)} chars")
+
+ # Try one more aggressive repair: if JSON is clearly truncated, try to salvage what we can
+ try:
+ # Find the last complete field and create minimal valid JSON
+ # Look for the last complete key-value pair
+ last_comma = json_text.rfind(',')
+ last_colon = json_text.rfind(':')
+
+ if last_comma > 0 and last_colon > last_comma:
+ # We have at least one complete field
+ # Try to extract up to the last complete field and close it
+ # Find the last complete field by looking for pattern: "key": value,
+ field_pattern = r'"\w+":\s*[^,}]+,'
+ matches = list(re.finditer(field_pattern, json_text))
+ if matches:
+ last_match = matches[-1]
+ # Extract up to and including the last complete field
+ salvage_text = json_text[:last_match.end()]
+ # Close any open structures
+ salvage_text = salvage_text.rstrip(', \n')
+ if salvage_text.count('{') > salvage_text.count('}'):
+ salvage_text += '\n' + '}' * (salvage_text.count('{') - salvage_text.count('}'))
+ if salvage_text.count('[') > salvage_text.count(']'):
+ salvage_text += ']' * (salvage_text.count('[') - salvage_text.count(']'))
+
+ logger.debug(f"Attempting salvage repair on: {salvage_text[:300]}...")
+ parsed = json.loads(salvage_text)
+ model_output = AgentOutputType.model_validate(parsed)
+ logger.info("✅ Successfully salvaged incomplete JSON")
+ else:
+ raise # Re-raise original error
+ else:
+ raise # Re-raise original error
+ except Exception:
+ # Salvage failed, use error fallback
+ logger.debug(f"Raw response (first 500 chars): {response.completion[:500]}")
+ # Create a minimal AgentOutput with error (using required fields only)
+ model_output = AgentOutputType(
+ evaluation_previous_goal="Failed to parse LLM output",
+ memory=f"LLM returned invalid JSON: {str(e)[:100]}",
+ next_goal="Retry with simpler request",
+ action=[], # Empty action list to indicate failure
+ )
+ # Add error to history
+ self.message_manager.update_history(
+ model_output=None,
+ result=[ActionResult(error=f"LLM failed to generate valid structured output: {str(e)[:200]}")],
+ step_info=step_info,
+ )
+ self._consecutive_failures += 1
+ continue
+ else:
+ model_output: AgentOutput = response.completion # type: ignore[assignment]
+
+ logger.info(
+ f"LLM response received: {len(model_output.action) if model_output.action else 0} actions"
+ )
+
+ # Execute actions
+ action_results: list[ActionResult] = []
+ if model_output.action:
+ action_results = await self._execute_actions(model_output.action)
+
+ # Update history with model output and action results
+ self.message_manager.update_history(
+ model_output=model_output,
+ result=action_results,
+ step_info=step_info,
+ )
+
+ # Track Sentience usage across all steps
+ if sentience_used:
+ sentience_used_in_any_step = True
+
+ # Track in execution history
+ execution_history.append(
+ {
+ "step": step + 1,
+ "model_output": model_output,
+ "action_results": action_results,
+ "sentience_used": sentience_used,
+ }
+ )
+
+ # Check if done
+ is_done = any(result.is_done for result in action_results if result.is_done)
+ if is_done:
+ logger.info("✅ Task completed")
+ break
+
+ # Check for errors
+ has_errors = any(result.error for result in action_results if result.error)
+ if has_errors:
+ self._consecutive_failures += 1
+ if self._consecutive_failures >= self.settings.max_failures:
+ logger.info("Max failures reached, stopping")
+ break
+ else:
+ self._consecutive_failures = 0 # Reset on success
+
+ except asyncio.TimeoutError:
+ logger.info(f"Step {step + 1} timed out after {self.settings.llm_timeout}s")
+ self._consecutive_failures += 1
+ # Update history with error
+ self.message_manager.update_history(
+ model_output=None,
+ result=None,
+ step_info=step_info,
+ )
+ if self._consecutive_failures >= self.settings.max_failures:
+ logger.info("Max failures reached, stopping")
+ break
+ except Exception as e:
+ logger.info(f"Step {step + 1} failed: {e}", exc_info=True)
+ self._consecutive_failures += 1
+ # Update history with error
+ self.message_manager.update_history(
+ model_output=None,
+ result=None,
+ step_info=step_info,
+ )
+ if self._consecutive_failures >= self.settings.max_failures:
+ logger.info("Max failures reached, stopping")
+ break
+
+ # Return usage summary and execution history
+ usage_summary = await self.token_cost_service.get_usage_summary()
+ logger.info(f"Agent completed: {usage_summary}")
+
+ # Count how many steps used Sentience
+ steps_using_sentience = sum(1 for entry in execution_history if entry.get("sentience_used", False))
+ total_steps = len(execution_history)
+
+ # Return execution summary (will return AgentHistoryList in future phases)
+ return {
+ "steps": self._current_step + 1,
+ "sentience_used": sentience_used_in_any_step,
+ "sentience_usage_stats": {
+ "steps_using_sentience": steps_using_sentience,
+ "total_steps": total_steps,
+ "sentience_percentage": (steps_using_sentience / total_steps * 100) if total_steps > 0 else 0,
+ },
+ "usage": usage_summary,
+ "execution_history": execution_history,
+ }
+
+ async def _execute_actions(self, actions: list[Any]) -> list[Any]:
+ """
+ Execute a list of actions.
+
+ Args:
+ actions: List of ActionModel instances
+
+ Returns:
+ List of ActionResult instances
+ """
+ from browser_use.agent.views import ActionResult
+ from browser_use.browser.events import BrowserStateRequestEvent
+
+ results: list[ActionResult] = []
+ total_actions = len(actions)
+
+ # Ensure selector_map is built before executing actions
+ # This is needed because Sentience uses backend_node_ids that must exist in selector_map
+ selector_map = await self.browser_session.get_selector_map()
+ if not selector_map:
+ logger.info(" 🔄 Selector map is empty, triggering DOM build...")
+ # Trigger browser state request to build DOM and selector_map
+ event = self.browser_session.event_bus.dispatch(
+ BrowserStateRequestEvent(include_screenshot=False)
+ )
+ await event
+ await event.event_result(raise_if_any=True, raise_if_none=False)
+ selector_map = await self.browser_session.get_selector_map()
+ logger.info(f" ✅ Selector map built: {len(selector_map)} elements available")
+
+ for i, action in enumerate(actions):
+ # Wait between actions (except first)
+ if i > 0:
+ wait_time = getattr(
+ self.browser_session.browser_profile, "wait_between_actions", 0.5
+ )
+ await asyncio.sleep(wait_time)
+
+ try:
+ # Get action name for logging
+ action_data = action.model_dump(exclude_unset=True)
+ action_name = next(iter(action_data.keys())) if action_data else "unknown"
+ action_params = action_data.get(action_name, {})
+
+ # Check if action uses an index and validate it exists in selector_map
+ action_index = action_params.get('index')
+ if action_index is not None and action_name in ('click', 'input', 'input_text'):
+ selector_map = await self.browser_session.get_selector_map()
+ if action_index not in selector_map:
+ # Try to find element in Sentience snapshot using SDK's find() function
+ sentience_element = None
+ if self._current_sentience_state and hasattr(self._current_sentience_state, 'snapshot'):
+ snapshot = self._current_sentience_state.snapshot
+
+ # First, try to find by ID
+ sentience_element = self._find_element_in_snapshot(snapshot, element_id=action_index)
+
+ # If not found by ID and this is an input action, try to find by text
+ if not sentience_element and action_name == 'input' and 'text' in action_params:
+ text_to_find = action_params.get('text', '')
+ if text_to_find:
+ sentience_element = self._find_element_in_snapshot(snapshot, text=text_to_find)
+ if sentience_element:
+ logger.info(
+ f" 🔍 Element {action_index} not found by ID, but found by text '{text_to_find[:30]}...' "
+ f"in Sentience snapshot. Using element ID {sentience_element.id}."
+ )
+ # Update action_index to use the found element's ID
+ action_index = sentience_element.id
+ action_params['index'] = action_index
+
+ if sentience_element:
+ logger.info(
+ f" 🔍 Element {action_index} not in selector_map, but found in Sentience snapshot. "
+ f"Validating backend_node_id exists in CDP before adding to selector_map."
+ )
+
+ # Get current target_id for the element - use agent_focus_target_id which is the active tab
+ target_id = self.browser_session.agent_focus_target_id
+ if not target_id:
+ # Fallback: get first available target
+ targets = await self.browser_session.session_manager.get_all_targets()
+ if targets:
+ target_id = list(targets.keys())[0]
+
+ # Validate that the backend_node_id actually exists in CDP before adding to selector_map
+ # This prevents "No node with given id found" errors
+ backend_node_id = action_index
+ node_exists = False
+ try:
+ cdp_session = await self.browser_session.get_or_create_cdp_session(
+ target_id=target_id, focus=False
+ )
+ # Try to resolve the node to verify it exists
+ result = await cdp_session.cdp_client.send.DOM.resolveNode(
+ params={'backendNodeId': backend_node_id},
+ session_id=cdp_session.session_id,
+ )
+ if result.get('object') and result['object'].get('objectId'):
+ node_exists = True
+ logger.info(f" ✅ Validated backend_node_id {backend_node_id} exists in CDP")
+ except Exception as e:
+ logger.warning(
+ f" ⚠️ backend_node_id {backend_node_id} not found in CDP (node may be stale): {e}. "
+ f"Skipping adding to selector_map to avoid fallback typing."
+ )
+
+ if not node_exists:
+ # Node doesn't exist - don't add to selector_map, let the action fail naturally
+ logger.info(
+ f" ⚠️ Cannot add element {action_index} to selector_map - backend_node_id is stale. "
+ f"Action will fail and agent should retry with a fresh snapshot."
+ )
+ else:
+ # Node exists - create minimal EnhancedDOMTreeNode and add to selector_map
+ from browser_use.dom.views import EnhancedDOMTreeNode, NodeType
+
+ # Extract role and other info from Sentience element
+ role = getattr(sentience_element, 'role', 'div') or 'div'
+
+ # For input actions, prefer textbox/searchbox over combobox if the element text suggests it's a search box
+ if action_name == 'input' and role.lower() == 'combobox':
+ element_text = getattr(sentience_element, 'text', '') or ''
+ if any(keyword in element_text.lower() for keyword in ['search', 'query', 'find']):
+ logger.info(f" 🔄 Overriding role from 'combobox' to 'searchbox' based on element text")
+ role = 'searchbox'
+
+ # Map common roles to HTML tag names
+ role_to_tag = {
+ 'textbox': 'input',
+ 'searchbox': 'input',
+ 'button': 'button',
+ 'link': 'a',
+ 'combobox': 'select',
+ }
+ tag_name = role_to_tag.get(role.lower(), 'div')
+
+ # Create minimal EnhancedDOMTreeNode with proper target_id
+ # Don't set session_id - let cdp_client_for_node use target_id strategy (more reliable)
+ minimal_node = EnhancedDOMTreeNode(
+ node_id=0, # Will be resolved when needed via CDP using backend_node_id
+ backend_node_id=backend_node_id, # This is the key - matches Sentience element.id
+ node_type=NodeType.ELEMENT_NODE,
+ node_name=tag_name,
+ node_value='',
+ attributes={'role': role, 'type': 'text'} if role in ('textbox', 'searchbox') else {'role': role} if role else {},
+ is_visible=True, # Sentience elements are visible
+ target_id=target_id or '', # type: ignore
+ session_id=None, # Let cdp_client_for_node use target_id strategy instead
+ frame_id=None,
+ content_document=None,
+ shadow_root_type=None,
+ shadow_roots=None,
+ parent_node=None,
+ children_nodes=None,
+ ax_node=None,
+ snapshot_node=None,
+ is_scrollable=None,
+ absolute_position=None,
+ )
+
+ # Add to selector_map temporarily
+ selector_map[backend_node_id] = minimal_node
+ # Also update cached selector_map
+ self.browser_session.update_cached_selector_map(selector_map)
+ logger.info(f" ✅ Added element {backend_node_id} (role={role}, tag={tag_name}) to selector_map temporarily")
+ else:
+ available_indices = sorted(list(selector_map.keys()))[:20]
+ logger.info(
+ f" ⚠️ Action {action_name} uses index {action_index}, but it's not in selector_map or Sentience snapshot. "
+ f"Available indices: {available_indices}{'...' if len(selector_map) > 20 else ''} "
+ f"(total: {len(selector_map)})"
+ )
+
+ logger.info(f" ▶️ {action_name}: {action_params}")
+
+ # Warn about multiple scroll actions (potential jittery behavior)
+ if action_name == "scroll" and i > 0:
+ prev_action_data = actions[i - 1].model_dump(exclude_unset=True)
+ prev_action_name = next(iter(prev_action_data.keys())) if prev_action_data else "unknown"
+ if prev_action_name == "scroll":
+ logger.info(f" ⚠️ Multiple scroll actions detected - may cause jittery behavior")
+
+ # Execute action
+ result = await self.tools.act(
+ action=action,
+ browser_session=self.browser_session,
+ file_system=self.file_system,
+ page_extraction_llm=self.llm, # Use the same LLM for extraction
+ sensitive_data=None, # TODO: Add sensitive data support
+ available_file_paths=None, # TODO: Add file paths support
+ )
+
+ results.append(result)
+
+ # Log result
+ if result.error:
+ logger.info(f" ❌ Action failed: {result.error}")
+ elif result.is_done:
+ logger.info(f" ✅ Task done: {result.long_term_memory or result.extracted_content}")
+
+ # Stop if done or error (for now, continue on error)
+ if result.is_done:
+ break
+
+ except Exception as e:
+ logger.info(f" ❌ Action execution error: {e}", exc_info=True)
+ # Create error result
+ error_result = ActionResult(
+ error=f"Action execution failed: {str(e)}",
+ is_done=False,
+ )
+ results.append(error_result)
+
+ return results
+
+ def _get_system_message(self) -> SystemMessage:
+ """
+ Get system message for the agent.
+
+ Uses the standard browser-use system prompt to ensure consistency.
+
+ Returns:
+ SystemMessage
+ """
+ from browser_use.agent.prompts import SystemPrompt
+
+ # Use standard system prompt with Sentience-specific extensions
+ system_prompt = SystemPrompt(
+ max_actions_per_step=3, # Default
+ use_thinking=True,
+ flash_mode=False,
+ is_anthropic=False, # Will be auto-detected if needed
+ is_browser_use_model=False, # Will be auto-detected if needed
+ extend_system_message=(
+ "\n\n"
+ "CRITICAL: When browser_state contains elements in Sentience format, "
+ "the first column is labeled 'ID' but browser-use actions use a parameter called 'index'.\n"
+ "You MUST use the ID value (first column) as the 'index' parameter value for ALL interactions.\n"
+ "\n"
+ "Format: ID|role|text|imp|is_primary|docYq|ord|DG|href\n"
+ "- The first column is the ID (e.g., in '21|link|Some text|...', the ID is 21)\n"
+ "- This ID is a backend_node_id from Chrome DevTools Protocol\n"
+ "- Browser-use actions use a parameter called 'index' (not 'id')\n"
+ "- Use the ID value as the index parameter value: ID → index parameter\n"
+ "\n"
+ "Usage Rules:\n"
+ "- For '21|link|Some text|...', use: click with index: 21 (the ID value becomes the index value)\n"
+ "- For '48|textbox|Search...', use: input with index: 48, text: \"your text\"\n"
+ "- The Sentience ID value IS the browser-use index value - use it directly\n"
+ "\n"
+ "Examples:\n"
+ "- Sentience format: '21|link|Click here|100|1|0|1|1|https://...'\n"
+ " → Action: click with index: 21 (use the ID value 21 as the index parameter)\n"
+ "- Sentience format: '48|textbox|Search...|95|0|0|-|0|'\n"
+ " → Action: input with index: 48, text: \"your text\"\n"
+ "\n"
+ "Terminology Note:\n"
+ "- Sentience format column name: 'ID' (first column)\n"
+ "- Browser-use action parameter name: 'index'\n"
+ "- The ID value from Sentience becomes the index value for browser-use actions\n"
+ "\n"
+ "IMPORTANT WARNINGS:\n"
+ "- ONLY use ID values that appear in the Sentience format list\n"
+ "- Some Sentience IDs may not be available if the element is not interactive by browser-use's criteria\n"
+ "- If an action fails with 'Element index X not available', that ID doesn't exist in the selector_map\n"
+ "- In that case, try a different element ID from the Sentience format list\n"
+ "- NEVER use arbitrary index numbers when Sentience format is present\n"
+ "- NEVER ignore the ID from the Sentience format - it is the ONLY valid index to use\n"
+ "\n"
+ ),
+ ).get_system_message()
+
+ return system_prompt
+
+ def _is_done(self, model_output: Any) -> bool:
+ """
+ Check if task is done based on model output.
+
+ Simplified for Phase 1.
+
+ Args:
+ model_output: Model output
+
+ Returns:
+ True if done, False otherwise
+ """
+ # TODO: Parse model output and check for 'done' action
+ return False
diff --git a/browser_use/integrations/sentience/message_manager.py b/browser_use/integrations/sentience/message_manager.py
new file mode 100644
index 0000000000..d449b7806e
--- /dev/null
+++ b/browser_use/integrations/sentience/message_manager.py
@@ -0,0 +1,283 @@
+"""
+CustomMessageManager: Simplified message manager for SentienceAgent.
+
+Manages conversation history, agent history items, and message construction
+without the complexity of the full MessageManager.
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING, Any
+
+from pydantic import BaseModel, Field
+
+from browser_use.agent.message_manager.views import HistoryItem, MessageManagerState
+from browser_use.llm.messages import BaseMessage, SystemMessage
+
+if TYPE_CHECKING:
+ from browser_use.agent.views import AgentOutput, AgentStepInfo, ActionResult
+
+logger = logging.getLogger(__name__)
+
+
+class CustomMessageManager:
+ """
+ Simplified message manager for SentienceAgent.
+
+ Manages conversation history and agent history items without the
+ full complexity of the standard MessageManager.
+ """
+
+ def __init__(
+ self,
+ task: str,
+ system_message: SystemMessage,
+ max_history_items: int | None = None,
+ ):
+ """
+ Initialize CustomMessageManager.
+
+ Args:
+ task: The task for the agent
+ system_message: System message for the LLM
+ max_history_items: Maximum number of history items to keep (None = all)
+ """
+ self.task = task
+ self.system_message = system_message
+ self.max_history_items = max_history_items
+
+ # Initialize state
+ self.state = MessageManagerState()
+ # Initialize with task (will be shown in agent_state, but include here for clarity)
+ self.state.agent_history_items = [
+ HistoryItem(step_number=0, system_message=f"Agent initialized. Task: {task}")
+ ]
+
+ # Store last messages for debugging
+ self.last_input_messages: list[BaseMessage] = []
+
+ logger.info(
+ f"Initialized CustomMessageManager: task='{task}', "
+ f"max_history_items={max_history_items}"
+ )
+
+ @property
+ def agent_history_description(self) -> str:
+ """
+ Build agent history description from list of items.
+
+ Respects max_history_items limit if set.
+
+ Returns:
+ Formatted history description string
+ """
+ if self.max_history_items is None:
+ # Include all items
+ return "\n".join(item.to_string() for item in self.state.agent_history_items)
+
+ # If max_history_items is 0, return empty string (no history)
+ # Note: Some LLMs may need at least minimal context, so 0 might not work well
+ # Consider using 1-2 instead of 0 for minimal history
+ if self.max_history_items == 0:
+ return ""
+
+ total_items = len(self.state.agent_history_items)
+
+ # If we have fewer items than the limit, just return all items
+ if total_items <= self.max_history_items:
+ return "\n".join(item.to_string() for item in self.state.agent_history_items)
+
+ # We have more items than the limit, so we need to omit some
+ omitted_count = total_items - self.max_history_items
+
+ # Show first item + omitted message + most recent (max_history_items - 1) items
+ recent_items_count = self.max_history_items - 1 # -1 for first item
+
+ items_to_include = [
+ self.state.agent_history_items[0].to_string(), # Keep first item (initialization)
+ f"[... {omitted_count} previous steps omitted...]",
+ ]
+ # Add most recent items
+ items_to_include.extend(
+ [
+ item.to_string()
+ for item in self.state.agent_history_items[-recent_items_count:]
+ ]
+ )
+
+ return "\n".join(items_to_include)
+
+ def update_history(
+ self,
+ model_output: AgentOutput | None = None,
+ result: list[ActionResult] | None = None,
+ step_info: AgentStepInfo | None = None,
+ ) -> None:
+ """
+ Update agent history with the latest step results.
+
+ Args:
+ model_output: Model output from LLM (if available)
+ result: List of action results
+ step_info: Step information
+ """
+ if result is None:
+ result = []
+ step_number = step_info.step_number if step_info else None
+
+ # Clear read_state from previous step
+ self.state.read_state_description = ""
+ self.state.read_state_images = []
+
+ # Process action results
+ action_results = ""
+ read_state_idx = 0
+
+ for action_result in result:
+ # Handle extracted content (one-time inclusion)
+ if (
+ action_result.include_extracted_content_only_once
+ and action_result.extracted_content
+ ):
+ self.state.read_state_description += (
+ f"\n"
+ f"{action_result.extracted_content}\n"
+ f"\n"
+ )
+ read_state_idx += 1
+ logger.info(
+ f"Added extracted_content to read_state_description: "
+ f"{action_result.extracted_content[:100]}..."
+ )
+
+ # Store images for one-time inclusion in the next message
+ if action_result.images:
+ self.state.read_state_images.extend(action_result.images)
+ logger.info(f"Added {len(action_result.images)} image(s) to read_state_images")
+
+ # Add to action results
+ if action_result.long_term_memory:
+ action_results += f"{action_result.long_term_memory}\n"
+ elif (
+ action_result.extracted_content
+ and not action_result.include_extracted_content_only_once
+ ):
+ action_results += f"{action_result.extracted_content}\n"
+
+ # Add errors
+ if action_result.error:
+ if len(action_result.error) > 200:
+ error_text = (
+ action_result.error[:100] + "......" + action_result.error[-100:]
+ )
+ else:
+ error_text = action_result.error
+ action_results += f"{error_text}\n"
+
+ # Truncate read_state_description if too long
+ MAX_CONTENT_SIZE = 60000
+ if len(self.state.read_state_description) > MAX_CONTENT_SIZE:
+ self.state.read_state_description = (
+ self.state.read_state_description[:MAX_CONTENT_SIZE]
+ + "\n... [Content truncated at 60k characters]"
+ )
+ logger.info("Truncated read_state_description to 60k characters")
+
+ self.state.read_state_description = self.state.read_state_description.strip("\n")
+
+ # Format action results
+ if action_results:
+ action_results = f"Result\n{action_results}"
+ action_results = action_results.strip("\n") if action_results else None
+
+ # Truncate action_results if too long
+ if action_results and len(action_results) > MAX_CONTENT_SIZE:
+ action_results = (
+ action_results[:MAX_CONTENT_SIZE]
+ + "\n... [Content truncated at 60k characters]"
+ )
+ logger.info("Truncated action_results to 60k characters")
+
+ # Build the history item
+ if model_output is None:
+ # Add history item for initial actions (step 0) or errors (step > 0)
+ if step_number is not None:
+ if step_number == 0 and action_results:
+ # Step 0 with initial action results
+ history_item = HistoryItem(
+ step_number=step_number, action_results=action_results
+ )
+ self.state.agent_history_items.append(history_item)
+ elif step_number > 0:
+ # Error case for steps > 0
+ history_item = HistoryItem(
+ step_number=step_number,
+ error="Agent failed to output in the right format.",
+ )
+ self.state.agent_history_items.append(history_item)
+ else:
+ # Normal step with model output
+ history_item = HistoryItem(
+ step_number=step_number,
+ evaluation_previous_goal=model_output.current_state.evaluation_previous_goal
+ if hasattr(model_output, "current_state")
+ and hasattr(model_output.current_state, "evaluation_previous_goal")
+ else None,
+ memory=model_output.current_state.memory
+ if hasattr(model_output, "current_state")
+ and hasattr(model_output.current_state, "memory")
+ else None,
+ next_goal=model_output.current_state.next_goal
+ if hasattr(model_output, "current_state")
+ and hasattr(model_output.current_state, "next_goal")
+ else None,
+ action_results=action_results,
+ )
+ self.state.agent_history_items.append(history_item)
+
+ # Log history tracking (note: items are tracked but may not be sent to LLM)
+ history_sent_count = (
+ 0 if self.max_history_items == 0
+ else min(len(self.state.agent_history_items), self.max_history_items) if self.max_history_items
+ else len(self.state.agent_history_items)
+ )
+ logger.info(
+ f"Updated history: step={step_number}, "
+ f"items_tracked={len(self.state.agent_history_items)}, "
+ f"items_sent_to_llm={history_sent_count}"
+ )
+
+ def get_messages(
+ self, user_message: BaseMessage | None = None
+ ) -> list[BaseMessage]:
+ """
+ Get all messages for LLM call.
+
+ Args:
+ user_message: User message to include (if provided)
+
+ Returns:
+ List of messages in correct order: system -> user
+ """
+ messages = [self.system_message]
+ if user_message:
+ messages.append(user_message)
+ return messages
+
+ def add_new_task(self, new_task: str) -> None:
+ """
+ Add a new follow-up task to the conversation.
+
+ Args:
+ new_task: The new task to add
+ """
+ new_task_formatted = f" {new_task.strip()} "
+ if "" not in self.task:
+ self.task = f"{self.task}"
+ self.task += "\n" + new_task_formatted
+
+ task_update_item = HistoryItem(system_message=new_task_formatted)
+ self.state.agent_history_items.append(task_update_item)
+
+ logger.info(f"Added new task to conversation: {new_task[:50]}...")
diff --git a/browser_use/llm/__init__.py b/browser_use/llm/__init__.py
index d6d8464c92..c6362978a5 100644
--- a/browser_use/llm/__init__.py
+++ b/browser_use/llm/__init__.py
@@ -35,6 +35,7 @@
from browser_use.llm.deepseek.chat import ChatDeepSeek
from browser_use.llm.google.chat import ChatGoogle
from browser_use.llm.groq.chat import ChatGroq
+ from browser_use.llm.huggingface.chat import ChatHuggingFace
from browser_use.llm.mistral.chat import ChatMistral
from browser_use.llm.oci_raw.chat import ChatOCIRaw
from browser_use.llm.ollama.chat import ChatOllama
@@ -88,6 +89,7 @@
'ChatDeepSeek': ('browser_use.llm.deepseek.chat', 'ChatDeepSeek'),
'ChatGoogle': ('browser_use.llm.google.chat', 'ChatGoogle'),
'ChatGroq': ('browser_use.llm.groq.chat', 'ChatGroq'),
+ 'ChatHuggingFace': ('browser_use.llm.huggingface.chat', 'ChatHuggingFace'),
'ChatMistral': ('browser_use.llm.mistral.chat', 'ChatMistral'),
'ChatOCIRaw': ('browser_use.llm.oci_raw.chat', 'ChatOCIRaw'),
'ChatOllama': ('browser_use.llm.ollama.chat', 'ChatOllama'),
@@ -151,6 +153,7 @@ def __getattr__(name: str):
'ChatAnthropicBedrock',
'ChatAWSBedrock',
'ChatGroq',
+ 'ChatHuggingFace',
'ChatMistral',
'ChatAzureOpenAI',
'ChatOCIRaw',
diff --git a/browser_use/llm/huggingface/__init__.py b/browser_use/llm/huggingface/__init__.py
new file mode 100644
index 0000000000..574019572a
--- /dev/null
+++ b/browser_use/llm/huggingface/__init__.py
@@ -0,0 +1,8 @@
+"""
+Hugging Face transformers integration for browser-use.
+"""
+
+from browser_use.llm.huggingface.chat import ChatHuggingFace
+from browser_use.llm.huggingface.serializer import HuggingFaceMessageSerializer
+
+__all__ = ['ChatHuggingFace', 'HuggingFaceMessageSerializer']
diff --git a/browser_use/llm/huggingface/chat.py b/browser_use/llm/huggingface/chat.py
new file mode 100644
index 0000000000..a59bc0d686
--- /dev/null
+++ b/browser_use/llm/huggingface/chat.py
@@ -0,0 +1,427 @@
+"""
+ChatHuggingFace - Wrapper for Hugging Face transformers models.
+
+This allows using local Hugging Face models directly without Ollama.
+Supports models like Qwen 2.5 3B, BitNet, and other transformer models.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import json
+import logging
+from dataclasses import dataclass
+from typing import Any, TypeVar, overload
+
+from pydantic import BaseModel
+
+from browser_use.llm.base import BaseChatModel
+from browser_use.llm.exceptions import ModelProviderError
+from browser_use.llm.messages import BaseMessage
+from browser_use.llm.views import ChatInvokeCompletion, ChatInvokeUsage
+
+try:
+ from transformers import AutoModelForCausalLM, AutoTokenizer
+ import torch
+ TRANSFORMERS_AVAILABLE = True
+ # Try to enable progress bars via huggingface_hub
+ try:
+ import os
+ # Enable verbose output for transformers (shows progress bars)
+ if 'TRANSFORMERS_VERBOSITY' not in os.environ:
+ os.environ['TRANSFORMERS_VERBOSITY'] = 'info'
+ # Ensure huggingface_hub shows progress
+ if 'HF_HUB_DISABLE_PROGRESS_BARS' not in os.environ:
+ os.environ['HF_HUB_DISABLE_PROGRESS_BARS'] = '0' # 0 = show progress bars
+ except Exception:
+ pass
+except ImportError:
+ TRANSFORMERS_AVAILABLE = False
+
+T = TypeVar('T', bound=BaseModel)
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class ChatHuggingFace(BaseChatModel):
+ """
+ Wrapper for Hugging Face transformers models.
+
+ Usage:
+ from browser_use.llm.huggingface import ChatHuggingFace
+
+ llm = ChatHuggingFace(
+ model="Qwen/Qwen2.5-3B-Instruct",
+ device_map="auto", # or "cpu", "cuda", etc.
+ )
+ """
+
+ model: str
+ """Model name or path (e.g., "Qwen/Qwen2.5-3B-Instruct")"""
+
+ device_map: str = "auto"
+ """Device to load model on: "auto", "cpu", "cuda", "cuda:0", etc."""
+
+ torch_dtype: str | None = None
+ """Torch dtype: "float16", "bfloat16", "float32", or None for auto"""
+
+ load_in_8bit: bool = False
+ """Load model in 8-bit mode (requires bitsandbytes)"""
+
+ load_in_4bit: bool = False
+ """Load model in 4-bit mode (requires bitsandbytes)"""
+
+ max_new_tokens: int = 2048
+ """Maximum number of new tokens to generate"""
+
+ temperature: float = 0.7
+ """Sampling temperature"""
+
+ top_p: float = 0.9
+ """Top-p sampling"""
+
+ do_sample: bool = True
+ """Whether to use sampling"""
+
+ trust_remote_code: bool = False
+ """Trust remote code when loading model"""
+
+ # Internal state
+ _tokenizer: Any = None
+ _model: Any = None
+ _model_loaded: bool = False
+
+ def __post_init__(self):
+ """Validate transformers is available."""
+ if not TRANSFORMERS_AVAILABLE:
+ raise ImportError(
+ "transformers library is required for ChatHuggingFace. "
+ "Install with: pip install transformers torch"
+ )
+
+ @property
+ def provider(self) -> str:
+ return 'huggingface'
+
+ @property
+ def name(self) -> str:
+ return self.model
+
+ def _load_model(self) -> None:
+ """Lazy load the model and tokenizer."""
+ if self._model_loaded:
+ return
+
+ print(f"\n🔄 Loading Hugging Face model: {self.model}", flush=True)
+ print(" This may take a few minutes on first run (downloading ~6GB)...", flush=True)
+
+ try:
+ # Ensure progress bars are enabled for huggingface_hub
+ import os
+ # Enable verbose output (shows progress bars)
+ os.environ.setdefault('TRANSFORMERS_VERBOSITY', 'info')
+ # Explicitly enable progress bars (0 = show, 1 = hide)
+ os.environ.setdefault('HF_HUB_DISABLE_PROGRESS_BARS', '0')
+ # Use regular download (not hf_transfer) to show progress
+ os.environ.setdefault('HF_HUB_ENABLE_HF_TRANSFER', '0')
+
+ # Check if model is already cached
+ try:
+ from pathlib import Path
+ cache_dir = Path.home() / ".cache" / "huggingface" / "hub"
+ model_cache_path = cache_dir / f"models--{self.model.replace('/', '--')}"
+ if model_cache_path.exists():
+ size = sum(f.stat().st_size for f in model_cache_path.rglob('*') if f.is_file()) / (1024**3)
+ print(f" ✅ Model found in cache: {model_cache_path}", flush=True)
+ print(f" 📦 Cache size: {size:.2f} GB", flush=True)
+ else:
+ print(f" 📥 Model not in cache, will download from Hugging Face...", flush=True)
+ print(f" ⏳ Download size: ~6GB (Qwen 2.5 3B)", flush=True)
+ except Exception:
+ pass
+
+ # Load tokenizer (transformers will show progress bar automatically if tqdm is installed)
+ print(" 📥 Loading tokenizer...", flush=True)
+ self._tokenizer = AutoTokenizer.from_pretrained(
+ self.model,
+ trust_remote_code=self.trust_remote_code,
+ )
+ print(" ✅ Tokenizer loaded", flush=True)
+
+ # Set pad token if not present
+ if self._tokenizer.pad_token is None:
+ self._tokenizer.pad_token = self._tokenizer.eos_token
+
+ # Prepare model loading kwargs
+ model_kwargs: dict[str, Any] = {
+ 'trust_remote_code': self.trust_remote_code,
+ }
+
+ # Handle quantization
+ if self.load_in_8bit or self.load_in_4bit:
+ try:
+ from transformers import BitsAndBytesConfig
+ quantization_config = BitsAndBytesConfig(
+ load_in_8bit=self.load_in_8bit,
+ load_in_4bit=self.load_in_4bit,
+ )
+ model_kwargs['quantization_config'] = quantization_config
+ except ImportError:
+ logger.warning("bitsandbytes not available, ignoring quantization settings")
+
+ # Handle device and dtype
+ if self.device_map == "auto":
+ # Check if accelerate is available (required for device_map="auto")
+ try:
+ import accelerate
+ # Ensure accelerate is imported (transformers checks for it)
+ model_kwargs['device_map'] = "auto"
+ print(f" ✅ Using device_map='auto' (accelerate {accelerate.__version__} available)", flush=True)
+ except ImportError:
+ print(" ⚠️ accelerate not installed, falling back to CPU", flush=True)
+ print(" 💡 Install with: pip install accelerate", flush=True)
+ model_kwargs['device_map'] = "cpu"
+ else:
+ model_kwargs['device_map'] = self.device_map
+
+ if self.torch_dtype:
+ dtype_map = {
+ 'float16': torch.float16,
+ 'bfloat16': torch.bfloat16,
+ 'float32': torch.float32,
+ }
+ if self.torch_dtype in dtype_map:
+ model_kwargs['torch_dtype'] = dtype_map[self.torch_dtype]
+
+ # Load model (transformers/huggingface_hub will show progress bars automatically)
+ print(" 📥 Loading model weights...", flush=True)
+ print(" ⏳ This may take 5-15 minutes on first download (~6GB)", flush=True)
+ print(" 💡 Progress bars should appear below (if tqdm is installed)", flush=True)
+ print(" 💡 Tip: Model will be cached locally after first download", flush=True)
+ print(" 💡 Monitor progress: watch -n 2 'du -sh ~/.cache/huggingface/hub/models--Qwen--Qwen2.5-3B-Instruct/ 2>/dev/null || echo Not started'", flush=True)
+ self._model = AutoModelForCausalLM.from_pretrained(
+ self.model,
+ **model_kwargs,
+ )
+ print(" 🔧 Setting model to evaluation mode...", flush=True)
+
+ # Set to eval mode
+ self._model.eval()
+
+ self._model_loaded = True
+ print(f"✅ Model fully loaded: {self.model}\n", flush=True)
+
+ except Exception as e:
+ raise ModelProviderError(
+ message=f"Failed to load Hugging Face model {self.model}: {str(e)}",
+ model=self.model,
+ ) from e
+
+ def _format_messages_for_chat(self, messages: list[BaseMessage]) -> str:
+ """Format messages using the model's chat template."""
+ from browser_use.llm.huggingface import HuggingFaceMessageSerializer
+
+ # Convert to chat format
+ chat_messages = HuggingFaceMessageSerializer.serialize_messages(messages)
+
+ # Apply chat template if available
+ if hasattr(self._tokenizer, 'apply_chat_template') and self._tokenizer.chat_template:
+ try:
+ formatted = self._tokenizer.apply_chat_template(
+ chat_messages,
+ tokenize=False,
+ add_generation_prompt=True,
+ )
+ return formatted
+ except Exception as e:
+ logger.warning(f"Failed to apply chat template: {e}, using simple format")
+
+ # Fallback: simple format
+ formatted_parts = []
+ for msg in chat_messages:
+ role = msg['role']
+ content = msg['content']
+ if role == 'system':
+ formatted_parts.append(f"System: {content}")
+ elif role == 'user':
+ formatted_parts.append(f"User: {content}")
+ elif role == 'assistant':
+ formatted_parts.append(f"Assistant: {content}")
+
+ return "\n\n".join(formatted_parts) + "\n\nAssistant:"
+
+ @overload
+ async def ainvoke(
+ self, messages: list[BaseMessage], output_format: None = None, **kwargs: Any
+ ) -> ChatInvokeCompletion[str]: ...
+
+ @overload
+ async def ainvoke(
+ self, messages: list[BaseMessage], output_format: type[T], **kwargs: Any
+ ) -> ChatInvokeCompletion[T]: ...
+
+ async def ainvoke(
+ self, messages: list[BaseMessage], output_format: type[T] | None = None, **kwargs: Any
+ ) -> ChatInvokeCompletion[T] | ChatInvokeCompletion[str]:
+ """Invoke the model asynchronously."""
+ # Load model if not already loaded (this may download from Hugging Face)
+ if not self._model_loaded:
+ print("🔄 Model loading triggered (this may download from Hugging Face)...", flush=True)
+ try:
+ self._load_model()
+ except Exception as e:
+ print(f"❌ Model loading failed: {e}", flush=True)
+ raise
+
+ # Run inference in thread pool to avoid blocking
+ loop = asyncio.get_event_loop()
+
+ try:
+ if output_format is None:
+ # Simple text generation
+ completion, usage = await loop.run_in_executor(
+ None,
+ self._generate_text,
+ messages,
+ )
+ return ChatInvokeCompletion(completion=completion, usage=usage)
+ else:
+ # Structured output - use JSON schema in prompt
+ schema = output_format.model_json_schema()
+ completion, usage = await loop.run_in_executor(
+ None,
+ self._generate_structured,
+ messages,
+ schema,
+ )
+ # Parse JSON response
+ try:
+ parsed = output_format.model_validate_json(completion)
+ return ChatInvokeCompletion(completion=parsed, usage=usage)
+ except Exception as e:
+ logger.warning(f"Failed to parse structured output: {e}, returning raw text")
+ return ChatInvokeCompletion(completion=completion, usage=usage)
+
+ except Exception as e:
+ raise ModelProviderError(
+ message=f"Failed to generate text: {str(e)}",
+ model=self.name,
+ ) from e
+
+ def _generate_text(self, messages: list[BaseMessage]) -> tuple[str, ChatInvokeUsage]:
+ """Generate text synchronously (runs in thread pool).
+
+ Returns:
+ Tuple of (completion_text, usage_info)
+ """
+ # Format messages
+ prompt = self._format_messages_for_chat(messages)
+
+ # Tokenize
+ inputs = self._tokenizer(prompt, return_tensors="pt")
+ prompt_tokens = inputs['input_ids'].shape[1]
+
+ # Move to same device as model
+ if hasattr(self._model, 'device'):
+ inputs = {k: v.to(self._model.device) for k, v in inputs.items()}
+ elif hasattr(self._model, 'hf_device_map'):
+ # Multi-device model, use first device
+ first_device = list(self._model.hf_device_map.values())[0]
+ inputs = {k: v.to(first_device) for k, v in inputs.items()}
+
+ # Generate
+ with torch.no_grad():
+ outputs = self._model.generate(
+ **inputs,
+ max_new_tokens=self.max_new_tokens,
+ temperature=self.temperature,
+ top_p=self.top_p,
+ do_sample=self.do_sample,
+ pad_token_id=self._tokenizer.pad_token_id,
+ eos_token_id=self._tokenizer.eos_token_id,
+ # Prevent early stopping to ensure complete JSON generation
+ # Don't stop on EOS token until we have complete JSON
+ # Note: This might generate extra tokens, but ensures JSON completeness
+ )
+
+ # Decode only the new tokens
+ input_length = inputs['input_ids'].shape[1]
+ generated_tokens = outputs[0][input_length:]
+ completion_tokens = len(generated_tokens)
+ completion = self._tokenizer.decode(generated_tokens, skip_special_tokens=True)
+
+ # Calculate usage
+ total_tokens = prompt_tokens + completion_tokens
+ usage = ChatInvokeUsage(
+ prompt_tokens=prompt_tokens,
+ prompt_cached_tokens=None,
+ prompt_cache_creation_tokens=None,
+ prompt_image_tokens=None,
+ completion_tokens=completion_tokens,
+ total_tokens=total_tokens,
+ )
+
+ return completion.strip(), usage
+
+ def _generate_structured(self, messages: list[BaseMessage], schema: dict[str, Any]) -> tuple[str, ChatInvokeUsage]:
+ """Generate structured output with JSON schema.
+
+ Returns:
+ Tuple of (completion_text, usage_info)
+ """
+ # Add explicit, strict JSON format instruction (optimized for small local LLMs)
+ # Following Sentience SDK playground pattern: very explicit, no reasoning
+ required_fields = schema.get('required', [])
+ properties = schema.get('properties', {})
+
+ # Build explicit format example
+ example_fields = []
+ for field in required_fields:
+ if field in properties:
+ prop = properties[field]
+ prop_type = prop.get('type', 'string')
+ if prop_type == 'array':
+ example_fields.append(f' "{field}": []')
+ elif prop_type == 'string':
+ example_fields.append(f' "{field}": ""')
+ elif prop_type == 'object':
+ example_fields.append(f' "{field}": {{}}')
+ else:
+ example_fields.append(f' "{field}": null')
+
+ example_json = "{\n" + ",\n".join(example_fields) + "\n}"
+
+ # Build minimal instruction (optimized for small local LLMs)
+ # Keep it very short to avoid confusing the model
+ schema_instruction = f"\n\nJSON only:\n{example_json}"
+
+ # Create modified messages
+ modified_messages = list(messages)
+ if modified_messages and hasattr(modified_messages[-1], 'content'):
+ last_msg = modified_messages[-1]
+ if isinstance(last_msg.content, str):
+ modified_messages[-1] = type(last_msg)(
+ content=last_msg.content + schema_instruction
+ )
+
+ # Generate with schema instruction
+ completion, usage = self._generate_text(modified_messages)
+
+ # Try to extract JSON from response
+ completion = completion.strip()
+
+ # Try to find JSON in the response (in case model adds extra text)
+ if completion.startswith('```json'):
+ # Extract from code block
+ completion = completion.replace('```json', '').replace('```', '').strip()
+ elif completion.startswith('```'):
+ completion = completion.replace('```', '').strip()
+
+ # Try to parse to validate JSON
+ try:
+ json.loads(completion)
+ except json.JSONDecodeError:
+ logger.warning(f"Generated text is not valid JSON: {completion[:200]}")
+
+ return completion, usage
diff --git a/browser_use/llm/huggingface/serializer.py b/browser_use/llm/huggingface/serializer.py
new file mode 100644
index 0000000000..e256c65cc9
--- /dev/null
+++ b/browser_use/llm/huggingface/serializer.py
@@ -0,0 +1,69 @@
+"""
+Serializer for converting browser-use messages to Hugging Face transformers format.
+"""
+
+from typing import Any
+
+from browser_use.llm.messages import (
+ AssistantMessage,
+ BaseMessage,
+ SystemMessage,
+ UserMessage,
+)
+
+
+class HuggingFaceMessageSerializer:
+ """Serializer for converting between browser-use messages and Hugging Face chat format."""
+
+ @staticmethod
+ def _extract_text_content(content: Any) -> str:
+ """Extract text content from message content, ignoring images."""
+ if content is None:
+ return ''
+ if isinstance(content, str):
+ return content
+
+ text_parts: list[str] = []
+ for part in content:
+ if hasattr(part, 'type'):
+ if part.type == 'text':
+ text_parts.append(part.text)
+ elif part.type == 'refusal':
+ text_parts.append(f'[Refusal] {part.refusal}')
+ # Skip image parts (transformers may not support images in all models)
+
+ return '\n'.join(text_parts)
+
+ @staticmethod
+ def serialize(message: BaseMessage) -> dict[str, str]:
+ """Serialize a browser-use message to Hugging Face chat format.
+
+ Returns:
+ Dict with 'role' and 'content' keys compatible with transformers chat templates.
+ """
+ if isinstance(message, SystemMessage):
+ return {
+ 'role': 'system',
+ 'content': HuggingFaceMessageSerializer._extract_text_content(message.content),
+ }
+ elif isinstance(message, UserMessage):
+ return {
+ 'role': 'user',
+ 'content': HuggingFaceMessageSerializer._extract_text_content(message.content),
+ }
+ elif isinstance(message, AssistantMessage):
+ return {
+ 'role': 'assistant',
+ 'content': HuggingFaceMessageSerializer._extract_text_content(message.content) or '',
+ }
+ else:
+ raise ValueError(f'Unknown message type: {type(message)}')
+
+ @staticmethod
+ def serialize_messages(messages: list[BaseMessage]) -> list[dict[str, str]]:
+ """Serialize a list of browser-use messages to Hugging Face chat format.
+
+ Returns:
+ List of dicts with 'role' and 'content' keys.
+ """
+ return [HuggingFaceMessageSerializer.serialize(m) for m in messages]
diff --git a/browser_use/tools/service.py b/browser_use/tools/service.py
index a0ab504cb5..680cddf965 100644
--- a/browser_use/tools/service.py
+++ b/browser_use/tools/service.py
@@ -668,15 +668,125 @@ async def extract(
extract_links = params['extract_links'] if isinstance(params, dict) else params.extract_links
start_from_char = params['start_from_char'] if isinstance(params, dict) else params.start_from_char
- # Extract clean markdown using the unified method
- try:
- from browser_use.dom.markdown_extractor import extract_clean_markdown
+ # Try to use Sentience SDK's read() function first (more efficient, lower token cost)
+ content = None
+ content_stats = None
+ sentience_used = False
- content, content_stats = await extract_clean_markdown(
- browser_session=browser_session, extract_links=extract_links
+ try:
+ # Get CDP session for the current page
+ cdp_session = await browser_session.get_or_create_cdp_session()
+
+ # Try to call Sentience extension's read() function
+ # First try to get raw HTML, then convert with Python's markdownify for best quality
+ # If that fails, use extension's lightweight markdown converter
+ result = await cdp_session.cdp_client.send.Runtime.evaluate(
+ params={
+ 'expression': """
+ (async () => {
+ try {
+ // Check if Sentience extension is available
+ if (typeof window.sentience === 'undefined' || typeof window.sentience.read !== 'function') {
+ return { status: 'error', error: 'Sentience extension not available' };
+ }
+
+ // Try to get raw HTML first (for enhanced markdown conversion with Python markdownify)
+ const rawResult = window.sentience.read({ format: 'raw' });
+ if (rawResult.status === 'success') {
+ return {
+ status: 'success',
+ url: rawResult.url,
+ format: 'raw',
+ content: rawResult.content,
+ length: rawResult.length
+ };
+ }
+
+ // Fall back to extension's markdown converter
+ return window.sentience.read({ format: 'markdown' });
+ } catch (error) {
+ return { status: 'error', error: error.message || String(error) };
+ }
+ })()
+ """,
+ 'awaitPromise': True,
+ 'returnByValue': True,
+ },
+ session_id=cdp_session.session_id,
)
+
+ # Check if Sentience read() succeeded
+ if result.get('result', {}).get('type') == 'object':
+ read_result = result['result'].get('value', {})
+ if read_result.get('status') == 'success':
+ content_format = read_result.get('format', '')
+
+ if content_format == 'raw':
+ # Got raw HTML, convert to markdown using Python's markdownify (same as SDK does)
+ try:
+ from markdownify import markdownify
+
+ html_content = read_result.get('content', '')
+ content = markdownify(
+ html_content,
+ heading_style='ATX',
+ bullets='-',
+ strip=['script', 'style'],
+ escape_asterisks=False,
+ escape_underscores=False,
+ escape_misc=False,
+ autolinks=False,
+ )
+ sentience_used = True
+
+ # Create stats
+ content_length = len(content)
+ content_stats = {
+ 'method': 'sentience_sdk_read_enhanced',
+ 'original_html_chars': len(html_content),
+ 'initial_markdown_chars': content_length,
+ 'filtered_chars_removed': 0,
+ 'final_filtered_chars': content_length,
+ 'url': read_result.get('url', ''),
+ }
+ logger.info(f'✅ Using Sentience SDK read() with markdownify enhancement (length: {content_length:,} chars)')
+ except ImportError:
+ # markdownify not available, fall back to browser-use method
+ logger.debug('markdownify not available, falling back to browser-use extraction')
+ except Exception as e:
+ logger.debug(f'markdownify conversion failed: {e}, falling back to browser-use extraction')
+
+ elif content_format == 'markdown':
+ # Got markdown directly from extension
+ content = read_result.get('content', '')
+ sentience_used = True
+
+ # Create stats
+ content_length = len(content)
+ content_stats = {
+ 'method': 'sentience_sdk_read',
+ 'original_html_chars': read_result.get('length', content_length), # Approximate
+ 'initial_markdown_chars': content_length,
+ 'filtered_chars_removed': 0, # Sentience already filters
+ 'final_filtered_chars': content_length,
+ 'url': read_result.get('url', ''),
+ }
+ logger.info(f'✅ Using Sentience SDK read() for markdown extraction (length: {content_length:,} chars)')
except Exception as e:
- raise RuntimeError(f'Could not extract clean markdown: {type(e).__name__}')
+ # Sentience not available or failed, will fall back to browser-use method
+ logger.debug(f'Sentience SDK read() not available or failed: {e}, falling back to browser-use extraction')
+
+ # Fall back to browser-use's extract_clean_markdown if Sentience wasn't used
+ if not sentience_used:
+ try:
+ from browser_use.dom.markdown_extractor import extract_clean_markdown
+
+ content, content_stats = await extract_clean_markdown(
+ browser_session=browser_session, extract_links=extract_links
+ )
+ logger.info(f'Using browser-use extract_clean_markdown (length: {len(content):,} chars)')
+ except Exception as e:
+ raise RuntimeError(f'Could not extract clean markdown: {type(e).__name__}')
# Original content length for processing
final_filtered_length = content_stats['final_filtered_chars']
diff --git a/examples/integrations/sentience_agent_example.py b/examples/integrations/sentience_agent_example.py
new file mode 100644
index 0000000000..fc7553d41f
--- /dev/null
+++ b/examples/integrations/sentience_agent_example.py
@@ -0,0 +1,188 @@
+"""
+Example usage of SentienceAgent.
+
+This example demonstrates how to use SentienceAgent with:
+- Sentience snapshot as primary prompt (compact, token-efficient)
+- Vision fallback when snapshot fails
+- Token usage tracking
+"""
+
+import asyncio
+import os
+from pathlib import Path
+
+import glob
+from dotenv import load_dotenv
+
+from browser_use import BrowserProfile, BrowserSession, ChatBrowserUse
+from browser_use.integrations.sentience import SentienceAgent
+from sentience import get_extension_dir
+
+# Note: This example requires:
+# 1. Sentience SDK installed: pip install sentienceapi
+# 2. Sentience extension loaded in browser
+# 3. Optional: SENTIENCE_API_KEY in .env for gateway mode
+
+load_dotenv()
+
+
+def log(msg: str) -> None:
+ """Print with flush for immediate output."""
+ print(msg, flush=True)
+
+
+async def main():
+ """Example: Use SentienceAgent to find the top Show HN post."""
+ try:
+ # Get path to Sentience extension
+ extension_path = get_extension_dir()
+ log(f"Loading Sentience extension from: {extension_path}")
+
+ # Verify extension exists
+ if not os.path.exists(extension_path):
+ raise FileNotFoundError(f"Sentience extension not found at: {extension_path}")
+ if not os.path.exists(os.path.join(extension_path, "manifest.json")):
+ raise FileNotFoundError(
+ f"Sentience extension manifest not found at: {extension_path}/manifest.json"
+ )
+ log(f"✅ Sentience extension verified at: {extension_path}")
+
+ # Find browser executable (optional - browser-use will find one if not specified)
+ # This example looks for Playwright-installed browsers (Chromium-based, work with CDP)
+ playwright_cache = Path.home() / "Library/Caches/ms-playwright"
+ browser_patterns = [
+ playwright_cache
+ / "chromium-*/chrome-mac*/Google Chrome for Testing.app/Contents/MacOS/Google Chrome for Testing",
+ playwright_cache / "chromium-*/chrome-mac*/Chromium.app/Contents/MacOS/Chromium",
+ ]
+
+ browser_executable = None
+ for pattern in browser_patterns:
+ matches = glob.glob(str(pattern))
+ if matches:
+ matches.sort()
+ browser_executable = matches[-1] # Use latest version
+ if Path(browser_executable).exists():
+ log(f"✅ Found browser: {browser_executable}")
+ break
+
+ if not browser_executable:
+ log("⚠️ Browser not found, browser-use will try to install it")
+
+ # Get default extension paths and combine with Sentience extension
+ # Chrome only uses the LAST --load-extension arg, so we must combine all extensions
+ log("Collecting all extension paths...")
+ extension_paths = [extension_path]
+
+ # Create a temporary profile to ensure default extensions are downloaded
+ # This ensures extensions exist before we try to load them
+ temp_profile = BrowserProfile(enable_default_extensions=True)
+ default_extensions = temp_profile._ensure_default_extensions_downloaded()
+
+ if default_extensions:
+ extension_paths.extend(default_extensions)
+ log(f" ✅ Found {len(default_extensions)} default extensions")
+ else:
+ log(" ⚠️ No default extensions found (this is OK, Sentience will still work)")
+
+ log(f"Total extensions to load: {len(extension_paths)} (including Sentience)")
+
+ # Combine all extensions into a single --load-extension arg
+ combined_extensions = ",".join(extension_paths)
+ log(f"Combined extension paths (first 100 chars): {combined_extensions[:100]}...")
+
+ # Create browser profile with ALL extensions combined
+ # Strategy: Disable default extensions, manually load all together
+ browser_profile = BrowserProfile(
+ headless=False, # Run with visible browser for demo
+ executable_path=browser_executable, # Use found browser if available
+ enable_default_extensions=False, # Disable auto-loading, we'll load manually
+ ignore_default_args=[
+ "--enable-automation",
+ "--disable-extensions", # Important: don't disable extensions
+ "--hide-scrollbars",
+ # Don't disable component extensions - we need background pages for Sentience
+ ],
+ args=[
+ "--enable-extensions",
+ "--disable-extensions-file-access-check", # Allow extension file access
+ "--disable-extensions-http-throttling", # Don't throttle extension HTTP
+ "--extensions-on-chrome-urls", # Allow extensions on chrome:// URLs
+ f"--load-extension={combined_extensions}", # Load ALL extensions together
+ ],
+ )
+
+ log("Browser profile configured with Sentience extension")
+
+ # Start browser session
+ log("Creating BrowserSession...")
+ browser_session = BrowserSession(browser_profile=browser_profile)
+ await browser_session.start()
+ log("✅ Browser session started")
+
+ # Initialize SentienceAgent
+ llm = ChatBrowserUse()
+ task = """Go to HackerNews Show at https://news.ycombinator.com/show and find the top 1 Show HN post.
+
+IMPORTANT: Do NOT click the post. Instead:
+1. Identify the top post from the Sentience snapshot (it will be the first post in the list)
+2. Note its element ID (index number) and title from the snapshot
+3. Call the done action with the element ID and title in this format: "Top post: element ID [index], title: [title]"
+"""
+
+ log(f"\n🚀 Starting SentienceAgent: {task}\n")
+
+ agent = SentienceAgent(
+ task=task,
+ llm=llm,
+ browser_session=browser_session,
+ tools=None, # Will use default tools
+ # Sentience configuration
+ sentience_api_key=os.getenv("SENTIENCE_API_KEY"),
+ sentience_use_api=True, # Use gateway/API mode
+ sentience_max_elements=40,
+ sentience_show_overlay=True,
+ # Vision fallback configuration
+ vision_fallback_enabled=True,
+ vision_detail_level="auto",
+ vision_include_screenshots=True,
+ # Token tracking
+ calculate_cost=True,
+ # Agent settings
+ max_steps=10, # Limit steps for example
+ max_failures=3,
+ )
+
+ # Run agent
+ result = await agent.run()
+
+ # Get token usage
+ usage_summary = await agent.token_cost_service.get_usage_summary()
+ log(f"\n📊 Token Usage Summary:")
+ log(f" Total tokens: {usage_summary.total_tokens}")
+ log(f" Total cost: ${usage_summary.total_cost:.6f}")
+ log(f" Steps: {result.get('steps', 'unknown')}")
+
+ # Show detailed Sentience usage stats
+ sentience_stats = result.get("sentience_usage_stats", {})
+ if sentience_stats:
+ steps_using = sentience_stats.get("steps_using_sentience", 0)
+ total_steps = sentience_stats.get("total_steps", 0)
+ percentage = sentience_stats.get("sentience_percentage", 0)
+ log(f" Sentience used: {result.get('sentience_used', False)}")
+ log(f" Sentience usage: {steps_using}/{total_steps} steps ({percentage:.1f}%)")
+ else:
+ log(f" Sentience used: {result.get('sentience_used', 'unknown')}")
+
+ except ImportError as e:
+ log(f"❌ Import error: {e}")
+ log("Make sure Sentience SDK is installed: pip install sentienceapi")
+ except Exception as e:
+ log(f"❌ Error: {e}")
+ import traceback
+
+ traceback.print_exc()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/integrations/sentience_agent_local_llm.py b/examples/integrations/sentience_agent_local_llm.py
new file mode 100644
index 0000000000..9ee12e5a51
--- /dev/null
+++ b/examples/integrations/sentience_agent_local_llm.py
@@ -0,0 +1,288 @@
+"""
+Example: SentienceAgent with dual-model setup (local LLM + cloud vision model).
+
+This example demonstrates how to use SentienceAgent with:
+- Primary: Local LLM (Qwen 2.5 3B) for Sentience snapshots (fast, free)
+- Fallback: Cloud vision model (GPT-4o) for vision mode when Sentience fails
+
+Requirements:
+1. Install transformers: pip install transformers torch accelerate
+2. Optional: pip install bitsandbytes (for 4-bit/8-bit quantization)
+3. Sentience SDK installed: pip install sentienceapi
+4. Sentience extension loaded in browser
+5. OPENAI_API_KEY in .env for GPT-4o vision fallback
+
+Note: Local models will be downloaded from Hugging Face on first use.
+Note: `accelerate` is required when using `device_map="auto"`.
+"""
+
+import asyncio
+import logging
+import os
+import traceback
+from pathlib import Path
+
+import glob
+from dotenv import load_dotenv
+
+from browser_use import BrowserProfile, BrowserSession
+from browser_use.integrations.sentience import SentienceAgent
+from browser_use.llm import ChatHuggingFace, ChatOpenAI
+from browser_use.llm.messages import SystemMessage, UserMessage
+from sentience import get_extension_dir
+
+load_dotenv()
+
+# Enable debug logging to see detailed Sentience extension errors
+# Uncomment the next line to see more diagnostic information
+logging.getLogger("browser_use.integrations.sentience").setLevel(logging.DEBUG)
+
+
+def log(msg: str) -> None:
+ """Print with flush for immediate output."""
+ print(msg, flush=True)
+
+
+async def main():
+ """Example: Use SentienceAgent with local LLM (Qwen 2.5 3B or BitNet)."""
+ browser_session = None
+ try:
+ # Get path to Sentience extension
+ extension_path = get_extension_dir()
+ log(f"Loading Sentience extension from: {extension_path}")
+
+ # Verify extension exists
+ if not os.path.exists(extension_path):
+ raise FileNotFoundError(f"Sentience extension not found at: {extension_path}")
+ if not os.path.exists(os.path.join(extension_path, "manifest.json")):
+ raise FileNotFoundError(
+ f"Sentience extension manifest not found at: {extension_path}/manifest.json"
+ )
+ log(f"✅ Sentience extension verified at: {extension_path}")
+
+ # Find browser executable (optional - browser-use will find one if not specified)
+ playwright_cache = Path.home() / "Library/Caches/ms-playwright"
+ browser_patterns = [
+ playwright_cache
+ / "chromium-*/chrome-mac*/Google Chrome for Testing.app/Contents/MacOS/Google Chrome for Testing",
+ playwright_cache / "chromium-*/chrome-mac*/Chromium.app/Contents/MacOS/Chromium",
+ ]
+
+ browser_executable = None
+ for pattern in browser_patterns:
+ matches = glob.glob(str(pattern))
+ if matches:
+ matches.sort()
+ browser_executable = matches[-1] # Use latest version
+ if Path(browser_executable).exists():
+ log(f"✅ Found browser: {browser_executable}")
+ break
+
+ if not browser_executable:
+ log("⚠️ Browser not found, browser-use will try to install it")
+
+ # Get default extension paths and combine with Sentience extension
+ # Chrome only uses the LAST --load-extension arg, so we must combine all extensions
+ log("Collecting all extension paths...")
+ extension_paths = [extension_path]
+
+ # Create a temporary profile to ensure default extensions are downloaded
+ # This ensures extensions exist before we try to load them
+ temp_profile = BrowserProfile(enable_default_extensions=True)
+ default_extensions = temp_profile._ensure_default_extensions_downloaded()
+
+ if default_extensions:
+ extension_paths.extend(default_extensions)
+ log(f" ✅ Found {len(default_extensions)} default extensions")
+ else:
+ log(" ⚠️ No default extensions found (this is OK, Sentience will still work)")
+
+ log(f"Total extensions to load: {len(extension_paths)} (including Sentience)")
+
+ # Combine all extensions into a single --load-extension arg
+ combined_extensions = ",".join(extension_paths)
+ log(f"Combined extension paths (first 100 chars): {combined_extensions[:100]}...")
+
+ # Create browser profile with ALL extensions combined
+ # Strategy: Disable default extensions, manually load all together
+ browser_profile = BrowserProfile(
+ headless=False, # Run with visible browser for demo
+ executable_path=browser_executable, # Use found browser if available
+ enable_default_extensions=False, # Disable auto-loading, we'll load manually
+ ignore_default_args=[
+ "--enable-automation",
+ "--disable-extensions", # Important: don't disable extensions
+ "--hide-scrollbars",
+ # Don't disable component extensions - we need background pages for Sentience
+ ],
+ args=[
+ "--enable-extensions",
+ "--disable-extensions-file-access-check", # Allow extension file access
+ "--disable-extensions-http-throttling", # Don't throttle extension HTTP
+ "--extensions-on-chrome-urls", # Allow extensions on chrome:// URLs
+ f"--load-extension={combined_extensions}", # Load ALL extensions together
+ ],
+ )
+
+ log("Browser profile configured with Sentience extension")
+
+ # Start browser session
+ log("Creating BrowserSession...")
+ browser_session = BrowserSession(browser_profile=browser_profile)
+ await browser_session.start()
+ log("✅ Browser session started")
+
+ # Initialize local LLM via Hugging Face transformers
+ log("\n" + "=" * 80)
+ log("🤖 Initializing Local LLM (Hugging Face transformers)")
+ log("=" * 80)
+
+ # Option 1: Qwen 2.5 3B (recommended for small models)
+ log("📦 Creating ChatHuggingFace instance...")
+ log(" Model: Qwen/Qwen2.5-3B-Instruct")
+ log(" ⚠️ IMPORTANT: Model download happens on FIRST LLM call")
+ log(" This means it will download when agent makes first decision")
+ llm = ChatHuggingFace(
+ model="Qwen/Qwen2.5-3B-Instruct",
+ device_map="auto", # Automatically use GPU if available
+ torch_dtype="float16", # Use float16 for faster inference
+ max_new_tokens=2048, # Increased for complete JSON responses
+ temperature=0.1, # Very low temperature for deterministic structured output
+ )
+ log("✅ ChatHuggingFace instance created (model not loaded yet)")
+
+ # OPTIONAL: Pre-load the model now (before agent starts)
+ # This will download the model immediately so you can see progress
+ log("\n🔄 Pre-loading model (this will download if not cached)...")
+ log(" ⚠️ This is where the download happens - watch for progress!")
+ log(" You can skip this by commenting out the next block")
+ try:
+ # Trigger model loading by calling ainvoke with a simple message
+ # This will download/load the model now
+ test_messages = [
+ SystemMessage(content="You are a helpful assistant."),
+ UserMessage(content="Say 'ready'"),
+ ]
+ log(" 📞 Calling model to trigger download/loading...")
+ log(" ⏳ This may take 5-15 minutes on first run (~6GB download)")
+ log(" 💡 Watch for 'Loading Hugging Face model' messages above")
+ response = await llm.ainvoke(test_messages)
+ log(f" ✅ Model loaded successfully! Response: {response.completion[:50]}...")
+ except Exception as e:
+ log(f" ❌ Model loading failed: {e}")
+ log(" Continuing anyway - model will load on first agent call")
+ traceback.print_exc()
+
+ # Option 2: BitNet B1.58 2B 4T (if available on Hugging Face)
+ # llm = ChatHuggingFace(
+ # model="microsoft/bitnet-b1.58-2B", # Check actual model name on HF
+ # device_map="auto",
+ # torch_dtype="float16",
+ # )
+
+ # Option 3: Other small models
+ # llm = ChatHuggingFace(
+ # model="meta-llama/Llama-3.2-3B-Instruct",
+ # device_map="auto",
+ # torch_dtype="float16",
+ # )
+
+ # Option 4: Use 4-bit quantization to save memory (requires bitsandbytes)
+ # llm = ChatHuggingFace(
+ # model="Qwen/Qwen2.5-3B-Instruct",
+ # device_map="auto",
+ # load_in_4bit=True, # Reduces memory usage significantly
+ # max_new_tokens=2048,
+ # )
+
+ log(f"✅ Using local LLM: {llm.model}")
+ log(f" Device: {llm.device_map}")
+ log("\n⏳ Note: Model will be downloaded from Hugging Face on first use (~6GB)")
+ log(" This may take 5-15 minutes depending on your internet speed...")
+ log(" Model will be cached locally for future runs.\n")
+
+ # Initialize vision LLM for fallback (cloud vision model)
+ log("\n" + "=" * 80)
+ log("👁️ Initializing Vision LLM (Cloud model for vision fallback)")
+ log("=" * 80)
+ log("📦 Creating ChatOpenAI instance for vision fallback...")
+ log(" Model: gpt-4o (vision-capable)")
+ log(" ⚠️ This will only be used when Sentience snapshot fails")
+ vision_llm = ChatOpenAI(model="gpt-4o")
+ log("✅ Vision LLM configured (will be used only for vision fallback)")
+
+ # Initialize SentienceAgent
+ task = """Go to HackerNews Show at https://news.ycombinator.com/show and find the top 1 Show HN post.
+
+IMPORTANT: Do NOT click the post. Instead:
+1. Identify the top post from the Sentience snapshot (it will be the first post in the list)
+2. Note its element ID (index number) and title from the snapshot
+3. Call the done action with the element ID and title in this format: "Top post: element ID [index], title: [title]"
+"""
+
+ log(f"\n🚀 Starting SentienceAgent: {task}\n")
+
+ agent = SentienceAgent(
+ task=task,
+ llm=llm, # Primary LLM: Qwen 3B for Sentience snapshots
+ vision_llm=vision_llm, # Fallback LLM: GPT-4o for vision mode
+ browser_session=browser_session,
+ tools=None, # Will use default tools
+ # Sentience configuration
+ sentience_api_key=os.getenv("SENTIENCE_API_KEY"),
+ sentience_use_api=True, # Use gateway/API mode
+ sentience_max_elements=40,
+ sentience_show_overlay=True,
+ # Vision fallback configuration
+ vision_fallback_enabled=True,
+ vision_detail_level="auto",
+ vision_include_screenshots=True,
+ # Token tracking
+ calculate_cost=True,
+ # Agent settings
+ max_steps=10, # Limit steps for example
+ max_failures=3,
+ # Local LLM specific settings (keep these for local model compatibility)
+ max_history_items=5, # Keep minimal history for small models
+ llm_timeout=300, # Increased timeout for local LLMs (5 minutes)
+ step_timeout=360, # Increased step timeout (6 minutes)
+ )
+
+ # Run agent
+ result = await agent.run()
+
+ # Get token usage
+ usage_summary = await agent.token_cost_service.get_usage_summary()
+ log("\n📊 Token Usage Summary:")
+ log(f" Total tokens: {usage_summary.total_tokens}")
+ log(f" Total cost: ${usage_summary.total_cost:.6f}")
+ log(f" Steps: {result.get('steps', 'unknown')}")
+
+ # Show detailed Sentience usage stats
+ sentience_stats = result.get("sentience_usage_stats", {})
+ if sentience_stats:
+ steps_using = sentience_stats.get("steps_using_sentience", 0)
+ total_steps = sentience_stats.get("total_steps", 0)
+ percentage = sentience_stats.get("sentience_percentage", 0)
+ log(f" Sentience used: {result.get('sentience_used', False)}")
+ log(f" Sentience usage: {steps_using}/{total_steps} steps ({percentage:.1f}%)")
+ else:
+ log(f" Sentience used: {result.get('sentience_used', 'unknown')}")
+
+ except ImportError as e:
+ log(f"❌ Import error: {e}")
+ log("\nPlease install required packages:")
+ log(" pip install transformers torch accelerate sentienceapi")
+ except Exception as e:
+ log(f"❌ Error: {e}")
+ traceback.print_exc()
+ finally:
+ if browser_session is not None:
+ try:
+ await browser_session.stop() # Gracefully stop the browser session
+ except Exception as e:
+ log(f"⚠️ Error stopping browser session: {e}")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/tests/integrations/sentience/test_agent.py b/tests/integrations/sentience/test_agent.py
new file mode 100644
index 0000000000..221cf7cf5e
--- /dev/null
+++ b/tests/integrations/sentience/test_agent.py
@@ -0,0 +1,372 @@
+"""Unit tests for SentienceAgent."""
+
+import pytest
+from unittest.mock import AsyncMock, MagicMock, patch
+
+from browser_use.integrations.sentience.agent import (
+ SentienceAgent,
+ SentienceAgentConfig,
+ SentienceAgentSettings,
+ VisionFallbackConfig,
+)
+
+
+@pytest.fixture
+def mock_llm():
+ """Create a mock LLM."""
+ llm = MagicMock()
+ llm.ainvoke = AsyncMock(return_value=MagicMock(content="test response"))
+ llm.model = "test-model"
+ llm.provider = "test-provider"
+ return llm
+
+
+@pytest.fixture
+def mock_browser_session():
+ """Create a mock browser session."""
+ session = MagicMock()
+ session.is_connected.return_value = True
+ session.get_browser_state_summary = AsyncMock(
+ return_value=MagicMock(
+ url="https://example.com",
+ screenshot=None,
+ page_info=MagicMock(title="Test Page"),
+ dom_state=MagicMock(selector_map={}),
+ )
+ )
+ session.get_current_page_url = AsyncMock(return_value="https://example.com")
+ return session
+
+
+@pytest.fixture
+def mock_tools():
+ """Create a mock tools registry."""
+ return MagicMock()
+
+
+class TestSentienceAgentConfig:
+ """Test SentienceAgentConfig dataclass."""
+
+ def test_default_config(self):
+ """Test default configuration values."""
+ config = SentienceAgentConfig()
+ assert config.sentience_api_key is None
+ assert config.sentience_use_api is None
+ assert config.sentience_max_elements == 60
+ assert config.sentience_show_overlay is False
+ assert config.sentience_wait_for_extension_ms == 5000
+ assert config.sentience_retries == 2
+ assert config.sentience_retry_delay_s == 1.0
+
+ def test_custom_config(self):
+ """Test custom configuration values."""
+ config = SentienceAgentConfig(
+ sentience_api_key="test-key",
+ sentience_max_elements=100,
+ sentience_show_overlay=True,
+ )
+ assert config.sentience_api_key == "test-key"
+ assert config.sentience_max_elements == 100
+ assert config.sentience_show_overlay is True
+
+
+class TestVisionFallbackConfig:
+ """Test VisionFallbackConfig dataclass."""
+
+ def test_default_config(self):
+ """Test default configuration values."""
+ config = VisionFallbackConfig()
+ assert config.enabled is True
+ assert config.detail_level == "auto"
+ assert config.include_screenshots is True
+
+ def test_custom_config(self):
+ """Test custom configuration values."""
+ config = VisionFallbackConfig(
+ enabled=False,
+ detail_level="high",
+ include_screenshots=False,
+ )
+ assert config.enabled is False
+ assert config.detail_level == "high"
+ assert config.include_screenshots is False
+
+
+class TestSentienceAgentSettings:
+ """Test SentienceAgentSettings Pydantic model."""
+
+ def test_default_settings(self):
+ """Test default settings values."""
+ settings = SentienceAgentSettings(task="test task")
+ assert settings.task == "test task"
+ assert settings.max_steps == 100
+ assert settings.max_failures == 3
+ assert settings.calculate_cost is True
+ assert isinstance(settings.sentience_config, SentienceAgentConfig)
+ assert isinstance(settings.vision_fallback, VisionFallbackConfig)
+
+ def test_custom_settings(self):
+ """Test custom settings values."""
+ settings = SentienceAgentSettings(
+ task="custom task",
+ max_steps=50,
+ max_failures=5,
+ calculate_cost=False,
+ )
+ assert settings.task == "custom task"
+ assert settings.max_steps == 50
+ assert settings.max_failures == 5
+ assert settings.calculate_cost is False
+
+
+class TestSentienceAgent:
+ """Test SentienceAgent class."""
+
+ def test_init(self, mock_llm, mock_browser_session, mock_tools):
+ """Test agent initialization."""
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ tools=mock_tools,
+ )
+ assert agent.task == "test task"
+ assert agent.llm == mock_llm
+ assert agent.browser_session == mock_browser_session
+ assert agent.tools == mock_tools
+ assert agent._current_step == 0
+ assert agent._consecutive_failures == 0
+
+ def test_init_with_custom_config(self, mock_llm, mock_browser_session):
+ """Test agent initialization with custom configuration."""
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ sentience_max_elements=100,
+ vision_fallback_enabled=False,
+ )
+ assert agent.settings.sentience_config.sentience_max_elements == 100
+ assert agent.settings.vision_fallback.enabled is False
+
+ def test_get_sentience_context_success(self, mock_llm, mock_browser_session):
+ """Test getting SentienceContext when SDK is available."""
+ with patch("browser_use.integrations.sentience.agent.SentienceContext") as mock_context:
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ )
+ context = agent._get_sentience_context()
+ assert context is not None
+ mock_context.assert_called_once()
+
+ def test_get_sentience_context_import_error(self, mock_llm, mock_browser_session):
+ """Test getting SentienceContext when SDK is not available."""
+ with patch(
+ "browser_use.integrations.sentience.agent.SentienceContext",
+ side_effect=ImportError("No module named 'sentience'"),
+ ):
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ )
+ with pytest.raises(ImportError, match="Sentience SDK is required"):
+ agent._get_sentience_context()
+
+ @pytest.mark.asyncio
+ async def test_try_sentience_snapshot_success(
+ self, mock_llm, mock_browser_session
+ ):
+ """Test successful Sentience snapshot."""
+ mock_state = MagicMock()
+ mock_state.prompt_block = "test prompt block"
+
+ with patch.object(
+ SentienceAgent, "_get_sentience_context", return_value=MagicMock()
+ ) as mock_get_context:
+ mock_context = MagicMock()
+ mock_context.build = AsyncMock(return_value=mock_state)
+ mock_get_context.return_value = mock_context
+
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ )
+ result = await agent._try_sentience_snapshot()
+
+ assert result == mock_state
+ mock_context.build.assert_called_once()
+
+ @pytest.mark.asyncio
+ async def test_try_sentience_snapshot_failure(
+ self, mock_llm, mock_browser_session
+ ):
+ """Test failed Sentience snapshot."""
+ with patch.object(
+ SentienceAgent, "_get_sentience_context", return_value=MagicMock()
+ ) as mock_get_context:
+ mock_context = MagicMock()
+ mock_context.build = AsyncMock(return_value=None)
+ mock_get_context.return_value = mock_context
+
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ )
+ result = await agent._try_sentience_snapshot()
+
+ assert result is None
+
+ @pytest.mark.asyncio
+ async def test_build_sentience_message(self, mock_llm, mock_browser_session):
+ """Test building message with Sentience prompt block."""
+ mock_state = MagicMock()
+ mock_state.prompt_block = "Elements: ID|role|text|...\n1|button|Click|...\n"
+
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ )
+ message = agent._build_sentience_message(mock_state)
+
+ assert isinstance(message.content, str)
+ assert "agent_history" in message.content
+ assert "browser_state" in message.content
+ assert mock_state.prompt_block in message.content
+
+ @pytest.mark.asyncio
+ async def test_build_vision_message_without_screenshot(
+ self, mock_llm, mock_browser_session
+ ):
+ """Test building vision message without screenshot."""
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ vision_include_screenshots=False,
+ )
+ message = await agent._build_vision_message()
+
+ assert isinstance(message.content, str)
+ assert "agent_history" in message.content
+ assert "browser_state" in message.content
+
+ @pytest.mark.asyncio
+ async def test_build_vision_message_with_screenshot(
+ self, mock_llm, mock_browser_session
+ ):
+ """Test building vision message with screenshot."""
+ mock_browser_session.get_browser_state_summary.return_value = MagicMock(
+ url="https://example.com",
+ screenshot="base64_screenshot_data",
+ page_info=MagicMock(title="Test Page"),
+ dom_state=MagicMock(selector_map={}),
+ )
+
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ vision_include_screenshots=True,
+ )
+ message = await agent._build_vision_message()
+
+ # Should be a list of content parts when screenshot is included
+ assert isinstance(message.content, list)
+ assert len(message.content) == 3 # text, label, image
+
+ @pytest.mark.asyncio
+ async def test_prepare_context_sentience_success(
+ self, mock_llm, mock_browser_session
+ ):
+ """Test context preparation with successful Sentience snapshot."""
+ mock_state = MagicMock()
+ mock_state.prompt_block = "test prompt block"
+
+ with patch.object(
+ SentienceAgent, "_try_sentience_snapshot", return_value=mock_state
+ ):
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ )
+ message, sentience_used = await agent._prepare_context()
+
+ assert sentience_used is True
+ assert isinstance(message.content, str)
+ assert agent._sentience_used_in_last_step is True
+
+ @pytest.mark.asyncio
+ async def test_prepare_context_vision_fallback(
+ self, mock_llm, mock_browser_session
+ ):
+ """Test context preparation with vision fallback."""
+ with patch.object(
+ SentienceAgent, "_try_sentience_snapshot", return_value=None
+ ):
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ vision_fallback_enabled=True,
+ )
+ message, sentience_used = await agent._prepare_context()
+
+ assert sentience_used is False
+ assert agent._sentience_used_in_last_step is False
+
+ @pytest.mark.asyncio
+ async def test_prepare_context_no_fallback(
+ self, mock_llm, mock_browser_session
+ ):
+ """Test context preparation without fallback."""
+ with patch.object(
+ SentienceAgent, "_try_sentience_snapshot", return_value=None
+ ):
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ vision_fallback_enabled=False,
+ )
+ message, sentience_used = await agent._prepare_context()
+
+ assert sentience_used is False
+ assert isinstance(message.content, str)
+ assert "agent_history" in message.content
+
+ def test_get_agent_history_description(self, mock_llm, mock_browser_session):
+ """Test agent history description generation."""
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ )
+ agent._current_step = 0
+ history = agent._get_agent_history_description()
+ assert "test task" in history
+ assert "Step: 1" in history
+
+ def test_build_dom_state(self, mock_llm, mock_browser_session):
+ """Test DOM state building."""
+ mock_browser_state = MagicMock()
+ mock_browser_state.url = "https://example.com"
+ mock_browser_state.page_info = MagicMock(title="Test Page")
+ mock_browser_state.dom_state = MagicMock(selector_map={"1": "button"})
+
+ agent = SentienceAgent(
+ task="test task",
+ llm=mock_llm,
+ browser_session=mock_browser_session,
+ )
+ dom_state = agent._build_dom_state(mock_browser_state)
+
+ assert "https://example.com" in dom_state
+ assert "Test Page" in dom_state
+ assert "Interactive elements: 1" in dom_state