Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cycode/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
import time as _time

# Unix-epoch wall clock captured at the earliest possible moment of CLI
# startup. Sent as `scan_parameters.cli_start_time` so the server can compute
# end-to-end scan duration from the moment the user actually triggered it.
_BOOT_WALL: float = _time.time()

__version__ = '0.0.0' # DON'T TOUCH. Placeholder. Will be filled automatically on poetry build from Git Tag
103 changes: 88 additions & 15 deletions cycode/cli/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import importlib
import logging
import sys
from typing import Annotated, Optional
Expand All @@ -10,12 +11,7 @@
from typer.completion import install_callback, show_callback

from cycode import __version__
from cycode.cli.apps import ai_guardrails, ai_remediation, auth, configure, ignore, report, report_import, scan, status
from cycode.cli.apps.api import get_platform_group

if sys.version_info >= (3, 10):
from cycode.cli.apps import mcp

from cycode.cli.cli_types import OutputTypeOption
from cycode.cli.consts import CLI_CONTEXT_SETTINGS
from cycode.cli.printers import ConsolePrinter
Expand Down Expand Up @@ -46,17 +42,88 @@
add_completion=False, # we add it manually to control the rich help panel
)

app.add_typer(ai_guardrails.app)
app.add_typer(ai_remediation.app)
app.add_typer(auth.app)
app.add_typer(configure.app)
app.add_typer(ignore.app)
app.add_typer(report.app)
app.add_typer(report_import.app)
app.add_typer(scan.app)
app.add_typer(status.app)
# Top-level subcommand → module providing its Typer app. Peeking at sys.argv
# lets us import only the invoked subapp on the hot path (e.g.
# `cycode ai-guardrails scan`), skipping ~300ms of unrelated imports.
_SUBAPP_MODULES: dict[str, str] = {
'ai-guardrails': 'cycode.cli.apps.ai_guardrails',
'ai-remediation': 'cycode.cli.apps.ai_remediation',
'auth': 'cycode.cli.apps.auth',
'configure': 'cycode.cli.apps.configure',
'ignore': 'cycode.cli.apps.ignore',
'report': 'cycode.cli.apps.report',
'import': 'cycode.cli.apps.report_import',
'scan': 'cycode.cli.apps.scan',
'status': 'cycode.cli.apps.status',
}
if sys.version_info >= (3, 10):
app.add_typer(mcp.app)
_SUBAPP_MODULES['mcp'] = 'cycode.cli.apps.mcp'

# Aliases: alternate spellings that resolve to a primary subcommand key.
_SUBAPP_ALIASES: dict[str, str] = {
'ai_remediation': 'ai-remediation', # backward-compat underscore form
'version': 'status',
}

# Root-level options that consume a following value; argv-peek must skip past
# both the option and its value when scanning for the first positional arg.
_ROOT_OPTS_WITH_VALUE = frozenset(
{
'--output',
'-o',
'--user-agent',
'--client-secret',
'--client-id',
'--id-token',
'--show-completion',
}
)


def _detect_invocation() -> tuple[Optional[str], Optional[str]]:
"""Return (top-level-subapp, second-level-subcommand) parsed from sys.argv.

Both values may be None: when no positional arg matches a known subapp,
or when the user only provided a top-level subcommand.
"""
positionals = []
args = sys.argv[1:]
i = 0
while i < len(args):
arg = args[i]
if arg in _ROOT_OPTS_WITH_VALUE:
i += 2
elif arg.startswith('-'):
# Any flag form: short, long, --key=value, or '--' marker. Skip the token only.
i += 1
else:
positionals.append(arg)
if len(positionals) >= 2:
break
i += 1
subapp = positionals[0] if positionals else None
subapp = _SUBAPP_ALIASES.get(subapp, subapp)
if subapp not in _SUBAPP_MODULES:
return None, None
subcommand = positionals[1] if len(positionals) >= 2 else None
return subapp, subcommand


# Computed once at import; reused by lazy registration and the version-checker skip.
_INVOKED_SUBAPP, _INVOKED_SUBCOMMAND = _detect_invocation()


def _register_subapps(only: Optional[str]) -> None:
if only is not None:
app.add_typer(importlib.import_module(_SUBAPP_MODULES[only]).app)
return
# Cold path (--help, completion, unknown subcommand): load all modules so
# root help lists everything. Deduplicate since aliases share modules.
for module_path in dict.fromkeys(_SUBAPP_MODULES.values()):
app.add_typer(importlib.import_module(module_path).app)


