diff --git a/src/codegen/extensions/langchain/tools.py b/src/codegen/extensions/langchain/tools.py index 0749384a4..bcbfaa127 100644 --- a/src/codegen/extensions/langchain/tools.py +++ b/src/codegen/extensions/langchain/tools.py @@ -126,19 +126,21 @@ class SearchInput(BaseModel): query: str = Field( ..., - description="""ripgrep query (or regex pattern) to run. For regex searches, set use_regex=True. Ripgrep is the preferred method.""", + description="""The search query to find in the codebase. When ripgrep is available, this will be passed as a ripgrep pattern. For regex searches, set use_regex=True. + Ripgrep is the preferred method.""", ) file_extensions: list[str] | None = Field(default=None, description="Optional list of file extensions to search (e.g. ['.py', '.ts'])") page: int = Field(default=1, description="Page number to return (1-based, default: 1)") files_per_page: int = Field(default=10, description="Number of files to return per page (default: 10)") use_regex: bool = Field(default=False, description="Whether to treat query as a regex pattern (default: False)") + fractional_search: bool = Field(default=False, description="Whether to search for individual words if full query returns no results (default: False)") tool_call_id: Annotated[str, InjectedToolCallId] class RipGrepTool(BaseTool): """Tool for searching the codebase via RipGrep.""" - name: ClassVar[str] = "search" + name: ClassVar[str] = "ripgrep_search" description: ClassVar[str] = "Search the codebase using `ripgrep` or regex pattern matching" args_schema: ClassVar[type[BaseModel]] = SearchInput codebase: Codebase = Field(exclude=True) @@ -146,8 +148,10 @@ class RipGrepTool(BaseTool): def __init__(self, codebase: Codebase) -> None: super().__init__(codebase=codebase) - def _run(self, tool_call_id: str, query: str, file_extensions: Optional[list[str]] = None, page: int = 1, files_per_page: int = 10, use_regex: bool = False) -> ToolMessage: - result = search(self.codebase, query, file_extensions=file_extensions, page=page, files_per_page=files_per_page, use_regex=use_regex) + def _run( + self, tool_call_id: str, query: str, file_extensions: Optional[list[str]] = None, page: int = 1, files_per_page: int = 10, use_regex: bool = False, fractional_search: bool = False + ) -> ToolMessage: + result = search(self.codebase, query, file_extensions=file_extensions, page=page, files_per_page=files_per_page, use_regex=use_regex, fractional_search=fractional_search) return result.render(tool_call_id) diff --git a/src/codegen/extensions/tools/search.py b/src/codegen/extensions/tools/search.py index 3f69be59c..4593af9cf 100644 --- a/src/codegen/extensions/tools/search.py +++ b/src/codegen/extensions/tools/search.py @@ -175,10 +175,13 @@ def _search_with_ripgrep( page: int = 1, files_per_page: int = 10, use_regex: bool = False, + fractional_search: bool = False, ) -> SearchObservation: """Search the codebase using ripgrep. This is faster than the Python implementation, especially for large codebases. + If fractional_search is True and no results are found for the full query, + it will automatically search for individual words from the query. """ # Build ripgrep command cmd = ["rg", "--line-number"] @@ -231,47 +234,105 @@ def _search_with_ripgrep( results=[], ) - # Parse output lines - for line in result.stdout.splitlines(): - # ripgrep output format: file:line:content - parts = line.split(":", 2) - if len(parts) < 3: - continue - - filepath, line_number_str, content = parts - - # Convert to relative path within the codebase - rel_path = os.path.relpath(filepath, codebase.repo_path) + # Check if we got no results and fractional search is enabled + if result.stdout.strip() == "" and fractional_search and " " in query: + logger.info(f"No results found for '{query}', trying fractional search") + # Split the query into individual words + words = query.split() + # Filter out very short words (less than 3 characters) + words = [word for word in words if len(word) >= 3] + + # Search for each word individually and combine results + for word in words: + word_cmd = cmd.copy() + # Replace the query with the individual word + word_cmd[-2] = word + + logger.info(f"Running fractional search with: {' '.join(word_cmd)}") + word_result = subprocess.run( + word_cmd, + capture_output=True, + text=True, + encoding="utf-8", + check=False, + ) - try: - line_number = int(line_number_str) - - # Find the actual match text - match_text = query - if use_regex: - # For regex, we need to find what actually matched - # This is a simplification - ideally we'd use ripgrep's --json option - # to get the exact match positions - pattern = re.compile(query) - match_obj = pattern.search(content) - if match_obj: - match_text = match_obj.group(0) - - # Create or append to file results - if rel_path not in all_results: - all_results[rel_path] = [] - - all_results[rel_path].append( - SearchMatch( - status="success", - line_number=line_number, - line=content.strip(), - match=match_text, + # Parse output lines for this word + for line in word_result.stdout.splitlines(): + # ripgrep output format: file:line:content + parts = line.split(":", 2) + if len(parts) < 3: + continue + + filepath, line_number_str, content = parts + + # Convert to relative path within the codebase + rel_path = os.path.relpath(filepath, codebase.repo_path) + + try: + line_number = int(line_number_str) + + # Create or append to file results + if rel_path not in all_results: + all_results[rel_path] = [] + + # Check if this line is already in the results to avoid duplicates + line_exists = any(match.line_number == line_number and match.line.strip() == content.strip() for match in all_results[rel_path]) + + if not line_exists: + all_results[rel_path].append( + SearchMatch( + status="success", + line_number=line_number, + line=content.strip(), + match=word, + ) + ) + except ValueError: + # Skip lines with invalid line numbers + continue + else: + # Parse output lines from the original search + for line in result.stdout.splitlines(): + # ripgrep output format: file:line:content + parts = line.split(":", 2) + if len(parts) < 3: + continue + + filepath, line_number_str, content = parts + + # Convert to relative path within the codebase + rel_path = os.path.relpath(filepath, codebase.repo_path) + + try: + line_number = int(line_number_str) + + # Find the actual match text + match_text = query + if use_regex: + # For regex, we need to find what actually matched + # This is a simplification - ideally we'd use ripgrep's --json option + # to get the exact match positions + pattern = re.compile(query) + match_obj = pattern.search(content) + if match_obj: + match_text = match_obj.group(0) + + # Create or append to file results + if rel_path not in all_results: + all_results[rel_path] = [] + + all_results[rel_path].append( + SearchMatch( + status="success", + line_number=line_number, + line=content.strip(), + match=match_text, + ) ) - ) - except ValueError: - # Skip lines with invalid line numbers - continue + except ValueError: + # Skip lines with invalid line numbers + continue # Convert to SearchFileResult objects file_results = [] @@ -318,10 +379,13 @@ def _search_with_python( page: int = 1, files_per_page: int = 10, use_regex: bool = False, + fractional_search: bool = False, ) -> SearchObservation: """Search the codebase using Python's regex engine. This is a fallback for when ripgrep is not available. + If fractional_search is True and no results are found for the full query, + it will automatically search for individual words from the query. """ # Validate pagination parameters if page < 1: @@ -352,38 +416,82 @@ def _search_with_python( extensions = file_extensions if file_extensions is not None else "*" all_results = [] - for file in codebase.files(extensions=extensions): - # Skip binary files - try: - content = file.content - except ValueError: # File is binary - continue - - file_matches = [] - # Split content into lines and store with line numbers (1-based) - lines = enumerate(content.splitlines(), 1) - - # Search each line for the pattern - for line_number, line in lines: - match = pattern.search(line) - if match: - file_matches.append( - SearchMatch( + + # Function to search files with a given pattern + def search_files_with_pattern(search_pattern, match_text): + file_results = [] + for file in codebase.files(extensions=extensions): + # Skip binary files + try: + content = file.content + except ValueError: # File is binary + continue + + file_matches = [] + # Split content into lines and store with line numbers (1-based) + lines = enumerate(content.splitlines(), 1) + + # Search each line for the pattern + for line_number, line in lines: + match = search_pattern.search(line) + if match: + # Check if this match is already in the results to avoid duplicates + match_exists = any(m.line_number == line_number and m.line.strip() == line.strip() for m in file_matches) + + if not match_exists: + file_matches.append( + SearchMatch( + status="success", + line_number=line_number, + line=line.strip(), + match=match_text if match_text else match.group(0), + ) + ) + + if file_matches: + file_results.append( + SearchFileResult( status="success", - line_number=line_number, - line=line.strip(), - match=match.group(0), + filepath=file.filepath, + matches=sorted(file_matches, key=lambda x: x.line_number), ) ) - - if file_matches: - all_results.append( - SearchFileResult( - status="success", - filepath=file.filepath, - matches=sorted(file_matches, key=lambda x: x.line_number), - ) - ) + return file_results + + # First try with the full query + all_results = search_files_with_pattern(pattern, query if not use_regex else None) + + # If no results and fractional search is enabled, try individual words + if not all_results and fractional_search and " " in query and not use_regex: + logger.info(f"No results found for '{query}', trying fractional search") + # Split the query into individual words + words = query.split() + # Filter out very short words (less than 3 characters) + words = [word for word in words if len(word) >= 3] + + # Search for each word individually + for word in words: + word_pattern = re.compile(re.escape(word), re.IGNORECASE) + word_results = search_files_with_pattern(word_pattern, word) + + # Merge results + for word_result in word_results: + # Check if this file is already in all_results + existing_file = next((r for r in all_results if r.filepath == word_result.filepath), None) + + if existing_file: + # Merge matches, avoiding duplicates + for match in word_result.matches: + match_exists = any(m.line_number == match.line_number and m.line.strip() == match.line.strip() for m in existing_file.matches) + + if not match_exists: + existing_file.matches.append(match) + + # Re-sort matches by line number + existing_file.matches.sort(key=lambda x: x.line_number) + else: + # Add new file result + all_results.append(word_result) # Sort all results by filepath all_results.sort(key=lambda x: x.filepath) @@ -415,6 +523,7 @@ def search( page: int = 1, files_per_page: int = 10, use_regex: bool = False, + fractional_search: bool = False, ) -> SearchObservation: """Search the codebase using text search or regex pattern matching. @@ -424,6 +533,9 @@ def search( Returns matching lines with their line numbers, grouped by file. Results are paginated by files, with a default of 10 files per page. + If fractional_search is True and no results are found for the full query, + it will automatically search for individual words from the query. + Args: codebase: The codebase to operate on query: The text to search for or regex pattern to match @@ -432,13 +544,14 @@ def search( page: Page number to return (1-based, default: 1) files_per_page: Number of files to return per page (default: 10) use_regex: Whether to treat query as a regex pattern (default: False) + fractional_search: Whether to search for individual words if full query returns no results (default: False) Returns: SearchObservation containing search results with matches and their sources """ # Try to use ripgrep first try: - return _search_with_ripgrep(codebase, query, file_extensions, page, files_per_page, use_regex) + return _search_with_ripgrep(codebase, query, file_extensions, page, files_per_page, use_regex, fractional_search) except (FileNotFoundError, subprocess.SubprocessError): # Fall back to Python implementation if ripgrep fails or isn't available - return _search_with_python(codebase, query, file_extensions, page, files_per_page, use_regex) + return _search_with_python(codebase, query, file_extensions, page, files_per_page, use_regex, fractional_search)