Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/codegen/extensions/langchain/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@
class ViewFileTool(BaseTool):
"""Tool for viewing file contents and metadata."""

name: ClassVar[str] = "view_file"

Check failure on line 69 in src/codegen/extensions/langchain/tools.py

View workflow job for this annotation

GitHub Actions / mypy

error: Cannot override instance variable (previously declared on base class "BaseTool") with class variable [misc]
description: ClassVar[str] = """View the contents and metadata of a file in the codebase.

Check failure on line 70 in src/codegen/extensions/langchain/tools.py

View workflow job for this annotation

GitHub Actions / mypy

error: Cannot override instance variable (previously declared on base class "BaseTool") with class variable [misc]
For large files (>500 lines), content will be paginated. Use start_line and end_line to navigate through the file.
The response will indicate if there are more lines available to view."""
args_schema: ClassVar[type[BaseModel]] = ViewFileInput

Check failure on line 73 in src/codegen/extensions/langchain/tools.py

View workflow job for this annotation

GitHub Actions / mypy

error: Cannot override instance variable (previously declared on base class "BaseTool") with class variable [misc]
codebase: Codebase = Field(exclude=True)

def __init__(self, codebase: Codebase) -> None:
Expand Down Expand Up @@ -108,9 +108,9 @@
class ListDirectoryTool(BaseTool):
"""Tool for listing directory contents."""

name: ClassVar[str] = "list_directory"

Check failure on line 111 in src/codegen/extensions/langchain/tools.py

View workflow job for this annotation

GitHub Actions / mypy

error: Cannot override instance variable (previously declared on base class "BaseTool") with class variable [misc]
description: ClassVar[str] = "List contents of a directory in the codebase"

Check failure on line 112 in src/codegen/extensions/langchain/tools.py

View workflow job for this annotation

GitHub Actions / mypy

error: Cannot override instance variable (previously declared on base class "BaseTool") with class variable [misc]
args_schema: ClassVar[type[BaseModel]] = ListDirectoryInput

Check failure on line 113 in src/codegen/extensions/langchain/tools.py

View workflow job for this annotation

GitHub Actions / mypy

error: Cannot override instance variable (previously declared on base class "BaseTool") with class variable [misc]
codebase: Codebase = Field(exclude=True)

def __init__(self, codebase: Codebase) -> None:
Expand All @@ -126,28 +126,32 @@

query: str = Field(
...,
description="""ripgrep query (or regex pattern) to run. For regex searches, set use_regex=True. Ripgrep is the preferred method.""",
description="""The search query to find in the codebase. When ripgrep is available, this will be passed as a ripgrep pattern. For regex searches, set use_regex=True.
Ripgrep is the preferred method.""",
)
file_extensions: list[str] | None = Field(default=None, description="Optional list of file extensions to search (e.g. ['.py', '.ts'])")
page: int = Field(default=1, description="Page number to return (1-based, default: 1)")
files_per_page: int = Field(default=10, description="Number of files to return per page (default: 10)")
use_regex: bool = Field(default=False, description="Whether to treat query as a regex pattern (default: False)")
fractional_search: bool = Field(default=False, description="Whether to search for individual words if full query returns no results (default: False)")
tool_call_id: Annotated[str, InjectedToolCallId]


class RipGrepTool(BaseTool):
"""Tool for searching the codebase via RipGrep."""

name: ClassVar[str] = "search"
name: ClassVar[str] = "ripgrep_search"

Check failure on line 143 in src/codegen/extensions/langchain/tools.py

View workflow job for this annotation

GitHub Actions / mypy

error: Cannot override instance variable (previously declared on base class "BaseTool") with class variable [misc]
description: ClassVar[str] = "Search the codebase using `ripgrep` or regex pattern matching"
args_schema: ClassVar[type[BaseModel]] = SearchInput
codebase: Codebase = Field(exclude=True)

def __init__(self, codebase: Codebase) -> None:
super().__init__(codebase=codebase)

def _run(self, tool_call_id: str, query: str, file_extensions: Optional[list[str]] = None, page: int = 1, files_per_page: int = 10, use_regex: bool = False) -> ToolMessage:
result = search(self.codebase, query, file_extensions=file_extensions, page=page, files_per_page=files_per_page, use_regex=use_regex)
def _run(
self, tool_call_id: str, query: str, file_extensions: Optional[list[str]] = None, page: int = 1, files_per_page: int = 10, use_regex: bool = False, fractional_search: bool = False
) -> ToolMessage:
result = search(self.codebase, query, file_extensions=file_extensions, page=page, files_per_page=files_per_page, use_regex=use_regex, fractional_search=fractional_search)
return result.render(tool_call_id)


Expand Down
253 changes: 183 additions & 70 deletions src/codegen/extensions/tools/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
)
str_template: ClassVar[str] = "Line {line_number}: {match}"

def render_as_string(self) -> str:

Check failure on line 40 in src/codegen/extensions/tools/search.py

