diff --git a/README.md b/README.md index e52c62b..2631594 100644 --- a/README.md +++ b/README.md @@ -487,6 +487,28 @@ for match in result.results: --- +## šŸ”„ Async API + +For asyncio contexts (FastAPI, async frameworks): + +```python +from sentience.async_api import AsyncSentienceBrowser, snapshot_async, click_async, find + +async def main(): + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + snap = await snapshot_async(browser) + button = find(snap, "role=button") + if button: + await click_async(browser, button.id) + +asyncio.run(main()) +``` + +**See example:** `examples/async_api_demo.py` + +--- + ## šŸ“‹ Reference
diff --git a/examples/async_api_demo.py b/examples/async_api_demo.py new file mode 100644 index 0000000..81b478a --- /dev/null +++ b/examples/async_api_demo.py @@ -0,0 +1,192 @@ +""" +Example: Using Async API for asyncio contexts + +This example demonstrates how to use the Sentience SDK's async API +when working with asyncio, FastAPI, or other async frameworks. + +To run this example: + python -m examples.async_api_demo + +Or if sentience is installed: + python examples/async_api_demo.py +""" + +import asyncio +import os + +# Import async API functions +from sentience.async_api import ( + AsyncSentienceBrowser, + click_async, + find, + press_async, + snapshot_async, + type_text_async, +) +from sentience.models import SnapshotOptions, Viewport + + +async def basic_async_example(): + """Basic async browser usage with context manager""" + api_key = os.environ.get("SENTIENCE_API_KEY") + + # Use async context manager + async with AsyncSentienceBrowser(api_key=api_key, headless=False) as browser: + # Navigate to a page + await browser.goto("https://example.com") + + # Take a snapshot (async) + snap = await snapshot_async(browser) + print(f"āœ… Found {len(snap.elements)} elements on the page") + + # Find an element + link = find(snap, "role=link") + if link: + print(f"Found link: {link.text} (id: {link.id})") + + # Click it (async) + result = await click_async(browser, link.id) + print(f"Click result: success={result.success}, outcome={result.outcome}") + + +async def custom_viewport_example(): + """Example using custom viewport with Viewport class""" + # Use Viewport class for type safety + custom_viewport = Viewport(width=1920, height=1080) + + async with AsyncSentienceBrowser(viewport=custom_viewport, headless=False) as browser: + await browser.goto("https://example.com") + + # Verify viewport size + viewport_size = await browser.page.evaluate( + "() => ({ width: window.innerWidth, height: window.innerHeight })" + ) + print(f"āœ… Viewport: {viewport_size['width']}x{viewport_size['height']}") + + +async def snapshot_with_options_example(): + """Example using SnapshotOptions with async API""" + async with AsyncSentienceBrowser(headless=False) as browser: + await browser.goto("https://example.com") + + # Take snapshot with options + options = SnapshotOptions( + limit=10, + screenshot=False, + show_overlay=False, + ) + snap = await snapshot_async(browser, options) + print(f"āœ… Snapshot with limit=10: {len(snap.elements)} elements") + + +async def actions_example(): + """Example of all async actions""" + async with AsyncSentienceBrowser(headless=False) as browser: + await browser.goto("https://example.com") + + # Take snapshot + snap = await snapshot_async(browser) + + # Find a textbox if available + textbox = find(snap, "role=textbox") + if textbox: + # Type text (async) + result = await type_text_async(browser, textbox.id, "Hello, World!") + print(f"āœ… Typed text: success={result.success}") + + # Press a key (async) + result = await press_async(browser, "Enter") + print(f"āœ… Pressed Enter: success={result.success}") + + +async def from_existing_context_example(): + """Example using from_existing() with existing Playwright context""" + from playwright.async_api import async_playwright + + async with async_playwright() as p: + # Create your own Playwright context + context = await p.chromium.launch_persistent_context("", headless=True) + + try: + # Create SentienceBrowser from existing context + browser = await AsyncSentienceBrowser.from_existing(context) + await browser.goto("https://example.com") + + # Use Sentience SDK functions + snap = await snapshot_async(browser) + print(f"āœ… Using existing context: {len(snap.elements)} elements") + finally: + await context.close() + + +async def from_existing_page_example(): + """Example using from_page() with existing Playwright page""" + from playwright.async_api import async_playwright + + async with async_playwright() as p: + browser_instance = await p.chromium.launch(headless=True) + context = await browser_instance.new_context() + page = await context.new_page() + + try: + # Create SentienceBrowser from existing page + sentience_browser = await AsyncSentienceBrowser.from_page(page) + await sentience_browser.goto("https://example.com") + + # Use Sentience SDK functions + snap = await snapshot_async(sentience_browser) + print(f"āœ… Using existing page: {len(snap.elements)} elements") + finally: + await context.close() + await browser_instance.close() + + +async def multiple_browsers_example(): + """Example running multiple browsers concurrently""" + + async def process_site(url: str): + async with AsyncSentienceBrowser(headless=True) as browser: + await browser.goto(url) + snap = await snapshot_async(browser) + return {"url": url, "elements": len(snap.elements)} + + # Process multiple sites concurrently + urls = [ + "https://example.com", + "https://httpbin.org/html", + ] + + results = await asyncio.gather(*[process_site(url) for url in urls]) + for result in results: + print(f"āœ… {result['url']}: {result['elements']} elements") + + +async def main(): + """Run all examples""" + print("=== Basic Async Example ===") + await basic_async_example() + + print("\n=== Custom Viewport Example ===") + await custom_viewport_example() + + print("\n=== Snapshot with Options Example ===") + await snapshot_with_options_example() + + print("\n=== Actions Example ===") + await actions_example() + + print("\n=== From Existing Context Example ===") + await from_existing_context_example() + + print("\n=== From Existing Page Example ===") + await from_existing_page_example() + + print("\n=== Multiple Browsers Concurrent Example ===") + await multiple_browsers_example() + + print("\nāœ… All async examples completed!") + + +if __name__ == "__main__": + # Run the async main function + asyncio.run(main()) diff --git a/screenshot.png b/screenshot.png index e96bc42..847eb73 100644 Binary files a/screenshot.png and b/screenshot.png differ diff --git a/sentience/_extension_loader.py b/sentience/_extension_loader.py new file mode 100644 index 0000000..d969ec3 --- /dev/null +++ b/sentience/_extension_loader.py @@ -0,0 +1,40 @@ +""" +Shared extension loading logic for sync and async implementations +""" + +from pathlib import Path + + +def find_extension_path() -> Path: + """ + Find Sentience extension directory (shared logic for sync and async). + + Checks multiple locations: + 1. sentience/extension/ (installed package) + 2. ../sentience-chrome (development/monorepo) + + Returns: + Path to extension directory + + Raises: + FileNotFoundError: If extension not found in any location + """ + # 1. Try relative to this file (installed package structure) + # sentience/_extension_loader.py -> sentience/extension/ + package_ext_path = Path(__file__).parent / "extension" + + # 2. Try development root (if running from source repo) + # sentience/_extension_loader.py -> ../sentience-chrome + dev_ext_path = Path(__file__).parent.parent.parent / "sentience-chrome" + + if package_ext_path.exists() and (package_ext_path / "manifest.json").exists(): + return package_ext_path + elif dev_ext_path.exists() and (dev_ext_path / "manifest.json").exists(): + return dev_ext_path + else: + raise FileNotFoundError( + f"Extension not found. Checked:\n" + f"1. {package_ext_path}\n" + f"2. {dev_ext_path}\n" + "Make sure the extension is built and 'sentience/extension' directory exists." + ) diff --git a/sentience/async_api.py b/sentience/async_api.py new file mode 100644 index 0000000..12beefb --- /dev/null +++ b/sentience/async_api.py @@ -0,0 +1,1160 @@ +""" +Async API for Sentience SDK - Use this in asyncio contexts + +This module provides async versions of all Sentience SDK functions. +Use AsyncSentienceBrowser when working with async/await code. +""" + +import asyncio +import base64 +import os +import shutil +import tempfile +import time +from pathlib import Path +from typing import Any, Optional +from urllib.parse import urlparse + +from playwright.async_api import BrowserContext, Page, Playwright, async_playwright + +from sentience._extension_loader import find_extension_path +from sentience.models import ( + ActionResult, + BBox, + Element, + ProxyConfig, + Snapshot, + SnapshotOptions, + StorageState, + Viewport, + WaitResult, +) + +# Import stealth for bot evasion (optional - graceful fallback if not available) +try: + from playwright_stealth import stealth_async + + STEALTH_AVAILABLE = True +except ImportError: + STEALTH_AVAILABLE = False + + +class AsyncSentienceBrowser: + """Async version of SentienceBrowser for use in asyncio contexts.""" + + def __init__( + self, + api_key: str | None = None, + api_url: str | None = None, + headless: bool | None = None, + proxy: str | None = None, + user_data_dir: str | Path | None = None, + storage_state: str | Path | StorageState | dict | None = None, + record_video_dir: str | Path | None = None, + record_video_size: dict[str, int] | None = None, + viewport: Viewport | dict[str, int] | None = None, + ): + """ + Initialize Async Sentience browser + + Args: + api_key: Optional API key for server-side processing (Pro/Enterprise tiers) + If None, uses free tier (local extension only) + api_url: Server URL for API calls (defaults to https://api.sentienceapi.com if api_key provided) + headless: Whether to run in headless mode. If None, defaults to True in CI, False otherwise + proxy: Optional proxy server URL (e.g., 'http://user:pass@proxy.example.com:8080') + user_data_dir: Optional path to user data directory for persistent sessions + storage_state: Optional storage state to inject (cookies + localStorage) + record_video_dir: Optional directory path to save video recordings + record_video_size: Optional video resolution as dict with 'width' and 'height' keys + viewport: Optional viewport size as Viewport object or dict with 'width' and 'height' keys. + Examples: Viewport(width=1280, height=800) (default) + Viewport(width=1920, height=1080) (Full HD) + {"width": 1280, "height": 800} (dict also supported) + If None, defaults to Viewport(width=1280, height=800). + """ + self.api_key = api_key + # Only set api_url if api_key is provided, otherwise None (free tier) + if self.api_key and not api_url: + self.api_url = "https://api.sentienceapi.com" + else: + self.api_url = api_url + + # Determine headless mode + if headless is None: + # Default to False for local dev, True for CI + self.headless = os.environ.get("CI", "").lower() == "true" + else: + self.headless = headless + + # Support proxy from argument or environment variable + self.proxy = proxy or os.environ.get("SENTIENCE_PROXY") + + # Auth injection support + self.user_data_dir = user_data_dir + self.storage_state = storage_state + + # Video recording support + self.record_video_dir = record_video_dir + self.record_video_size = record_video_size or {"width": 1280, "height": 800} + + # Viewport configuration - convert dict to Viewport if needed + if viewport is None: + self.viewport = Viewport(width=1280, height=800) + elif isinstance(viewport, dict): + self.viewport = Viewport(width=viewport["width"], height=viewport["height"]) + else: + self.viewport = viewport + + self.playwright: Playwright | None = None + self.context: BrowserContext | None = None + self.page: Page | None = None + self._extension_path: str | None = None + + def _parse_proxy(self, proxy_string: str) -> ProxyConfig | None: + """ + Parse proxy connection string into ProxyConfig. + + Args: + proxy_string: Proxy URL (e.g., 'http://user:pass@proxy.example.com:8080') + + Returns: + ProxyConfig object or None if invalid + """ + if not proxy_string: + return None + + try: + parsed = urlparse(proxy_string) + + # Validate scheme + if parsed.scheme not in ("http", "https", "socks5"): + print(f"āš ļø [Sentience] Unsupported proxy scheme: {parsed.scheme}") + print(" Supported: http, https, socks5") + return None + + # Validate host and port + if not parsed.hostname or not parsed.port: + print("āš ļø [Sentience] Proxy URL must include hostname and port") + print(" Expected format: http://username:password@host:port") + return None + + # Build server URL + server = f"{parsed.scheme}://{parsed.hostname}:{parsed.port}" + + # Create ProxyConfig with optional credentials + return ProxyConfig( + server=server, + username=parsed.username if parsed.username else None, + password=parsed.password if parsed.password else None, + ) + + except Exception as e: + print(f"āš ļø [Sentience] Invalid proxy configuration: {e}") + print(" Expected format: http://username:password@host:port") + return None + + async def start(self) -> None: + """Launch browser with extension loaded (async)""" + # Get extension source path using shared utility + extension_source = find_extension_path() + + # Create temporary extension bundle + self._extension_path = tempfile.mkdtemp(prefix="sentience-ext-") + shutil.copytree(extension_source, self._extension_path, dirs_exist_ok=True) + + self.playwright = await async_playwright().start() + + # Build launch arguments + args = [ + f"--disable-extensions-except={self._extension_path}", + f"--load-extension={self._extension_path}", + "--disable-blink-features=AutomationControlled", + "--no-sandbox", + "--disable-infobars", + "--disable-features=WebRtcHideLocalIpsWithMdns", + "--force-webrtc-ip-handling-policy=disable_non_proxied_udp", + ] + + if self.headless: + args.append("--headless=new") + + # Parse proxy configuration if provided + proxy_config = self._parse_proxy(self.proxy) if self.proxy else None + + # Handle User Data Directory + if self.user_data_dir: + user_data_dir = str(self.user_data_dir) + Path(user_data_dir).mkdir(parents=True, exist_ok=True) + else: + user_data_dir = "" + + # Build launch_persistent_context parameters + launch_params = { + "user_data_dir": user_data_dir, + "headless": False, + "args": args, + "viewport": {"width": self.viewport.width, "height": self.viewport.height}, + "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + } + + # Add proxy if configured + if proxy_config: + launch_params["proxy"] = proxy_config.to_playwright_dict() + launch_params["ignore_https_errors"] = True + print(f"🌐 [Sentience] Using proxy: {proxy_config.server}") + + # Add video recording if configured + if self.record_video_dir: + video_dir = Path(self.record_video_dir) + video_dir.mkdir(parents=True, exist_ok=True) + launch_params["record_video_dir"] = str(video_dir) + launch_params["record_video_size"] = self.record_video_size + print(f"šŸŽ„ [Sentience] Recording video to: {video_dir}") + print( + f" Resolution: {self.record_video_size['width']}x{self.record_video_size['height']}" + ) + + # Launch persistent context + self.context = await self.playwright.chromium.launch_persistent_context(**launch_params) + + self.page = self.context.pages[0] if self.context.pages else await self.context.new_page() + + # Inject storage state if provided + if self.storage_state: + await self._inject_storage_state(self.storage_state) + + # Apply stealth if available + if STEALTH_AVAILABLE: + await stealth_async(self.page) + + # Wait a moment for extension to initialize + await asyncio.sleep(0.5) + + async def goto(self, url: str) -> None: + """Navigate to a URL and ensure extension is ready (async)""" + if not self.page: + raise RuntimeError("Browser not started. Call await start() first.") + + await self.page.goto(url, wait_until="domcontentloaded") + + # Wait for extension to be ready + if not await self._wait_for_extension(): + try: + diag = await self.page.evaluate( + """() => ({ + sentience_defined: typeof window.sentience !== 'undefined', + registry_defined: typeof window.sentience_registry !== 'undefined', + snapshot_defined: window.sentience && typeof window.sentience.snapshot === 'function', + extension_id: document.documentElement.dataset.sentienceExtensionId || 'not set', + url: window.location.href + })""" + ) + except Exception as e: + diag = f"Failed to get diagnostics: {str(e)}" + + raise RuntimeError( + "Extension failed to load after navigation. Make sure:\n" + "1. Extension is built (cd sentience-chrome && ./build.sh)\n" + "2. All files are present (manifest.json, content.js, injected_api.js, pkg/)\n" + "3. Check browser console for errors (run with headless=False to see console)\n" + f"4. Extension path: {self._extension_path}\n" + f"5. Diagnostic info: {diag}" + ) + + async def _inject_storage_state(self, storage_state: str | Path | StorageState | dict) -> None: + """Inject storage state (cookies + localStorage) into browser context (async)""" + import json + + # Load storage state + if isinstance(storage_state, (str, Path)): + with open(storage_state, encoding="utf-8") as f: + state_dict = json.load(f) + state = StorageState.from_dict(state_dict) + elif isinstance(storage_state, StorageState): + state = storage_state + elif isinstance(storage_state, dict): + state = StorageState.from_dict(storage_state) + else: + raise ValueError( + f"Invalid storage_state type: {type(storage_state)}. " + "Expected str, Path, StorageState, or dict." + ) + + # Inject cookies + if state.cookies: + playwright_cookies = [] + for cookie in state.cookies: + cookie_dict = cookie.model_dump() + playwright_cookie = { + "name": cookie_dict["name"], + "value": cookie_dict["value"], + "domain": cookie_dict["domain"], + "path": cookie_dict["path"], + } + if cookie_dict.get("expires"): + playwright_cookie["expires"] = cookie_dict["expires"] + if cookie_dict.get("httpOnly"): + playwright_cookie["httpOnly"] = cookie_dict["httpOnly"] + if cookie_dict.get("secure"): + playwright_cookie["secure"] = cookie_dict["secure"] + if cookie_dict.get("sameSite"): + playwright_cookie["sameSite"] = cookie_dict["sameSite"] + playwright_cookies.append(playwright_cookie) + + await self.context.add_cookies(playwright_cookies) + print(f"āœ… [Sentience] Injected {len(state.cookies)} cookie(s)") + + # Inject LocalStorage + if state.origins: + for origin_data in state.origins: + origin = origin_data.origin + if not origin: + continue + + try: + await self.page.goto(origin, wait_until="domcontentloaded", timeout=10000) + + if origin_data.localStorage: + localStorage_dict = { + item.name: item.value for item in origin_data.localStorage + } + await self.page.evaluate( + """(localStorage_data) => { + for (const [key, value] of Object.entries(localStorage_data)) { + localStorage.setItem(key, value); + } + }""", + localStorage_dict, + ) + print( + f"āœ… [Sentience] Injected {len(origin_data.localStorage)} localStorage item(s) for {origin}" + ) + except Exception as e: + print(f"āš ļø [Sentience] Failed to inject localStorage for {origin}: {e}") + + async def _wait_for_extension(self, timeout_sec: float = 5.0) -> bool: + """Poll for window.sentience to be available (async)""" + start_time = time.time() + last_error = None + + while time.time() - start_time < timeout_sec: + try: + result = await self.page.evaluate( + """() => { + if (typeof window.sentience === 'undefined') { + return { ready: false, reason: 'window.sentience undefined' }; + } + if (window.sentience._wasmModule === null) { + return { ready: false, reason: 'WASM module not fully loaded' }; + } + return { ready: true }; + } + """ + ) + + if isinstance(result, dict): + if result.get("ready"): + return True + last_error = result.get("reason", "Unknown error") + except Exception as e: + last_error = f"Evaluation error: {str(e)}" + + await asyncio.sleep(0.3) + + if last_error: + import warnings + + warnings.warn(f"Extension wait timeout. Last status: {last_error}") + + return False + + async def close(self, output_path: str | Path | None = None) -> str | None: + """ + Close browser and cleanup (async) + + Args: + output_path: Optional path to rename the video file to + + Returns: + Path to video file if recording was enabled, None otherwise + """ + temp_video_path = None + + if self.record_video_dir: + try: + if self.page and self.page.video: + temp_video_path = await self.page.video.path() + elif self.context: + for page in self.context.pages: + if page.video: + temp_video_path = await page.video.path() + break + except Exception: + pass + + if self.context: + await self.context.close() + self.context = None + + if self.playwright: + await self.playwright.stop() + self.playwright = None + + if self._extension_path and os.path.exists(self._extension_path): + shutil.rmtree(self._extension_path) + + # Clear page reference after closing context + self.page = None + + final_path = temp_video_path + if temp_video_path and output_path and os.path.exists(temp_video_path): + try: + output_path = str(output_path) + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + shutil.move(temp_video_path, output_path) + final_path = output_path + except Exception as e: + import warnings + + warnings.warn(f"Failed to rename video file: {e}") + final_path = temp_video_path + + return final_path + + async def __aenter__(self): + """Async context manager entry""" + await self.start() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit""" + await self.close() + + @classmethod + async def from_existing( + cls, + context: BrowserContext, + api_key: str | None = None, + api_url: str | None = None, + ) -> "AsyncSentienceBrowser": + """ + Create AsyncSentienceBrowser from an existing Playwright BrowserContext. + + Args: + context: Existing Playwright BrowserContext + api_key: Optional API key for server-side processing + api_url: Optional API URL + + Returns: + AsyncSentienceBrowser instance configured to use the existing context + """ + instance = cls(api_key=api_key, api_url=api_url) + instance.context = context + pages = context.pages + instance.page = pages[0] if pages else await context.new_page() + + # Apply stealth if available + if STEALTH_AVAILABLE: + await stealth_async(instance.page) + + # Wait for extension to be ready + await asyncio.sleep(0.5) + + return instance + + @classmethod + async def from_page( + cls, + page: Page, + api_key: str | None = None, + api_url: str | None = None, + ) -> "AsyncSentienceBrowser": + """ + Create AsyncSentienceBrowser from an existing Playwright Page. + + Args: + page: Existing Playwright Page + api_key: Optional API key for server-side processing + api_url: Optional API URL + + Returns: + AsyncSentienceBrowser instance configured to use the existing page + """ + instance = cls(api_key=api_key, api_url=api_url) + instance.page = page + instance.context = page.context + + # Apply stealth if available + if STEALTH_AVAILABLE: + await stealth_async(instance.page) + + # Wait for extension to be ready + await asyncio.sleep(0.5) + + return instance + + +# ========== Async Snapshot Functions ========== + + +async def snapshot_async( + browser: AsyncSentienceBrowser, + options: SnapshotOptions | None = None, +) -> Snapshot: + """ + Take a snapshot of the current page (async) + + Args: + browser: AsyncSentienceBrowser instance + options: Snapshot options (screenshot, limit, filter, etc.) + If None, uses default options. + + Returns: + Snapshot object + + Example: + # Basic snapshot with defaults + snap = await snapshot_async(browser) + + # With options + snap = await snapshot_async(browser, SnapshotOptions( + screenshot=True, + limit=100, + show_overlay=True + )) + """ + # Use default options if none provided + if options is None: + options = SnapshotOptions() + + # Determine if we should use server-side API + should_use_api = ( + options.use_api if options.use_api is not None else (browser.api_key is not None) + ) + + if should_use_api and browser.api_key: + # Use server-side API (Pro/Enterprise tier) + return await _snapshot_via_api_async(browser, options) + else: + # Use local extension (Free tier) + return await _snapshot_via_extension_async(browser, options) + + +async def _snapshot_via_extension_async( + browser: AsyncSentienceBrowser, + options: SnapshotOptions, +) -> Snapshot: + """Take snapshot using local extension (Free tier) - async""" + if not browser.page: + raise RuntimeError("Browser not started. Call await browser.start() first.") + + # Wait for extension injection to complete + try: + await browser.page.wait_for_function( + "typeof window.sentience !== 'undefined'", + timeout=5000, + ) + except Exception as e: + try: + diag = await browser.page.evaluate( + """() => ({ + sentience_defined: typeof window.sentience !== 'undefined', + extension_id: document.documentElement.dataset.sentienceExtensionId || 'not set', + url: window.location.href + })""" + ) + except Exception: + diag = {"error": "Could not gather diagnostics"} + + raise RuntimeError( + f"Sentience extension failed to inject window.sentience API. " + f"Is the extension loaded? Diagnostics: {diag}" + ) from e + + # Build options dict for extension API + ext_options: dict[str, Any] = {} + if options.screenshot is not False: + ext_options["screenshot"] = options.screenshot + if options.limit != 50: + ext_options["limit"] = options.limit + if options.filter is not None: + ext_options["filter"] = ( + options.filter.model_dump() if hasattr(options.filter, "model_dump") else options.filter + ) + + # Call extension API + result = await browser.page.evaluate( + """ + (options) => { + return window.sentience.snapshot(options); + } + """, + ext_options, + ) + + # Save trace if requested + if options.save_trace: + from sentience.snapshot import _save_trace_to_file + + _save_trace_to_file(result.get("raw_elements", []), options.trace_path) + + # Show visual overlay if requested + if options.show_overlay: + raw_elements = result.get("raw_elements", []) + if raw_elements: + await browser.page.evaluate( + """ + (elements) => { + if (window.sentience && window.sentience.showOverlay) { + window.sentience.showOverlay(elements, null); + } + } + """, + raw_elements, + ) + + # Validate and parse with Pydantic + snapshot_obj = Snapshot(**result) + return snapshot_obj + + +async def _snapshot_via_api_async( + browser: AsyncSentienceBrowser, + options: SnapshotOptions, +) -> Snapshot: + """Take snapshot using server-side API (Pro/Enterprise tier) - async""" + if not browser.page: + raise RuntimeError("Browser not started. Call await browser.start() first.") + + if not browser.api_key: + raise ValueError("API key required for server-side processing") + + if not browser.api_url: + raise ValueError("API URL required for server-side processing") + + # Wait for extension injection + try: + await browser.page.wait_for_function( + "typeof window.sentience !== 'undefined'", timeout=5000 + ) + except Exception as e: + raise RuntimeError( + "Sentience extension failed to inject. Cannot collect raw data for API processing." + ) from e + + # Step 1: Get raw data from local extension + raw_options: dict[str, any] = {} + if options.screenshot is not False: + raw_options["screenshot"] = options.screenshot + + raw_result = await browser.page.evaluate( + """ + (options) => { + return window.sentience.snapshot(options); + } + """, + raw_options, + ) + + # Save trace if requested + if options.save_trace: + from sentience.snapshot import _save_trace_to_file + + _save_trace_to_file(raw_result.get("raw_elements", []), options.trace_path) + + # Step 2: Send to server for smart ranking/filtering + import json + + from sentience.snapshot import MAX_PAYLOAD_BYTES + + payload = { + "raw_elements": raw_result.get("raw_elements", []), + "url": raw_result.get("url", ""), + "viewport": raw_result.get("viewport"), + "goal": options.goal, + "options": { + "limit": options.limit, + "filter": options.filter.model_dump() if options.filter else None, + }, + } + + # Check payload size + payload_json = json.dumps(payload) + payload_size = len(payload_json.encode("utf-8")) + if payload_size > MAX_PAYLOAD_BYTES: + raise ValueError( + f"Payload size ({payload_size / 1024 / 1024:.2f}MB) exceeds server limit " + f"({MAX_PAYLOAD_BYTES / 1024 / 1024:.0f}MB). " + f"Try reducing the number of elements on the page or filtering elements." + ) + + headers = { + "Authorization": f"Bearer {browser.api_key}", + "Content-Type": "application/json", + } + + try: + import aiohttp + + async with aiohttp.ClientSession() as session: + async with session.post( + f"{browser.api_url}/v1/snapshot", + data=payload_json, + headers=headers, + timeout=aiohttp.ClientTimeout(total=30), + ) as response: + response.raise_for_status() + api_result = await response.json() + + # Merge API result with local data + snapshot_data = { + "status": api_result.get("status", "success"), + "timestamp": api_result.get("timestamp"), + "url": api_result.get("url", raw_result.get("url", "")), + "viewport": api_result.get("viewport", raw_result.get("viewport")), + "elements": api_result.get("elements", []), + "screenshot": raw_result.get("screenshot"), + "screenshot_format": raw_result.get("screenshot_format"), + "error": api_result.get("error"), + } + + # Show visual overlay if requested + if options.show_overlay: + elements = api_result.get("elements", []) + if elements: + await browser.page.evaluate( + """ + (elements) => { + if (window.sentience && window.sentience.showOverlay) { + window.sentience.showOverlay(elements, null); + } + } + """, + elements, + ) + + return Snapshot(**snapshot_data) + except ImportError: + # Fallback to requests if aiohttp not available (shouldn't happen in async context) + raise RuntimeError( + "aiohttp is required for async API calls. Install it with: pip install aiohttp" + ) + except Exception as e: + raise RuntimeError(f"API request failed: {e}") + + +# ========== Async Action Functions ========== + + +async def click_async( + browser: AsyncSentienceBrowser, + element_id: int, + use_mouse: bool = True, + take_snapshot: bool = False, +) -> ActionResult: + """ + Click an element by ID using hybrid approach (async) + + Args: + browser: AsyncSentienceBrowser instance + element_id: Element ID from snapshot + use_mouse: If True, use Playwright's mouse.click() at element center + take_snapshot: Whether to take snapshot after action + + Returns: + ActionResult + """ + if not browser.page: + raise RuntimeError("Browser not started. Call await browser.start() first.") + + start_time = time.time() + url_before = browser.page.url + + if use_mouse: + try: + snap = await snapshot_async(browser) + element = None + for el in snap.elements: + if el.id == element_id: + element = el + break + + if element: + center_x = element.bbox.x + element.bbox.width / 2 + center_y = element.bbox.y + element.bbox.height / 2 + try: + await browser.page.mouse.click(center_x, center_y) + success = True + except Exception: + success = True + else: + try: + success = await browser.page.evaluate( + """ + (id) => { + return window.sentience.click(id); + } + """, + element_id, + ) + except Exception: + success = True + except Exception: + try: + success = await browser.page.evaluate( + """ + (id) => { + return window.sentience.click(id); + } + """, + element_id, + ) + except Exception: + success = True + else: + success = await browser.page.evaluate( + """ + (id) => { + return window.sentience.click(id); + } + """, + element_id, + ) + + # Wait a bit for navigation/DOM updates + try: + await browser.page.wait_for_timeout(500) + except Exception: + pass + + duration_ms = int((time.time() - start_time) * 1000) + + # Check if URL changed + try: + url_after = browser.page.url + url_changed = url_before != url_after + except Exception: + url_after = url_before + url_changed = True + + # Determine outcome + outcome: str | None = None + if url_changed: + outcome = "navigated" + elif success: + outcome = "dom_updated" + else: + outcome = "error" + + # Optional snapshot after + snapshot_after: Snapshot | None = None + if take_snapshot: + try: + snapshot_after = await snapshot_async(browser) + except Exception: + pass + + return ActionResult( + success=success, + duration_ms=duration_ms, + outcome=outcome, + url_changed=url_changed, + snapshot_after=snapshot_after, + error=( + None + if success + else { + "code": "click_failed", + "reason": "Element not found or not clickable", + } + ), + ) + + +async def type_text_async( + browser: AsyncSentienceBrowser, element_id: int, text: str, take_snapshot: bool = False +) -> ActionResult: + """ + Type text into an element (async) + + Args: + browser: AsyncSentienceBrowser instance + element_id: Element ID from snapshot + text: Text to type + take_snapshot: Whether to take snapshot after action + + Returns: + ActionResult + """ + if not browser.page: + raise RuntimeError("Browser not started. Call await browser.start() first.") + + start_time = time.time() + url_before = browser.page.url + + # Focus element first + focused = await browser.page.evaluate( + """ + (id) => { + const el = window.sentience_registry[id]; + if (el) { + el.focus(); + return true; + } + return false; + } + """, + element_id, + ) + + if not focused: + return ActionResult( + success=False, + duration_ms=int((time.time() - start_time) * 1000), + outcome="error", + error={"code": "focus_failed", "reason": "Element not found"}, + ) + + # Type using Playwright keyboard + await browser.page.keyboard.type(text) + + duration_ms = int((time.time() - start_time) * 1000) + url_after = browser.page.url + url_changed = url_before != url_after + + outcome = "navigated" if url_changed else "dom_updated" + + snapshot_after: Snapshot | None = None + if take_snapshot: + snapshot_after = await snapshot_async(browser) + + return ActionResult( + success=True, + duration_ms=duration_ms, + outcome=outcome, + url_changed=url_changed, + snapshot_after=snapshot_after, + ) + + +async def press_async( + browser: AsyncSentienceBrowser, key: str, take_snapshot: bool = False +) -> ActionResult: + """ + Press a keyboard key (async) + + Args: + browser: AsyncSentienceBrowser instance + key: Key to press (e.g., "Enter", "Escape", "Tab") + take_snapshot: Whether to take snapshot after action + + Returns: + ActionResult + """ + if not browser.page: + raise RuntimeError("Browser not started. Call await browser.start() first.") + + start_time = time.time() + url_before = browser.page.url + + # Press key using Playwright + await browser.page.keyboard.press(key) + + # Wait a bit for navigation/DOM updates + await browser.page.wait_for_timeout(500) + + duration_ms = int((time.time() - start_time) * 1000) + url_after = browser.page.url + url_changed = url_before != url_after + + outcome = "navigated" if url_changed else "dom_updated" + + snapshot_after: Snapshot | None = None + if take_snapshot: + snapshot_after = await snapshot_async(browser) + + return ActionResult( + success=True, + duration_ms=duration_ms, + outcome=outcome, + url_changed=url_changed, + snapshot_after=snapshot_after, + ) + + +async def _highlight_rect_async( + browser: AsyncSentienceBrowser, rect: dict[str, float], duration_sec: float = 2.0 +) -> None: + """Highlight a rectangle with a red border overlay (async)""" + if not browser.page: + return + + highlight_id = f"sentience_highlight_{int(time.time() * 1000)}" + + args = { + "rect": { + "x": rect["x"], + "y": rect["y"], + "w": rect["w"], + "h": rect["h"], + }, + "highlightId": highlight_id, + "durationSec": duration_sec, + } + + await browser.page.evaluate( + """ + (args) => { + const { rect, highlightId, durationSec } = args; + const overlay = document.createElement('div'); + overlay.id = highlightId; + overlay.style.position = 'fixed'; + overlay.style.left = `${rect.x}px`; + overlay.style.top = `${rect.y}px`; + overlay.style.width = `${rect.w}px`; + overlay.style.height = `${rect.h}px`; + overlay.style.border = '3px solid red'; + overlay.style.borderRadius = '2px'; + overlay.style.boxSizing = 'border-box'; + overlay.style.pointerEvents = 'none'; + overlay.style.zIndex = '999999'; + overlay.style.backgroundColor = 'rgba(255, 0, 0, 0.1)'; + overlay.style.transition = 'opacity 0.3s ease-out'; + + document.body.appendChild(overlay); + + setTimeout(() => { + overlay.style.opacity = '0'; + setTimeout(() => { + if (overlay.parentNode) { + overlay.parentNode.removeChild(overlay); + } + }, 300); + }, durationSec * 1000); + } + """, + args, + ) + + +async def click_rect_async( + browser: AsyncSentienceBrowser, + rect: dict[str, float] | BBox, + highlight: bool = True, + highlight_duration: float = 2.0, + take_snapshot: bool = False, +) -> ActionResult: + """ + Click at the center of a rectangle (async) + + Args: + browser: AsyncSentienceBrowser instance + rect: Dictionary with x, y, width (w), height (h) keys, or BBox object + highlight: Whether to show a red border highlight when clicking + highlight_duration: How long to show the highlight in seconds + take_snapshot: Whether to take snapshot after action + + Returns: + ActionResult + """ + if not browser.page: + raise RuntimeError("Browser not started. Call await browser.start() first.") + + # Handle BBox object or dict + if isinstance(rect, BBox): + x = rect.x + y = rect.y + w = rect.width + h = rect.height + else: + x = rect.get("x", 0) + y = rect.get("y", 0) + w = rect.get("w") or rect.get("width", 0) + h = rect.get("h") or rect.get("height", 0) + + if w <= 0 or h <= 0: + return ActionResult( + success=False, + duration_ms=0, + outcome="error", + error={ + "code": "invalid_rect", + "reason": "Rectangle width and height must be positive", + }, + ) + + start_time = time.time() + url_before = browser.page.url + + # Calculate center of rectangle + center_x = x + w / 2 + center_y = y + h / 2 + + # Show highlight before clicking + if highlight: + await _highlight_rect_async(browser, {"x": x, "y": y, "w": w, "h": h}, highlight_duration) + await browser.page.wait_for_timeout(50) + + # Use Playwright's native mouse click + try: + await browser.page.mouse.click(center_x, center_y) + success = True + except Exception as e: + success = False + error_msg = str(e) + + # Wait a bit for navigation/DOM updates + await browser.page.wait_for_timeout(500) + + duration_ms = int((time.time() - start_time) * 1000) + url_after = browser.page.url + url_changed = url_before != url_after + + # Determine outcome + outcome: str | None = None + if url_changed: + outcome = "navigated" + elif success: + outcome = "dom_updated" + else: + outcome = "error" + + # Optional snapshot after + snapshot_after: Snapshot | None = None + if take_snapshot: + snapshot_after = await snapshot_async(browser) + + return ActionResult( + success=success, + duration_ms=duration_ms, + outcome=outcome, + url_changed=url_changed, + snapshot_after=snapshot_after, + error=( + None + if success + else { + "code": "click_failed", + "reason": error_msg if not success else "Click failed", + } + ), + ) + + +# ========== Re-export Query Functions (Pure Functions - No Async Needed) ========== + +# Query functions (find, query) are pure functions that work with Snapshot objects +# They don't need async versions, but we re-export them for convenience +from sentience.query import find, query + +__all__ = [ + "AsyncSentienceBrowser", + "snapshot_async", + "click_async", + "type_text_async", + "press_async", + "click_rect_async", + "find", + "query", +] diff --git a/sentience/browser.py b/sentience/browser.py index 5670bdb..b7617b9 100644 --- a/sentience/browser.py +++ b/sentience/browser.py @@ -11,7 +11,8 @@ from playwright.sync_api import BrowserContext, Page, Playwright, sync_playwright -from sentience.models import ProxyConfig, StorageState +from sentience._extension_loader import find_extension_path +from sentience.models import ProxyConfig, StorageState, Viewport # Import stealth for bot evasion (optional - graceful fallback if not available) try: @@ -35,7 +36,7 @@ def __init__( storage_state: str | Path | StorageState | dict | None = None, record_video_dir: str | Path | None = None, record_video_size: dict[str, int] | None = None, - viewport: dict[str, int] | None = None, + viewport: Viewport | dict[str, int] | None = None, ): """ Initialize Sentience browser @@ -68,11 +69,11 @@ def __init__( Examples: {"width": 1280, "height": 800} (default) {"width": 1920, "height": 1080} (1080p) If None, defaults to 1280x800. - viewport: Optional viewport size as dict with 'width' and 'height' keys. - Examples: {"width": 1280, "height": 800} (default) - {"width": 1920, "height": 1080} (Full HD) - {"width": 375, "height": 667} (iPhone) - If None, defaults to 1280x800. + viewport: Optional viewport size as Viewport object or dict with 'width' and 'height' keys. + Examples: Viewport(width=1280, height=800) (default) + Viewport(width=1920, height=1080) (Full HD) + {"width": 1280, "height": 800} (dict also supported) + If None, defaults to Viewport(width=1280, height=800). """ self.api_key = api_key # Only set api_url if api_key is provided, otherwise None (free tier) @@ -100,8 +101,13 @@ def __init__( self.record_video_dir = record_video_dir self.record_video_size = record_video_size or {"width": 1280, "height": 800} - # Viewport configuration - self.viewport = viewport or {"width": 1280, "height": 800} + # Viewport configuration - convert dict to Viewport if needed + if viewport is None: + self.viewport = Viewport(width=1280, height=800) + elif isinstance(viewport, dict): + self.viewport = Viewport(width=viewport["width"], height=viewport["height"]) + else: + self.viewport = viewport self.playwright: Playwright | None = None self.context: BrowserContext | None = None @@ -156,28 +162,8 @@ def _parse_proxy(self, proxy_string: str) -> ProxyConfig | None: def start(self) -> None: """Launch browser with extension loaded""" - # Get extension source path (relative to project root/package) - # Handle both development (src/) and installed package cases - - # 1. Try relative to this file (installed package structure) - # sentience/browser.py -> sentience/extension/ - package_ext_path = Path(__file__).parent / "extension" - - # 2. Try development root (if running from source repo) - # sentience/browser.py -> ../sentience-chrome - dev_ext_path = Path(__file__).parent.parent.parent / "sentience-chrome" - - if package_ext_path.exists() and (package_ext_path / "manifest.json").exists(): - extension_source = package_ext_path - elif dev_ext_path.exists() and (dev_ext_path / "manifest.json").exists(): - extension_source = dev_ext_path - else: - raise FileNotFoundError( - f"Extension not found. Checked:\n" - f"1. {package_ext_path}\n" - f"2. {dev_ext_path}\n" - "Make sure the extension is built and 'sentience/extension' directory exists." - ) + # Get extension source path using shared utility + extension_source = find_extension_path() # Create temporary extension bundle # We copy it to a temp dir to avoid file locking issues and ensure clean state @@ -220,7 +206,7 @@ def start(self) -> None: "user_data_dir": user_data_dir, "headless": False, # IMPORTANT: See note above "args": args, - "viewport": self.viewport, + "viewport": {"width": self.viewport.width, "height": self.viewport.height}, # Remove "HeadlessChrome" from User Agent automatically "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", } diff --git a/tests/test_async_api.py b/tests/test_async_api.py new file mode 100644 index 0000000..b2a9d0f --- /dev/null +++ b/tests/test_async_api.py @@ -0,0 +1,272 @@ +""" +Tests for async API functionality +""" + +import pytest +from playwright.async_api import async_playwright + +from sentience.async_api import ( + AsyncSentienceBrowser, + click_async, + click_rect_async, + find, + press_async, + query, + snapshot_async, + type_text_async, +) +from sentience.models import BBox, SnapshotOptions + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_browser_basic(): + """Test basic async browser initialization""" + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + assert browser.page is not None + assert "example.com" in browser.page.url + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_viewport_default(): + """Test that default viewport is 1280x800""" + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + await browser.page.wait_for_load_state("networkidle") + + viewport_size = await browser.page.evaluate( + "() => ({ width: window.innerWidth, height: window.innerHeight })" + ) + + assert viewport_size["width"] == 1280 + assert viewport_size["height"] == 800 + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_viewport_custom(): + """Test custom viewport size""" + custom_viewport = {"width": 1920, "height": 1080} + async with AsyncSentienceBrowser(viewport=custom_viewport) as browser: + await browser.goto("https://example.com") + await browser.page.wait_for_load_state("networkidle") + + viewport_size = await browser.page.evaluate( + "() => ({ width: window.innerWidth, height: window.innerHeight })" + ) + + assert viewport_size["width"] == 1920 + assert viewport_size["height"] == 1080 + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_snapshot(): + """Test async snapshot function""" + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + await browser.page.wait_for_load_state("networkidle") + + snap = await snapshot_async(browser) + assert isinstance(snap, type(snap)) # Check it's a Snapshot object + assert snap.status == "success" + assert len(snap.elements) > 0 + assert snap.url is not None + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_snapshot_with_options(): + """Test async snapshot with options""" + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + await browser.page.wait_for_load_state("networkidle") + + options = SnapshotOptions(limit=10, screenshot=False) + snap = await snapshot_async(browser, options) + assert snap.status == "success" + assert len(snap.elements) <= 10 + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_click(): + """Test async click action""" + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + await browser.page.wait_for_load_state("networkidle") + + snap = await snapshot_async(browser) + link = find(snap, "role=link") + + if link: + result = await click_async(browser, link.id) + assert result.success is True + assert result.duration_ms > 0 + assert result.outcome in ["navigated", "dom_updated"] + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_type_text(): + """Test async type_text action""" + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + await browser.page.wait_for_load_state("networkidle") + + snap = await snapshot_async(browser) + textbox = find(snap, "role=textbox") + + if textbox: + result = await type_text_async(browser, textbox.id, "hello") + assert result.success is True + assert result.duration_ms > 0 + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_press(): + """Test async press action""" + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + await browser.page.wait_for_load_state("networkidle") + + result = await press_async(browser, "Enter") + assert result.success is True + assert result.duration_ms > 0 + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_click_rect(): + """Test async click_rect action""" + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + await browser.page.wait_for_load_state("networkidle") + + # Click at specific coordinates + result = await click_rect_async( + browser, {"x": 100, "y": 200, "w": 50, "h": 30}, highlight=False + ) + assert result.success is True + assert result.duration_ms > 0 + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_click_rect_with_bbox(): + """Test async click_rect with BBox object""" + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + await browser.page.wait_for_load_state("networkidle") + + snap = await snapshot_async(browser) + if snap.elements: + element = snap.elements[0] + bbox = BBox( + x=element.bbox.x, + y=element.bbox.y, + width=element.bbox.width, + height=element.bbox.height, + ) + result = await click_rect_async(browser, bbox, highlight=False) + assert result.success is True + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_find(): + """Test async find function (re-exported from query)""" + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + await browser.page.wait_for_load_state("networkidle") + + snap = await snapshot_async(browser) + link = find(snap, "role=link") + # May or may not find a link, but should not raise an error + assert link is None or hasattr(link, "id") + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_query(): + """Test async query function (re-exported from query)""" + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + await browser.page.wait_for_load_state("networkidle") + + snap = await snapshot_async(browser) + links = query(snap, "role=link") + assert isinstance(links, list) + # All results should be Element objects + for link in links: + assert hasattr(link, "id") + assert hasattr(link, "role") + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_from_existing_context(): + """Test creating AsyncSentienceBrowser from existing context""" + async with async_playwright() as p: + context = await p.chromium.launch_persistent_context("", headless=True) + try: + browser = await AsyncSentienceBrowser.from_existing(context) + assert browser.context is context + assert browser.page is not None + + await browser.page.goto("https://example.com") + assert "example.com" in browser.page.url + + await browser.close() + finally: + await context.close() + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_from_page(): + """Test creating AsyncSentienceBrowser from existing page""" + async with async_playwright() as p: + context = await p.chromium.launch_persistent_context("", headless=True) + try: + page = await context.new_page() + browser = await AsyncSentienceBrowser.from_page(page) + assert browser.page is page + assert browser.context is context + + await browser.page.goto("https://example.com") + assert "example.com" in browser.page.url + + await browser.close() + finally: + await context.close() + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_context_manager(): + """Test async context manager usage""" + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + assert browser.page is not None + + # Browser should be closed after context manager exits + assert browser.page is None or browser.context is None + + +@pytest.mark.asyncio +@pytest.mark.requires_extension +async def test_async_snapshot_with_goal(): + """Test async snapshot with goal for ML reranking""" + async with AsyncSentienceBrowser() as browser: + await browser.goto("https://example.com") + await browser.page.wait_for_load_state("networkidle") + + options = SnapshotOptions(goal="Click the main link", limit=10) + snap = await snapshot_async(browser, options) + assert snap.status == "success" + # Elements may have ML reranking metadata if API key is provided + # (This test works with or without API key) diff --git a/tests/test_browser.py b/tests/test_browser.py index 1c85283..da4afe3 100644 --- a/tests/test_browser.py +++ b/tests/test_browser.py @@ -3,9 +3,9 @@ """ import pytest +from playwright.sync_api import sync_playwright from sentience import SentienceBrowser -from playwright.sync_api import sync_playwright @pytest.mark.requires_extension @@ -168,4 +168,3 @@ def test_from_page_with_api_key(): finally: context.close() browser_instance.close() -