diff --git a/cycode/__init__.py b/cycode/__init__.py index 4ce71ef1..63ae25e0 100644 --- a/cycode/__init__.py +++ b/cycode/__init__.py @@ -1 +1,8 @@ +import time as _time + +# Unix-epoch wall clock captured at the earliest possible moment of CLI +# startup. Sent as `scan_parameters.cli_start_time` so the server can compute +# end-to-end scan duration from the moment the user actually triggered it. +_BOOT_WALL: float = _time.time() + __version__ = '0.0.0' # DON'T TOUCH. Placeholder. Will be filled automatically on poetry build from Git Tag diff --git a/cycode/cli/app.py b/cycode/cli/app.py index 103e8b86..82f7f41b 100644 --- a/cycode/cli/app.py +++ b/cycode/cli/app.py @@ -1,3 +1,4 @@ +import importlib import logging import sys from typing import Annotated, Optional @@ -10,12 +11,7 @@ from typer.completion import install_callback, show_callback from cycode import __version__ -from cycode.cli.apps import ai_guardrails, ai_remediation, auth, configure, ignore, report, report_import, scan, status from cycode.cli.apps.api import get_platform_group - -if sys.version_info >= (3, 10): - from cycode.cli.apps import mcp - from cycode.cli.cli_types import OutputTypeOption from cycode.cli.consts import CLI_CONTEXT_SETTINGS from cycode.cli.printers import ConsolePrinter @@ -46,17 +42,88 @@ add_completion=False, # we add it manually to control the rich help panel ) -app.add_typer(ai_guardrails.app) -app.add_typer(ai_remediation.app) -app.add_typer(auth.app) -app.add_typer(configure.app) -app.add_typer(ignore.app) -app.add_typer(report.app) -app.add_typer(report_import.app) -app.add_typer(scan.app) -app.add_typer(status.app) +# Top-level subcommand → module providing its Typer app. Peeking at sys.argv +# lets us import only the invoked subapp on the hot path (e.g. +# `cycode ai-guardrails scan`), skipping ~300ms of unrelated imports. +_SUBAPP_MODULES: dict[str, str] = { + 'ai-guardrails': 'cycode.cli.apps.ai_guardrails', + 'ai-remediation': 'cycode.cli.apps.ai_remediation', + 'auth': 'cycode.cli.apps.auth', + 'configure': 'cycode.cli.apps.configure', + 'ignore': 'cycode.cli.apps.ignore', + 'report': 'cycode.cli.apps.report', + 'import': 'cycode.cli.apps.report_import', + 'scan': 'cycode.cli.apps.scan', + 'status': 'cycode.cli.apps.status', +} if sys.version_info >= (3, 10): - app.add_typer(mcp.app) + _SUBAPP_MODULES['mcp'] = 'cycode.cli.apps.mcp' + +# Aliases: alternate spellings that resolve to a primary subcommand key. +_SUBAPP_ALIASES: dict[str, str] = { + 'ai_remediation': 'ai-remediation', # backward-compat underscore form + 'version': 'status', +} + +# Root-level options that consume a following value; argv-peek must skip past +# both the option and its value when scanning for the first positional arg. +_ROOT_OPTS_WITH_VALUE = frozenset( + { + '--output', + '-o', + '--user-agent', + '--client-secret', + '--client-id', + '--id-token', + '--show-completion', + } +) + + +def _detect_invocation() -> tuple[Optional[str], Optional[str]]: + """Return (top-level-subapp, second-level-subcommand) parsed from sys.argv. + + Both values may be None: when no positional arg matches a known subapp, + or when the user only provided a top-level subcommand. + """ + positionals = [] + args = sys.argv[1:] + i = 0 + while i < len(args): + arg = args[i] + if arg in _ROOT_OPTS_WITH_VALUE: + i += 2 + elif arg.startswith('-'): + # Any flag form: short, long, --key=value, or '--' marker. Skip the token only. + i += 1 + else: + positionals.append(arg) + if len(positionals) >= 2: + break + i += 1 + subapp = positionals[0] if positionals else None + subapp = _SUBAPP_ALIASES.get(subapp, subapp) + if subapp not in _SUBAPP_MODULES: + return None, None + subcommand = positionals[1] if len(positionals) >= 2 else None + return subapp, subcommand + + +# Computed once at import; reused by lazy registration and the version-checker skip. +_INVOKED_SUBAPP, _INVOKED_SUBCOMMAND = _detect_invocation() + + +def _register_subapps(only: Optional[str]) -> None: + if only is not None: + app.add_typer(importlib.import_module(_SUBAPP_MODULES[only]).app) + return + # Cold path (--help, completion, unknown subcommand): load all modules so + # root help lists everything. Deduplicate since aliases share modules. + for module_path in dict.fromkeys(_SUBAPP_MODULES.values()): + app.add_typer(importlib.import_module(module_path).app) + + +_register_subapps(_INVOKED_SUBAPP) # Register the `platform` command group (dynamically built from the OpenAPI spec). # The group itself is constructed cheaply at import time; the spec is only fetched @@ -81,6 +148,12 @@ def _get_group_with_platform(app_typer: typer.Typer) -> click.Group: def check_latest_version_on_close(ctx: typer.Context) -> None: + # Skip on `cycode ai-guardrails scan` — it emits JSON to stdout, so an + # upgrade notice would corrupt the response. Human-driven sibling commands + # (install, uninstall, status, session-start) still get the notice. + if (_INVOKED_SUBAPP, _INVOKED_SUBCOMMAND) == ('ai-guardrails', 'scan'): + return + output = ctx.obj.get('output') # don't print anything if the output is JSON if output == OutputTypeOption.JSON: diff --git a/cycode/cli/apps/scan/code_scanner.py b/cycode/cli/apps/scan/code_scanner.py index 072e438e..dc3727e4 100644 --- a/cycode/cli/apps/scan/code_scanner.py +++ b/cycode/cli/apps/scan/code_scanner.py @@ -204,18 +204,21 @@ def _scan_batch_thread_func(batch: list[Document]) -> tuple[str, CliError, Local 'zip_file_size': zip_file_size, }, ) - report_scan_status( - cycode_client, - scan_type, - scan_id, - scan_completed, - relevant_detections_count, - detections_count, - len(batch), - zip_file_size, - command_scan_type, - error_message, - ) + # Sync flows already received the full result inline; only async flows + # need a separate status report to signal polling completion. + if not should_use_sync_flow: + report_scan_status( + cycode_client, + scan_type, + scan_id, + scan_completed, + relevant_detections_count, + detections_count, + len(batch), + zip_file_size, + command_scan_type, + error_message, + ) return scan_id, error, local_scan_result diff --git a/cycode/cli/apps/scan/scan_parameters.py b/cycode/cli/apps/scan/scan_parameters.py index 58754e86..f362d419 100644 --- a/cycode/cli/apps/scan/scan_parameters.py +++ b/cycode/cli/apps/scan/scan_parameters.py @@ -2,6 +2,7 @@ import typer +from cycode import _BOOT_WALL from cycode.cli.apps.scan.remote_url_resolver import get_remote_url_scan_parameter from cycode.cli.utils.scan_utils import generate_unique_scan_id from cycode.logger import get_logger @@ -17,6 +18,7 @@ def _get_default_scan_parameters(ctx: typer.Context) -> dict: 'license_compliance': ctx.obj.get('license-compliance'), 'command_type': ctx.info_name.replace('-', '_'), # save backward compatibility 'aggregation_id': str(generate_unique_scan_id()), + 'cli_start_time': _BOOT_WALL, } diff --git a/cycode/cli/apps/scan/scan_result.py b/cycode/cli/apps/scan/scan_result.py index 13fb8576..9fb1da1d 100644 --- a/cycode/cli/apps/scan/scan_result.py +++ b/cycode/cli/apps/scan/scan_result.py @@ -189,6 +189,10 @@ def enrich_scan_result_with_data_from_detection_rules( for detection in detections_per_file.detections: detection_rule_ids.add(detection.detection_rule_id) + if not detection_rule_ids: + logger.debug('No detections to enrich, skipping detection_rules fetch') + return + detection_rules = cycode_client.get_detection_rules(detection_rule_ids) detection_rules_by_id = {detection_rule.detection_rule_id: detection_rule for detection_rule in detection_rules} diff --git a/cycode/cyclient/base_token_auth_client.py b/cycode/cyclient/base_token_auth_client.py index 3f164836..ec315e7d 100644 --- a/cycode/cyclient/base_token_auth_client.py +++ b/cycode/cyclient/base_token_auth_client.py @@ -24,19 +24,10 @@ def __init__(self, client_id: str) -> None: self.client_id = client_id self._credentials_manager = CredentialsManager() - # load cached access token - access_token, expires_in, creator = self._credentials_manager.get_access_token() - - self._access_token = self._expires_in = None - expected_creator = self._create_jwt_creator() - if creator == expected_creator: - # we must be sure that cached access token is created using the same client id and client secret. - # because client id and client secret could be passed via command, via env vars or via config file. - # we must not use cached access token if client id or client secret was changed. - self._access_token = access_token - self._expires_in = arrow.get(expires_in) if expires_in else None - + self._access_token = None + self._expires_in = None self._lock = Lock() + self._load_token_from_disk() def get_access_token(self) -> str: with self._lock: @@ -51,8 +42,30 @@ def invalidate_access_token(self, in_storage: bool = False) -> None: self._credentials_manager.update_access_token(None, None, None) def refresh_access_token_if_needed(self) -> None: - if self._access_token is None or self._expires_in is None or arrow.utcnow() >= self._expires_in: - self.refresh_access_token() + if self._has_valid_token(): + return + # Re-check disk before doing the network refresh: another client instance + # in this process may have already refreshed and persisted a fresh token. + self._load_token_from_disk() + if self._has_valid_token(): + return + self.refresh_access_token() + + def _has_valid_token(self) -> bool: + return self._access_token is not None and self._expires_in is not None and arrow.utcnow() < self._expires_in + + def _load_token_from_disk(self) -> None: + access_token, expires_in, creator = self._credentials_manager.get_access_token() + expected_creator = self._create_jwt_creator() + # We must be sure that cached access token is created using the same client id and client secret. + # Because client id and client secret could be passed via command, via env vars or via config file. + # We must not use cached access token if client id or client secret was changed. + if creator == expected_creator and access_token: + self._access_token = access_token + self._expires_in = arrow.get(expires_in) if expires_in else None + else: + self._access_token = None + self._expires_in = None def refresh_access_token(self) -> None: auth_response = self._request_new_access_token() diff --git a/cycode/cyclient/cycode_client_base.py b/cycode/cyclient/cycode_client_base.py index 1aae7bcb..bde0e880 100644 --- a/cycode/cyclient/cycode_client_base.py +++ b/cycode/cyclient/cycode_client_base.py @@ -1,3 +1,4 @@ +import functools import os import platform import ssl @@ -39,16 +40,29 @@ def cert_verify(self, *args, **kwargs) -> None: conn.ca_certs = None +@functools.cache +def _get_session() -> requests.Session: + """Process-wide Session so TCP+TLS connections are reused across all API calls.""" + session = requests.Session() + # On Windows without an explicit CA bundle env var, fall back to the system + # trust store via a custom SSL context. + if platform.system() == 'Windows' and not ( + os.environ.get('REQUESTS_CA_BUNDLE') or os.environ.get('CURL_CA_BUNDLE') + ): + session.mount('https://', SystemStorageSslContext()) + return session + + def _get_request_function() -> Callable: - if os.environ.get('REQUESTS_CA_BUNDLE') or os.environ.get('CURL_CA_BUNDLE'): - return requests.request + return _get_session().request - if platform.system() != 'Windows': - return requests.request - session = requests.Session() - session.mount('https://', SystemStorageSslContext()) - return session.request +def _log_response(response: Response, url: str, hide_response_content_log: bool) -> None: + content = 'HIDDEN' if hide_response_content_log else response.text + logger.debug( + 'Receiving response, %s', + {'status_code': response.status_code, 'url': url, 'content': content}, + ) _REQUEST_ERRORS_TO_RETRY = ( @@ -182,12 +196,7 @@ def _send_multipart( response = _get_request_function()( method='post', url=url, data=tracker, headers=headers, timeout=self.timeout ) - - content = 'HIDDEN' if hide_response_content_log else response.text - logger.debug( - 'Receiving response, %s', - {'status_code': response.status_code, 'url': url, 'content': content}, - ) + _log_response(response, url, hide_response_content_log) response.raise_for_status() return response @@ -231,14 +240,8 @@ def _execute( try: headers = self.get_request_headers(headers, without_auth=without_auth) - request = _get_request_function() - response = request(method=method, url=url, timeout=timeout, headers=headers, **kwargs) - - content = 'HIDDEN' if hide_response_content_log else response.text - logger.debug( - 'Receiving response, %s', - {'status_code': response.status_code, 'url': url, 'content': content}, - ) + response = _get_request_function()(method=method, url=url, timeout=timeout, headers=headers, **kwargs) + _log_response(response, url, hide_response_content_log) response.raise_for_status() return response diff --git a/pyinstaller.spec b/pyinstaller.spec index c577c547..d93766a8 100644 --- a/pyinstaller.spec +++ b/pyinstaller.spec @@ -21,9 +21,26 @@ CLI_VERSION = _dunamai.get_version('cycode', first_choice=_dunamai.Version.from_ with open(_INIT_FILE_PATH, 'w', encoding='UTF-8') as file: file.write(prev_content.replace(VERSION_PLACEHOLDER, CLI_VERSION)) +# Top-level subapp modules are loaded lazily via importlib.import_module() in +# cycode/cli/app.py to keep startup fast on hot paths (e.g. ai-guardrails scan). +# PyInstaller's static analyzer can't see those imports, so list them explicitly. +_hiddenimports = [ + 'cycode.cli.apps.ai_guardrails', + 'cycode.cli.apps.ai_remediation', + 'cycode.cli.apps.auth', + 'cycode.cli.apps.configure', + 'cycode.cli.apps.ignore', + 'cycode.cli.apps.report', + 'cycode.cli.apps.report_import', + 'cycode.cli.apps.scan', + 'cycode.cli.apps.status', + 'cycode.cli.apps.mcp', +] + a = Analysis( scripts=['cycode/cli/main.py'], excludes=['tests', 'setuptools', 'pkg_resources'], + hiddenimports=_hiddenimports, ) exe_args = [PYZ(a.pure), a.scripts, a.binaries, a.datas] diff --git a/tests/cli/test_app_argv_peek.py b/tests/cli/test_app_argv_peek.py new file mode 100644 index 00000000..bd4c61a8 --- /dev/null +++ b/tests/cli/test_app_argv_peek.py @@ -0,0 +1,82 @@ +"""Tests for the argv-peek lazy subapp registration in cycode/cli/app.py. + +The argv-peek picks the invoked subapp from sys.argv before Typer dispatches, +so it has to walk argv itself — skipping flags and (importantly) the values +those flags consume. The `_ROOT_OPTS_WITH_VALUE` set lists every root-level +flag that consumes a following positional token. If a maintainer adds a new +value-taking option to `app_callback` and forgets to register it here, the +argv-peek will silently fall back to the cold path (loading every subapp). +The test below catches that drift by comparing the hand-maintained set +against what Click's introspection sees on the built command. +""" + +from typing import Optional +from unittest.mock import patch + +import click +import pytest +import typer.main + +from cycode.cli.app import _ROOT_OPTS_WITH_VALUE, _detect_invocation, app + + +def test_root_opts_with_value_matches_click_introspection() -> None: + """Every root option that takes a value must be registered in _ROOT_OPTS_WITH_VALUE.""" + cmd = typer.main.get_command(app) + expected = { + opt + for param in cmd.params + if isinstance(param, click.Option) and not param.is_flag + for opt in param.opts + if opt.startswith('-') + } + assert frozenset(expected) == _ROOT_OPTS_WITH_VALUE, ( + f'_ROOT_OPTS_WITH_VALUE is out of sync with app_callback.\n' + f' Missing: {sorted(expected - _ROOT_OPTS_WITH_VALUE)}\n' + f' Extra: {sorted(_ROOT_OPTS_WITH_VALUE - expected)}\n' + f'Update _ROOT_OPTS_WITH_VALUE in cycode/cli/app.py.' + ) + + +@pytest.mark.parametrize( + 'argv', + [ + ['cycode', 'ai-guardrails', 'scan'], + ['cycode', '-v', 'ai-guardrails', 'scan'], + ['cycode', '--verbose', 'ai-guardrails', 'scan'], + ['cycode', '--output', 'json', 'ai-guardrails', 'scan'], + ['cycode', '-o', 'json', 'ai-guardrails', 'scan'], + ['cycode', '--user-agent', '{"app_name":"x"}', 'ai-guardrails', 'scan'], + ['cycode', '--client-secret', 'secret-val', 'ai-guardrails', 'scan'], + ['cycode', '--client-id', 'client-val', 'ai-guardrails', 'scan'], + ['cycode', '--id-token', 'token-val', 'ai-guardrails', 'scan'], + ['cycode', '--show-completion', 'bash', 'ai-guardrails', 'scan'], + # --key=value form is one token; argv-peek should treat it as a flag + ['cycode', '--output=json', 'ai-guardrails', 'scan'], + # multiple value-taking options stacked + ['cycode', '-v', '--output', 'json', '--client-id', 'foo', 'ai-guardrails', 'scan'], + ], +) +def test_detect_invocation_finds_subcommand_past_flags(argv: list[str]) -> None: + with patch('sys.argv', argv): + assert _detect_invocation() == ('ai-guardrails', 'scan') + + +@pytest.mark.parametrize( + ('argv', 'expected'), + [ + # No positional args → no match + (['cycode'], (None, None)), + (['cycode', '-v'], (None, None)), + # Unknown subapp → no match (graceful: app.py falls back to cold path) + (['cycode', 'not-a-real-subapp'], (None, None)), + # Known subapp, no subcommand + (['cycode', 'scan'], ('scan', None)), + # Alias resolution + (['cycode', 'ai_remediation'], ('ai-remediation', None)), + (['cycode', 'version'], ('status', None)), + ], +) +def test_detect_invocation_edge_cases(argv: list[str], expected: tuple[Optional[str], Optional[str]]) -> None: + with patch('sys.argv', argv): + assert _detect_invocation() == expected