View workflow job for this annotation

GitHub Actions / mypy

error: Signature of "render_as_string" incompatible with supertype "Observation" [override]
"""Render match in a VSCode-like format."""
return f"{self.line_number:>4}: {self.line}"

Expand All @@ -62,7 +62,7 @@

str_template: ClassVar[str] = "{filepath}: {match_count} matches"

def render_as_string(self) -> str:

Check failure on line 65 in src/codegen/extensions/tools/search.py

View workflow job for this annotation

GitHub Actions / mypy

error: Signature of "render_as_string" incompatible with supertype "Observation" [override]
"""Render file results in a VSCode-like format."""
lines = [
f"📄 {self.filepath}",
Expand Down Expand Up @@ -100,7 +100,7 @@

str_template: ClassVar[str] = "Found {total_files} files with matches for '{query}' (page {page}/{total_pages})"

def render(self, tool_call_id: str) -> ToolMessage:

Check failure on line 103 in src/codegen/extensions/tools/search.py

View workflow job for this annotation

GitHub Actions / mypy

error: Signature of "render" incompatible with supertype "Observation" [override]
"""Render search results in a VSCode-like format.

Args:
Expand Down Expand Up @@ -175,10 +175,13 @@
page: int = 1,
files_per_page: int = 10,
use_regex: bool = False,
fractional_search: bool = False,
) -> SearchObservation:
"""Search the codebase using ripgrep.

This is faster than the Python implementation, especially for large codebases.
If fractional_search is True and no results are found for the full query,
it will automatically search for individual words from the query.
"""
# Build ripgrep command
cmd = ["rg", "--line-number"]
Expand Down Expand Up @@ -231,47 +234,105 @@
results=[],
)

# Parse output lines
for line in result.stdout.splitlines():
# ripgrep output format: file:line:content
parts = line.split(":", 2)
if len(parts) < 3:
continue

filepath, line_number_str, content = parts

# Convert to relative path within the codebase
rel_path = os.path.relpath(filepath, codebase.repo_path)
# Check if we got no results and fractional search is enabled
if result.stdout.strip() == "" and fractional_search and " " in query:
logger.info(f"No results found for '{query}', trying fractional search")
# Split the query into individual words
words = query.split()
# Filter out very short words (less than 3 characters)
words = [word for word in words if len(word) >= 3]

# Search for each word individually and combine results
for word in words:
word_cmd = cmd.copy()
# Replace the query with the individual word
word_cmd[-2] = word

logger.info(f"Running fractional search with: {' '.join(word_cmd)}")
word_result = subprocess.run(
word_cmd,
capture_output=True,
text=True,
encoding="utf-8",
check=False,
)

try:
line_number = int(line_number_str)

# Find the actual match text
match_text = query
if use_regex:
# For regex, we need to find what actually matched
# This is a simplification - ideally we'd use ripgrep's --json option
# to get the exact match positions
pattern = re.compile(query)
match_obj = pattern.search(content)
if match_obj:
match_text = match_obj.group(0)

# Create or append to file results
if rel_path not in all_results:
all_results[rel_path] = []

all_results[rel_path].append(
SearchMatch(
status="success",
line_number=line_number,
line=content.strip(),
match=match_text,
# Parse output lines for this word
for line in word_result.stdout.splitlines():
# ripgrep output format: file:line:content
parts = line.split(":", 2)
if len(parts) < 3:
continue

filepath, line_number_str, content = parts

# Convert to relative path within the codebase
rel_path = os.path.relpath(filepath, codebase.repo_path)

try:
line_number = int(line_number_str)

# Create or append to file results
if rel_path not in all_results:
all_results[rel_path] = []

# Check if this line is already in the results to avoid duplicates
line_exists = any(match.line_number == line_number and match.line.strip() == content.strip() for match in all_results[rel_path])

if not line_exists:
all_results[rel_path].append(
SearchMatch(
status="success",
line_number=line_number,
line=content.strip(),
match=word,
)
)
except ValueError:
# Skip lines with invalid line numbers
continue
else:
# Parse output lines from the original search
for line in result.stdout.splitlines():
# ripgrep output format: file:line:content
parts = line.split(":", 2)
if len(parts) < 3:
continue

filepath, line_number_str, content = parts

# Convert to relative path within the codebase
rel_path = os.path.relpath(filepath, codebase.repo_path)

try:
line_number = int(line_number_str)

# Find the actual match text
match_text = query
if use_regex:
# For regex, we need to find what actually matched
# This is a simplification - ideally we'd use ripgrep's --json option
# to get the exact match positions
pattern = re.compile(query)
match_obj = pattern.search(content)
if match_obj:
match_text = match_obj.group(0)

# Create or append to file results
if rel_path not in all_results:
all_results[rel_path] = []

all_results[rel_path].append(
SearchMatch(
status="success",
line_number=line_number,
line=content.strip(),
match=match_text,
)
)
)
except ValueError:
# Skip lines with invalid line numbers
continue
except ValueError:
# Skip lines with invalid line numbers
continue

# Convert to SearchFileResult objects
file_results = []
Expand Down Expand Up @@ -318,10 +379,13 @@
page: int = 1,
files_per_page: int = 10,
use_regex: bool = False,
fractional_search: bool = False,
) -> SearchObservation:
"""Search the codebase using Python's regex engine.

This is a fallback for when ripgrep is not available.
If fractional_search is True and no results are found for the full query,
it will automatically search for individual words from the query.
"""
# Validate pagination parameters
if page < 1:
Expand Down Expand Up @@ -352,38 +416,82 @@
extensions = file_extensions if file_extensions is not None else "*"

all_results = []
for file in codebase.files(extensions=extensions):
# Skip binary files
try:
content = file.content
except ValueError: # File is binary
continue

file_matches = []
# Split content into lines and store with line numbers (1-based)
lines = enumerate(content.splitlines(), 1)

# Search each line for the pattern
for line_number, line in lines:
match = pattern.search(line)
if match:
file_matches.append(
SearchMatch(

# Function to search files with a given pattern
def search_files_with_pattern(search_pattern, match_text):
file_results = []
for file in codebase.files(extensions=extensions):
# Skip binary files
try:
content = file.content
except ValueError: # File is binary
continue

file_matches = []
# Split content into lines and store with line numbers (1-based)
lines = enumerate(content.splitlines(), 1)

# Search each line for the pattern
for line_number, line in lines:
match = search_pattern.search(line)
if match:
# Check if this match is already in the results to avoid duplicates
match_exists = any(m.line_number == line_number and m.line.strip() == line.strip() for m in file_matches)

if not match_exists:
file_matches.append(
SearchMatch(
status="success",
line_number=line_number,
line=line.strip(),
match=match_text if match_text else match.group(0),
)
)

if file_matches:
file_results.append(
SearchFileResult(
status="success",
line_number=line_number,
line=line.strip(),
match=match.group(0),
filepath=file.filepath,
matches=sorted(file_matches, key=lambda x: x.line_number),
)
)

if file_matches:
all_results.append(
SearchFileResult(
status="success",
filepath=file.filepath,
matches=sorted(file_matches, key=lambda x: x.line_number),
)
)
return file_results

# First try with the full query
all_results = search_files_with_pattern(pattern, query if not use_regex else None)

# If no results and fractional search is enabled, try individual words
if not all_results and fractional_search and " " in query and not use_regex:
logger.info(f"No results found for '{query}', trying fractional search")
# Split the query into individual words
words = query.split()
# Filter out very short words (less than 3 characters)
words = [word for word in words if len(word) >= 3]

# Search for each word individually
for word in words:
word_pattern = re.compile(re.escape(word), re.IGNORECASE)
word_results = search_files_with_pattern(word_pattern, word)

# Merge results
for word_result in word_results:
# Check if this file is already in all_results
existing_file = next((r for r in all_results if r.filepath == word_result.filepath), None)

if existing_file:
# Merge matches, avoiding duplicates
for match in word_result.matches:
match_exists = any(m.line_number == match.line_number and m.line.strip() == match.line.strip() for m in existing_file.matches)

if not match_exists:
existing_file.matches.append(match)

# Re-sort matches by line number
existing_file.matches.sort(key=lambda x: x.line_number)
else:
# Add new file result
all_results.append(word_result)

# Sort all results by filepath
all_results.sort(key=lambda x: x.filepath)
Expand Down Expand Up @@ -415,6 +523,7 @@
page: int = 1,
files_per_page: int = 10,
use_regex: bool = False,
fractional_search: bool = False,
) -> SearchObservation:
"""Search the codebase using text search or regex pattern matching.

Expand All @@ -424,6 +533,9 @@
Returns matching lines with their line numbers, grouped by file.
Results are paginated by files, with a default of 10 files per page.

If fractional_search is True and no results are found for the full query,
it will automatically search for individual words from the query.

Args:
codebase: The codebase to operate on
query: The text to search for or regex pattern to match
Expand All @@ -432,13 +544,14 @@
page: Page number to return (1-based, default: 1)
files_per_page: Number of files to return per page (default: 10)
use_regex: Whether to treat query as a regex pattern (default: False)
fractional_search: Whether to search for individual words if full query returns no results (default: False)

Returns:
SearchObservation containing search results with matches and their sources
"""
# Try to use ripgrep first
try:
return _search_with_ripgrep(codebase, query, file_extensions, page, files_per_page, use_regex)
return _search_with_ripgrep(codebase, query, file_extensions, page, files_per_page, use_regex, fractional_search)
except (FileNotFoundError, subprocess.SubprocessError):
# Fall back to Python implementation if ripgrep fails or isn't available
return _search_with_python(codebase, query, file_extensions, page, files_per_page, use_regex)
return _search_with_python(codebase, query, file_extensions, page, files_per_page, use_regex, fractional_search)