Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 173 additions & 36 deletions src/codegen/extensions/tools/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
)
str_template: ClassVar[str] = "Line {line_number}: {match}"

def render_as_string(self) -> str:

Check failure on line 40 in src/codegen/extensions/tools/search.py

View workflow job for this annotation

GitHub Actions / mypy

error: Signature of "render_as_string" incompatible with supertype "Observation" [override]
"""Render match in a VSCode-like format."""
return f"{self.line_number:>4}: {self.line}"

Expand All @@ -62,7 +62,7 @@

str_template: ClassVar[str] = "{filepath}: {match_count} matches"

def render_as_string(self) -> str:

Check failure on line 65 in src/codegen/extensions/tools/search.py

View workflow job for this annotation

GitHub Actions / mypy

error: Signature of "render_as_string" incompatible with supertype "Observation" [override]
"""Render file results in a VSCode-like format."""
lines = [
f"📄 {self.filepath}",
Expand Down Expand Up @@ -100,7 +100,7 @@

str_template: ClassVar[str] = "Found {total_files} files with matches for '{query}' (page {page}/{total_pages})"

def render(self, tool_call_id: str) -> ToolMessage:

Check failure on line 103 in src/codegen/extensions/tools/search.py

View workflow job for this annotation

GitHub Actions / mypy

error: Signature of "render" incompatible with supertype "Observation" [override]
"""Render search results in a VSCode-like format.

Args:
Expand Down Expand Up @@ -175,10 +175,14 @@
page: int = 1,
files_per_page: int = 10,
use_regex: bool = False,
fractional_search: bool = False,
) -> SearchObservation:
"""Search the codebase using ripgrep.

This is faster than the Python implementation, especially for large codebases.

If fractional_search is True and the initial search returns no results,
it will automatically split the query into individual words and search for each one.
"""
# Build ripgrep command
cmd = ["rg", "--line-number"]
Expand Down Expand Up @@ -231,47 +235,117 @@
results=[],
)

# Parse output lines
for line in result.stdout.splitlines():
# ripgrep output format: file:line:content
parts = line.split(":", 2)
if len(parts) < 3:
continue
# If no matches found and fractional search is enabled, try searching for individual words
if result.stdout.strip() == "" and fractional_search and " " in query:
logger.info(f"No matches found for '{query}'. Trying fractional search.")

filepath, line_number_str, content = parts
# Split the query into individual words
words = query.split()

# Convert to relative path within the codebase
rel_path = os.path.relpath(filepath, codebase.repo_path)
# Create a combined result from searching for each word
combined_results: dict[str, list[SearchMatch]] = {}
original_query = query # Save the original query

try:
line_number = int(line_number_str)
for word in words:
# Skip very short words (optional, can be adjusted)
if len(word) < 3:
continue

# Find the actual match text
match_text = query
if use_regex:
# For regex, we need to find what actually matched
# This is a simplification - ideally we'd use ripgrep's --json option
# to get the exact match positions
pattern = re.compile(query)
match_obj = pattern.search(content)
if match_obj:
match_text = match_obj.group(0)
# Update the command with the new word
cmd[-2] = word

# Create or append to file results
if rel_path not in all_results:
all_results[rel_path] = []
logger.info(f"Running fractional search with word: '{word}'")
word_result = subprocess.run(
cmd,
capture_output=True,
text=True,
encoding="utf-8",
check=False,
)

all_results[rel_path].append(
SearchMatch(
status="success",
line_number=line_number,
line=content.strip(),
match=match_text,
# Parse output lines for this word
for line in word_result.stdout.splitlines():
# ripgrep output format: file:line:content
parts = line.split(":", 2)
if len(parts) < 3:
continue

filepath, line_number_str, content = parts

# Convert to relative path within the codebase
rel_path = os.path.relpath(filepath, codebase.repo_path)

try:
line_number = int(line_number_str)

# Create or append to file results
if rel_path not in combined_results:
combined_results[rel_path] = []

# Check if we already have this line for this file
# (to avoid duplicates from different word matches)
line_exists = any(match.line_number == line_number for match in combined_results.get(rel_path, []))

if not line_exists:
combined_results[rel_path].append(
SearchMatch(
status="success",
line_number=line_number,
line=content.strip(),
match=word, # Use the individual word as the match
)
)
except ValueError:
# Skip lines with invalid line numbers
continue

# If we found results with fractional search, use them
if combined_results:
all_results = combined_results
# Note: we keep the original query in the results for clarity

else:
# Parse output lines from the original search
for line in result.stdout.splitlines():
# ripgrep output format: file:line:content
parts = line.split(":", 2)
if len(parts) < 3:
continue

filepath, line_number_str, content = parts

# Convert to relative path within the codebase
rel_path = os.path.relpath(filepath, codebase.repo_path)

try:
line_number = int(line_number_str)

# Find the actual match text
match_text = query
if use_regex:
# For regex, we need to find what actually matched
# This is a simplification - ideally we'd use ripgrep's --json option
# to get the exact match positions
pattern = re.compile(query)
match_obj = pattern.search(content)
if match_obj:
match_text = match_obj.group(0)

# Create or append to file results
if rel_path not in all_results:
all_results[rel_path] = []

all_results[rel_path].append(
SearchMatch(
status="success",
line_number=line_number,
line=content.strip(),
match=match_text,
)
)
)
except ValueError:
# Skip lines with invalid line numbers
continue
except ValueError:
# Skip lines with invalid line numbers
continue

# Convert to SearchFileResult objects
file_results = []
Expand Down Expand Up @@ -318,10 +392,14 @@
page: int = 1,
files_per_page: int = 10,
use_regex: bool = False,
fractional_search: bool = False,
) -> SearchObservation:
"""Search the codebase using Python's regex engine.

This is a fallback for when ripgrep is not available.

If fractional_search is True and the initial search returns no results,
it will automatically split the query into individual words and search for each one.
"""
# Validate pagination parameters
if page < 1:
Expand Down Expand Up @@ -352,7 +430,9 @@
extensions = file_extensions if file_extensions is not None else "*"

all_results = []

# First try with the full query
for file in codebase.files(extensions=extensions):

Check failure on line 435 in src/codegen/extensions/tools/search.py

View workflow job for this annotation

GitHub Actions / mypy

error: Argument "extensions" to "files" of "Codebase" has incompatible type "list[str] | str"; expected "list[str]" [arg-type]
# Skip binary files
try:
content = file.content
Expand Down Expand Up @@ -385,6 +465,60 @@
)
)

# If no results found and fractional search is enabled, try with individual words
if not all_results and fractional_search and " " in query and not use_regex:
logger.info(f"No matches found for '{query}'. Trying fractional search.")

# Split the query into individual words
words = query.split()

# Dictionary to track files and matches to avoid duplicates
combined_results: dict[str, dict[int, SearchMatch]] = {}

for word in words:
# Skip very short words (optional, can be adjusted)
if len(word) < 3:
continue

# Create pattern for this word
word_pattern = re.compile(re.escape(word), re.IGNORECASE)

for file in codebase.files(extensions=extensions):

Check failure on line 486 in src/codegen/extensions/tools/search.py

View workflow job for this annotation

GitHub Actions / mypy

error: Argument "extensions" to "files" of "Codebase" has incompatible type "list[str] | str"; expected "list[str]" [arg-type]
# Skip binary files
try:
content = file.content
except ValueError: # File is binary
continue

# Initialize file in combined results if not present
if file.filepath not in combined_results:
combined_results[file.filepath] = {}

# Split content into lines and store with line numbers (1-based)
lines = enumerate(content.splitlines(), 1)

# Search each line for the word pattern
for line_number, line in lines:
match = word_pattern.search(line)
if match and line_number not in combined_results[file.filepath]:
combined_results[file.filepath][line_number] = SearchMatch(
status="success",
line_number=line_number,
line=line.strip(),
match=match.group(0),
)

# Convert combined results to the expected format
for filepath, matches_dict in combined_results.items():
if matches_dict: # Only include files with matches
all_results.append(
SearchFileResult(
status="success",
filepath=filepath,
matches=sorted(matches_dict.values(), key=lambda x: x.line_number),
)
)

# Sort all results by filepath
all_results.sort(key=lambda x: x.filepath)

Expand Down Expand Up @@ -415,6 +549,7 @@
page: int = 1,
files_per_page: int = 10,
use_regex: bool = False,
fractional_search: bool = False,
) -> SearchObservation:
"""Search the codebase using text search or regex pattern matching.

Expand All @@ -432,13 +567,15 @@
page: Page number to return (1-based, default: 1)
files_per_page: Number of files to return per page (default: 10)
use_regex: Whether to treat query as a regex pattern (default: False)
fractional_search: Whether to automatically search for individual words if the full
query returns no results (default: False)

Returns:
SearchObservation containing search results with matches and their sources
"""
# Try to use ripgrep first
try:
return _search_with_ripgrep(codebase, query, file_extensions, page, files_per_page, use_regex)
return _search_with_ripgrep(codebase, query, file_extensions, page, files_per_page, use_regex, fractional_search)
except (FileNotFoundError, subprocess.SubprocessError):
# Fall back to Python implementation if ripgrep fails or isn't available
return _search_with_python(codebase, query, file_extensions, page, files_per_page, use_regex)
return _search_with_python(codebase, query, file_extensions, page, files_per_page, use_regex, fractional_search)
Loading