diff --git a/ai/gen-ai-agents/code-quality-agent/LICENSE b/ai/gen-ai-agents/code-quality-agent/LICENSE new file mode 100644 index 000000000..fb2e1fcb6 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Luigi Saetta + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/ai/gen-ai-agents/code-quality-agent/README.md b/ai/gen-ai-agents/code-quality-agent/README.md new file mode 100644 index 000000000..7eab7999a --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/README.md @@ -0,0 +1,177 @@ +# Code Quality Agent + +A lightweight **LangGraph-based** agent that scans a local codebase (read-only) to: + +- ✅ **Check file headers** against a simple template policy +- ✅ **Scan for secrets** (heuristic patterns + suspicious assignments) +- ✅ **Check for license** +- ✅ **Check for dependencies licenses** +- ✅ **Generate header fixes** +- ✅ **Generate per-file documentation** (optional) in Markdown via an LLM (OCI GenAI via LangChain) + +It produces artifacts in a separate output folder (no in-place edits). + + +--- + +## Features + +### Header policy checks +For each discovered source file, the agent validates that a header block contains: + +- `File name:` +- `Author:` +- `Date last modified:` +- `Python Version:` +- `Description:` +- `License:` + +It also performs a **date alignment check** (header date vs. file `mtime` in UTC) when the file path is available. + +### Secrets scanning (heuristic) +The agent searches each file for: +- known patterns (AWS keys, GitHub tokens, OCI OCIDs, private key blocks, bearer headers, etc.) +- suspicious string assignments / dict values with sensitive names (password, token, secret, api_key, …) + +Findings are reported with: +- kind +- line number +- a redacted excerpt + +### License check +Check that an approved LICENSE file is provided. + +### Header fix generation +For each of the files where the header check fails, provide the snippet suggested to use. +- modifiy the Author field +- check the rest. + +### Per-file doc generation (LLM) +For each Python file, the agent can generate Markdown documentation with sections such as: +- overview +- public API +- behaviors/edge cases +- side effects +- usage examples +- risks/TODOs + +### Report generation +A final summary report is also generated, in Markdown. + +### Languages supported +For now, tests have been done using: +- Python + +--- + +## Repository layout + +```text +. +├── agent/ +│ ├── graph_agent.py # LangGraph pipeline (discover → check → scan → docgen → report) +│ ├── fs_ro.py # Read-only sandboxed filesystem access +│ ├── header_rules.py # Header policy checker +│ ├── secrets_scan.py # Heuristic secrets scanner +│ ├── docgen.py # Per-file documentation generation +│ ├── docgen_prompt.py # Prompts for doc generation + final report +│ ├── docgen_utils.py # LLM invocation + output normalization +│ ├── oci_models.py # OCI GenAI / OCI OpenAI LangChain adapters +│ └── utils.py # Logging helpers, etc. +├── out/ # Default output folder (generated artifacts) +├── run_agent.py # CLI entry point +├── run_agent.sh # Convenience runner +├── requirements.txt +└── LICENSE +``` + +## Setup +1. Create a python 3.11+ environment + +For example, +``` +conda create -n code_quality_agent python==3.11 +``` + +activate the environment. If you're using conda: +``` +conda activate code_quality_agent +``` + +2. Install the following python libraries: +``` +pip install oci -U +pip install langchain -U +pip install langchain-oci -U +pip install langgraph -U +``` + +3. Clone this repository +``` +git clone https://github.com/luigisaetta/code_quality_agent.git +``` + +4. Create a config_private.py file, in the agent directory. + +Start from the template provided in the repository and create a **config_private.py** file. +Put in the file your compartment's OCID. + + +5. Have your local OCI config setup + +Setup under $HOME/.oci +See: https://docs.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm + +6. Set policies to use Generative AI + +See: https://docs.oracle.com/en-us/iaas/Content/generative-ai/iam-policies.htm + +Ask your tenancy admin for help. + +## How-to use it +Modify the [run_agent.sh](./run_agent.sh) file. + +Change the params: +- root (root directory for all the files to be scanned) +- out: with the full path to the output dir + +run +``` +run_agent.sh +``` + +## Dependency License Checks – Execution Requirements + +This agent checks license compliance for direct Python dependencies listed in `requirements.txt`. + +### Recommended (deterministic & fast) +Run the agent in an environment where: +- All dependencies from `requirements.txt`, from the project to-be-scanned, are installed +- Agent runtime dependencies (see Setup above) are installed + +This allows the agent to read license data from installed package metadata: +- Offline execution +- Faster and reproducible results +**Recommended for CI and release validation.** + +### Fallback (best-effort) +If some dependencies are not installed: +- Network access is required (the agent will do a PyPI JSON lookup) +- Execution may be slower +- License data may be incomplete or ambiguous + +## Important Note on Results and Human Review + +This agent is intended to **assist** with code quality, security, and license compliance checks, **not to replace entirely human judgment**. + +While the agent applies deterministic rules and best-effort analysis, it may produce: +- **False positives** (e.g. ambiguous licenses, heuristic PII detection, conservative policy checks) +- **Incomplete results** depending on the execution environment (installed dependencies, network access, metadata quality) + +For this reason: +- **All findings must be reviewed and validated by a human** +- The agent’s output should be treated as an **input to review**, not a final decision +- Final responsibility for compliance, security, and legal interpretation always remains with the user + +This is especially important for compliance-critical areas such as **licenses, personal data (PII), and security findings**. + diff --git a/ai/gen-ai-agents/code-quality-agent/agent/config.py b/ai/gen-ai-agents/code-quality-agent/agent/config.py new file mode 100644 index 000000000..6e89ff05a --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/config.py @@ -0,0 +1,100 @@ +""" +File name: config.py +Author: Luigi Saetta +Date last modified: 2025-07-02 +Python Version: 3.11 + +Description: + This module provides general configurations + + +Usage: + Import this module into other scripts to use its functions. + Example: + import config + +License: + This code is released under the MIT License. + +Notes: + This is a part of a demo showing how to implement a code quality agent. + +Warnings: + This module is in development, may change in future versions. +""" + +DEBUG = False +STREAMING = False + +# OCI general + +# type of OCI auth +AUTH = "API_KEY" +REGION = "eu-frankfurt-1" +SERVICE_ENDPOINT = f"https://inference.generativeai.{REGION}.oci.oraclecloud.com" + +# LLM +# this is the default model +LLM_MODEL_ID = "openai.gpt-oss-120b" + +TEMPERATURE = 0.0 +TOP_P = 1 +MAX_TOKENS = 4000 + +# +# specific configs for the Code Quality Agent +# +# for now, only Python files +FILES_PATTERN = "*.py" + +# ---- File exclusions (repo-relative glob patterns) ---- +EXCLUDED_PATHS = [ + ".git/**", + ".venv/**", + "venv/**", + "__pycache__/**", + "*.pyc", + "build/**", + "dist/**", + "node_modules/**", +] + +# Accepted license identifiers (you decide the vocabulary) +ACCEPTED_LICENSE_TYPES = [ + "MIT", + "Apache-2.0", + "UPL-1.0", + "BSD-3-Clause", + "BSD-2-Clause", +] + +# set this flag to True if you want to create local docs in md format. +# Not needed to check code quality. +ENABLE_DOC_GENERATION = False + +# used for header generation. +# It is the minimum version accepted. +PYTHON_VERSION = "3.11" + +# Licenses you allow for dependencies (use SPDX-ish IDs where possible) +# see docs here: +# https://confluence.oraclecorp.com/confluence/display/CORPARCH/Licenses+Eligible+for+Pre-Approval+-+Distribution +ACCEPTED_DEP_LICENSES = { + "MIT", + "Apache-2.0", + "BSD-3-Clause", + "BSD-2-Clause", + "BSD", + "ISC", + # Mozilla Public License + "MPL-2.0", + # Python Software Foundation License + "PSF-2.0", + "UPL-1.0", + # Eclipse Public License + "EPL-2.0", +} + +# Policy knobs +FAIL_ON_UNKNOWN_DEP_LICENSE = False # usually False at first +FAIL_ON_NOT_INSTALLED_DEP = False diff --git a/ai/gen-ai-agents/code-quality-agent/agent/config_private_template.py b/ai/gen-ai-agents/code-quality-agent/agent/config_private_template.py new file mode 100644 index 000000000..e52e2524e --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/config_private_template.py @@ -0,0 +1,5 @@ +""" +Private config +""" + +COMPARTMENT_ID = "YOUR_COMPARTMENT_OCID" diff --git a/ai/gen-ai-agents/code-quality-agent/agent/dep_license_check.py b/ai/gen-ai-agents/code-quality-agent/agent/dep_license_check.py new file mode 100644 index 000000000..ac42b3dc0 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/dep_license_check.py @@ -0,0 +1,490 @@ +""" +File name: dep_license_check.py +Author: L. Saetta +Date last modified: 2026-01-12 +Python Version: 3.11 +License: MIT + +Description: + Checks licenses for dependencies listed in requirements.txt (direct deps). + + Primary source: + - Installed package metadata (importlib.metadata) + + Fallback source (when installed metadata cannot determine license): + - PyPI JSON API for the *installed version* (best-effort) + + Limitations: + - If dependencies are not installed in the environment running the agent, licenses will be NOT_INSTALLED. + - Requirements parsing is intentionally conservative; complex pip options are ignored. + - PyPI fallback requires network access and relies on PyPI metadata quality. +""" + +from __future__ import annotations + +import json +import re +import urllib.error +import urllib.request +from dataclasses import dataclass +from importlib import metadata +from typing import Any, Iterable + +from agent.utils import get_console_logger + + +logger = get_console_logger() + +# ---- Data models ---- + + +@dataclass(frozen=True) +class DepLicenseInfo: + requirement: str # original requirement line (cleaned) + distribution: str # normalized dist name (best-effort) + version: str | None + license: str # normalized license id or UNKNOWN/NOT_INSTALLED + source: str # license_field | classifier | pypi_json | unknown | not_installed + + +@dataclass(frozen=True) +class DepLicenseCheckResult: + ok: bool + deps: list[DepLicenseInfo] + failures: list[DepLicenseInfo] + warnings: list[DepLicenseInfo] + message: str + + +# ---- Requirements parsing (direct deps only) ---- + +_REQ_NAME_RE = re.compile(r"^\s*([A-Za-z0-9][A-Za-z0-9._-]*)") # dist name at start +_IGNORE_PREFIXES = ( + "-r", + "--requirement", + "--index-url", + "--extra-index-url", + "--find-links", + "--trusted-host", +) + + +def parse_requirements_txt(text: str) -> list[str]: + """ + Returns cleaned requirement lines (direct deps). + Ignores comments, empty lines, and pip options. + Keeps markers/extras/version pins as part of the requirement string, but extracts dist name separately later. + """ + reqs: list[str] = [] + for raw in text.splitlines(): + line = raw.strip() + if not line or line.startswith("#"): + continue + if line.startswith(_IGNORE_PREFIXES): + # conservative: ignore includes and index directives + continue + # drop inline comments: "pkg==1.2 # comment" + if " #" in line: + line = line.split(" #", 1)[0].rstrip() + reqs.append(line) + return reqs + + +def extract_dist_name(requirement: str) -> str | None: + """ + Best-effort extraction of distribution name from a requirement line. + Handles: + - requests==2.32.3 + - pydantic>=2 + - fastapi[standard]>=0.100 + - package ; python_version < "3.12" + """ + # Strip environment marker + base = requirement.split(";", 1)[0].strip() + # Strip extras + base = base.split("[", 1)[0].strip() + m = _REQ_NAME_RE.match(base) + if not m: + return None + return m.group(1) + + +# ---- License extraction helpers ---- + + +def _normalize_license_string(s: str) -> str: + """ + Normalize common license strings to SPDX-ish ids. + Keep it conservative; expand mapping as you need. + """ + v = (s or "").strip() + if not v: + return "UNKNOWN" + + u = v.upper().strip() + if u in {"UNKNOWN", "NONE", "N/A"}: + return "UNKNOWN" + + # Common normalizations + # for now I don't want to add GPL + mapping = { + # Apache + "APACHE": "Apache-2.0", + "APACHE 2.0": "Apache-2.0", + "APACHE-2.0": "Apache-2.0", + "APACHE SOFTWARE LICENSE": "Apache-2.0", + # MIT + "MIT": "MIT", + "MIT LICENSE": "MIT", + # BSD + "BSD": "BSD", + # BSD variants commonly seen in metadata + "MODIFIED BSD LICENSE": "BSD", + "NEW BSD LICENSE": "BSD-3-Clause", + "REVISED BSD LICENSE": "BSD-3-Clause", + "BSD-3-CLAUSE": "BSD-3-Clause", + "BSD 3-CLAUSE": "BSD-3-Clause", + "BSD-2-CLAUSE": "BSD-2-Clause", + "BSD 2-CLAUSE": "BSD-2-Clause", + # ISC + "ISC": "ISC", + # MPL + "MPL 2.0": "MPL-2.0", + "MPL-2.0": "MPL-2.0", + "MOZILLA PUBLIC LICENSE 2.0": "MPL-2.0", + # UPL (Oracle / Universal Permissive License) + "UPL-1.0": "UPL-1.0", + "UPL 1.0": "UPL-1.0", + "UNIVERSAL PERMISSIVE LICENSE 1.0": "UPL-1.0", + "UNIVERSAL PERMISSIVE LICENSE (UPL) 1.0": "UPL-1.0", + } + + if "MODIFIED BSD" in u: + return "BSD" + + if u in mapping: + return mapping[u] + + # Substring matches + if "APACHE" in u and "2" in u: + return "Apache-2.0" + if "MIT" in u: + return "MIT" + if "BSD" in u and "3" in u: + return "BSD-3-Clause" + if "BSD" in u and "2" in u: + return "BSD-2-Clause" + if "MPL" in u and "2" in u: + return "MPL-2.0" + if "ISC" in u: + return "ISC" + if "UPL" in u: + return "UPL-1.0" + + # Keep original (but trimmed) if it looks like an SPDX-ish token + if re.fullmatch(r"[A-Za-z0-9.\-+]+", v): + return v + + return v # last resort + + +def _license_from_classifiers(classifiers: Iterable[str]) -> str | None: + """ + Map Trove classifiers to normalized license ids. + """ + trove_map = { + "License :: OSI Approved :: MIT License": "MIT", + "License :: OSI Approved :: Apache Software License": "Apache-2.0", + "License :: OSI Approved :: BSD License": "BSD", + "License :: OSI Approved :: ISC License (ISCL)": "ISC", + "License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)": "MPL-2.0", + 'License :: OSI Approved :: BSD 3-Clause "New" or "Revised" License': "BSD-3-Clause", + 'License :: OSI Approved :: BSD 2-Clause "Simplified" License': "BSD-2-Clause", + # Note: UPL does not reliably appear as a Trove classifier; use license_expression instead. + } + for c in classifiers: + c = c.strip() + if c in trove_map: + return trove_map[c] + + # fallback: keyword scan + for c in classifiers: + u = c.upper() + if "LICENSE ::" not in u: + continue + if "MIT" in u: + return "MIT" + if "APACHE" in u: + return "Apache-2.0" + if "BSD 3-CLAUSE" in u or ("BSD" in u and "3" in u): + return "BSD-3-Clause" + if "BSD 2-CLAUSE" in u or ("BSD" in u and "2" in u): + return "BSD-2-Clause" + if "MPL" in u and "2" in u: + return "MPL-2.0" + if "ISC" in u: + return "ISC" + return None + + +def _normalize_dist_for_pypi(name: str) -> str: + """ + Normalize project name for PyPI URLs (PEP 503-ish): + lowercase and replace runs of [-_.] with '-'. + """ + return re.sub(r"[-_.]+", "-", name.strip().lower()) + + +# Simple in-process cache to avoid repeated HTTP calls +_PYPI_LICENSE_CACHE: dict[tuple[str, str | None], str | None] = {} + + +def _license_from_pypi_json(payload: dict[str, Any]) -> str | None: + """ + Extract license from PyPI JSON payload (best-effort), including PEP 639 fields. + + Returns a normalized license string (e.g., 'UPL-1.0', 'MIT', 'Apache-2.0') or None. + """ + info = payload.get("info") or {} + + # 1) PEP 639: license_expression (preferred) + lic_expr = (info.get("license_expression") or "").strip() + if lic_expr: + lic_norm = _normalize_license_string(lic_expr) + if lic_norm != "UNKNOWN": + return lic_norm + + # 2) Alternative shape: info.license could be a dict with expression + lic_obj = info.get("license") + if isinstance(lic_obj, dict): + expr = (lic_obj.get("expression") or "").strip() + if expr: + expr_norm = _normalize_license_string(expr) + if expr_norm != "UNKNOWN": + return expr_norm + + # 3) Trove classifiers + classifiers = info.get("classifiers") or [] + lic = _license_from_classifiers(classifiers) + if lic: + return lic + + # 4) Legacy string field: info.license + if isinstance(lic_obj, str): + lic_raw = lic_obj.strip() + else: + lic_raw = (info.get("license") or "").strip() + + if lic_raw: + lic_norm = _normalize_license_string(lic_raw) + if lic_norm != "UNKNOWN": + return lic_norm + + return None + + +def _get_license_from_pypi( + dist_name: str, version: str | None, *, timeout_s: int = 5 +) -> str | None: + """ + Best-effort PyPI fallback: query PyPI JSON API for the given dist and (if provided) version. + + Returns a normalized license string, or None if not found / network error / ambiguous. + """ + key = (dist_name, version) + if key in _PYPI_LICENSE_CACHE: + return _PYPI_LICENSE_CACHE[key] + + pypi_name = _normalize_dist_for_pypi(dist_name) + + # Prefer version-specific endpoint for determinism. + if version: + url = f"https://pypi.org/pypi/{pypi_name}/{version}/json" + else: + url = f"https://pypi.org/pypi/{pypi_name}/json" + + logger.info(" Fetching license info from PyPI for %s", dist_name) + + req = urllib.request.Request( + url, + headers={ + "Accept": "application/json", + "User-Agent": "code-quality-agent/1.0 (license-check)", + }, + method="GET", + ) + + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read().decode("utf-8", errors="replace")) + except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, ValueError): + _PYPI_LICENSE_CACHE[key] = None + return None + + lic = _license_from_pypi_json(data) + _PYPI_LICENSE_CACHE[key] = lic + return lic + + +def extract_pinned_version(requirement: str) -> str | None: + """ + Extracts the pinned version from a requirement line if it uses '=='. + Returns None if no pinned version is found. + """ + base = requirement.split(";", 1)[0].strip() + m = re.search(r"==\s*([A-Za-z0-9][A-Za-z0-9.\-+]*)", base) + return m.group(1) if m else None + + +def get_installed_dist_license( + dist_name: str, *, version_hint: str | None = None +) -> DepLicenseInfo: + """ + dist_name is a distribution name (best-effort). + + Returns DepLicenseInfo with license info from installed metadata. + If not installed, tries PyPI fallback (prefer version_hint if provided). + If license cannot be determined, license is UNKNOWN. + """ + try: + md = metadata.metadata(dist_name) + ver = metadata.version(dist_name) + installed = True + except metadata.PackageNotFoundError: + md = None + ver = None + installed = False + + # CHANGE: If not installed, try PyPI fallback instead of returning immediately + if not installed: + lic_web = _get_license_from_pypi(dist_name, version_hint) + if lic_web: + return DepLicenseInfo( + requirement=dist_name, + distribution=dist_name, + version=version_hint, + license=lic_web, + source="pypi_json", + ) + return DepLicenseInfo( + requirement=dist_name, + distribution=dist_name, + version=version_hint, + license="NOT_INSTALLED", + source="not_installed", + ) + + # 1) Local metadata: License field + lic_raw = (md.get("License") or "").strip() + lic = _normalize_license_string(lic_raw) + if lic not in {"UNKNOWN"} and lic_raw: + return DepLicenseInfo( + requirement=dist_name, + distribution=dist_name, + version=ver, + license=lic, + source="license_field", + ) + + # 2) Local metadata: Trove classifiers + classifiers = md.get_all("Classifier") or [] + lic2 = _license_from_classifiers(classifiers) + if lic2: + return DepLicenseInfo( + requirement=dist_name, + distribution=dist_name, + version=ver, + license=lic2, + source="classifier", + ) + + # CHANGE: PyPI fallback when local metadata is insufficient (license would be UNKNOWN) + lic3 = _get_license_from_pypi(dist_name, ver) + if lic3: + return DepLicenseInfo( + requirement=dist_name, + distribution=dist_name, + version=ver, + license=lic3, + source="pypi_json", + ) + + return DepLicenseInfo( + requirement=dist_name, + distribution=dist_name, + version=ver, + license="UNKNOWN", + source="unknown", + ) + + +def check_dependency_licenses( + *, + requirements_text: str, + accepted_licenses: set[str], + fail_on_unknown: bool, + fail_on_not_installed: bool, +) -> DepLicenseCheckResult: + req_lines = parse_requirements_txt(requirements_text) + + infos: list[DepLicenseInfo] = [] + failures: list[DepLicenseInfo] = [] + warnings: list[DepLicenseInfo] = [] + + # Process each requirement in requirements.txt + for req in req_lines: + dist = extract_dist_name(req) + if not dist: + warnings.append( + DepLicenseInfo( + requirement=req, + distribution="(unparsed)", + version=None, + license="UNKNOWN", + source="unknown", + ) + ) + continue + + pinned = extract_pinned_version(req) + info = get_installed_dist_license(dist, version_hint=pinned) + + # Keep original requirement for traceability + info = DepLicenseInfo( + requirement=req, + distribution=info.distribution, + version=info.version, + license=info.license, + source=info.source, + ) + infos.append(info) + + # Evaluate + if info.license == "NOT_INSTALLED": + if fail_on_not_installed: + failures.append(info) + else: + warnings.append(info) + continue + + if info.license == "UNKNOWN" or info.license == "BSD": + # BSD is ambiguous; treat as warning unless you explicitly allow "BSD" + if info.license in accepted_licenses: + continue + if fail_on_unknown: + failures.append(info) + else: + warnings.append(info) + continue + + if info.license not in accepted_licenses: + failures.append(info) + + ok = len(failures) == 0 + msg = ( + f"Checked {len(infos)} direct dependencies from requirements.txt. " + f"Failures: {len(failures)}. Warnings: {len(warnings)}." + ) + return DepLicenseCheckResult( + ok=ok, deps=infos, failures=failures, warnings=warnings, message=msg + ) diff --git a/ai/gen-ai-agents/code-quality-agent/agent/docgen.py b/ai/gen-ai-agents/code-quality-agent/agent/docgen.py new file mode 100644 index 000000000..2aa5bce42 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/docgen.py @@ -0,0 +1,183 @@ +""" +File name: docgen.py +Author: Luigi Saetta +Date last modified: 2026-01-12 +Python Version: 3.11 + +Description: + Per-file documentation generation using an LLM. Output is written elsewhere. + + - Accepts the Python source as text (read-only input). + - Produces a Markdown document in an output folder (no in-place edits). + - Designed to work with an LLM returned by oci.models.get_llm(). + + Supported LLM call styles (best-effort): + - await llm.ainvoke(prompt) -> str or object with .content + - await llm(prompt) -> str or object with .content + +Usage: + from pathlib import Path + from oci.models import get_llm + from agent.docgen import generate_doc_for_file + + llm = get_llm() + await generate_doc_for_file( + llm=llm, + relpath=Path("pkg/module.py"), + source=python_source_text, + out_dir=Path("./out_docs"), + request="Focus on public API and side effects." + ) + +License: + MIT +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any +import re + + +from agent.docgen_prompt import DOC_PROMPT +from agent.docgen_utils import call_llm_normalized +from agent.utils import get_console_logger + +logger = get_console_logger() + +# ---------------------------- +# Public API +# ---------------------------- + + +@dataclass(frozen=True) +class DocGenResult: + out_path: Path + bytes_written: int + model_hint: str | None = None + + +def ensure_dir(p: Path) -> None: + """Create directory if needed.""" + p.mkdir(parents=True, exist_ok=True) + + +def safe_doc_filename(relpath: Path) -> str: + """ + Convert "a/b/c.py" -> "a__b__c.py.md" (safe single-file output namespace). + """ + return "__".join(relpath.parts) + ".md" + + +async def generate_doc_for_file( + *, + llm: Any, + relpath: Path, + source: str, + out_dir: Path, + request: str = "", + prompt_template: str = DOC_PROMPT, + max_source_chars: int = 120_000, +) -> DocGenResult: + """ + Generate Markdown documentation for a single Python file. + + Args: + llm: LLM object (expected to support .ainvoke(prompt) or be awaitable). + relpath: Path relative to the scanned root (used only for labeling/output naming). + source: Python source code text. + out_dir: Output directory (docs will be written here). + request: User request/goal for the documentation (e.g., "focus on security and public API"). + prompt_template: Prompt template string with {relpath}, {source}, and {request}. + max_source_chars: Safety limit to avoid sending huge files to the LLM. + + Returns: + DocGenResult with the output path and bytes written. + + Raises: + ValueError: if source is empty or too large. + RuntimeError: if LLM call fails or returns empty content. + """ + if not source or not source.strip(): + raise ValueError(f"Empty source for {relpath}") + + # Light guardrail to avoid huge prompts; adjust as needed. + if len(source) > max_source_chars: + source = _truncate_source(source, max_source_chars) + + ensure_dir(out_dir) + + prompt = prompt_template.format( + relpath=str(relpath), + source=source, + request=(request or "").strip(), + ) + + text, model_hint = await call_llm_normalized(llm, prompt) + text = _postprocess_markdown(text, relpath) + + if not text.strip(): + raise RuntimeError("LLM returned empty documentation content.") + + out_path = out_dir / safe_doc_filename(relpath) + data = (text.rstrip() + "\n").encode("utf-8") + out_path.write_bytes(data) + + return DocGenResult( + out_path=out_path, + bytes_written=len(data), + model_hint=model_hint, + ) + + +# ---------------------------- +# Internals +# ---------------------------- + + +def _truncate_source(source: str, max_chars: int) -> str: + """ + Truncate source while keeping head + tail to preserve context. + """ + head = source[: int(max_chars * 0.65)] + tail = source[-int(max_chars * 0.25) :] + note = ( + "\n\n# --- TRUNCATED ---\n" + "# The source file was truncated before being sent to the LLM.\n" + "# Consider generating docs per-section if you need full coverage.\n" + "# --- TRUNCATED ---\n\n" + ) + return head + note + tail + + +def _postprocess_markdown(text: str, relpath: Path) -> str: + """ + Minimal cleanup: + - Ensure it starts with a title + - Remove stray triple backticks at edges (common formatting glitches) + """ + t = text.strip() + + # If model returns a fenced block only, unwrap once. + t = _unwrap_single_fence(t) + + # Ensure title + if not re.match(r"^\s*#\s+", t): + t = f"# {relpath}\n\n" + t + + return t + + +def _unwrap_single_fence(t: str) -> str: + """If the whole content is wrapped in a single ```...``` fence, unwrap it.""" + if t.startswith("```") and t.endswith("```"): + lines = t.splitlines() + if ( + len(lines) >= 3 + and lines[0].startswith("```") + and lines[-1].startswith("```") + ): + return "\n".join(lines[1:-1]).strip() + return t diff --git a/ai/gen-ai-agents/code-quality-agent/agent/docgen_prompt.py b/ai/gen-ai-agents/code-quality-agent/agent/docgen_prompt.py new file mode 100644 index 000000000..19579ffa4 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/docgen_prompt.py @@ -0,0 +1,116 @@ +""" +File name: docgen_prompt.py +Author: L. Saetta +Date last modified: 2026-01-12 +Python Version: 3.11 +License: MIT + +Description: + Prompt for documentation generation of Python files. + + You can customize the documentation style and content by modifying the DOC_PROMPT variable below. + +""" + +DOC_PROMPT = """ +You are a senior Python engineer. + +You must generate documentation for the following Python file. +The user request below specifies what to emphasize. Follow it carefully when relevant. + +IMPORTANT SAFETY / COMPLIANCE RULES (highest priority): +- Never include secrets, credentials, API keys, tokens, private keys, or passwords. +- Never include or reproduce personal data (PII). This includes (non-exhaustive): + emails, phone numbers, IBAN, credit card numbers, tax IDs, personal addresses. +- If the source contains sensitive-looking values or PII-like strings, DO NOT reproduce them. + Instead, describe them generically and mention that values were redacted. + +USER REQUEST (high priority): +{request} + +Output format: +- Markdown +- Title: the file path +- Sections: + - Overview (what it does, in 3-6 bullet points) + - Public API (functions/classes likely intended for import/use) + - Key behaviors and edge cases + - Inputs/outputs and side effects + - Usage examples (short, realistic) - IMPORTANT: use placeholders, never real identifiers + - Risks/TODOs (brief) + +Keep it practical and concise. + +FILE PATH: {relpath} + +PYTHON SOURCE: +```python +{source} +``` +""" + +# This is the prompt with instructions for the final report +REPORT_PROMPT = """ +You are a senior Python engineer. + +Today is: {now_datetime}. + +Generate a final report in markdown based on the following inputs. + +## Inputs +- Root directory: {root_dir} +- Processed: {num_files} +- Header issues found: {header_issues} +- Secrets issues found: {secret_issues} +- License check (repository license file): {license_check} +- Dependency license failures (requirements.txt direct deps): {dep_license_failures} +- Dependency license warnings (unknown/not installed/ambiguous): {dep_license_warnings} +- PII hard failures (direct identifiers): {pii_hard_failures} +- PII warnings (structured name/address): {pii_warnings} +- requirements.txt check: {requirements_check} + +## Policies +### PII Policy +Explain the policy outcome clearly: +- HARD FAIL: direct identifiers (email, phone, IBAN, credit card, tax id, etc.) +- WARN: possible names/addresses only when in structured form + +### Dependency license policy +- A dependency is NON-COMPLIANT if its detected license is not in the accepted allow-list. +- If a dependency license is UNKNOWN or NOT_INSTALLED, treat it as a WARNING unless configured otherwise. +- If requirements.txt is missing at repository root, dependency checks are incomplete and this must be a STRONG WARNING. + +## Pass/Fail rules (must be explicit) +The overall outcome is FAIL if any of the following are true: +- Any secrets issues are found +- Any PII hard failures exist +- Repository license check indicates non-compliance or missing/invalid license (if applicable) +- Any dependency license failures exist + +The overall outcome is WARN (not FAIL) if any of the following are true and none of the FAIL conditions hold: +- requirements.txt is missing at repository root +- Any dependency license warnings exist (UNKNOWN / NOT_INSTALLED / ambiguous like "BSD") +- Any PII warnings exist + +## Output requirements +- Title: Code Compliance & Risk Assessment Report +- Analysis made on root directory: {root_dir} +- Organize the report into dedicated sections with proper headings: + 1) Executive summary (Outcome: PASS/FAIL/WARN + key numbers + strongest issues first) + 2) Requirements & dependency visibility (requirements.txt presence + what was checked) + 3) License compliance (repository license file) + 4) Dependency license compliance (failures and warnings) + 5) Secrets scan results + 6) PII compliance (separate subsections for HARD FAIL and WARN) + 7) Header compliance + 8) Recommendations (actionable, prioritized) + +## Safety rules for the report (highest priority) +- Never include secrets or credentials. +- Never include raw PII. If excerpts are present in the inputs, assume they are already masked; + do not attempt to reconstruct or infer the original values. +- Do not paste any third-party license text. Refer to licenses by name only. +- When providing examples, always use placeholders. + +Keep it concise, practical, and suitable for a CI compliance artifact. +""" diff --git a/ai/gen-ai-agents/code-quality-agent/agent/docgen_utils.py b/ai/gen-ai-agents/code-quality-agent/agent/docgen_utils.py new file mode 100644 index 000000000..f5d527cb3 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/docgen_utils.py @@ -0,0 +1,145 @@ +""" +File name: docgen_utils.py +Author: L. Saetta +Date last modified: 2026-01-12 +Python Version: 3.11 +License: MIT + +Description: + Text generation utilities + +""" + +import asyncio +from typing import Any, Optional +from langchain_core.messages import HumanMessage + + +def _dig_for_string(obj: Any, depth: int = 0) -> Optional[str]: + """Recursively search nested dict/list structures for a likely content string.""" + if depth > 6: + return None + + if isinstance(obj, str): + return obj + + if isinstance(obj, dict): + # Prefer message.content if present + msg = obj.get("message") + if isinstance(msg, dict): + c = msg.get("content") + if isinstance(c, str): + return c + # Common "choices" shape + ch = obj.get("choices") + if isinstance(ch, list): + for it in ch: + s = _dig_for_string(it, depth + 1) + if s: + return s + # Generic scan + for _, v in obj.items(): + s = _dig_for_string(v, depth + 1) + if s: + return s + + if isinstance(obj, list): + for it in obj: + s = _dig_for_string(it, depth + 1) + if s: + return s + + return None + + +def extract_text(resp: Any) -> str: + """ + Normalize LLM outputs across: + - str + - LangChain messages (.content as str) + - Responses-style content blocks (list of {"type": "text", "text": ...}) + - dict-like OpenAI / OCI shapes + + this version manages also Responses-style content blocks + """ + + if resp is None: + return "" + + # 1. Plain string + if isinstance(resp, str): + return resp + + # 2. Responses API / LC adapters: content=[{type: "text", text: "..."}] + content = getattr(resp, "content", None) + if isinstance(content, list): + parts: list[str] = [] + for item in content: + if isinstance(item, dict): + if item.get("type") == "text" and isinstance(item.get("text"), str): + parts.append(item["text"]) + if parts: + return "\n".join(parts) + + # 3. LangChain message: .content is str + if isinstance(content, str): + return content + + # 4. Dict-like payloads + if isinstance(resp, dict): + # Direct keys + for k in ("content", "text", "output", "message"): + v = resp.get(k) + if isinstance(v, str): + return v + + # Nested Responses-like structure + v = _dig_for_string(resp) + if isinstance(v, str): + return v + + # 5. Fallback (last resort) + return str(resp) + + +def extract_model_hint(resp: Any) -> Optional[str]: + """Best-effort model hint extraction (optional).""" + for attr in ("model", "model_name"): + v = getattr(resp, attr, None) + if isinstance(v, str) and v.strip(): + return v.strip() + + if isinstance(resp, dict): + for k in ("model", "model_name"): + v = resp.get(k) + if isinstance(v, str) and v.strip(): + return v.strip() + + return None + + +async def call_llm_normalized(llm: Any, prompt: str) -> tuple[str, Optional[str]]: + """ + Call the LLM using *sync* invoke(), but keep this function async by + running the blocking call in a worker thread. + + Returns (text, model_hint). + + Rewritten using invoke (not ainvoke) for better compatibility. + """ + msg = [HumanMessage(content=prompt)] + + def _sync_call(): + # Prefer invoke() if available; fallback to calling the object directly. + if hasattr(llm, "invoke"): + return llm.invoke(msg) + return llm(msg) + + try: + resp = await asyncio.to_thread(_sync_call) + except Exception as e: + raise RuntimeError(f"LLM invocation failed: {e}") from e + + text = extract_text(resp) + model_hint = extract_model_hint(resp) + return text, model_hint diff --git a/ai/gen-ai-agents/code-quality-agent/agent/fs_ro.py b/ai/gen-ai-agents/code-quality-agent/agent/fs_ro.py new file mode 100644 index 000000000..3e4130843 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/fs_ro.py @@ -0,0 +1,89 @@ +""" +File name: fs_ro.py +Author: Luigi Saetta +Date last modified: 2026-01-07 +Python Version: 3.11 + +License: + MIT + +Description: + Read-only, sandboxed filesystem access to a root folder and its subfolders. + Prevents path traversal and forbids access outside the configured root. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + +from agent.config import FILES_PATTERN + +# max file size to read +MAX_BYTES = 2_000_000 + + +# for listing any file (not only python) +ALL_FILES_PATTERN = "*" + + +class SandboxViolation(Exception): + """Raised when attempting to access paths outside the configured sandbox root.""" + + +@dataclass(frozen=True) +class ReadOnlySandboxFS: + root_dir: Path + + def __post_init__(self) -> None: + root = self.root_dir.expanduser().resolve(strict=True) + object.__setattr__(self, "root_dir", root) + + def _resolve_under_root(self, path: Path) -> Path: + # Join relative paths to root; allow absolute paths only if still under root. + candidate = (self.root_dir / path) if not path.is_absolute() else path + resolved = candidate.expanduser().resolve(strict=False) + + # Python 3.11: Path.is_relative_to exists + if not resolved.is_relative_to(self.root_dir): + raise SandboxViolation(f"Access outside sandbox is forbidden: {resolved}") + return resolved + + def list_source_files(self) -> list[Path]: + """ + Return absolute Paths for all files under root matching pattern (recursive). + + For now we support only .py files. + """ + return sorted(self.root_dir.rglob(FILES_PATTERN)) + + def list_all_files(self) -> list[Path]: + """ + Return absolute Paths for all files under root (recursive). + """ + return sorted( + [p for p in self.root_dir.rglob(ALL_FILES_PATTERN) if p.is_file()] + ) + + def read_text( + self, rel_or_abs_path: str | Path, *, max_bytes: int = MAX_BYTES + ) -> str: + """Read a file as UTF-8 text (best-effort).""" + p = self._resolve_under_root(Path(rel_or_abs_path)) + if not p.exists() or not p.is_file(): + raise FileNotFoundError(str(p)) + + # Guardrail for huge files + size = p.stat().st_size + if size > max_bytes: + raise ValueError(f"File too large ({size} bytes). Refusing to read: {p}") + + data = p.read_bytes() + return data.decode("utf-8", errors="replace") + + def relpath(self, abs_path: Path) -> Path: + """Convert an absolute path under root to a relative path.""" + abs_resolved = abs_path.expanduser().resolve(strict=False) + if not abs_resolved.is_relative_to(self.root_dir): + raise SandboxViolation(f"Not under sandbox root: {abs_resolved}") + return abs_resolved.relative_to(self.root_dir) diff --git a/ai/gen-ai-agents/code-quality-agent/agent/gitignore_utils.py b/ai/gen-ai-agents/code-quality-agent/agent/gitignore_utils.py new file mode 100644 index 000000000..974113b50 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/gitignore_utils.py @@ -0,0 +1,86 @@ +""" +File name: gitignore_utils.py +Author: Unknown +Date last modified: 2026-01-12 +Python Version: 3.11 +License: MIT + +Description: + Minimal .gitignore matcher (best-effort, no external deps). + + Supports: + - blank lines / comments + - negation: !pattern + - directory patterns ending with / + - simple globbing via fnmatch + - patterns are evaluated against repo-relative POSIX paths + + Limitations: + - does not fully implement gitignore spec (e.g., anchored patterns, **, etc.) + - good enough for policy gating (downgrade FAIL->WARN) but not for exact git behavior +""" + +from __future__ import annotations + +from dataclasses import dataclass +from fnmatch import fnmatch +from pathlib import Path + +from agent.utils import get_console_logger + +logger = get_console_logger() + + +@dataclass(frozen=True) +class GitIgnoreRule: + pattern: str + negated: bool + is_dir: bool + + +def parse_gitignore(text: str) -> list[GitIgnoreRule]: + rules: list[GitIgnoreRule] = [] + + logger.info("Scanning .gitignore...") + + for raw in text.splitlines(): + line = raw.strip() + if not line or line.startswith("#"): + continue + neg = line.startswith("!") + if neg: + line = line[1:].strip() + if not line: + continue + + is_dir = line.endswith("/") + pat = line[:-1] if is_dir else line + + # Normalize to POSIX-like + pat = pat.replace("\\", "/") + + rules.append(GitIgnoreRule(pattern=pat, negated=neg, is_dir=is_dir)) + return rules + + +def _match_rule(rel_posix: str, rule: GitIgnoreRule) -> bool: + # Directory rule: match if path is under that directory + if rule.is_dir: + prefix = rule.pattern.rstrip("/") + "/" + return rel_posix.startswith(prefix) + + # Glob match against full rel path OR basename + return fnmatch(rel_posix, rule.pattern) or fnmatch( + Path(rel_posix).name, rule.pattern + ) + + +def is_ignored(rel_posix: str, rules: list[GitIgnoreRule]) -> bool: + """ + Apply rules in order (like git): last match wins; negation un-ignores. + """ + ignored = False + for r in rules: + if _match_rule(rel_posix, r): + ignored = not r.negated + return ignored diff --git a/ai/gen-ai-agents/code-quality-agent/agent/graph_agent.py b/ai/gen-ai-agents/code-quality-agent/agent/graph_agent.py new file mode 100644 index 000000000..7f93c73bc --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/graph_agent.py @@ -0,0 +1,725 @@ +""" +File name: graph_agent.py +Author: Luigi Saetta +Date last modified: 2026-01-12 +Python Version: 3.11 + +License: + MIT + +Description: + LangGraph agent that runs a pipeline over local Python files (read-only access), + producing outputs elsewhere. + +Usage: + from agent.graph_agent import build_graph, run_agent + + graph = build_graph() + result = await run_agent(graph, root_dir="...", out_dir="...", request="...") +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from dataclasses import dataclass, field +from typing import Any +from fnmatch import fnmatch + +from langgraph.graph import StateGraph, END +from langchain_core.runnables import RunnableConfig + +from agent.fs_ro import ReadOnlySandboxFS +from agent.header_rules import check_header +from agent.secrets_scan import scan_for_secrets +from agent.docgen import generate_doc_for_file +from agent.docgen_utils import call_llm_normalized +from agent.oci_models import get_llm +from agent.docgen_prompt import REPORT_PROMPT +from agent.license_check import check_license +from agent.pii_scan import scan_for_pii +from agent.header_fix import generate_header_snippet +from agent.requirements_check import check_requirements_at_root +from agent.gitignore_utils import parse_gitignore, is_ignored + +from agent.config import ACCEPTED_LICENSE_TYPES, EXCLUDED_PATHS + +from agent.utils import get_console_logger + +from agent.dep_license_check import check_dependency_licenses +from agent.config import ( + ACCEPTED_DEP_LICENSES, + FAIL_ON_UNKNOWN_DEP_LICENSE, + FAIL_ON_NOT_INSTALLED_DEP, + LLM_MODEL_ID, + ENABLE_DOC_GENERATION, +) + + +logger = get_console_logger() + + +# ---- Helpers ---- +def get_config_value( + config: RunnableConfig | None, + key: str, + default: Any = None, +) -> Any: + if not config: + return default + configurable = config.get("configurable") + if not configurable: + return default + return configurable.get(key, default) + +def _is_excluded(relpath: str) -> bool: + """ + Check if a given repo-relative path matches any of the excluded patterns. + """ + posix = relpath.replace("\\", "/") + return any(fnmatch(posix, pat) for pat in EXCLUDED_PATHS) + +# ---- State ---- +@dataclass +class AgentState: + request: str + root_dir: str + out_dir: str + + file_list: list[str] = field(default_factory=list) + + header_issues: dict[str, str] = field(default_factory=dict) # relpath -> message + secrets: dict[str, list[dict[str, Any]]] = field( + default_factory=dict + ) # relpath -> findings + docs: dict[str, str] = field(default_factory=dict) # relpath -> doc out path + + summary: str = "" + + license_ok: bool = True + license_info: dict[str, Any] = field(default_factory=dict) # details of check + + # PII + # relpath -> findings + pii_findings: dict[str, list[dict[str, Any]]] = field(default_factory=dict) + pii_failures: dict[str, list[dict[str, Any]]] = field( + default_factory=dict + ) # subset severity=fail + pii_warnings: dict[str, list[dict[str, Any]]] = field( + default_factory=dict + ) # subset severity=warn + + header_fixes: dict[str, str] = field( + default_factory=dict + ) # relpath -> header snippet file path + + # to check library licenses + requirements_ok: bool = True + requirements_info: dict[str, Any] = field(default_factory=dict) + + dep_license_ok: bool = True + dep_licenses: list[dict[str, Any]] = field(default_factory=list) + dep_license_failures: list[dict[str, Any]] = field(default_factory=list) + dep_license_warnings: list[dict[str, Any]] = field(default_factory=list) + + # .gitignore + ignored_paths: set[str] = field(default_factory=set) # repo-relative posix paths + + # Secrets split + secrets_failures: dict[str, list[dict[str, Any]]] = field(default_factory=dict) + secrets_warnings: dict[str, list[dict[str, Any]]] = field(default_factory=dict) + + +# ---- Nodes ---- + + +def node_discover_files(state: AgentState) -> AgentState: + """ + Discover all source files under the root directory. + + Modified to use ReadOnlySandboxFS. + """ + fs = ReadOnlySandboxFS(Path(state.root_dir)) + source_files = fs.list_source_files() + + # apply any exclusions + files: list[str] = [] + for p in source_files: + rel = str(fs.relpath(p)) + if _is_excluded(rel): + continue + files.append(rel) + + state.file_list = files + + logger.info("") + logger.info("Discovered %d source files.", len(state.file_list)) + + for f_name in state.file_list: + logger.info(" - %s", f_name) + + logger.info("") + + return state + + +def node_check_headers(state: AgentState) -> AgentState: + fs = ReadOnlySandboxFS(Path(state.root_dir)) + issues: dict[str, str] = {} + + for rel in state.file_list: + + logger.info("Checking headers for: %s...", rel) + + src = fs.read_text(rel) + res = check_header(src, path=fs._resolve_under_root(Path(rel))) + if not res.ok: + issues[rel] = res.message + + state.header_issues = issues + return state + + +def node_scan_secrets(state: AgentState) -> AgentState: + """ + Scan files for secrets using predefined patterns. + Modified to use ReadOnlySandboxFS. + It distinguishes between ignored files (warnings) and others (failures). + """ + fs = ReadOnlySandboxFS(Path(state.root_dir)) + failures: dict[str, list[dict[str, Any]]] = {} + warnings: dict[str, list[dict[str, Any]]] = {} + + ignored = getattr(state, "ignored_paths", set()) or set() + + for rel in state.file_list: + logger.info("Scanning secrets for: %s...", rel) + + src = fs.read_text(rel) + findings = scan_for_secrets(src) + if not findings: + continue + + payload = [ + {"kind": f.kind, "line": f.line, "excerpt": f.excerpt} for f in findings + ] + + # DOWNGRADE: if file is ignored, treat as warning (still report) + if rel.replace("\\", "/") in ignored: + warnings[rel] = payload + else: + failures[rel] = payload + + # Keep legacy state.secrets if you want, but prefer split + state.secrets_failures = failures + state.secrets_warnings = warnings + state.secrets = failures # optional: keep old behavior for other code paths + + return state + + +def node_scan_pii(state: AgentState) -> AgentState: + """ + Scan files for PII using predefined patterns. + Modified to use ReadOnlySandboxFS. + It distinguishes between ignored files (warnings) and others (failures). + """ + fs = ReadOnlySandboxFS(Path(state.root_dir)) + + all_findings: dict[str, list[dict[str, Any]]] = {} + failures: dict[str, list[dict[str, Any]]] = {} + warnings: dict[str, list[dict[str, Any]]] = {} + + ignored = getattr(state, "ignored_paths", set()) or set() + + for rel in state.file_list: + logger.info("Scanning PII for: %s...", rel) + src = fs.read_text(rel) + + found = scan_for_pii(src) + if not found: + continue + + payload = [ + { + "kind": f.kind, + "severity": f.severity, + "line": f.line, + "excerpt": f.excerpt, # already masked + "confidence": f.confidence, + } + for f in found + ] + all_findings[rel] = payload + + rel_posix = rel.replace("\\", "/") + is_ign = rel_posix in ignored + + # DOWNGRADE: any "fail" in ignored files becomes "warn" + for p in payload: + sev = p["severity"] + if is_ign and sev == "fail": + p = dict(p) + p["severity"] = "warn" + p["confidence"] = "low" # optional: signal downgraded severity + warnings.setdefault(rel, []).append(p) + elif sev == "fail": + failures.setdefault(rel, []).append(p) + else: + warnings.setdefault(rel, []).append(p) + + state.pii_findings = all_findings + state.pii_failures = failures + state.pii_warnings = warnings + return state + + +async def node_generate_docs( + state: AgentState, *, config: RunnableConfig +) -> AgentState: + if not ENABLE_DOC_GENERATION: + # doc generation disabled + logger.info("Document generation is disabled. Skipping this step.") + return state + + fs = ReadOnlySandboxFS(Path(state.root_dir)) + + # get model_id from config + model_id = get_config_value(config, "model_id") + + llm = get_llm(model_id=model_id) + out_dir = Path(state.out_dir).expanduser().resolve() + + docs: dict[str, str] = {} + + for rel in state.file_list: + + # added this try-except to avoid stopping the whole process if one file fails + # one situation where it fails is where the file contains secret info + # that the LLM refuses to process + try: + logger.info("Generating doc for: %s...", rel) + + src = fs.read_text(rel) + res = await generate_doc_for_file( + llm=llm, + relpath=Path(rel), + source=src, + out_dir=out_dir, + # ✅ NEW: now docgen uses the request + request=state.request, + ) + docs[rel] = str(res.out_path) + except Exception as e: + logger.error("Doc generation failed for %s: %s", rel, e) + docs[rel] = "" + + state.docs = docs + return state + + +def node_check_license(state: AgentState) -> AgentState: + """ + Check that a license file exists and the license type is accepted. + """ + fs = ReadOnlySandboxFS(Path(state.root_dir)) + + # We want a list of all repo files (not just python source files). + # If your ReadOnlySandboxFS doesn't expose this, see note below. + def _list_all_files() -> list[str]: + return [str(fs.relpath(p)).replace("\\", "/") for p in fs.list_all_files()] + + res = check_license( + list_files=_list_all_files, + read_text=fs.read_text, + accepted_types=ACCEPTED_LICENSE_TYPES, + ) + + state.license_ok = res.ok + state.license_info = { + "ok": res.ok, + "found_file": res.found_file, + "detected_type": res.detected_type, + "message": res.message, + } + + if res.ok: + logger.info("License check OK: %s", res.message) + else: + logger.warning("License check FAILED: %s", res.message) + + return state + + +async def node_generate_header_fixes( + state: AgentState, *, config: RunnableConfig +) -> AgentState: + if not state.header_issues: + return state + + fs = ReadOnlySandboxFS(Path(state.root_dir)) + + model_id = get_config_value(config, "model_id") + llm = get_llm(model_id=model_id) + + out_dir = Path(state.out_dir).expanduser().resolve() + fixes_dir = out_dir / "header_fixes" + fixes_dir.mkdir(parents=True, exist_ok=True) + + fixes: dict[str, str] = {} + + for rel in state.header_issues.keys(): + logger.info("Generating header snippet for: %s...", rel) + + try: + detected_license = (getattr(state, "license_info", {}) or {}).get( + "detected_type" + ) or "Unknown" + + src = fs.read_text(rel) + + header = await generate_header_snippet( + llm=llm, + relpath=Path(rel), + source=src, + author="Unknown", + license_hint=detected_license, + pyver="3.11", + ) + + # Create a mirrored directory structure under header_fixes + target = fixes_dir / (Path(rel).as_posix() + ".header.py") + target.parent.mkdir(parents=True, exist_ok=True) + + # File contains ONLY the header docstring + target.write_text(header, encoding="utf-8") + + fixes[rel] = str(target) + + except Exception as e: + logger.error("Header snippet generation failed for %s: %s", rel, e) + fixes[rel] = "" + + state.header_fixes = fixes + return state + + +def node_check_requirements(state: AgentState) -> AgentState: + """ + Check whether requirements.txt exists at repo root. + Modified to use ReadOnlySandboxFS. + """ + fs = ReadOnlySandboxFS(Path(state.root_dir)) + repo_root = Path(state.root_dir).expanduser().resolve() + + res = check_requirements_at_root(repo_root=repo_root, fs=fs) + + state.requirements_ok = res.ok + state.requirements_info = { + "ok": res.ok, + "relpath": res.relpath, + "message": res.message, + "preview": res.preview, + } + + if res.ok: + logger.info("Requirements check OK: %s", res.message) + else: + logger.warning("Requirements check FAILED: %s", res.message) + + return state + + +def node_check_dep_licenses(state: AgentState) -> AgentState: + """ + Check licenses of direct dependencies from requirements.txt. + Requires that dependencies are installed in the runtime environment to be accurate. + """ + fs = ReadOnlySandboxFS(Path(state.root_dir)) + + # If requirements missing, we cannot proceed reliably + req_info = getattr(state, "requirements_info", {}) or {} + if not req_info.get("ok", True): + state.dep_license_ok = ( + True # don't fail the run, but warn in report via requirements_info + ) + state.dep_licenses = [] + state.dep_license_failures = [] + state.dep_license_warnings = [] + return state + + req_text = fs.read_text("requirements.txt") + + res = check_dependency_licenses( + requirements_text=req_text, + accepted_licenses=set(ACCEPTED_DEP_LICENSES), + fail_on_unknown=bool(FAIL_ON_UNKNOWN_DEP_LICENSE), + fail_on_not_installed=bool(FAIL_ON_NOT_INSTALLED_DEP), + ) + + # Store as JSON-serializable dicts + state.dep_license_ok = res.ok + state.dep_licenses = [ + { + "requirement": d.requirement, + "distribution": d.distribution, + "version": d.version, + "license": d.license, + "source": d.source, + } + for d in res.deps + ] + state.dep_license_failures = [ + { + "requirement": d.requirement, + "distribution": d.distribution, + "version": d.version, + "license": d.license, + "source": d.source, + } + for d in res.failures + ] + state.dep_license_warnings = [ + { + "requirement": d.requirement, + "distribution": d.distribution, + "version": d.version, + "license": d.license, + "source": d.source, + } + for d in res.warnings + ] + + if res.ok: + logger.info("Dependency license check OK. %s", res.message) + else: + logger.warning("Dependency license check FAILED. %s", res.message) + + return state + + +def node_load_gitignore(state: AgentState) -> AgentState: + """ + Load and parse .gitignore from repo root, determine ignored paths. + Modified to use ReadOnlySandboxFS. + """ + fs = ReadOnlySandboxFS(Path(state.root_dir)) + + try: + gi = fs.read_text(".gitignore") + except FileNotFoundError: + state.ignored_paths = set() + logger.info("No .gitignore found at repo root.") + return state + + rules = parse_gitignore(gi) + + # Use list_all_files to know which repo paths exist + all_files = [str(fs.relpath(p)).replace("\\", "/") for p in fs.list_all_files()] + + ignored = {p for p in all_files if is_ignored(p, rules)} + state.ignored_paths = ignored + + logger.info("Loaded .gitignore: %d ignored files.", len(ignored)) + for p in ignored: + logger.info(" - %s", p) + + return state + + +async def node_finalize(state: AgentState, *, config: RunnableConfig) -> AgentState: + """ + Finalize run: + - compute deterministic PASS/WARN/FAIL outcome + - generate LLM report (markdown) using REPORT_PROMPT + - write report to out_dir/report_.md + + Notes: + - Secrets/PII found in .gitignore files are downgraded to WARN (if you implemented that logic + in the scanning nodes and stored them into secrets_warnings / pii_warnings accordingly). + """ + + # ---- Helper accessors (avoid None surprises) ---- + header_issues = getattr(state, "header_issues", {}) or {} + + secrets_failures = getattr(state, "secrets_failures", {}) or {} + secrets_warnings = getattr(state, "secrets_warnings", {}) or {} + + pii_failures = getattr(state, "pii_failures", {}) or {} + pii_warnings = getattr(state, "pii_warnings", {}) or {} + + license_ok = getattr(state, "license_ok", True) + license_info = getattr(state, "license_info", {}) or {} + + requirements_ok = getattr(state, "requirements_ok", True) + requirements_info = getattr(state, "requirements_info", {}) or {} + req_status = "OK" if requirements_ok else "MISSING" + + dep_failures = getattr(state, "dep_license_failures", []) or [] + dep_warnings = getattr(state, "dep_license_warnings", []) or [] + + docs = getattr(state, "docs", {}) or {} + + # ---- Counts ---- + hard_pii_count = sum(len(v) for v in pii_failures.values()) + warn_pii_count = sum(len(v) for v in pii_warnings.values()) + + secrets_fail_files = len(secrets_failures) + secrets_warn_files = len(secrets_warnings) + + # ---- Determine deterministic outcome ---- + fail_reasons: list[str] = [] + warn_reasons: list[str] = [] + + # FAIL conditions + if secrets_fail_files > 0: + fail_reasons.append("Secrets detected (non-ignored files)") + + if hard_pii_count > 0: + fail_reasons.append("PII hard failures detected (non-ignored files)") + + if not license_ok: + fail_reasons.append("Repository license check failed") + + if len(dep_failures) > 0: + fail_reasons.append("Dependency license failures") + + # WARN conditions (only if not FAIL) + if secrets_warn_files > 0: + warn_reasons.append("Secrets detected in .gitignore files (downgraded to WARN)") + + if warn_pii_count > 0: + warn_reasons.append( + "PII warnings (includes downgraded findings from .gitignore files)" + ) + + if not requirements_ok: + warn_reasons.append( + "requirements.txt missing at repository root (dependency checks incomplete)" + ) + + if len(dep_warnings) > 0: + warn_reasons.append( + "Dependency license warnings (UNKNOWN/NOT_INSTALLED/ambiguous)" + ) + + if fail_reasons: + overall = "FAIL" + elif warn_reasons: + overall = "WARN" + else: + overall = "PASS" + + # ---- Summary string (human readable) ---- + state.summary = ( + f"Outcome: {overall}\n" + f"Processed {len(state.file_list)} files.\n" + f"Repository license: {'OK' if license_ok else 'FAILED'}\n" + f"requirements.txt at root: {req_status}\n" + f"Dependency licenses: failures={len(dep_failures)}, warnings={len(dep_warnings)}\n" + f"Header issues: {len(header_issues)} files.\n" + f"Secrets: FAIL files={secrets_fail_files}, WARN files={secrets_warn_files}\n" + f"PII hard failures: {hard_pii_count} findings in {len(pii_failures)} files.\n" + f"PII warnings: {warn_pii_count} findings in {len(pii_warnings)} files.\n" + f"Docs generated: {len(docs)} files.\n" + f"Output dir: {state.out_dir}\n" + ) + if fail_reasons: + state.summary += "Fail reasons: " + "; ".join(fail_reasons) + "\n" + elif warn_reasons: + state.summary += "Warn reasons: " + "; ".join(warn_reasons) + "\n" + + # ---- Generate report via LLM ---- + model_id = get_config_value(config, "model_id") + llm = get_llm(model_id=model_id) + + now_iso = datetime.now(timezone.utc).isoformat(timespec="minutes") + + # For REPORT_PROMPT: keep backward compatibility with your existing placeholder name `secret_issues` + # by passing only the FAIL set there (policy-critical), and optionally include warnings in requirements_check. + prompt = REPORT_PROMPT.format( + root_dir=state.root_dir, + now_datetime=now_iso, + num_files=len(state.file_list), + header_issues=header_issues, + secret_issues=secrets_failures, # only non-ignored failures + license_check=license_info, + dep_license_failures=dep_failures, + dep_license_warnings=dep_warnings, + pii_hard_failures=pii_failures, + pii_warnings=pii_warnings, + requirements_check={ + **requirements_info, + # include extra detail for the report without changing your state model + "requirements_status": req_status, + "secrets_warnings_ignored_files": secrets_warnings, + }, + ) + + text, _ = await call_llm_normalized(llm, prompt) + + logger.info("") + logger.info("Final report: %s", text) + + # ---- Save to file ---- + current_day = now_iso[:10] + out_dir = Path(state.out_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + out_path = out_dir / f"report_{current_day}.md" + out_path.write_text(text.rstrip() + "\n", encoding="utf-8") + + return state + + +# ---- Graph ---- + + +def build_graph(): + g = StateGraph(AgentState) + + g.add_node("discover_files", node_discover_files) + + # sequentially here we process all the files discovered + g.add_node("load_gitignore", node_load_gitignore) + g.add_node("check_headers", node_check_headers) + g.add_node("scan_secrets", node_scan_secrets) + g.add_node("check_license", node_check_license) + g.add_node("scan_pii", node_scan_pii) + g.add_node("check_requirements", node_check_requirements) + g.add_node("check_dep_licenses", node_check_dep_licenses) + g.add_node("generate_header_fixes", node_generate_header_fixes) + g.add_node("generate_docs", node_generate_docs) + g.add_node("finalize", node_finalize) + + g.set_entry_point("discover_files") + g.add_edge("discover_files", "load_gitignore") + g.add_edge("load_gitignore", "check_requirements") + g.add_edge("check_requirements", "check_license") + g.add_edge("check_license", "check_dep_licenses") + g.add_edge("check_dep_licenses", "check_headers") + g.add_edge("check_headers", "generate_header_fixes") + g.add_edge("generate_header_fixes", "scan_secrets") + g.add_edge("scan_secrets", "scan_pii") + g.add_edge("scan_pii", "generate_docs") + g.add_edge("generate_docs", "finalize") + g.add_edge("finalize", END) + + return g.compile() + + +async def run_agent(graph, *, root_dir: str, out_dir: str, request: str) -> AgentState: + # here we define the initial state + state = AgentState(request=request, root_dir=root_dir, out_dir=out_dir) + + # here we define the config for the run of the agent + cfg = {"configurable": {"model_id": LLM_MODEL_ID}} + + logger.info("") + logger.info("Running agent with config: %s...", cfg) + logger.info("") + + # LangGraph returns the final state + final_state = await graph.ainvoke(state, config=cfg) + + return final_state diff --git a/ai/gen-ai-agents/code-quality-agent/agent/header_fix.py b/ai/gen-ai-agents/code-quality-agent/agent/header_fix.py new file mode 100644 index 000000000..2c6c79dfd --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/header_fix.py @@ -0,0 +1,182 @@ +""" +File name: header_fix.py +Author: L. Saetta +Date last modified: 2026-01-12 +Python Version: 3.11 +License: MIT + +Description: + Generate compliant header docstrings for files that fail header checks. + Outputs patch text (unified diff) to apply externally (repo is read-only). +""" + +from __future__ import annotations + +import ast +from datetime import date +from pathlib import Path +from typing import Any + +from agent.docgen_utils import call_llm_normalized +from agent.utils import get_console_logger +from agent.header_rules import REQUIRED_KEYS, HEADER_TEMPLATE +from agent.config import PYTHON_VERSION + +# +# The template for the Header is in agent/header_rules.py +# + +REQUIRED_HEADER_FIELDS = REQUIRED_KEYS + +DESC_GEN_PROMPT = """ +You are a senior Python engineer. + +Task: write 1-3 short bullet points describing what this Python module does. +Return ONLY the bullet points (each starting with "- "). +Do NOT include secrets or PII. Do NOT quote code. Use generic wording. + +Module path: {relpath} + +Module structure: +- Classes: {classes} +- Functions: {functions} +- Imports: {imports} +""" + + +logger = get_console_logger() + + +def _format_description_block(lines: list[str]) -> str: + """ + Indent description lines consistently and keep them short. + """ + cleaned = [ln.strip() for ln in lines if ln.strip()] + if not cleaned: + cleaned = ["- Module description unavailable."] + + # Enforce max 3 lines (your policy) + cleaned = cleaned[:3] + + # Enforce bullet formatting + bullets = [] + for ln in cleaned: + if not ln.startswith("-"): + ln = "- " + ln + bullets.append(ln) + + # 4-space indentation for block + return "\n".join(" " + b for b in bullets) + + +def _render_header( + *, + relpath: Path, + author: str, + today: str, + pyver: str, + license_hint: str, + description_lines: list[str], +) -> str: + lic = license_hint if license_hint and license_hint != "Unknown" else "Unknown" + return ( + HEADER_TEMPLATE.format( + file_name=relpath.name, + author=author or "Unknown", + date_last_modified=today, + python_version=pyver, + license=lic, + description_block=_format_description_block(description_lines), + ).strip() + + "\n" + ) + + +def _extract_structure(source: str) -> dict[str, list[str]]: + try: + mod = ast.parse(source) + except SyntaxError: + return {"classes": [], "functions": [], "imports": []} + + classes = [n.name for n in mod.body if isinstance(n, ast.ClassDef)] + funcs = [n.name for n in mod.body if isinstance(n, ast.FunctionDef)] + + imports: list[str] = [] + for n in mod.body: + if isinstance(n, ast.Import): + imports.extend(a.name for a in n.names) + elif isinstance(n, ast.ImportFrom): + if n.module: + imports.append(n.module) + + # keep it short + return { + "classes": classes[:10], + "functions": funcs[:10], + "imports": imports[:10], + } + + +async def generate_header_snippet( + *, + llm: Any, + relpath: Path, + source: str, + author: str = "Unknown", + license_hint: str = "Unknown", + pyver: str = PYTHON_VERSION, +) -> str: + """ + Returns ONLY the header docstring text (with trailing newline). + """ + today = date.today().isoformat() + + # Default description (deterministic) if LLM fails + struct = _extract_structure(source) + fallback_desc = [] + + if struct["classes"]: + fallback_desc.append(f"Defines classes: {', '.join(struct['classes'][:5])}.") + if struct["functions"]: + fallback_desc.append( + f"Defines functions: {', '.join(struct['functions'][:5])}." + ) + if not fallback_desc: + fallback_desc.append("Provides supporting utilities for this project.") + + # LLM-generated description (optional, safer: only structure is sent) + description_lines: list[str] = fallback_desc + + try: + prompt = DESC_GEN_PROMPT.format( + relpath=str(relpath).replace("\\", "/"), + classes=", ".join(struct["classes"]) or "None", + functions=", ".join(struct["functions"]) or "None", + imports=", ".join(struct["imports"]) or "None", + ) + text, _ = await call_llm_normalized(llm, prompt) + + # Parse bullet lines + lines = [ln.strip() for ln in text.splitlines() if ln.strip()] + # Keep only bullets / normalize + bullets = [] + for ln in lines: + if not ln.startswith("-"): + ln = "- " + ln.lstrip("- ").strip() + bullets.append(ln) + + if bullets: + description_lines = bullets[:3] + + except Exception as e: + logger.warning("LLM description generation failed for %s: %s", relpath, e) + + # Render header using the fixed template + return _render_header( + relpath=relpath, + author=author or "Unknown", + today=today, + pyver=pyver, + license_hint=license_hint or "Unknown", + description_lines=description_lines, + ) diff --git a/ai/gen-ai-agents/code-quality-agent/agent/header_rules.py b/ai/gen-ai-agents/code-quality-agent/agent/header_rules.py new file mode 100644 index 000000000..c9ba66452 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/header_rules.py @@ -0,0 +1,121 @@ +""" +File name: header_rules.py +Author: Luigi Saetta +Date last modified: 2025-12-16 +Python Version: 3.11 + +License: + MIT + +Description: + Header template rules and a checker for Python source files. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from dataclasses import dataclass +from pathlib import Path +import re + + +# Simple, practical header requirements: +# - Must contain these keys in the first N lines +REQUIRED_KEYS = [ + "File name:", + "Author:", + "Date last modified:", + "Python Version:", + "Description:", + "License:", +] + +HEADER_TEMPLATE = '''""" +File name: {file_name} +Author: {author} +Date last modified: {date_last_modified} +Python Version: {python_version} +License: {license} + +Description: +{description_block} +""" +''' + +DATE_RE = re.compile(r"^Date last modified:\s*(.+)\s*$", re.MULTILINE) + + +@dataclass(frozen=True) +class HeaderCheckResult: + ok: bool + missing_keys: list[str] + message: str + date_mismatch: bool = False + + +def check_header( + source: str, *, path: Path | None = None, top_lines: int = 40 +) -> HeaderCheckResult: + head = "\n".join(source.splitlines()[:top_lines]) + missing = [k for k in REQUIRED_KEYS if k not in head] + + if missing: + return HeaderCheckResult( + ok=False, + missing_keys=missing, + message=f"Missing header keys in first {top_lines} lines: {missing}", + ) + + # Description sanity check + m = re.search(r"Description:\s*(.+)", head) + if not m or not m.group(1).strip(): + return HeaderCheckResult( + ok=False, + missing_keys=[], + message="Description field is present but empty.", + ) + + # Date alignment check (only if we got a path) + if path is not None: + hm = DATE_RE.search(head) + if not hm: + return HeaderCheckResult( + ok=False, + missing_keys=["Date last modified:"], + message="Date last modified field not found or not parseable.", + ) + + header_date_raw = hm.group(1).strip() + + # Accept either 'YYYY-MM-DD' or full ISO datetime; normalize to date + try: + if ( + len(header_date_raw) >= 10 + and header_date_raw[4] == "-" + and header_date_raw[7] == "-" + ): + header_day = header_date_raw[:10] # YYYY-MM-DD + else: + raise ValueError("Unsupported date format") + except Exception: + return HeaderCheckResult( + ok=False, + missing_keys=[], + message=f"Date last modified is not in YYYY-MM-DD (or ISO starting with it): {header_date_raw}", + ) + + file_day = ( + datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc) + .date() + .isoformat() + ) + + if header_day != file_day: + return HeaderCheckResult( + ok=False, + missing_keys=[], + message=f"Date last modified mismatch: header={header_day}, file_mtime_utc={file_day}", + date_mismatch=True, + ) + + return HeaderCheckResult(ok=True, missing_keys=[], message="Header looks OK.") diff --git a/ai/gen-ai-agents/code-quality-agent/agent/license_check.py b/ai/gen-ai-agents/code-quality-agent/agent/license_check.py new file mode 100644 index 000000000..d16c0ce13 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/license_check.py @@ -0,0 +1,145 @@ +""" +File name: license_check.py +Author: Luigi Saetta +Date last modified: 2025-12-15 +Python Version: 3.11 + +License: + MIT + +Description: + Checks that a license file is present and (best-effort) identifies its type, + then validates it against an allow-list. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from typing import Iterable, Optional + + +@dataclass(frozen=True) +class LicenseCheckResult: + ok: bool + found_file: str | None + detected_type: str | None + message: str + + +# Common license file names seen in repos +DEFAULT_LICENSE_FILENAMES = ( + "LICENSE", + "LICENSE.txt", + "LICENSE.md", + "COPYING", + "COPYING.txt", + "NOTICE", + "NOTICE.txt", +) + + +# Very lightweight heuristics (not perfect, but good enough for policy checks) +_LICENSE_PATTERNS: list[tuple[str, str]] = [ + ("MIT", r"\bMIT License\b"), + ("Apache-2.0", r"\bApache License\b.*\bVersion 2\.0\b"), + + # Universal Permissive License (UPL) v1.0 + ("UPL-1.0", r"\bThe\s+Universal\s+Permissive\s+License\s*\(UPL\)\s*,?\s*Version\s*1\.0\b"), + + ( + "BSD-3-Clause", + r"\bRedistribution and use in source and binary forms\b.*\bNeither the name\b", + ), + ("GPL-3.0", r"\bGNU GENERAL PUBLIC LICENSE\b.*\bVersion 3\b"), + ("GPL-2.0", r"\bGNU GENERAL PUBLIC LICENSE\b.*\bVersion 2\b"), +] + + +def _detect_license_type(text: str) -> Optional[str]: + t = text[:20000] # cap for speed + for lic, pat in _LICENSE_PATTERNS: + if re.search(pat, t, flags=re.IGNORECASE | re.DOTALL): + return lic + return None + + +def check_license( + *, + list_files: callable, + read_text: callable, + accepted_types: Iterable[str], + filenames: Iterable[str] = DEFAULT_LICENSE_FILENAMES, +) -> LicenseCheckResult: + """ + list_files(): () -> list[str] # repo-relative paths + read_text(relpath): -> str # file content + accepted_types: allow-list of license identifiers + filenames: license file name candidates + """ + accepted = set(accepted_types) + + # Normalize to posix-style strings + repo_files = [str(p) for p in list_files()] + + # Prefer top-level matches first + candidates: list[str] = [] + + for fn in filenames: + # Exact top-level + if fn in repo_files: + candidates.append(fn) + + if not candidates: + # Any depth (e.g., docs/LICENSE) + for rf in repo_files: + base = rf.rsplit("/", 1)[-1] + if base in set(filenames): + candidates.append(rf) + + if not candidates: + return LicenseCheckResult( + ok=False, + found_file=None, + detected_type=None, + message="No license file found (expected one of: " + + ", ".join(filenames) + + ").", + ) + + chosen = candidates[0] + try: + content = read_text(chosen) + except Exception as e: + return LicenseCheckResult( + ok=False, + found_file=chosen, + detected_type=None, + message=f"License file found at '{chosen}' but cannot be read: {e}", + ) + + detected = _detect_license_type(content) + + if detected is None: + # If you want: treat as failure (strict) or warning (lenient) + return LicenseCheckResult( + ok=False, + found_file=chosen, + detected_type=None, + message=f"License file '{chosen}' found, but license type could not be identified.", + ) + + if detected not in accepted: + return LicenseCheckResult( + ok=False, + found_file=chosen, + detected_type=detected, + message=f"License type '{detected}' detected in '{chosen}', but it is not in the accepted list: {sorted(accepted)}", + ) + + return LicenseCheckResult( + ok=True, + found_file=chosen, + detected_type=detected, + message=f"License OK: '{detected}' found in '{chosen}'.", + ) diff --git a/ai/gen-ai-agents/code-quality-agent/agent/oci_models.py b/ai/gen-ai-agents/code-quality-agent/agent/oci_models.py new file mode 100644 index 000000000..5e4739de0 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/oci_models.py @@ -0,0 +1,92 @@ +""" +File name: oci_models.py +Author: Luigi Saetta +Date last modified: 2026-01-13 +Python Version: 3.11 + +Description: + This module enables easy access to OCI GenAI LLM/Embeddings. + + +Usage: + Import this module into other scripts to use its functions. + Example: + from oci_models import get_llm + +License: + This code is released under the MIT License. + +Notes: + This is a part of a demo showing how to implement an advanced + RAG solution as a LangGraph agent. + + modified to support xAI and OpenAI models through Langchain + +Warnings: + This module is in development, may change in future versions. +""" + +# switched to the new OCI langchain integration +from langchain_oci import ChatOCIGenAI + +from agent.utils import get_console_logger +from agent.config import ( + DEBUG, + STREAMING, + AUTH, + SERVICE_ENDPOINT, + # used only for defaults + LLM_MODEL_ID, + TEMPERATURE, + MAX_TOKENS, +) +from agent.config_private import COMPARTMENT_ID + +logger = get_console_logger() + + +# for gpt5, since max tokens is not supported +MODELS_WITHOUT_KWARGS = { + "openai.gpt-oss-120b", + "openai.gpt-5", +} + + +def debug_llm(llm): + print("LLM class:", type(llm)) + for attr in ("model_name", "model", "openai_api_base", "base_url", "api_base"): + if hasattr(llm, attr): + print(f"{attr} =", getattr(llm, attr)) + for attr in ("openai_api_key", "api_key"): + if hasattr(llm, attr): + v = getattr(llm, attr) + print(f"{attr} present =", bool(v)) + + +def get_llm(model_id=LLM_MODEL_ID, temperature=TEMPERATURE, max_tokens=MAX_TOKENS): + """ + Initialize and return an instance of ChatOCIGenAI with the specified configuration. + + Returns: + ChatOCIGenAI: An instance of the OCI GenAI language model. + """ + if model_id not in MODELS_WITHOUT_KWARGS: + _model_kwargs = {"temperature": temperature, "max_tokens": max_tokens} + else: + # for some models (OpenAI search) you cannot set those params + _model_kwargs = None + + # old langchain fashion but based on langchain-oci + llm = ChatOCIGenAI( + auth_type=AUTH, + model_id=model_id, + service_endpoint=SERVICE_ENDPOINT, + compartment_id=COMPARTMENT_ID, + is_stream=STREAMING, + model_kwargs=_model_kwargs, + ) + + if DEBUG: + debug_llm(llm) + + return llm diff --git a/ai/gen-ai-agents/code-quality-agent/agent/pii_scan.py b/ai/gen-ai-agents/code-quality-agent/agent/pii_scan.py new file mode 100644 index 000000000..2b2f01629 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/pii_scan.py @@ -0,0 +1,294 @@ +""" +File name: pii_scan.py +Author: Luigi Saetta +Date last modified: 2026-01-08 +Python Version: 3.11 + +License: + MIT + +Description: + Deterministic PII scanner. + + Policy: + - HARD FAIL: direct identifiers (email, phone, IBAN, credit card, IT tax id) + - WARN: structured names/addresses (heuristic, conservative) + + All excerpts are masked to avoid leaking PII in logs/artifacts. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass + + +# ---------------------------- +# Findings +# ---------------------------- + + +@dataclass(frozen=True) +class PiiFinding: + kind: str # email | phone | iban | credit_card | tax_id_it | name_structured | address_structured + severity: str # "fail" | "warn" + line: int # 1-based + excerpt: str # masked snippet + confidence: str = "medium" # low | medium | high + + +# ---------------------------- +# Helpers +# ---------------------------- + +_EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE) + +# phone is tricky; keep it conservative but useful +_PHONE_RE = re.compile( + r""" + (? str: + def repl(m: re.Match) -> str: + val = m.group(0) + parts = val.split("@", 1) + user = parts[0] + dom = parts[1] + user_mask = (user[:2] + "***") if len(user) > 2 else "***" + dom_parts = dom.split(".") + dom_mask = "***." + dom_parts[-1] if len(dom_parts) >= 2 else "***" + return f"{user_mask}@{dom_mask}" + + return _EMAIL_RE.sub(repl, s) + + +def _mask_digits(s: str) -> str: + # Keep last 2 digits only + digits = re.sub(r"\D", "", s) + if len(digits) <= 2: + return "***" + return "***" + digits[-2:] + + +def _mask_iban(s: str) -> str: + # Keep country + last 2 chars + s2 = re.sub(r"\s+", "", s) + if len(s2) < 6: + return "IBAN:***" + return s2[:2].upper() + "**" + "***" + s2[-2:] + + +def _mask_taxid(s: str) -> str: + s2 = s.strip() + if len(s2) < 4: + return "***" + return s2[:2].upper() + "***" + s2[-2:].upper() + + +def _mask_excerpt(kind: str, excerpt: str) -> str: + if kind == "email": + return _mask_email(excerpt) + if kind == "iban": + return _mask_iban(excerpt) + if kind == "tax_id_it": + return _mask_taxid(excerpt) + if kind in ("credit_card", "phone"): + return _mask_digits(excerpt) + # For warn kinds, keep as-is but truncate + return excerpt[:120] + + +def _looks_fake(line: str) -> bool: + line = line.lower() + return any(h in line for h in _FAKE_HINTS) + + +def _luhn_ok(number: str) -> bool: + digits = [int(ch) for ch in number if ch.isdigit()] + if len(digits) < 13 or len(digits) > 19: + return False + checksum = 0 + parity = len(digits) % 2 + for i, d in enumerate(digits): + if i % 2 == parity: + d *= 2 + if d > 9: + d -= 9 + checksum += d + return checksum % 10 == 0 + + +# ---------------------------- +# Public API +# ---------------------------- + + +def scan_for_pii(text: str) -> list[PiiFinding]: + """ + Returns a list of masked findings. + Deterministic, line-based. + """ + findings: list[PiiFinding] = [] + lines = text.splitlines() + + for idx, line in enumerate(lines, start=1): + # Skip super-noisy lines that are clearly placeholders + fake = _looks_fake(line) + + # ---- HARD FAIL: email ---- + for m in _EMAIL_RE.finditer(line): + if fake: + continue + raw = m.group(0) + findings.append( + PiiFinding( + kind="email", + severity="fail", + line=idx, + excerpt=_mask_excerpt("email", raw), + confidence="high", + ) + ) + + # ---- HARD FAIL: IBAN ---- + for m in _IBAN_RE.finditer(line.replace(" ", "")): + if fake: + continue + raw = m.group(0) + findings.append( + PiiFinding( + kind="iban", + severity="fail", + line=idx, + excerpt=_mask_excerpt("iban", raw), + confidence="high", + ) + ) + + # ---- HARD FAIL: IT tax id (codice fiscale) ---- + for m in _CF_RE.finditer(line): + if fake: + continue + raw = m.group(0) + findings.append( + PiiFinding( + kind="tax_id_it", + severity="fail", + line=idx, + excerpt=_mask_excerpt("tax_id_it", raw), + confidence="medium", + ) + ) + + # ---- HARD FAIL: credit card (candidate + Luhn) ---- + for m in _CC_CANDIDATE_RE.finditer(line): + if fake: + continue + raw = m.group(0) + if _luhn_ok(raw): + findings.append( + PiiFinding( + kind="credit_card", + severity="fail", + line=idx, + excerpt=_mask_excerpt("credit_card", raw), + confidence="high", + ) + ) + + # ---- HARD FAIL: phone (conservative) ---- + # To avoid too many false positives, require >= 9 digits total + for m in _PHONE_RE.finditer(line): + raw = m.group(0) + digits = re.sub(r"\D", "", raw) + if len(digits) < 9: + continue + if fake: + continue + findings.append( + PiiFinding( + kind="phone", + severity="fail", + line=idx, + excerpt=_mask_excerpt("phone", raw), + confidence="medium", + ) + ) + + # ---- WARN: structured names / addresses ---- + # Conservative: only if explicitly structured with a label. + if _STRUCT_NAME_RE.search(line): + findings.append( + PiiFinding( + kind="name_structured", + severity="warn", + line=idx, + excerpt=_mask_excerpt("name_structured", line.strip()), + confidence="low", + ) + ) + if _STRUCT_ADDR_RE.search(line): + # Avoid flagging generic config lines like "address = localhost" + if not re.search( + r"\blocalhost\b|\b127\.0\.0\.1\b", line, flags=re.IGNORECASE + ): + findings.append( + PiiFinding( + kind="address_structured", + severity="warn", + line=idx, + excerpt=_mask_excerpt("address_structured", line.strip()), + confidence="low", + ) + ) + + # Deduplicate same kind+line+excerpt to reduce noise + uniq: dict[tuple[str, int, str, str], PiiFinding] = {} + for f in findings: + key = (f.kind, f.line, f.excerpt, f.severity) + uniq[key] = f + return list(uniq.values()) diff --git a/ai/gen-ai-agents/code-quality-agent/agent/requirements_check.py b/ai/gen-ai-agents/code-quality-agent/agent/requirements_check.py new file mode 100644 index 000000000..6ceca4d41 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/requirements_check.py @@ -0,0 +1,69 @@ +""" +File name: requirements_check.py +Author: L. Saetta +Date last modified: 2026-01-12 +Python Version: 3.11 +License: MIT + +Description: + Checks whether requirements.txt exists at repo root. + Optionally captures a short preview for reporting (no secrets/PII). +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass(frozen=True) +class RequirementsCheckResult: + ok: bool + relpath: str | None + message: str + preview: str = "" + + +def check_requirements_at_root(*, repo_root: Path, fs) -> RequirementsCheckResult: + """ + repo_root: absolute Path of repository root + fs: ReadOnlySandboxFS + + Returns: + ok=True if requirements.txt exists at root. + ok=False otherwise (strong warning). + """ + req_name = "requirements.txt" + + # use sandbox read to ensure we're under root + try: + text = fs.read_text(req_name) + # short preview (first ~30 non-empty lines), for reporting only + lines = [ln.rstrip() for ln in text.splitlines() if ln.strip()] + preview = "\n".join(lines[:30]) + return RequirementsCheckResult( + ok=True, + relpath=req_name, + message="requirements.txt found at repository root.", + preview=preview, + ) + except FileNotFoundError: + return RequirementsCheckResult( + ok=False, + relpath=None, + message=( + "⚠️ STRONG WARNING: requirements.txt NOT found at repository root. " + "Dependency license/compliance checks may be incomplete or impossible." + ), + preview="", + ) + except Exception as e: + return RequirementsCheckResult( + ok=False, + relpath=None, + message=( + "⚠️ STRONG WARNING: requirements.txt check failed. " + f"Could not read repository root requirements.txt: {e}" + ), + preview="", + ) diff --git a/ai/gen-ai-agents/code-quality-agent/agent/secrets_scan.py b/ai/gen-ai-agents/code-quality-agent/agent/secrets_scan.py new file mode 100644 index 000000000..e2b7bd8e4 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/secrets_scan.py @@ -0,0 +1,185 @@ +""" +File name: secrets_scan.py +Author: Luigi Saetta +Date last modified: 2025-12-15 +Python Version: 3.11 + +License: + MIT + +Description: + Basic secrets scanning (heuristic). This is not a full secret-scanner replacement. +""" + +from __future__ import annotations + +from dataclasses import dataclass +import re + + +SENSITIVE_NAME_RE = re.compile( + r"""(?ix) + \b( + pass(word|wd)? | + pwd | + secret | + token | + api[_-]?key | + client[_-]?secret | + private[_-]?key | + bearer | + auth | + session | + cookie | + oauth | + key + )\b + """ +) + +ASSIGNMENT_STR_RE = re.compile( + r"""(?x) + ^\s* + (?P[A-Za-z_][A-Za-z0-9_]*) + \s*=\s* + (?P['"])(?P.*?)(?P=quote) + \s*(?:\#.*)?$ + """ +) + +DICT_KV_STR_RE = re.compile( + r"""(?x) + ^\s* + (?P['"])(?P[^'"]+)(?P=keyquote) + \s*:\s* + (?P['"])(?P.*?)(?P=valquote) + \s*,?\s*(?:\#.*)?$ + """ +) + +PATTERNS: list[tuple[str, re.Pattern[str]]] = [ + ("AWS Access Key ID", re.compile(r"\bAKIA[0-9A-Z]{16}\b")), + ( + "AWS Secret (loose)", + re.compile( + r"(?i)\baws(.{0,20})?(secret|access)?.{0,20}['\"][A-Za-z0-9/+=]{30,}['\"]" + ), + ), + ("OCI OCID (resource id)", re.compile(r"\bocid1\.[a-z0-9._-]+\b", re.IGNORECASE)), + ("GitHub token", re.compile(r"\bghp_[A-Za-z0-9]{30,}\b")), + ("GitHub fine-grained token", re.compile(r"\bgithub_pat_[A-Za-z0-9_]{20,}\b")), + ( + "Private key block", + re.compile(r"-----BEGIN (?:RSA |EC |OPENSSH |)?PRIVATE KEY-----"), + ), + ( + "Bearer token header (loose)", + re.compile(r"(?i)\bAuthorization\s*:\s*Bearer\s+[A-Za-z0-9\-_\.=]{10,}"), + ), +] + +PLACEHOLDER_VALUES = { + "changeme", + "change_me", + "your_token_here", + "your-key-here", + "xxx", + "xxxx", + "dummy", + "placeholder", +} + + +@dataclass(frozen=True) +class SecretFinding: + kind: str + line: int + name_or_key: str + excerpt: str + + +def _redact_value(value: str) -> str: + v = value.strip() + if not v: + return "***" + if len(v) <= 4: + return "***" + return f"{v[0]}***{v[-1]}" + + +def _redact_line_keep_structure(line: str, value: str) -> str: + red = _redact_value(value) + return line.replace(value, red, 1) + + +def _is_probably_secret(name_or_key: str, value: str) -> bool: + v = value.strip() + if not v: + return False + if v.lower() in PLACEHOLDER_VALUES: + return False + + if SENSITIVE_NAME_RE.search(name_or_key): + return True + + if len(v) >= 20 and re.fullmatch(r"[A-Za-z0-9_\-\.=+/]+", v): + return True + + return False + + +def scan_for_secrets(source: str, *, max_findings: int = 200) -> list[SecretFinding]: + findings: list[SecretFinding] = [] + lines = source.splitlines() + + for i, line in enumerate(lines, start=1): + for kind, pat in PATTERNS: + m = pat.search(line) + if m: + matched = m.group(0) + redacted = line.replace(matched, _redact_value(matched), 1) + findings.append( + SecretFinding( + kind=kind, + line=i, + name_or_key="(pattern)", + excerpt=redacted.strip(), + ) + ) + if len(findings) >= max_findings: + return findings + + m = ASSIGNMENT_STR_RE.match(line) + if m: + name = m.group("name") + value = m.group("value") + if _is_probably_secret(name, value): + findings.append( + SecretFinding( + kind="Sensitive assignment", + line=i, + name_or_key=name, + excerpt=_redact_line_keep_structure(line, value).strip(), + ) + ) + if len(findings) >= max_findings: + return findings + continue + + m = DICT_KV_STR_RE.match(line) + if m: + key = m.group("key") + value = m.group("value") + if _is_probably_secret(key, value): + findings.append( + SecretFinding( + kind="Sensitive dict value", + line=i, + name_or_key=key, + excerpt=_redact_line_keep_structure(line, value).strip(), + ) + ) + if len(findings) >= max_findings: + return findings + + return findings diff --git a/ai/gen-ai-agents/code-quality-agent/agent/utils.py b/ai/gen-ai-agents/code-quality-agent/agent/utils.py new file mode 100644 index 000000000..01689437c --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/agent/utils.py @@ -0,0 +1,111 @@ +""" +File name: utils.py +Author: Luigi Saetta +Date last modified: 2025-03-31 +Python Version: 3.11 + +Description: + Utility functions here. + +Usage: + Import this module into other scripts to use its functions. + Example: + from utils import ... + +License: + This code is released under the MIT License. + +Description: + This is a part of a demo showing how to implement an advanced + RAG solution as a LangGraph agent. + +Warnings: + This module is in development, may change in future versions. +""" + +import os +import logging +import re +import json + + +def get_console_logger(name: str = "ConsoleLogger", level: str = "INFO"): + """ + To get a logger to print on console + """ + logger = logging.getLogger(name) + + # to avoid duplication of logging + if not logger.handlers: + logger.setLevel(level) + + handler = logging.StreamHandler() + handler.setLevel(logging.DEBUG) + + formatter = logging.Formatter("%(asctime)s - %(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + + logger.propagate = False + + return logger + + +def extract_text_triple_backticks(_text): + """ + Extracts all text enclosed between triple backticks (```) from a string. + + :param text: The input string to analyze. + :return: A list containing the texts found between triple backticks. + """ + logger = get_console_logger() + + # Uses (.*?) to capture text between backticks in a non-greedy way + pattern = r"```(.*?)```" + # re.DOTALL allows capturing multiline content + + try: + _result = [block.strip() for block in re.findall(pattern, _text, re.DOTALL)][0] + except Exception as e: + logger.info("no triple backtickes in extract_text_triple_backticks: %s", e) + + # try to be resilient, return the entire text + _result = _text + + return _result + + +def extract_json_from_text(text): + """ + Extracts JSON content from a given text and returns it as a Python dictionary. + + Args: + text (str): The input text containing JSON content. + + Returns: + dict: Parsed JSON data. + """ + try: + # Use regex to extract JSON content (contained between {}) + json_match = re.search(r"\{.*\}", text, re.DOTALL) + if json_match: + json_content = json_match.group(0) + return json.loads(json_content) + + # If no JSON content is found, raise an error + raise ValueError("No JSON content found in the text.") + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON format: {e}") from e + + +# for the loading utility +def remove_path_from_ref(ref_pathname): + """ + remove the path from source (ref) + """ + ref = ref_pathname + # check if / or \ is contained + if len(ref_pathname.split(os.sep)) > 0: + ref = ref_pathname.split(os.sep)[-1] + + return ref diff --git a/ai/gen-ai-agents/code-quality-agent/requirements.txt b/ai/gen-ai-agents/code-quality-agent/requirements.txt new file mode 100644 index 000000000..1204a6189 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/requirements.txt @@ -0,0 +1,59 @@ +aiohappyeyeballs==2.6.1 +aiohttp==3.13.3 +aiosignal==1.4.0 +annotated-types==0.7.0 +anyio==4.12.1 +attrs==25.4.0 +certifi==2026.1.4 +cffi==2.0.0 +charset-normalizer==3.4.4 +circuitbreaker==2.1.3 +cryptography==45.0.7 +distro==1.9.0 +frozenlist==1.8.0 +h11==0.16.0 +httpcore==1.0.9 +httpx==0.28.1 +idna==3.11 +jiter==0.12.0 +jsonpatch==1.33 +jsonpointer==3.0.0 +langchain==1.2.3 +langchain-core==1.2.6 +langchain-oci==0.2.1 +langchain-openai==1.1.7 +langgraph==1.0.5 +langgraph-checkpoint==3.0.1 +langgraph-prebuilt==1.0.5 +langgraph-sdk==0.3.1 +langsmith==0.6.2 +multidict==6.7.0 +oci==2.164.2 +oci-openai==1.0.0 +openai==2.14.0 +orjson==3.11.5 +ormsgpack==1.12.1 +packaging==25.0 +propcache==0.4.1 +pycparser==2.23 +pydantic==2.12.5 +pydantic_core==2.41.5 +pyOpenSSL==25.1.0 +python-dateutil==2.9.0.post0 +pytz==2025.2 +PyYAML==6.0.3 +regex==2025.11.3 +requests==2.32.5 +requests-toolbelt==1.0.0 +six==1.17.0 +sniffio==1.3.1 +tenacity==9.1.2 +tiktoken==0.12.0 +tqdm==4.67.1 +typing-inspection==0.4.2 +typing_extensions==4.15.0 +urllib3==2.6.3 +uuid_utils==0.13.0 +xxhash==3.6.0 +yarl==1.22.0 +zstandard==0.25.0 diff --git a/ai/gen-ai-agents/code-quality-agent/run_agent.py b/ai/gen-ai-agents/code-quality-agent/run_agent.py new file mode 100644 index 000000000..473dc0c06 --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/run_agent.py @@ -0,0 +1,76 @@ +""" +File name: run_agent.py +Author: L. Saetta +Date last modified: 2026-01-12 +Python Version: 3.11 +License: MIT + +Description: + Entry point to run the agent from command line. +""" + +import argparse +import asyncio +from agent.graph_agent import build_graph, run_agent +from agent.utils import get_console_logger + +logger = get_console_logger() + + +def main(): + """ + Main function to run the agent from command line. + Parses command line arguments and executes the agent graph. + """ + default_request = ( + "Check headers and scan secrets. If you identify any secrets, " + "report them in the risk section." + ) + + ap = argparse.ArgumentParser() + ap.add_argument( + "--root", + required=True, + help="Root directory containing python files (read-only).", + ) + ap.add_argument( + "--out", required=True, help="Output directory for generated artifacts." + ) + ap.add_argument( + "--request", + default=default_request, + help="User request text.", + ) + args = ap.parse_args() + + # here we build the graph + graph = build_graph() + + async def _run(): + st = await run_agent( + graph, root_dir=args.root, out_dir=args.out, request=args.request + ) + + print("") + print("\n=== AGENT SUMMARY ===") + print(st["summary"]) + + if st["header_issues"]: + print("\nHEADER ISSUES:") + for k, v in st["header_issues"].items(): + print(f"- {k}: {v}") + + if st["secrets"]: + print("\nSECRETS FOUND (review ASAP):") + for fpath, findings in st["secrets"].items(): + print(f"- {fpath}:") + for it in findings: + print(f" line {it['line']}: {it['kind']} | {it['excerpt']}") + + print("") + + asyncio.run(_run()) + + +if __name__ == "__main__": + main() diff --git a/ai/gen-ai-agents/code-quality-agent/run_agent.sh b/ai/gen-ai-agents/code-quality-agent/run_agent.sh new file mode 100755 index 000000000..b8a2be51b --- /dev/null +++ b/ai/gen-ai-agents/code-quality-agent/run_agent.sh @@ -0,0 +1,5 @@ +# Author: Luigi Saetta +# Date: 2026-01-11 +# hint: change root and out params +python run_agent.py --root /Users/lsaetta/Progetti/rag-agent-2026 --out ./out --request "Check headers and scan secrets. If you identify any secrets, report them in the risk section." +