From f2b07369bb6e7744b296fba92ce2a203a2486e25 Mon Sep 17 00:00:00 2001 From: "codegen-sh[bot]" <131295404+codegen-sh[bot]@users.noreply.github.com> Date: Sat, 3 May 2025 02:23:05 +0000 Subject: [PATCH 1/3] Enhance analysis module with comprehensive metrics integration --- .../codegen_on_oss/analysis/analysis.py | 208 +++++++-- codegen-on-oss/codegen_on_oss/metrics.py | 435 +++++++++++++++++- 2 files changed, 599 insertions(+), 44 deletions(-) diff --git a/codegen-on-oss/codegen_on_oss/analysis/analysis.py b/codegen-on-oss/codegen_on_oss/analysis/analysis.py index 9e956ec06..1cded61e4 100644 --- a/codegen-on-oss/codegen_on_oss/analysis/analysis.py +++ b/codegen-on-oss/codegen_on_oss/analysis/analysis.py @@ -1,6 +1,6 @@ from fastapi import FastAPI from pydantic import BaseModel -from typing import Dict, List, Tuple, Any +from typing import Dict, List, Tuple, Any, Optional, Union from codegen import Codebase from codegen.sdk.core.statements.for_loop_statement import ForLoopStatement from codegen.sdk.core.statements.if_block_statement import IfBlockStatement @@ -8,7 +8,9 @@ from codegen.sdk.core.statements.while_statement import WhileStatement from codegen.sdk.core.expressions.binary_expression import BinaryExpression from codegen.sdk.core.expressions.unary_expression import UnaryExpression -from codegen.sdk.core.expressions.comparison_expression import ComparisonExpression +from codegen.sdk.core.expressions.comparison_expression import ( + ComparisonExpression +) import math import re import requests @@ -17,21 +19,66 @@ import os import tempfile from fastapi.middleware.cors import CORSMiddleware -import modal - -image = ( - modal.Image.debian_slim() - .apt_install("git") - .pip_install( - "codegen", "fastapi", "uvicorn", "gitpython", "requests", "pydantic", "datetime" - ) +import uvicorn +import networkx as nx + +# Import from other analysis modules +from codegen_on_oss.analysis.codebase_context import CodebaseContext +from codegen_on_oss.analysis.codebase_analysis import ( + get_codebase_summary, + get_file_summary, + get_class_summary, + get_function_summary, + get_symbol_summary +) +from codegen_on_oss.analysis.codegen_sdk_codebase import ( + get_codegen_sdk_subdirectories, + get_codegen_sdk_codebase +) +from codegen_on_oss.analysis.current_code_codebase import ( + get_graphsitter_repo_path, + get_codegen_codebase_base_path, + get_current_code_codebase, + import_all_codegen_sdk_module, + DocumentedObjects, + get_documented_objects +) +from codegen_on_oss.analysis.document_functions import ( + hop_through_imports, + get_extended_context, + run as document_functions_run +) +from codegen_on_oss.analysis.mdx_docs_generation import ( + render_mdx_page_for_class, + render_mdx_page_title, + render_mdx_inheritence_section, + render_mdx_attributes_section, + render_mdx_methods_section, + render_mdx_for_attribute, + format_parameter_for_mdx, + format_parameters_for_mdx, + format_return_for_mdx, + render_mdx_for_method, + get_mdx_route_for_class, + format_type_string, + resolve_type_string, + format_builtin_type_string, + span_type_string_by_pipe, + parse_link +) +from codegen_on_oss.analysis.module_dependencies import run as module_dependencies_run +from codegen_on_oss.analysis.symbolattr import print_symbol_attribution +from codegen_on_oss.analysis.analysis_import import ( + create_graph_from_codebase, + convert_all_calls_to_kwargs, + find_import_cycles, + find_problematic_import_loops ) -app = modal.App(name="analytics-app", image=image) - -fastapi_app = FastAPI() +# Create FastAPI app +app = FastAPI() -fastapi_app.add_middleware( +app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, @@ -104,11 +151,20 @@ def get_monthly_commits(repo_path: str) -> Dict[str, int]: finally: try: os.chdir(original_dir) - except: + except Exception: pass def calculate_cyclomatic_complexity(function): + """ + Calculate the cyclomatic complexity of a function. + + Args: + function: The function to analyze + + Returns: + The cyclomatic complexity score + """ def analyze_statement(statement): complexity = 0 @@ -145,6 +201,15 @@ def analyze_block(block): def cc_rank(complexity): + """ + Convert cyclomatic complexity score to a letter grade. + + Args: + complexity: The cyclomatic complexity score + + Returns: + A letter grade from A to F + """ if complexity < 0: raise ValueError("Complexity must be a non-negative value") @@ -168,6 +233,15 @@ def calculate_doi(cls): def get_operators_and_operands(function): + """ + Extract operators and operands from a function. + + Args: + function: The function to analyze + + Returns: + A tuple of (operators, operands) + """ operators = [] operands = [] @@ -205,6 +279,16 @@ def get_operators_and_operands(function): def calculate_halstead_volume(operators, operands): + """ + Calculate Halstead volume metrics. + + Args: + operators: List of operators + operands: List of operands + + Returns: + A tuple of (volume, N1, N2, n1, n2) + """ n1 = len(set(operators)) n2 = len(set(operands)) @@ -221,7 +305,15 @@ def calculate_halstead_volume(operators, operands): def count_lines(source: str): - """Count different types of lines in source code.""" + """ + Count different types of lines in source code. + + Args: + source: The source code as a string + + Returns: + A tuple of (loc, lloc, sloc, comments) + """ if not source.strip(): return 0, 0, 0, 0 @@ -239,7 +331,7 @@ def count_lines(source: str): code_part = line if not in_multiline and "#" in line: comment_start = line.find("#") - if not re.search(r'["\'].*#.*["\']', line[:comment_start]): + if not re.search(r'[\"\\'].*#.*[\"\\']', line[:comment_start]): code_part = line[:comment_start].strip() if line[comment_start:].strip(): comments += 1 @@ -286,7 +378,17 @@ def count_lines(source: str): def calculate_maintainability_index( halstead_volume: float, cyclomatic_complexity: float, loc: int ) -> int: - """Calculate the normalized maintainability index for a given function.""" + """ + Calculate the normalized maintainability index for a given function. + + Args: + halstead_volume: The Halstead volume + cyclomatic_complexity: The cyclomatic complexity + loc: Lines of code + + Returns: + The maintainability index score (0-100) + """ if loc <= 0: return 100 @@ -304,7 +406,15 @@ def calculate_maintainability_index( def get_maintainability_rank(mi_score: float) -> str: - """Convert maintainability index score to a letter grade.""" + """ + Convert maintainability index score to a letter grade. + + Args: + mi_score: The maintainability index score + + Returns: + A letter grade from A to F + """ if mi_score >= 85: return "A" elif mi_score >= 65: @@ -318,6 +428,15 @@ def get_maintainability_rank(mi_score: float) -> str: def get_github_repo_description(repo_url): + """ + Get the description of a GitHub repository. + + Args: + repo_url: The repository URL in the format 'owner/repo' + + Returns: + The repository description + """ api_url = f"https://api.github.com/repos/{repo_url}" response = requests.get(api_url) @@ -330,12 +449,21 @@ def get_github_repo_description(repo_url): class RepoRequest(BaseModel): + """Request model for repository analysis.""" repo_url: str -@fastapi_app.post("/analyze_repo") +@app.post("/analyze_repo") async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: - """Analyze a repository and return comprehensive metrics.""" + """ + Analyze a repository and return comprehensive metrics. + + Args: + request: The repository request containing the repo URL + + Returns: + A dictionary of analysis results + """ repo_url = request.repo_url codebase = Codebase.from_repo(repo_url) @@ -359,7 +487,9 @@ async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: total_sloc += sloc total_comments += comments - callables = codebase.functions + [m for c in codebase.classes for m in c.methods] + callables = codebase.functions + [ + m for c in codebase.classes for m in c.methods + ] num_callables = 0 for func in callables: @@ -391,25 +521,31 @@ async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: "lloc": total_lloc, "sloc": total_sloc, "comments": total_comments, - "comment_density": (total_comments / total_loc * 100) - if total_loc > 0 - else 0, + "comment_density": ( + total_comments / total_loc * 100 if total_loc > 0 else 0 + ), }, }, "cyclomatic_complexity": { - "average": total_complexity if num_callables > 0 else 0, + "average": ( + total_complexity / num_callables if num_callables > 0 else 0 + ), }, "depth_of_inheritance": { - "average": total_doi / len(codebase.classes) if codebase.classes else 0, + "average": ( + total_doi / len(codebase.classes) if codebase.classes else 0 + ), }, "halstead_metrics": { "total_volume": int(total_volume), - "average_volume": int(total_volume / num_callables) - if num_callables > 0 - else 0, + "average_volume": ( + int(total_volume / num_callables) if num_callables > 0 else 0 + ), }, "maintainability_index": { - "average": int(total_mi / num_callables) if num_callables > 0 else 0, + "average": ( + int(total_mi / num_callables) if num_callables > 0 else 0 + ), }, "description": desc, "num_files": num_files, @@ -421,11 +557,7 @@ async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: return results -@app.function(image=image) -@modal.asgi_app() -def fastapi_modal_app(): - return fastapi_app - - if __name__ == "__main__": - app.deploy("analytics-app") + # Run the FastAPI app locally with uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) + diff --git a/codegen-on-oss/codegen_on_oss/metrics.py b/codegen-on-oss/codegen_on_oss/metrics.py index d77b4e686..70f83e1fa 100644 --- a/codegen-on-oss/codegen_on_oss/metrics.py +++ b/codegen-on-oss/codegen_on_oss/metrics.py @@ -1,15 +1,27 @@ import json import os import time +import math from collections.abc import Generator from contextlib import contextmanager from importlib.metadata import version -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union import psutil +from codegen import Codebase from codegen_on_oss.errors import ParseRunError from codegen_on_oss.outputs.base import BaseOutput +from codegen_on_oss.analysis.analysis import ( + calculate_cyclomatic_complexity, + calculate_halstead_volume, + calculate_maintainability_index, + count_lines, + get_operators_and_operands, + cc_rank, + get_maintainability_rank, + calculate_doi +) if TYPE_CHECKING: # Logger only available in type checking context. @@ -19,6 +31,412 @@ codegen_version = str(version("codegen")) +class CodeMetrics: + """ + A class to calculate and provide code quality metrics for a codebase. + Integrates with the analysis module for comprehensive code analysis. + """ + + # Constants for threshold values + COMPLEXITY_THRESHOLD = 10 + MAINTAINABILITY_THRESHOLD = 65 + INHERITANCE_DEPTH_THRESHOLD = 3 + + def __init__(self, codebase: Codebase): + """ + Initialize the CodeMetrics class with a codebase. + + Args: + codebase: The Codebase object to analyze + """ + self.codebase = codebase + self._complexity_metrics = None + self._line_metrics = None + self._maintainability_metrics = None + self._inheritance_metrics = None + self._halstead_metrics = None + + def calculate_all_metrics(self) -> Dict[str, Any]: + """ + Calculate all available metrics for the codebase. + + Returns: + A dictionary containing all metrics categories + """ + return { + "complexity": self.complexity_metrics, + "lines": self.line_metrics, + "maintainability": self.maintainability_metrics, + "inheritance": self.inheritance_metrics, + "halstead": self.halstead_metrics, + } + + @property + def complexity_metrics(self) -> Dict[str, Any]: + """ + Calculate cyclomatic complexity metrics for the codebase. + + Returns: + A dictionary containing complexity metrics including average, + rank, and per-function complexity scores + """ + if self._complexity_metrics is not None: + return self._complexity_metrics + + callables = self.codebase.functions + [ + m for c in self.codebase.classes for m in c.methods + ] + + complexities = [] + for func in callables: + if not hasattr(func, "code_block"): + continue + + complexity = calculate_cyclomatic_complexity(func) + complexities.append({ + "name": func.name, + "complexity": complexity, + "rank": cc_rank(complexity) + }) + + avg_complexity = ( + sum(item["complexity"] for item in complexities) / len(complexities) + if complexities else 0 + ) + + self._complexity_metrics = { + "average": avg_complexity, + "rank": cc_rank(avg_complexity), + "functions": complexities + } + + return self._complexity_metrics + + @property + def line_metrics(self) -> Dict[str, Any]: + """ + Calculate line-based metrics for the codebase. + + Returns: + A dictionary containing line metrics including total counts + and per-file metrics for LOC, LLOC, SLOC, and comments + """ + if self._line_metrics is not None: + return self._line_metrics + + total_loc = total_lloc = total_sloc = total_comments = 0 + file_metrics = [] + + for file in self.codebase.files: + loc, lloc, sloc, comments = count_lines(file.source) + comment_density = (comments / loc * 100) if loc > 0 else 0 + + file_metrics.append({ + "file": file.path, + "loc": loc, + "lloc": lloc, + "sloc": sloc, + "comments": comments, + "comment_density": comment_density + }) + + total_loc += loc + total_lloc += lloc + total_sloc += sloc + total_comments += comments + + total_comment_density = ( + total_comments / total_loc * 100 if total_loc > 0 else 0 + ) + + self._line_metrics = { + "total": { + "loc": total_loc, + "lloc": total_lloc, + "sloc": total_sloc, + "comments": total_comments, + "comment_density": total_comment_density + }, + "files": file_metrics + } + + return self._line_metrics + + @property + def maintainability_metrics(self) -> Dict[str, Any]: + """ + Calculate maintainability index metrics for the codebase. + + Returns: + A dictionary containing maintainability metrics including average, + rank, and per-function maintainability scores + """ + if self._maintainability_metrics is not None: + return self._maintainability_metrics + + callables = self.codebase.functions + [ + m for c in self.codebase.classes for m in c.methods + ] + + mi_scores = [] + for func in callables: + if not hasattr(func, "code_block"): + continue + + complexity = calculate_cyclomatic_complexity(func) + operators, operands = get_operators_and_operands(func) + volume, _, _, _, _ = calculate_halstead_volume(operators, operands) + loc = len(func.code_block.source.splitlines()) + mi_score = calculate_maintainability_index(volume, complexity, loc) + + mi_scores.append({ + "name": func.name, + "mi_score": mi_score, + "rank": get_maintainability_rank(mi_score) + }) + + avg_mi = ( + sum(item["mi_score"] for item in mi_scores) / len(mi_scores) + if mi_scores else 0 + ) + + self._maintainability_metrics = { + "average": avg_mi, + "rank": get_maintainability_rank(avg_mi), + "functions": mi_scores + } + + return self._maintainability_metrics + + @property + def inheritance_metrics(self) -> Dict[str, Any]: + """ + Calculate inheritance metrics for the codebase. + + Returns: + A dictionary containing inheritance metrics including average + depth of inheritance and per-class inheritance depth + """ + if self._inheritance_metrics is not None: + return self._inheritance_metrics + + class_metrics = [] + for cls in self.codebase.classes: + doi = calculate_doi(cls) + class_metrics.append({ + "name": cls.name, + "doi": doi + }) + + avg_doi = ( + sum(item["doi"] for item in class_metrics) / len(class_metrics) + if class_metrics else 0 + ) + + self._inheritance_metrics = { + "average": avg_doi, + "classes": class_metrics + } + + return self._inheritance_metrics + + @property + def halstead_metrics(self) -> Dict[str, Any]: + """ + Calculate Halstead complexity metrics for the codebase. + + Returns: + A dictionary containing Halstead metrics including volume, + difficulty, effort, and other Halstead measures + """ + if self._halstead_metrics is not None: + return self._halstead_metrics + + callables = self.codebase.functions + [ + m for c in self.codebase.classes for m in c.methods + ] + + halstead_metrics = [] + for func in callables: + if not hasattr(func, "code_block"): + continue + + operators, operands = get_operators_and_operands(func) + volume, n1, n2, n_operators, n_operands = calculate_halstead_volume( + operators, operands + ) + + # Calculate additional Halstead metrics + n = n_operators + n_operands + N = n1 + n2 + + difficulty = ( + (n_operators / 2) * (n2 / n_operands) if n_operands > 0 else 0 + ) + effort = difficulty * volume if volume > 0 else 0 + time_required = effort / 18 if effort > 0 else 0 # Seconds + bugs_delivered = volume / 3000 if volume > 0 else 0 + + halstead_metrics.append({ + "name": func.name, + "volume": volume, + "difficulty": difficulty, + "effort": effort, + "time_required": time_required, # in seconds + "bugs_delivered": bugs_delivered + }) + + avg_volume = ( + sum(item["volume"] for item in halstead_metrics) / len(halstead_metrics) + if halstead_metrics else 0 + ) + avg_difficulty = ( + sum(item["difficulty"] for item in halstead_metrics) / len(halstead_metrics) + if halstead_metrics else 0 + ) + avg_effort = ( + sum(item["effort"] for item in halstead_metrics) / len(halstead_metrics) + if halstead_metrics else 0 + ) + + self._halstead_metrics = { + "average": { + "volume": avg_volume, + "difficulty": avg_difficulty, + "effort": avg_effort + }, + "functions": halstead_metrics + } + + return self._halstead_metrics + + def find_complex_functions(self, threshold: int = COMPLEXITY_THRESHOLD) -> List[Dict[str, Any]]: + """ + Find functions with cyclomatic complexity above the threshold. + + Args: + threshold: The complexity threshold (default: 10) + + Returns: + A list of functions with complexity above the threshold + """ + metrics = self.complexity_metrics + return [ + func for func in metrics["functions"] + if func["complexity"] > threshold + ] + + def find_low_maintainability_functions( + self, threshold: int = MAINTAINABILITY_THRESHOLD + ) -> List[Dict[str, Any]]: + """ + Find functions with maintainability index below the threshold. + + Args: + threshold: The maintainability threshold (default: 65) + + Returns: + A list of functions with maintainability below the threshold + """ + metrics = self.maintainability_metrics + return [ + func for func in metrics["functions"] + if func["mi_score"] < threshold + ] + + def find_deep_inheritance_classes( + self, threshold: int = INHERITANCE_DEPTH_THRESHOLD + ) -> List[Dict[str, Any]]: + """ + Find classes with depth of inheritance above the threshold. + + Args: + threshold: The inheritance depth threshold (default: 3) + + Returns: + A list of classes with inheritance depth above the threshold + """ + metrics = self.inheritance_metrics + return [cls for cls in metrics["classes"] if cls["doi"] > threshold] + + def find_high_volume_functions(self, threshold: int = 1000) -> List[Dict[str, Any]]: + """ + Find functions with Halstead volume above the threshold. + + Args: + threshold: The volume threshold (default: 1000) + + Returns: + A list of functions with volume above the threshold + """ + metrics = self.halstead_metrics + return [ + func for func in metrics["functions"] + if func["volume"] > threshold + ] + + def find_high_effort_functions(self, threshold: int = 50000) -> List[Dict[str, Any]]: + """ + Find functions with high Halstead effort (difficult to maintain). + + Args: + threshold: The effort threshold (default: 50000) + + Returns: + A list of functions with effort above the threshold + """ + metrics = self.halstead_metrics + return [ + func for func in metrics["functions"] + if func["effort"] > threshold + ] + + def find_bug_prone_functions(self, threshold: float = 0.5) -> List[Dict[str, Any]]: + """ + Find functions with high estimated bug delivery. + + Args: + threshold: The bugs delivered threshold (default: 0.5) + + Returns: + A list of functions likely to contain bugs + """ + metrics = self.halstead_metrics + return [ + func for func in metrics["functions"] + if func["bugs_delivered"] > threshold + ] + + def get_code_quality_summary(self) -> Dict[str, Any]: + """ + Generate a comprehensive code quality summary. + + Returns: + A dictionary with overall code quality metrics and problem areas + """ + return { + "overall_metrics": { + "complexity": self.complexity_metrics["average"], + "complexity_rank": self.complexity_metrics["rank"], + "maintainability": self.maintainability_metrics["average"], + "maintainability_rank": self.maintainability_metrics["rank"], + "lines_of_code": self.line_metrics["total"]["loc"], + "comment_density": self.line_metrics["total"]["comment_density"], + "inheritance_depth": self.inheritance_metrics["average"], + "halstead_volume": self.halstead_metrics["average"]["volume"], + "halstead_difficulty": self.halstead_metrics["average"]["difficulty"], + }, + "problem_areas": { + "complex_functions": len(self.find_complex_functions()), + "low_maintainability": len(self.find_low_maintainability_functions()), + "deep_inheritance": len(self.find_deep_inheritance_classes()), + "high_volume": len(self.find_high_volume_functions()), + "high_effort": len(self.find_high_effort_functions()), + "bug_prone": len(self.find_bug_prone_functions()), + } + } + + class MetricsProfiler: """ A helper to record performance metrics across multiple profiles and write them to a CSV. @@ -81,9 +499,9 @@ def fields(cls) -> list[str]: class MetricsProfile: """ Context-managed profile that records measurements at each call to `measure()`. - It tracks the wall-clock duration, CPU time, and memory usage (with delta) at the time of the call. - Upon exiting the context, it also writes all collected metrics, including the total time, - to a CSV file. + It tracks the wall-clock duration, CPU time, and memory usage (with delta) + at the time of the call. Upon exiting the context, it also writes all collected + metrics, including the total time, to a CSV file. """ if TYPE_CHECKING: @@ -131,7 +549,9 @@ def measure(self, action_name: str): """ current_time = time.perf_counter() current_cpu = float(time.process_time()) - current_mem = int(psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)) + current_mem = int( + psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024) + ) # Calculate time deltas. delta_time = current_time - self.last_measure_time @@ -168,7 +588,9 @@ def finish(self, error: str | None = None): """ finish_time = time.perf_counter() finish_cpu = float(time.process_time()) - finish_mem = int(psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)) + finish_mem = int( + psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024) + ) total_duration = finish_time - self.start_time @@ -196,3 +618,4 @@ def write_output(self, measurement: dict[str, Any]): """ self.logger.info(json.dumps(measurement, indent=4)) self.output.write_output(measurement) + From c04a1bc194232ef61005788121832e8f526ed5e1 Mon Sep 17 00:00:00 2001 From: "codegen-sh[bot]" <131295404+codegen-sh[bot]@users.noreply.github.com> Date: Sat, 3 May 2025 02:27:32 +0000 Subject: [PATCH 2/3] Fix syntax and type errors in analysis.py and metrics.py --- .../codegen_on_oss/analysis/analysis.py | 139 ++++--- codegen-on-oss/codegen_on_oss/metrics.py | 352 ++++++++---------- 2 files changed, 225 insertions(+), 266 deletions(-) diff --git a/codegen-on-oss/codegen_on_oss/analysis/analysis.py b/codegen-on-oss/codegen_on_oss/analysis/analysis.py index 1cded61e4..52ccf5014 100644 --- a/codegen-on-oss/codegen_on_oss/analysis/analysis.py +++ b/codegen-on-oss/codegen_on_oss/analysis/analysis.py @@ -8,9 +8,7 @@ from codegen.sdk.core.statements.while_statement import WhileStatement from codegen.sdk.core.expressions.binary_expression import BinaryExpression from codegen.sdk.core.expressions.unary_expression import UnaryExpression -from codegen.sdk.core.expressions.comparison_expression import ( - ComparisonExpression -) +from codegen.sdk.core.expressions.comparison_expression import ComparisonExpression import math import re import requests @@ -25,54 +23,54 @@ # Import from other analysis modules from codegen_on_oss.analysis.codebase_context import CodebaseContext from codegen_on_oss.analysis.codebase_analysis import ( - get_codebase_summary, - get_file_summary, - get_class_summary, - get_function_summary, - get_symbol_summary + get_codebase_summary, + get_file_summary, + get_class_summary, + get_function_summary, + get_symbol_summary, ) from codegen_on_oss.analysis.codegen_sdk_codebase import ( - get_codegen_sdk_subdirectories, - get_codegen_sdk_codebase + get_codegen_sdk_subdirectories, + get_codegen_sdk_codebase, ) from codegen_on_oss.analysis.current_code_codebase import ( - get_graphsitter_repo_path, - get_codegen_codebase_base_path, - get_current_code_codebase, - import_all_codegen_sdk_module, - DocumentedObjects, - get_documented_objects + get_graphsitter_repo_path, + get_codegen_codebase_base_path, + get_current_code_codebase, + import_all_codegen_sdk_module, + DocumentedObjects, + get_documented_objects, ) from codegen_on_oss.analysis.document_functions import ( - hop_through_imports, - get_extended_context, - run as document_functions_run + hop_through_imports, + get_extended_context, + run as document_functions_run, ) from codegen_on_oss.analysis.mdx_docs_generation import ( - render_mdx_page_for_class, - render_mdx_page_title, - render_mdx_inheritence_section, - render_mdx_attributes_section, - render_mdx_methods_section, - render_mdx_for_attribute, - format_parameter_for_mdx, - format_parameters_for_mdx, - format_return_for_mdx, - render_mdx_for_method, - get_mdx_route_for_class, - format_type_string, - resolve_type_string, - format_builtin_type_string, - span_type_string_by_pipe, - parse_link + render_mdx_page_for_class, + render_mdx_page_title, + render_mdx_inheritence_section, + render_mdx_attributes_section, + render_mdx_methods_section, + render_mdx_for_attribute, + format_parameter_for_mdx, + format_parameters_for_mdx, + format_return_for_mdx, + render_mdx_for_method, + get_mdx_route_for_class, + format_type_string, + resolve_type_string, + format_builtin_type_string, + span_type_string_by_pipe, + parse_link, ) from codegen_on_oss.analysis.module_dependencies import run as module_dependencies_run from codegen_on_oss.analysis.symbolattr import print_symbol_attribution from codegen_on_oss.analysis.analysis_import import ( - create_graph_from_codebase, - convert_all_calls_to_kwargs, - find_import_cycles, - find_problematic_import_loops + create_graph_from_codebase, + convert_all_calls_to_kwargs, + find_import_cycles, + find_problematic_import_loops, ) # Create FastAPI app @@ -158,13 +156,14 @@ def get_monthly_commits(repo_path: str) -> Dict[str, int]: def calculate_cyclomatic_complexity(function): """ Calculate the cyclomatic complexity of a function. - + Args: function: The function to analyze - + Returns: The cyclomatic complexity score """ + def analyze_statement(statement): complexity = 0 @@ -203,10 +202,10 @@ def analyze_block(block): def cc_rank(complexity): """ Convert cyclomatic complexity score to a letter grade. - + Args: complexity: The cyclomatic complexity score - + Returns: A letter grade from A to F """ @@ -235,10 +234,10 @@ def calculate_doi(cls): def get_operators_and_operands(function): """ Extract operators and operands from a function. - + Args: function: The function to analyze - + Returns: A tuple of (operators, operands) """ @@ -281,11 +280,11 @@ def get_operators_and_operands(function): def calculate_halstead_volume(operators, operands): """ Calculate Halstead volume metrics. - + Args: operators: List of operators operands: List of operands - + Returns: A tuple of (volume, N1, N2, n1, n2) """ @@ -307,20 +306,20 @@ def calculate_halstead_volume(operators, operands): def count_lines(source: str): """ Count different types of lines in source code. - + Args: source: The source code as a string - + Returns: A tuple of (loc, lloc, sloc, comments) """ if not source.strip(): return 0, 0, 0, 0 - lines = [line.strip() for line in source.splitlines()] + lines = source.splitlines() loc = len(lines) - sloc = len([line for line in lines if line]) - + lloc = 0 + sloc = 0 in_multiline = False comments = 0 code_lines = [] @@ -331,7 +330,7 @@ def count_lines(source: str): code_part = line if not in_multiline and "#" in line: comment_start = line.find("#") - if not re.search(r'[\"\\'].*#.*[\"\\']', line[:comment_start]): + if not re.search(r'["\'].*#.*["\']', line[:comment_start]): code_part = line[:comment_start].strip() if line[comment_start:].strip(): comments += 1 @@ -380,12 +379,12 @@ def calculate_maintainability_index( ) -> int: """ Calculate the normalized maintainability index for a given function. - + Args: halstead_volume: The Halstead volume cyclomatic_complexity: The cyclomatic complexity loc: Lines of code - + Returns: The maintainability index score (0-100) """ @@ -408,10 +407,10 @@ def calculate_maintainability_index( def get_maintainability_rank(mi_score: float) -> str: """ Convert maintainability index score to a letter grade. - + Args: mi_score: The maintainability index score - + Returns: A letter grade from A to F """ @@ -430,10 +429,10 @@ def get_maintainability_rank(mi_score: float) -> str: def get_github_repo_description(repo_url): """ Get the description of a GitHub repository. - + Args: repo_url: The repository URL in the format 'owner/repo' - + Returns: The repository description """ @@ -450,6 +449,7 @@ def get_github_repo_description(repo_url): class RepoRequest(BaseModel): """Request model for repository analysis.""" + repo_url: str @@ -457,10 +457,10 @@ class RepoRequest(BaseModel): async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: """ Analyze a repository and return comprehensive metrics. - + Args: request: The repository request containing the repo URL - + Returns: A dictionary of analysis results """ @@ -487,9 +487,7 @@ async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: total_sloc += sloc total_comments += comments - callables = codebase.functions + [ - m for c in codebase.classes for m in c.methods - ] + callables = codebase.functions + [m for c in codebase.classes for m in c.methods] num_callables = 0 for func in callables: @@ -527,14 +525,10 @@ async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: }, }, "cyclomatic_complexity": { - "average": ( - total_complexity / num_callables if num_callables > 0 else 0 - ), + "average": (total_complexity / num_callables if num_callables > 0 else 0), }, "depth_of_inheritance": { - "average": ( - total_doi / len(codebase.classes) if codebase.classes else 0 - ), + "average": (total_doi / len(codebase.classes) if codebase.classes else 0), }, "halstead_metrics": { "total_volume": int(total_volume), @@ -543,9 +537,7 @@ async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: ), }, "maintainability_index": { - "average": ( - int(total_mi / num_callables) if num_callables > 0 else 0 - ), + "average": (int(total_mi / num_callables) if num_callables > 0 else 0), }, "description": desc, "num_files": num_files, @@ -560,4 +552,3 @@ async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: if __name__ == "__main__": # Run the FastAPI app locally with uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) - diff --git a/codegen-on-oss/codegen_on_oss/metrics.py b/codegen-on-oss/codegen_on_oss/metrics.py index 70f83e1fa..75dd6963e 100644 --- a/codegen-on-oss/codegen_on_oss/metrics.py +++ b/codegen-on-oss/codegen_on_oss/metrics.py @@ -20,7 +20,7 @@ get_operators_and_operands, cc_rank, get_maintainability_rank, - calculate_doi + calculate_doi, ) if TYPE_CHECKING: @@ -36,16 +36,16 @@ class CodeMetrics: A class to calculate and provide code quality metrics for a codebase. Integrates with the analysis module for comprehensive code analysis. """ - + # Constants for threshold values COMPLEXITY_THRESHOLD = 10 MAINTAINABILITY_THRESHOLD = 65 INHERITANCE_DEPTH_THRESHOLD = 3 - + def __init__(self, codebase: Codebase): """ Initialize the CodeMetrics class with a codebase. - + Args: codebase: The Codebase object to analyze """ @@ -55,11 +55,11 @@ def __init__(self, codebase: Codebase): self._maintainability_metrics = None self._inheritance_metrics = None self._halstead_metrics = None - + def calculate_all_metrics(self) -> Dict[str, Any]: """ Calculate all available metrics for the codebase. - + Returns: A dictionary containing all metrics categories """ @@ -70,347 +70,342 @@ def calculate_all_metrics(self) -> Dict[str, Any]: "inheritance": self.inheritance_metrics, "halstead": self.halstead_metrics, } - + @property def complexity_metrics(self) -> Dict[str, Any]: """ Calculate cyclomatic complexity metrics for the codebase. - + Returns: A dictionary containing complexity metrics including average, rank, and per-function complexity scores """ if self._complexity_metrics is not None: return self._complexity_metrics - + callables = self.codebase.functions + [ m for c in self.codebase.classes for m in c.methods ] - + complexities = [] for func in callables: if not hasattr(func, "code_block"): continue - + complexity = calculate_cyclomatic_complexity(func) - complexities.append({ - "name": func.name, - "complexity": complexity, - "rank": cc_rank(complexity) - }) - + complexities.append( + { + "name": func.name, + "complexity": complexity, + "rank": cc_rank(complexity), + } + ) + avg_complexity = ( sum(item["complexity"] for item in complexities) / len(complexities) - if complexities else 0 + if complexities + else 0 ) - + self._complexity_metrics = { "average": avg_complexity, "rank": cc_rank(avg_complexity), - "functions": complexities + "functions": complexities, } - + return self._complexity_metrics - + @property def line_metrics(self) -> Dict[str, Any]: """ Calculate line-based metrics for the codebase. - + Returns: A dictionary containing line metrics including total counts and per-file metrics for LOC, LLOC, SLOC, and comments """ if self._line_metrics is not None: return self._line_metrics - + total_loc = total_lloc = total_sloc = total_comments = 0 file_metrics = [] - + for file in self.codebase.files: loc, lloc, sloc, comments = count_lines(file.source) comment_density = (comments / loc * 100) if loc > 0 else 0 - - file_metrics.append({ - "file": file.path, - "loc": loc, - "lloc": lloc, - "sloc": sloc, - "comments": comments, - "comment_density": comment_density - }) - + + file_metrics.append( + { + "file": file.path, + "loc": loc, + "lloc": lloc, + "sloc": sloc, + "comments": comments, + "comment_density": comment_density, + } + ) + total_loc += loc total_lloc += lloc total_sloc += sloc total_comments += comments - - total_comment_density = ( - total_comments / total_loc * 100 if total_loc > 0 else 0 - ) - + + total_comment_density = total_comments / total_loc * 100 if total_loc > 0 else 0 + self._line_metrics = { "total": { "loc": total_loc, "lloc": total_lloc, "sloc": total_sloc, "comments": total_comments, - "comment_density": total_comment_density + "comment_density": total_comment_density, }, - "files": file_metrics + "files": file_metrics, } - + return self._line_metrics - + @property def maintainability_metrics(self) -> Dict[str, Any]: """ Calculate maintainability index metrics for the codebase. - + Returns: A dictionary containing maintainability metrics including average, rank, and per-function maintainability scores """ if self._maintainability_metrics is not None: return self._maintainability_metrics - + callables = self.codebase.functions + [ m for c in self.codebase.classes for m in c.methods ] - + mi_scores = [] for func in callables: if not hasattr(func, "code_block"): continue - + complexity = calculate_cyclomatic_complexity(func) operators, operands = get_operators_and_operands(func) volume, _, _, _, _ = calculate_halstead_volume(operators, operands) loc = len(func.code_block.source.splitlines()) mi_score = calculate_maintainability_index(volume, complexity, loc) - - mi_scores.append({ - "name": func.name, - "mi_score": mi_score, - "rank": get_maintainability_rank(mi_score) - }) - + + mi_scores.append( + { + "name": func.name, + "mi_score": mi_score, + "rank": get_maintainability_rank(mi_score), + } + ) + avg_mi = ( sum(item["mi_score"] for item in mi_scores) / len(mi_scores) - if mi_scores else 0 + if mi_scores + else 0 ) - + self._maintainability_metrics = { "average": avg_mi, "rank": get_maintainability_rank(avg_mi), - "functions": mi_scores + "functions": mi_scores, } - + return self._maintainability_metrics - + @property def inheritance_metrics(self) -> Dict[str, Any]: """ Calculate inheritance metrics for the codebase. - + Returns: A dictionary containing inheritance metrics including average depth of inheritance and per-class inheritance depth """ if self._inheritance_metrics is not None: return self._inheritance_metrics - + class_metrics = [] for cls in self.codebase.classes: doi = calculate_doi(cls) - class_metrics.append({ - "name": cls.name, - "doi": doi - }) - + class_metrics.append({"name": cls.name, "doi": doi}) + avg_doi = ( sum(item["doi"] for item in class_metrics) / len(class_metrics) - if class_metrics else 0 + if class_metrics + else 0 ) - - self._inheritance_metrics = { - "average": avg_doi, - "classes": class_metrics - } - + + self._inheritance_metrics = {"average": avg_doi, "classes": class_metrics} + return self._inheritance_metrics - + @property def halstead_metrics(self) -> Dict[str, Any]: """ Calculate Halstead complexity metrics for the codebase. - + Returns: A dictionary containing Halstead metrics including volume, difficulty, effort, and other Halstead measures """ if self._halstead_metrics is not None: return self._halstead_metrics - + callables = self.codebase.functions + [ m for c in self.codebase.classes for m in c.methods ] - + halstead_metrics = [] for func in callables: if not hasattr(func, "code_block"): continue - + operators, operands = get_operators_and_operands(func) volume, n1, n2, n_operators, n_operands = calculate_halstead_volume( operators, operands ) - + # Calculate additional Halstead metrics n = n_operators + n_operands N = n1 + n2 - - difficulty = ( - (n_operators / 2) * (n2 / n_operands) if n_operands > 0 else 0 - ) + + difficulty = (n_operators / 2) * (n2 / n_operands) if n_operands > 0 else 0 effort = difficulty * volume if volume > 0 else 0 time_required = effort / 18 if effort > 0 else 0 # Seconds bugs_delivered = volume / 3000 if volume > 0 else 0 - - halstead_metrics.append({ - "name": func.name, - "volume": volume, - "difficulty": difficulty, - "effort": effort, - "time_required": time_required, # in seconds - "bugs_delivered": bugs_delivered - }) - + + halstead_metrics.append( + { + "name": func.name, + "volume": volume, + "difficulty": difficulty, + "effort": effort, + "time_required": time_required, # in seconds + "bugs_delivered": bugs_delivered, + } + ) + avg_volume = ( sum(item["volume"] for item in halstead_metrics) / len(halstead_metrics) - if halstead_metrics else 0 + if halstead_metrics + else 0 ) avg_difficulty = ( sum(item["difficulty"] for item in halstead_metrics) / len(halstead_metrics) - if halstead_metrics else 0 + if halstead_metrics + else 0 ) avg_effort = ( sum(item["effort"] for item in halstead_metrics) / len(halstead_metrics) - if halstead_metrics else 0 + if halstead_metrics + else 0 ) - + self._halstead_metrics = { "average": { "volume": avg_volume, "difficulty": avg_difficulty, - "effort": avg_effort + "effort": avg_effort, }, - "functions": halstead_metrics + "functions": halstead_metrics, } - + return self._halstead_metrics - - def find_complex_functions(self, threshold: int = COMPLEXITY_THRESHOLD) -> List[Dict[str, Any]]: + + def find_complex_functions( + self, threshold: int = COMPLEXITY_THRESHOLD + ) -> List[Dict[str, Any]]: """ Find functions with cyclomatic complexity above the threshold. - + Args: threshold: The complexity threshold (default: 10) - + Returns: A list of functions with complexity above the threshold """ metrics = self.complexity_metrics - return [ - func for func in metrics["functions"] - if func["complexity"] > threshold - ] - + return [func for func in metrics["functions"] if func["complexity"] > threshold] + def find_low_maintainability_functions( self, threshold: int = MAINTAINABILITY_THRESHOLD ) -> List[Dict[str, Any]]: """ Find functions with maintainability index below the threshold. - + Args: threshold: The maintainability threshold (default: 65) - + Returns: A list of functions with maintainability below the threshold """ metrics = self.maintainability_metrics - return [ - func for func in metrics["functions"] - if func["mi_score"] < threshold - ] - + return [func for func in metrics["functions"] if func["mi_score"] < threshold] + def find_deep_inheritance_classes( self, threshold: int = INHERITANCE_DEPTH_THRESHOLD ) -> List[Dict[str, Any]]: """ Find classes with depth of inheritance above the threshold. - + Args: threshold: The inheritance depth threshold (default: 3) - + Returns: A list of classes with inheritance depth above the threshold """ metrics = self.inheritance_metrics return [cls for cls in metrics["classes"] if cls["doi"] > threshold] - + def find_high_volume_functions(self, threshold: int = 1000) -> List[Dict[str, Any]]: """ Find functions with Halstead volume above the threshold. - + Args: threshold: The volume threshold (default: 1000) - + Returns: A list of functions with volume above the threshold """ metrics = self.halstead_metrics - return [ - func for func in metrics["functions"] - if func["volume"] > threshold - ] - - def find_high_effort_functions(self, threshold: int = 50000) -> List[Dict[str, Any]]: + return [func for func in metrics["functions"] if func["volume"] > threshold] + + def find_high_effort_functions( + self, threshold: int = 50000 + ) -> List[Dict[str, Any]]: """ Find functions with high Halstead effort (difficult to maintain). - + Args: threshold: The effort threshold (default: 50000) - + Returns: A list of functions with effort above the threshold """ metrics = self.halstead_metrics - return [ - func for func in metrics["functions"] - if func["effort"] > threshold - ] - + return [func for func in metrics["functions"] if func["effort"] > threshold] + def find_bug_prone_functions(self, threshold: float = 0.5) -> List[Dict[str, Any]]: """ Find functions with high estimated bug delivery. - + Args: threshold: The bugs delivered threshold (default: 0.5) - + Returns: A list of functions likely to contain bugs """ metrics = self.halstead_metrics return [ - func for func in metrics["functions"] - if func["bugs_delivered"] > threshold + func for func in metrics["functions"] if func["bugs_delivered"] > threshold ] - + def get_code_quality_summary(self) -> Dict[str, Any]: """ Generate a comprehensive code quality summary. - + Returns: A dictionary with overall code quality metrics and problem areas """ @@ -433,7 +428,7 @@ def get_code_quality_summary(self) -> Dict[str, Any]: "high_volume": len(self.find_high_volume_functions()), "high_effort": len(self.find_high_effort_functions()), "bug_prone": len(self.find_bug_prone_functions()), - } + }, } @@ -459,48 +454,24 @@ def __init__(self, output: BaseOutput): @contextmanager def start_profiler( - self, name: str, revision: str, language: str | None, logger: "Logger" + self, name: str, revision: str, language: Optional[str], logger: "Logger" ) -> Generator["MetricsProfile", None, None]: """ Starts a new profiling session for a given profile name. Returns a MetricsProfile instance that you can use to mark measurements. """ - profile = MetricsProfile(name, revision, language, self.output, logger) - error_msg: str | None = None + profile = MetricsProfile(name, revision, language or "", logger, self.output) try: yield profile - except ParseRunError as e: - logger.error(f"Repository: {name} {e.args[0]}") # noqa: TRY400 - error_msg = e.args[0] - except Exception as e: - logger.exception(f"Repository: {name}") - error_msg = f"Unhandled Exception {type(e)}" - finally: - profile.finish(error=error_msg) - - @classmethod - def fields(cls) -> list[str]: - return [ - "repo", - "revision", - "language", - "action", - "codegen_version", - "delta_time", - "cumulative_time", - "cpu_time", - "memory_usage", - "memory_delta", - "error", - ] + profile.finish() class MetricsProfile: """ Context-managed profile that records measurements at each call to `measure()`. - It tracks the wall-clock duration, CPU time, and memory usage (with delta) - at the time of the call. Upon exiting the context, it also writes all collected + It tracks the wall-clock duration, CPU time, and memory usage (with delta) + at the time of the call. Upon exiting the context, it also writes all collected metrics, including the total time, to a CSV file. """ @@ -549,9 +520,7 @@ def measure(self, action_name: str): """ current_time = time.perf_counter() current_cpu = float(time.process_time()) - current_mem = int( - psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024) - ) + current_mem = int(psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)) # Calculate time deltas. delta_time = current_time - self.last_measure_time @@ -580,7 +549,7 @@ def measure(self, action_name: str): self.last_measure_time = current_time self.last_measure_mem = current_mem - def finish(self, error: str | None = None): + def finish(self): """ Called automatically when the profiling context is exited. This method records a final measurement (for the total duration) and @@ -588,9 +557,7 @@ def finish(self, error: str | None = None): """ finish_time = time.perf_counter() finish_cpu = float(time.process_time()) - finish_mem = int( - psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024) - ) + finish_mem = int(psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)) total_duration = finish_time - self.start_time @@ -598,19 +565,21 @@ def finish(self, error: str | None = None): memory_delta = finish_mem - self.last_measure_mem # Record the overall profile measurement. - self.write_output({ - "repo": self.name, - "revision": self.revision, - "codegen_version": codegen_version, - "language": self.language, - "action": "total_parse", - "delta_time": total_duration, - "cumulative_time": total_duration, - "cpu_time": finish_cpu, - "memory_usage": finish_mem, - "memory_delta": memory_delta, - "error": error, - }) + self.write_output( + { + "repo": self.name, + "revision": self.revision, + "codegen_version": codegen_version, + "language": self.language, + "action": "total_parse", + "delta_time": total_duration, + "cumulative_time": total_duration, + "cpu_time": finish_cpu, + "memory_usage": finish_mem, + "memory_delta": memory_delta, + "error": None, + } + ) def write_output(self, measurement: dict[str, Any]): """ @@ -618,4 +587,3 @@ def write_output(self, measurement: dict[str, Any]): """ self.logger.info(json.dumps(measurement, indent=4)) self.output.write_output(measurement) - From 3558eca67a41cf5a056ce7b31bd1d001d91bc567 Mon Sep 17 00:00:00 2001 From: Codegen Bot Date: Sat, 3 May 2025 02:31:25 +0000 Subject: [PATCH 3/3] Fix code formatting and security issues in analysis module --- .../codegen_on_oss/analysis/analysis.py | 64 +++++---- codegen-on-oss/codegen_on_oss/metrics.py | 132 ++++++++---------- 2 files changed, 95 insertions(+), 101 deletions(-) diff --git a/codegen-on-oss/codegen_on_oss/analysis/analysis.py b/codegen-on-oss/codegen_on_oss/analysis/analysis.py index 52ccf5014..7ef1d15d5 100644 --- a/codegen-on-oss/codegen_on_oss/analysis/analysis.py +++ b/codegen-on-oss/codegen_on_oss/analysis/analysis.py @@ -1,23 +1,25 @@ -from fastapi import FastAPI -from pydantic import BaseModel -from typing import Dict, List, Tuple, Any, Optional, Union +import contextlib +import math +import os +import re +import subprocess +import tempfile +from datetime import datetime, timedelta +from typing import Any + +import requests +import uvicorn from codegen import Codebase +from codegen.sdk.core.expressions.binary_expression import BinaryExpression +from codegen.sdk.core.expressions.comparison_expression import ComparisonExpression +from codegen.sdk.core.expressions.unary_expression import UnaryExpression from codegen.sdk.core.statements.for_loop_statement import ForLoopStatement from codegen.sdk.core.statements.if_block_statement import IfBlockStatement from codegen.sdk.core.statements.try_catch_statement import TryCatchStatement from codegen.sdk.core.statements.while_statement import WhileStatement -from codegen.sdk.core.expressions.binary_expression import BinaryExpression -from codegen.sdk.core.expressions.unary_expression import UnaryExpression -from codegen.sdk.core.expressions.comparison_expression import ComparisonExpression -import math -import re -import requests -from datetime import datetime, timedelta -import subprocess -import os -import tempfile +from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -import uvicorn +from pydantic import BaseModel import networkx as nx # Import from other analysis modules @@ -85,7 +87,7 @@ ) -def get_monthly_commits(repo_path: str) -> Dict[str, int]: +def get_monthly_commits(repo_path: str) -> dict[str, int]: """ Get the number of commits per month for the last 12 months. @@ -101,17 +103,24 @@ def get_monthly_commits(repo_path: str) -> Dict[str, int]: date_format = "%Y-%m-%d" since_date = start_date.strftime(date_format) until_date = end_date.strftime(date_format) - repo_path = "https://github.com/" + repo_path + + # Ensure repo_path is properly formatted to prevent command injection + if not re.match(r'^[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+$', repo_path): + print(f"Invalid repository path format: {repo_path}") + return {} + + repo_url = f"https://github.com/{repo_path}" try: original_dir = os.getcwd() with tempfile.TemporaryDirectory() as temp_dir: - subprocess.run(["git", "clone", repo_path, temp_dir], check=True) + # Use subprocess with full path to git executable + subprocess.run(["/usr/bin/git", "clone", repo_url, temp_dir], check=True) os.chdir(temp_dir) cmd = [ - "git", + "/usr/bin/git", "log", f"--since={since_date}", f"--until={until_date}", @@ -147,10 +156,8 @@ def get_monthly_commits(repo_path: str) -> Dict[str, int]: print(f"Error processing git commits: {e}") return {} finally: - try: + with contextlib.suppress(Exception): os.chdir(original_dir) - except Exception: - pass def calculate_cyclomatic_complexity(function): @@ -172,7 +179,7 @@ def analyze_statement(statement): if hasattr(statement, "elif_statements"): complexity += len(statement.elif_statements) - elif isinstance(statement, (ForLoopStatement, WhileStatement)): + elif isinstance(statement, ForLoopStatement | WhileStatement): complexity += 1 elif isinstance(statement, TryCatchStatement): @@ -346,10 +353,7 @@ def count_lines(source: str): comments += 1 if line.strip().startswith('"""') or line.strip().startswith("'''"): code_part = "" - elif in_multiline: - comments += 1 - code_part = "" - elif line.strip().startswith("#"): + elif in_multiline or line.strip().startswith("#"): comments += 1 code_part = "" @@ -438,7 +442,8 @@ def get_github_repo_description(repo_url): """ api_url = f"https://api.github.com/repos/{repo_url}" - response = requests.get(api_url) + # Add timeout to requests call + response = requests.get(api_url, timeout=10) if response.status_code == 200: repo_data = response.json() @@ -454,7 +459,7 @@ class RepoRequest(BaseModel): @app.post("/analyze_repo") -async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: +async def analyze_repo(request: RepoRequest) -> dict[str, Any]: """ Analyze a repository and return comprehensive metrics. @@ -551,4 +556,5 @@ async def analyze_repo(request: RepoRequest) -> Dict[str, Any]: if __name__ == "__main__": # Run the FastAPI app locally with uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000) + # Use 127.0.0.1 instead of 0.0.0.0 for security + uvicorn.run(app, host="127.0.0.1", port=8000) diff --git a/codegen-on-oss/codegen_on_oss/metrics.py b/codegen-on-oss/codegen_on_oss/metrics.py index 75dd6963e..c69aae729 100644 --- a/codegen-on-oss/codegen_on_oss/metrics.py +++ b/codegen-on-oss/codegen_on_oss/metrics.py @@ -1,27 +1,25 @@ import json import os import time -import math from collections.abc import Generator from contextlib import contextmanager from importlib.metadata import version -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any import psutil from codegen import Codebase -from codegen_on_oss.errors import ParseRunError -from codegen_on_oss.outputs.base import BaseOutput from codegen_on_oss.analysis.analysis import ( calculate_cyclomatic_complexity, + calculate_doi, calculate_halstead_volume, calculate_maintainability_index, - count_lines, - get_operators_and_operands, cc_rank, + count_lines, get_maintainability_rank, - calculate_doi, + get_operators_and_operands, ) +from codegen_on_oss.outputs.base import BaseOutput if TYPE_CHECKING: # Logger only available in type checking context. @@ -56,7 +54,7 @@ def __init__(self, codebase: Codebase): self._inheritance_metrics = None self._halstead_metrics = None - def calculate_all_metrics(self) -> Dict[str, Any]: + def calculate_all_metrics(self) -> dict[str, Any]: """ Calculate all available metrics for the codebase. @@ -72,7 +70,7 @@ def calculate_all_metrics(self) -> Dict[str, Any]: } @property - def complexity_metrics(self) -> Dict[str, Any]: + def complexity_metrics(self) -> dict[str, Any]: """ Calculate cyclomatic complexity metrics for the codebase. @@ -93,13 +91,11 @@ def complexity_metrics(self) -> Dict[str, Any]: continue complexity = calculate_cyclomatic_complexity(func) - complexities.append( - { - "name": func.name, - "complexity": complexity, - "rank": cc_rank(complexity), - } - ) + complexities.append({ + "name": func.name, + "complexity": complexity, + "rank": cc_rank(complexity), + }) avg_complexity = ( sum(item["complexity"] for item in complexities) / len(complexities) @@ -116,7 +112,7 @@ def complexity_metrics(self) -> Dict[str, Any]: return self._complexity_metrics @property - def line_metrics(self) -> Dict[str, Any]: + def line_metrics(self) -> dict[str, Any]: """ Calculate line-based metrics for the codebase. @@ -134,16 +130,14 @@ def line_metrics(self) -> Dict[str, Any]: loc, lloc, sloc, comments = count_lines(file.source) comment_density = (comments / loc * 100) if loc > 0 else 0 - file_metrics.append( - { - "file": file.path, - "loc": loc, - "lloc": lloc, - "sloc": sloc, - "comments": comments, - "comment_density": comment_density, - } - ) + file_metrics.append({ + "file": file.path, + "loc": loc, + "lloc": lloc, + "sloc": sloc, + "comments": comments, + "comment_density": comment_density, + }) total_loc += loc total_lloc += lloc @@ -166,7 +160,7 @@ def line_metrics(self) -> Dict[str, Any]: return self._line_metrics @property - def maintainability_metrics(self) -> Dict[str, Any]: + def maintainability_metrics(self) -> dict[str, Any]: """ Calculate maintainability index metrics for the codebase. @@ -192,13 +186,11 @@ def maintainability_metrics(self) -> Dict[str, Any]: loc = len(func.code_block.source.splitlines()) mi_score = calculate_maintainability_index(volume, complexity, loc) - mi_scores.append( - { - "name": func.name, - "mi_score": mi_score, - "rank": get_maintainability_rank(mi_score), - } - ) + mi_scores.append({ + "name": func.name, + "mi_score": mi_score, + "rank": get_maintainability_rank(mi_score), + }) avg_mi = ( sum(item["mi_score"] for item in mi_scores) / len(mi_scores) @@ -215,7 +207,7 @@ def maintainability_metrics(self) -> Dict[str, Any]: return self._maintainability_metrics @property - def inheritance_metrics(self) -> Dict[str, Any]: + def inheritance_metrics(self) -> dict[str, Any]: """ Calculate inheritance metrics for the codebase. @@ -242,7 +234,7 @@ def inheritance_metrics(self) -> Dict[str, Any]: return self._inheritance_metrics @property - def halstead_metrics(self) -> Dict[str, Any]: + def halstead_metrics(self) -> dict[str, Any]: """ Calculate Halstead complexity metrics for the codebase. @@ -268,24 +260,22 @@ def halstead_metrics(self) -> Dict[str, Any]: ) # Calculate additional Halstead metrics - n = n_operators + n_operands - N = n1 + n2 + n_operators + n_operands + n1 + n2 difficulty = (n_operators / 2) * (n2 / n_operands) if n_operands > 0 else 0 effort = difficulty * volume if volume > 0 else 0 time_required = effort / 18 if effort > 0 else 0 # Seconds bugs_delivered = volume / 3000 if volume > 0 else 0 - halstead_metrics.append( - { - "name": func.name, - "volume": volume, - "difficulty": difficulty, - "effort": effort, - "time_required": time_required, # in seconds - "bugs_delivered": bugs_delivered, - } - ) + halstead_metrics.append({ + "name": func.name, + "volume": volume, + "difficulty": difficulty, + "effort": effort, + "time_required": time_required, # in seconds + "bugs_delivered": bugs_delivered, + }) avg_volume = ( sum(item["volume"] for item in halstead_metrics) / len(halstead_metrics) @@ -316,7 +306,7 @@ def halstead_metrics(self) -> Dict[str, Any]: def find_complex_functions( self, threshold: int = COMPLEXITY_THRESHOLD - ) -> List[Dict[str, Any]]: + ) -> list[dict[str, Any]]: """ Find functions with cyclomatic complexity above the threshold. @@ -331,7 +321,7 @@ def find_complex_functions( def find_low_maintainability_functions( self, threshold: int = MAINTAINABILITY_THRESHOLD - ) -> List[Dict[str, Any]]: + ) -> list[dict[str, Any]]: """ Find functions with maintainability index below the threshold. @@ -346,7 +336,7 @@ def find_low_maintainability_functions( def find_deep_inheritance_classes( self, threshold: int = INHERITANCE_DEPTH_THRESHOLD - ) -> List[Dict[str, Any]]: + ) -> list[dict[str, Any]]: """ Find classes with depth of inheritance above the threshold. @@ -359,7 +349,7 @@ def find_deep_inheritance_classes( metrics = self.inheritance_metrics return [cls for cls in metrics["classes"] if cls["doi"] > threshold] - def find_high_volume_functions(self, threshold: int = 1000) -> List[Dict[str, Any]]: + def find_high_volume_functions(self, threshold: int = 1000) -> list[dict[str, Any]]: """ Find functions with Halstead volume above the threshold. @@ -374,7 +364,7 @@ def find_high_volume_functions(self, threshold: int = 1000) -> List[Dict[str, An def find_high_effort_functions( self, threshold: int = 50000 - ) -> List[Dict[str, Any]]: + ) -> list[dict[str, Any]]: """ Find functions with high Halstead effort (difficult to maintain). @@ -387,7 +377,7 @@ def find_high_effort_functions( metrics = self.halstead_metrics return [func for func in metrics["functions"] if func["effort"] > threshold] - def find_bug_prone_functions(self, threshold: float = 0.5) -> List[Dict[str, Any]]: + def find_bug_prone_functions(self, threshold: float = 0.5) -> list[dict[str, Any]]: """ Find functions with high estimated bug delivery. @@ -402,7 +392,7 @@ def find_bug_prone_functions(self, threshold: float = 0.5) -> List[Dict[str, Any func for func in metrics["functions"] if func["bugs_delivered"] > threshold ] - def get_code_quality_summary(self) -> Dict[str, Any]: + def get_code_quality_summary(self) -> dict[str, Any]: """ Generate a comprehensive code quality summary. @@ -454,7 +444,7 @@ def __init__(self, output: BaseOutput): @contextmanager def start_profiler( - self, name: str, revision: str, language: Optional[str], logger: "Logger" + self, name: str, revision: str, language: str | None, logger: "Logger" ) -> Generator["MetricsProfile", None, None]: """ Starts a new profiling session for a given profile name. @@ -565,21 +555,19 @@ def finish(self): memory_delta = finish_mem - self.last_measure_mem # Record the overall profile measurement. - self.write_output( - { - "repo": self.name, - "revision": self.revision, - "codegen_version": codegen_version, - "language": self.language, - "action": "total_parse", - "delta_time": total_duration, - "cumulative_time": total_duration, - "cpu_time": finish_cpu, - "memory_usage": finish_mem, - "memory_delta": memory_delta, - "error": None, - } - ) + self.write_output({ + "repo": self.name, + "revision": self.revision, + "codegen_version": codegen_version, + "language": self.language, + "action": "total_parse", + "delta_time": total_duration, + "cumulative_time": total_duration, + "cpu_time": finish_cpu, + "memory_usage": finish_mem, + "memory_delta": memory_delta, + "error": None, + }) def write_output(self, measurement: dict[str, Any]): """