_register_subapps(_INVOKED_SUBAPP)

# Register the `platform` command group (dynamically built from the OpenAPI spec).
# The group itself is constructed cheaply at import time; the spec is only fetched
Expand All @@ -81,6 +148,12 @@ def _get_group_with_platform(app_typer: typer.Typer) -> click.Group:


def check_latest_version_on_close(ctx: typer.Context) -> None:
# Skip on `cycode ai-guardrails scan` — it emits JSON to stdout, so an
# upgrade notice would corrupt the response. Human-driven sibling commands
# (install, uninstall, status, session-start) still get the notice.
if (_INVOKED_SUBAPP, _INVOKED_SUBCOMMAND) == ('ai-guardrails', 'scan'):
return

output = ctx.obj.get('output')
# don't print anything if the output is JSON
if output == OutputTypeOption.JSON:
Expand Down
27 changes: 15 additions & 12 deletions cycode/cli/apps/scan/code_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,21 @@ def _scan_batch_thread_func(batch: list[Document]) -> tuple[str, CliError, Local
'zip_file_size': zip_file_size,
},
)
report_scan_status(
cycode_client,
scan_type,
scan_id,
scan_completed,
relevant_detections_count,
detections_count,
len(batch),
zip_file_size,
command_scan_type,
error_message,
)
# Sync flows already received the full result inline; only async flows
# need a separate status report to signal polling completion.
if not should_use_sync_flow:
report_scan_status(
cycode_client,
scan_type,
scan_id,
scan_completed,
relevant_detections_count,
detections_count,
len(batch),
zip_file_size,
command_scan_type,
error_message,
)

return scan_id, error, local_scan_result

Expand Down
2 changes: 2 additions & 0 deletions cycode/cli/apps/scan/scan_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import typer

from cycode import _BOOT_WALL
from cycode.cli.apps.scan.remote_url_resolver import get_remote_url_scan_parameter
from cycode.cli.utils.scan_utils import generate_unique_scan_id
from cycode.logger import get_logger
Expand All @@ -17,6 +18,7 @@ def _get_default_scan_parameters(ctx: typer.Context) -> dict:
'license_compliance': ctx.obj.get('license-compliance'),
'command_type': ctx.info_name.replace('-', '_'), # save backward compatibility
'aggregation_id': str(generate_unique_scan_id()),
'cli_start_time': _BOOT_WALL,
}


Expand Down
4 changes: 4 additions & 0 deletions cycode/cli/apps/scan/scan_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ def enrich_scan_result_with_data_from_detection_rules(
for detection in detections_per_file.detections:
detection_rule_ids.add(detection.detection_rule_id)

if not detection_rule_ids:
logger.debug('No detections to enrich, skipping detection_rules fetch')
return

detection_rules = cycode_client.get_detection_rules(detection_rule_ids)
detection_rules_by_id = {detection_rule.detection_rule_id: detection_rule for detection_rule in detection_rules}

Expand Down
41 changes: 27 additions & 14 deletions cycode/cyclient/base_token_auth_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,10 @@ def __init__(self, client_id: str) -> None:
self.client_id = client_id

self._credentials_manager = CredentialsManager()
# load cached access token
access_token, expires_in, creator = self._credentials_manager.get_access_token()

self._access_token = self._expires_in = None
expected_creator = self._create_jwt_creator()
if creator == expected_creator:
# we must be sure that cached access token is created using the same client id and client secret.
# because client id and client secret could be passed via command, via env vars or via config file.
# we must not use cached access token if client id or client secret was changed.
self._access_token = access_token
self._expires_in = arrow.get(expires_in) if expires_in else None

self._access_token = None
self._expires_in = None
self._lock = Lock()
self._load_token_from_disk()

def get_access_token(self) -> str:
with self._lock:
Expand All @@ -51,8 +42,30 @@ def invalidate_access_token(self, in_storage: bool = False) -> None:
self._credentials_manager.update_access_token(None, None, None)

def refresh_access_token_if_needed(self) -> None:
if self._access_token is None or self._expires_in is None or arrow.utcnow() >= self._expires_in:
self.refresh_access_token()
if self._has_valid_token():
return
# Re-check disk before doing the network refresh: another client instance
# in this process may have already refreshed and persisted a fresh token.
self._load_token_from_disk()
if self._has_valid_token():
return
self.refresh_access_token()

def _has_valid_token(self) -> bool:
return self._access_token is not None and self._expires_in is not None and arrow.utcnow() < self._expires_in

def _load_token_from_disk(self) -> None:
access_token, expires_in, creator = self._credentials_manager.get_access_token()
expected_creator = self._create_jwt_creator()
# We must be sure that cached access token is created using the same client id and client secret.
# Because client id and client secret could be passed via command, via env vars or via config file.
# We must not use cached access token if client id or client secret was changed.
if creator == expected_creator and access_token:
self._access_token = access_token
self._expires_in = arrow.get(expires_in) if expires_in else None
else:
self._access_token = None
self._expires_in = None

def refresh_access_token(self) -> None:
auth_response = self._request_new_access_token()
Expand Down
45 changes: 24 additions & 21 deletions cycode/cyclient/cycode_client_base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import os
import platform
import ssl
Expand Down Expand Up @@ -39,16 +40,29 @@ def cert_verify(self, *args, **kwargs) -> None:
conn.ca_certs = None


@functools.cache
def _get_session() -> requests.Session:
"""Process-wide Session so TCP+TLS connections are reused across all API calls."""
session = requests.Session()
# On Windows without an explicit CA bundle env var, fall back to the system
# trust store via a custom SSL context.
if platform.system() == 'Windows' and not (
os.environ.get('REQUESTS_CA_BUNDLE') or os.environ.get('CURL_CA_BUNDLE')
):
session.mount('https://', SystemStorageSslContext())
return session


def _get_request_function() -> Callable:
if os.environ.get('REQUESTS_CA_BUNDLE') or os.environ.get('CURL_CA_BUNDLE'):
return requests.request
return _get_session().request

if platform.system() != 'Windows':
return requests.request

session = requests.Session()
session.mount('https://', SystemStorageSslContext())
return session.request
def _log_response(response: Response, url: str, hide_response_content_log: bool) -> None:
content = 'HIDDEN' if hide_response_content_log else response.text
logger.debug(
'Receiving response, %s',
{'status_code': response.status_code, 'url': url, 'content': content},
)


_REQUEST_ERRORS_TO_RETRY = (
Expand Down Expand Up @@ -182,12 +196,7 @@ def _send_multipart(
response = _get_request_function()(
method='post', url=url, data=tracker, headers=headers, timeout=self.timeout
)

content = 'HIDDEN' if hide_response_content_log else response.text
logger.debug(
'Receiving response, %s',
{'status_code': response.status_code, 'url': url, 'content': content},
)
_log_response(response, url, hide_response_content_log)

response.raise_for_status()
return response
Expand Down Expand Up @@ -231,14 +240,8 @@ def _execute(

try:
headers = self.get_request_headers(headers, without_auth=without_auth)
request = _get_request_function()
response = request(method=method, url=url, timeout=timeout, headers=headers, **kwargs)

content = 'HIDDEN' if hide_response_content_log else response.text
logger.debug(
'Receiving response, %s',
{'status_code': response.status_code, 'url': url, 'content': content},
)
response = _get_request_function()(method=method, url=url, timeout=timeout, headers=headers, **kwargs)
_log_response(response, url, hide_response_content_log)

response.raise_for_status()
return response
Expand Down
17 changes: 17 additions & 0 deletions pyinstaller.spec
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,26 @@ CLI_VERSION = _dunamai.get_version('cycode', first_choice=_dunamai.Version.from_
with open(_INIT_FILE_PATH, 'w', encoding='UTF-8') as file:
file.write(prev_content.replace(VERSION_PLACEHOLDER, CLI_VERSION))

# Top-level subapp modules are loaded lazily via importlib.import_module() in
# cycode/cli/app.py to keep startup fast on hot paths (e.g. ai-guardrails scan).
# PyInstaller's static analyzer can't see those imports, so list them explicitly.
_hiddenimports = [
'cycode.cli.apps.ai_guardrails',
'cycode.cli.apps.ai_remediation',
'cycode.cli.apps.auth',
'cycode.cli.apps.configure',
'cycode.cli.apps.ignore',
'cycode.cli.apps.report',
'cycode.cli.apps.report_import',
'cycode.cli.apps.scan',
'cycode.cli.apps.status',
'cycode.cli.apps.mcp',
]

a = Analysis(
scripts=['cycode/cli/main.py'],
excludes=['tests', 'setuptools', 'pkg_resources'],
hiddenimports=_hiddenimports,
)

exe_args = [PYZ(a.pure), a.scripts, a.binaries, a.datas]
Expand Down
Loading
Loading