-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy path__init__.py
More file actions
1890 lines (1611 loc) · 78.5 KB
/
__init__.py
File metadata and controls
1890 lines (1611 loc) · 78.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import logging
import os
import re
import sys
import tarfile
import tempfile
import time
import json
from dataclasses import asdict
from pathlib import Path, PurePath
from typing import Dict, List, Tuple, Set, TYPE_CHECKING, Optional
if TYPE_CHECKING:
from socketsecurity.config import CliConfig
from socketdev import socketdev
from socketdev.exceptions import APIFailure
from socketdev.fullscans import FullScanParams, SocketArtifact
from socketdev.org import Organization
from socketdev.repos import RepositoryInfo
import copy
from socketsecurity import __version__, USER_AGENT
from socketsecurity.core.classes import (
Alert,
Diff,
FullScan,
Issue,
Package,
Purl
)
from socketsecurity.core.exceptions import APIResourceNotFound
from .socket_config import SocketConfig
from .utils import socket_globs
from .resource_utils import check_file_count_against_ulimit
import importlib
logging_std = importlib.import_module("logging")
__all__ = [
"Core",
"log",
"__version__",
"USER_AGENT",
]
version = __version__
log = logging.getLogger("socketdev")
_ALERT_TYPE_TITLE_OVERRIDES = {
"gptDidYouMean": "Possible typosquat attack (GPT)",
}
_HUMANIZE_BOUNDARY = re.compile(r"(?<=[a-z0-9])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])")
# Reachability facts-file upload compression.
#
# The Socket full-scan endpoint transparently brotli-decompresses any multipart part
# whose basename is exactly ``.socket.facts.json.br`` and stores it as plain
# ``.socket.facts.json``. Compressing the facts file on upload keeps it well under the
# server's per-file size cap (a ~262 MB facts file compresses to roughly 15-30 MB),
# which is required for large reachability (tier 1) scans to succeed.
#
# The server matches the *exact* name ``.socket.facts.json.br``, so we only compress
# files whose basename is exactly ``.socket.facts.json`` (a custom ``--reach-output-file``
# name would not be decompressed server-side, so it is left as a plain upload).
SOCKET_FACTS_FILENAME = ".socket.facts.json"
SOCKET_FACTS_BROTLI_FILENAME = ".socket.facts.json.br"
# Brotli quality (0-11); 5 is a good speed/ratio tradeoff for large JSON payloads.
SOCKET_FACTS_BROTLI_QUALITY = 5
# Largest brotli window (2**24 bytes); improves the ratio on large facts files.
SOCKET_FACTS_BROTLI_LGWIN = 24
# Stream the facts file in 1 MiB chunks so large files aren't held fully in memory.
SOCKET_FACTS_BROTLI_CHUNK_SIZE = 1024 * 1024
# Tier 1 reachability finalize retry policy. The finalize call links the tier1 scan to the
# full scan and can fail transiently (network/API blips); a few backoff retries make it robust.
TIER1_FINALIZE_MAX_ATTEMPTS = 3
TIER1_FINALIZE_BACKOFF_SECONDS = 1.0
def _humanize_alert_type(alert_type: str) -> str:
"""Convert a camelCase/PascalCase alert type into a Title-Cased label.
Used as a last-resort fallback when the SDK does not have metadata for an
alert type and there is no explicit override. Adjacent capitals are kept
together so acronyms like 'SQL' survive ('SQLInjection' -> 'SQL Injection').
"""
if not alert_type:
return ""
parts = _HUMANIZE_BOUNDARY.split(alert_type)
return " ".join(part[:1].upper() + part[1:] for part in parts if part)
class Core:
"""Main class for interacting with Socket Security API and processing scan results."""
ALERT_TYPE_TO_CAPABILITY = {
"envVars": "Environment Variables",
"networkAccess": "Network Access",
"filesystemAccess": "File System Access",
"shellAccess": "Shell Access",
"usesEval": "Uses Eval",
"unsafe": "Unsafe"
}
config: SocketConfig
sdk: socketdev
cli_config: Optional['CliConfig']
def __init__(self, config: SocketConfig, sdk: socketdev, cli_config: Optional['CliConfig'] = None) -> None:
"""Initialize Core with configuration and SDK instance."""
self.config = config
self.sdk = sdk
self.cli_config = cli_config
self.set_org_vars()
def set_org_vars(self) -> None:
"""Sets the main shared configuration variables for organization access."""
org_id, org_slug = self.get_org_id_slug()
self.config.org_id = org_id
self.config.org_slug = org_slug
base_path = f"orgs/{org_slug}"
self.config.full_scan_path = f"{base_path}/full-scans"
self.config.repository_path = f"{base_path}/repos"
def get_org_id_slug(self) -> Tuple[str, str]:
"""Gets the Org ID and Org Slug for the API Token."""
response = self.sdk.org.get(use_types=True)
organizations: Dict[str, Organization] = response.get("organizations", {})
if len(organizations) == 1:
org_id = next(iter(organizations))
return org_id, organizations[org_id]['slug']
return None, None
def get_sbom_data(self, full_scan_id: str) -> Dict[str, SocketArtifact]:
"""Returns SBOM artifacts for a full scan keyed by artifact ID."""
response = self.sdk.fullscans.stream(self.config.org_slug, full_scan_id, use_types=True)
if not response.success:
log.debug(f"Failed to get SBOM data for full-scan {full_scan_id}")
log.debug(response.message)
return {}
if not hasattr(response, "artifacts") or not response.artifacts:
return {}
return response.artifacts
def get_sbom_data_list(self, artifacts_dict: Dict[str, SocketArtifact]) -> list[SocketArtifact]:
"""Converts artifacts dictionary to a list."""
return list(artifacts_dict.values())
def create_sbom_output(self, diff: Diff) -> dict:
"""Creates CycloneDX output for a given diff."""
try:
result = self.sdk.export.cdx_bom(self.config.org_slug, diff.id, use_types=True)
if not result.success:
log.error(f"Failed to get CycloneDX Output for full-scan {diff.id}")
log.error(result.message)
return {}
result.pop("success", None)
return result
except Exception:
log.error(f"Unable to get CycloneDX Output for {diff.id}")
log.error(result.get("message", "No error message provided"))
return {}
@staticmethod
def expand_brace_pattern(pattern: str) -> List[str]:
"""
Recursively expands brace expressions (e.g., {a,b,c}) into separate patterns, supporting nested braces.
"""
def recursive_expand(pat: str) -> List[str]:
stack = []
for i, c in enumerate(pat):
if c == '{':
stack.append(i)
elif c == '}' and stack:
start = stack.pop()
if not stack:
# Found the outermost pair
before = pat[:start]
after = pat[i+1:]
inner = pat[start+1:i]
# Split on commas not inside nested braces
options = []
depth = 0
last = 0
for j, ch in enumerate(inner):
if ch == '{':
depth += 1
elif ch == '}':
depth -= 1
elif ch == ',' and depth == 0:
options.append(inner[last:j])
last = j+1
options.append(inner[last:])
results = []
for opt in options:
expanded = before + opt + after
results.extend(recursive_expand(expanded))
return results
return [pat]
return recursive_expand(pattern)
@staticmethod
def is_excluded(file_path: str, excluded_dirs: Set[str]) -> bool:
parts = os.path.normpath(file_path).split(os.sep)
for part in parts:
if part in excluded_dirs:
return True
return False
@staticmethod
def _exclude_glob_to_regex(pattern: str) -> str:
"""Translate a micromatch-style glob into an anchored regex string.
Mirrors the Node CLI's --exclude-paths matcher (src/commands/scan/exclude-paths.mts):
patterns are matched against scan-root-relative POSIX paths, case-sensitively, where
``*`` does NOT cross ``/`` and ``**`` DOES. Patterns are anchored at the scan root, so
``tests`` matches ``tests`` (not ``src/tests``); use ``**/tests`` to match at any depth.
"""
i, n = 0, len(pattern)
out = ["^"]
while i < n:
c = pattern[i]
if c == "*":
if i + 1 < n and pattern[i + 1] == "*":
if i + 2 < n and pattern[i + 2] == "/":
out.append("(?:[^/]+/)*") # '**/' -> zero or more path segments
i += 3
else:
out.append(".*") # '**' at end / before non-slash -> any, incl '/'
i += 2
else:
out.append("[^/]*") # '*' -> within a single path segment
i += 1
elif c == "?":
out.append("[^/]")
i += 1
else:
out.append(re.escape(c))
i += 1
out.append("$")
return "".join(out)
@staticmethod
def compile_exclude_paths(patterns: Optional[List[str]]) -> List["re.Pattern"]:
"""Compile --exclude-paths globs into anchored regexes (compiled once per scan).
Each pattern ``P`` is expanded the way Node feeds fast-glob's ``ignore``: ``P`` (a file-
or dir-shaped exact match) plus ``P/**`` (its subtree), unless ``P`` already ends with
``/**``. Validation of the patterns happens earlier, in CliConfig.from_args.
"""
compiled: List["re.Pattern"] = []
for raw in patterns or []:
p = (raw or "").strip().replace("\\", "/").rstrip("/")
if not p:
continue
globs = [p] if p.endswith("/**") else [p, f"{p}/**"]
compiled.extend(re.compile(Core._exclude_glob_to_regex(g)) for g in globs)
return compiled
@staticmethod
def path_matches_exclude_regexes(rel_path: str, regexes: List["re.Pattern"]) -> bool:
rp = rel_path.replace(os.sep, "/").replace("\\", "/")
return any(r.match(rp) for r in regexes)
@staticmethod
def matches_exclude_paths(file_path: str, base_path: str, patterns: List[str]) -> bool:
"""Convenience matcher (compiles patterns per call); used in tests/ad-hoc checks."""
rel_path = os.path.relpath(file_path, base_path).replace(os.sep, "/")
return Core.path_matches_exclude_regexes(rel_path, Core.compile_exclude_paths(patterns))
def save_submitted_files_list(self, files: List[str], output_path: str) -> None:
"""
Save the list of submitted file names to a JSON file for debugging.
Args:
files: List of file paths that were submitted for scanning
output_path: Path where to save the JSON file
"""
try:
# Calculate total size of all files
total_size_bytes = 0
valid_files = []
for file_path in files:
try:
if os.path.exists(file_path) and os.path.isfile(file_path):
file_size = os.path.getsize(file_path)
total_size_bytes += file_size
valid_files.append(file_path)
else:
log.warning(f"File not found or not accessible: {file_path}")
valid_files.append(file_path) # Still include in list for debugging
except OSError as e:
log.warning(f"Error accessing file {file_path}: {e}")
valid_files.append(file_path) # Still include in list for debugging
# Convert bytes to human-readable format
def format_bytes(bytes_value):
"""Convert bytes to human readable format"""
for unit in ['B', 'KB', 'MB', 'GB']:
if bytes_value < 1024.0:
return f"{bytes_value:.2f} {unit}"
bytes_value /= 1024.0
return f"{bytes_value:.2f} TB"
file_data = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime()),
"total_files": len(valid_files),
"total_size_bytes": total_size_bytes,
"total_size_human": format_bytes(total_size_bytes),
"files": sorted(valid_files)
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(file_data, f, indent=2, ensure_ascii=False)
log.info(f"Saved list of {len(valid_files)} submitted files ({file_data['total_size_human']}) to: {output_path}")
except Exception as e:
log.error(f"Failed to save submitted files list to {output_path}: {e}")
def save_manifest_tar(self, files: List[str], output_path: str, base_dir: str) -> None:
"""
Save all manifest files to a compressed tar.gz archive with original directory structure.
Args:
files: List of file paths to include in the archive
output_path: Path where to save the tar.gz file
base_dir: Base directory to preserve relative structure
"""
try:
# Normalize base directory
base_dir = os.path.abspath(base_dir)
if not base_dir.endswith(os.sep):
base_dir += os.sep
log.info(f"Creating manifest tar.gz file: {output_path}")
log.debug(f"Base directory: {base_dir}")
with tarfile.open(output_path, 'w:gz') as tar:
for file_path in files:
if not os.path.exists(file_path):
log.warning(f"File not found, skipping: {file_path}")
continue
# Calculate relative path within the base directory
abs_file_path = os.path.abspath(file_path)
if abs_file_path.startswith(base_dir):
# File is within base directory - use relative path
arcname = os.path.relpath(abs_file_path, base_dir)
else:
# File is outside base directory - use just the filename
arcname = os.path.basename(abs_file_path)
log.warning(f"File outside base dir, using basename: {file_path} -> {arcname}")
# Normalize archive name to use forward slashes
arcname = arcname.replace(os.sep, '/')
log.debug(f"Adding to tar: {file_path} -> {arcname}")
tar.add(file_path, arcname=arcname)
# Get tar file size for logging
tar_size = os.path.getsize(output_path)
def format_bytes(bytes_value):
"""Convert bytes to human readable format"""
for unit in ['B', 'KB', 'MB', 'GB']:
if bytes_value < 1024.0:
return f"{bytes_value:.2f} {unit}"
bytes_value /= 1024.0
return f"{bytes_value:.2f} TB"
tar_size_human = format_bytes(tar_size)
log.info(f"Successfully created tar.gz with {len(files)} files ({tar_size_human}, {tar_size:,} bytes): {output_path}")
except Exception as e:
log.error(f"Failed to save manifest tar.gz to {output_path}: {e}")
def find_files(self, path: str, ecosystems: Optional[List[str]] = None) -> List[str]:
"""
Finds supported manifest files in the given path.
Args:
path: Path to search for manifest files.
ecosystems: Optional list of ecosystems to include. If None, all ecosystems are included.
Returns:
List of found manifest file paths.
"""
log.debug("Starting Find Files")
start_time = time.time()
files: Set[str] = set()
# Unified --exclude-paths: filter discovered manifests by the same paths/globs that are
# forwarded to coana's --exclude-dirs. Only consulted when the user supplied the flag.
# Patterns are anchored to `path` (the scan root this pass walks), matching coana's
# target and the Node CLI's fast-glob cwd. NOTE: when scanning multiple --sub-path
# targets, find_files runs once per sub-path, so a pattern like `tests` anchors to each
# sub-path independently (Node anchors all patterns to a single scan-root cwd). This only
# differs for the multi-target full-scan + --exclude-paths combo; the reach flow is
# single-target, so it matches Node there.
exclude_paths = getattr(self.cli_config, "exclude_paths", None) if self.cli_config else None
exclude_regexes = Core.compile_exclude_paths(exclude_paths) if exclude_paths else []
# Get supported patterns from the API
patterns = self.get_supported_patterns()
for ecosystem in patterns:
# If ecosystems filter is provided, only include specified ecosystems
if ecosystems is not None and ecosystem not in ecosystems:
continue
if ecosystem in self.config.excluded_ecosystems:
continue
log.debug(f'Scanning ecosystem: {ecosystem}')
ecosystem_patterns = patterns[ecosystem]
for file_name in ecosystem_patterns:
original_pattern = ecosystem_patterns[file_name]["pattern"]
# Expand brace patterns
expanded_patterns = Core.expand_brace_pattern(original_pattern)
for pattern in expanded_patterns:
case_insensitive_pattern = Core.to_case_insensitive_regex(pattern)
log.debug(f"Searching for pattern: {case_insensitive_pattern}")
glob_start = time.time()
# Use pathlib.Path.rglob() instead of glob.glob() to properly match dotfiles/dotdirs
base_path = Path(path)
glob_files = base_path.rglob(case_insensitive_pattern)
for glob_file in glob_files:
glob_file_str = str(glob_file)
if not os.path.isfile(glob_file_str):
continue
if Core.is_excluded(glob_file_str, self.config.excluded_dirs):
continue
if exclude_regexes:
rel = os.path.relpath(glob_file_str, path)
if Core.path_matches_exclude_regexes(rel, exclude_regexes):
continue
files.add(glob_file_str.replace("\\", "/"))
glob_end = time.time()
log.debug(f"Globbing took {glob_end - glob_start:.4f} seconds")
file_list = sorted(files)
file_count = len(file_list)
log.info(f"Total files found: {file_count}")
# Check if the number of manifest files might exceed ulimit -n
ulimit_check = check_file_count_against_ulimit(file_count)
if ulimit_check["can_check"]:
if ulimit_check["would_exceed"]:
log.debug(f"Found {file_count} manifest files, which may exceed the file descriptor limit (ulimit -n = {ulimit_check['soft_limit']})")
log.debug(f"Available file descriptors: {ulimit_check['available_fds']} (after {ulimit_check['buffer_size']} buffer)")
log.debug(f"Recommendation: {ulimit_check['recommendation']}")
log.debug("This may cause 'Too many open files' errors during processing")
else:
log.debug(f"File count ({file_count}) is within file descriptor limit ({ulimit_check['soft_limit']})")
else:
log.debug(f"Could not check file descriptor limit: {ulimit_check.get('error', 'Unknown error')}")
return file_list
def find_sbom_files(self, path: str) -> List[str]:
"""
Finds only pre-generated SBOM files (CDX and SPDX) in the given path.
This is used with --reach-use-only-pregenerated-sboms to find only
pre-computed CycloneDX and SPDX manifest files.
Args:
path: Path to search for SBOM files.
Returns:
List of found CDX and SPDX file paths.
"""
log.debug("Starting Find SBOM Files (CDX and SPDX only)")
sbom_ecosystems = ['cdx', 'spdx']
return self.find_files(path, ecosystems=sbom_ecosystems)
def get_supported_patterns(self) -> Dict:
"""
Gets supported file patterns from the Socket API.
Returns:
Dictionary of supported file patterns with 'general' key removed
"""
response = self.sdk.report.supported()
if not response:
log.error("Failed to get supported patterns from API")
# Import the old patterns as fallback
from .utils import socket_globs
return socket_globs
# Remove the 'general' key if it exists
if 'general' in response:
response.pop('general')
# The response is already in the format we need
return response
def has_manifest_files(self, files: list) -> bool:
"""
Checks if any files in the list are supported manifest files.
Args:
files: List of file paths to check
Returns:
True if any files match manifest patterns, False otherwise
"""
# Get supported patterns
try:
patterns = self.get_supported_patterns()
except Exception as e:
log.error(f"Error getting supported patterns from API: {e}")
log.warning("Falling back to local patterns")
from .utils import socket_globs as fallback_patterns
patterns = fallback_patterns
# Normalize all file paths for matching
norm_files = [f.replace('\\', '/').lstrip('./') for f in files]
for ecosystem in patterns:
ecosystem_patterns = patterns[ecosystem]
for file_name in ecosystem_patterns:
pattern_str = ecosystem_patterns[file_name]["pattern"]
# Expand brace patterns for each manifest pattern
expanded_patterns = Core.expand_brace_pattern(pattern_str)
for exp_pat in expanded_patterns:
for file in norm_files:
# Match the pattern as-is first (handles root-level files
# like "package.json" matching pattern "package.json")
if PurePath(file).match(exp_pat):
return True
# Also try with **/ prefix to match files in subdirectories
# (e.g. "src/requirements.txt" matching "*requirements.txt")
if '/' not in exp_pat and PurePath(file).match(f"**/{exp_pat}"):
return True
return False
def check_file_count_limit(self, file_count: int) -> dict:
"""
Check if the given file count would exceed the system's file descriptor limit.
Args:
file_count: Number of files to check
Returns:
Dictionary with check results including recommendations
"""
return check_file_count_against_ulimit(file_count)
@staticmethod
def to_case_insensitive_regex(input_string: str) -> str:
"""
Converts a string into a case-insensitive regex pattern.
Args:
input_string: String to convert
Returns:
Case-insensitive regex pattern
Example:
"pipfile" -> "[Pp][Ii][Pp][Ff][Ii][Ll][Ee]"
"""
return ''.join(f'[{char.lower()}{char.upper()}]' if char.isalpha() else char for char in input_string)
@staticmethod
def empty_head_scan_file() -> List[str]:
"""
Creates a temporary empty file for baseline scans when no head scan exists.
Returns:
List containing path to a temporary empty file
"""
# Create a temporary directory and then create our specific filename
temp_dir = tempfile.gettempdir()
temp_path = os.path.join(temp_dir, '.socket.facts.json')
# Create the empty file
with open(temp_path, 'w') as f:
pass # Creates an empty file
log.debug(f"Created temporary empty file for baseline scan: {temp_path}")
return [temp_path]
def finalize_tier1_scan(self, full_scan_id: str, facts_file_path: str) -> bool:
"""
Finalize a tier 1 reachability scan by associating it with a full scan.
This function reads the tier1ReachabilityScanId from the facts file and
calls the SDK to link it with the specified full scan.
Linking the tier 1 scan to the full scan helps the Socket team debug potential issues.
Args:
full_scan_id: The ID of the full scan to associate with the tier 1 scan
facts_file_path: Path to the .socket.facts.json file containing the tier1ReachabilityScanId
Returns:
True if successful, False otherwise
"""
log.debug(f"Finalizing tier 1 scan for full scan {full_scan_id}")
# Read the tier1ReachabilityScanId from the facts file
try:
if not os.path.exists(facts_file_path):
log.debug(f"Facts file not found: {facts_file_path}")
return False
with open(facts_file_path, 'r') as f:
facts = json.load(f)
tier1_scan_id = facts.get('tier1ReachabilityScanId')
if not tier1_scan_id:
log.debug(f"No tier1ReachabilityScanId found in {facts_file_path}")
return False
tier1_scan_id = tier1_scan_id.strip()
log.debug(f"Found tier1ReachabilityScanId: {tier1_scan_id}")
except (json.JSONDecodeError, IOError) as e:
log.debug(f"Failed to read tier1ReachabilityScanId from {facts_file_path}: {e}")
return False
# Call the SDK to finalize the tier 1 scan, retrying transient failures with backoff.
last_error: Optional[Exception] = None
for attempt in range(1, TIER1_FINALIZE_MAX_ATTEMPTS + 1):
try:
success = self.sdk.fullscans.finalize_tier1(
full_scan_id=full_scan_id,
tier1_reachability_scan_id=tier1_scan_id,
)
if success:
log.debug(f"Successfully finalized tier 1 scan {tier1_scan_id} for full scan {full_scan_id}")
return True
log.debug(
f"finalize_tier1 returned a falsy result for scan {tier1_scan_id} "
f"(attempt {attempt}/{TIER1_FINALIZE_MAX_ATTEMPTS})"
)
except Exception as e:
last_error = e
log.debug(
f"Unable to finalize tier 1 scan (attempt {attempt}/{TIER1_FINALIZE_MAX_ATTEMPTS}): {e}"
)
if attempt < TIER1_FINALIZE_MAX_ATTEMPTS:
time.sleep(TIER1_FINALIZE_BACKOFF_SECONDS * (2 ** (attempt - 1)))
if last_error is not None:
log.debug(
f"Giving up finalizing tier 1 scan {tier1_scan_id} after "
f"{TIER1_FINALIZE_MAX_ATTEMPTS} attempts: {last_error}"
)
else:
log.debug(
f"Giving up finalizing tier 1 scan {tier1_scan_id} after "
f"{TIER1_FINALIZE_MAX_ATTEMPTS} attempts"
)
return False
@staticmethod
def _compress_facts_file(source_path: str) -> str:
"""Brotli-compress a ``.socket.facts.json`` file to a sibling ``.socket.facts.json.br``.
The source is streamed in chunks so a large facts file (hundreds of MB) never has
to be held in memory at once. The compressed file is written next to the source so
that the multipart key the SDK derives keeps the same directory prefix, only with a
``.br`` basename. Any existing ``.socket.facts.json.br`` sibling is overwritten, and a
partially-written output is removed if compression fails part-way through (e.g. the
disk fills up mid-stream) so no orphaned ``.br`` is left in the target directory.
Args:
source_path: Path to the plain ``.socket.facts.json`` file.
Returns:
Path to the compressed sibling file.
"""
# Imported lazily so the dependency is only needed when actually uploading a facts
# file. brotlicffi is the API-compatible fallback used on PyPy / non-CPython runtimes.
try:
import brotli
except ImportError:
import brotlicffi as brotli
target_path = os.path.join(os.path.dirname(source_path), SOCKET_FACTS_BROTLI_FILENAME)
compressor = brotli.Compressor(
quality=SOCKET_FACTS_BROTLI_QUALITY,
lgwin=SOCKET_FACTS_BROTLI_LGWIN,
)
try:
with open(source_path, "rb") as src, open(target_path, "wb") as dst:
while True:
chunk = src.read(SOCKET_FACTS_BROTLI_CHUNK_SIZE)
if not chunk:
break
compressed = compressor.process(chunk)
if compressed:
dst.write(compressed)
dst.write(compressor.finish())
except BaseException:
# Don't leave a half-written .br behind for the caller to miss (it only tracks
# the path for cleanup once this returns). Remove it, then re-raise so the caller
# falls back to uploading the plain file.
try:
os.unlink(target_path)
except OSError:
pass
raise
return target_path
def _compress_facts_files_for_upload(self, files: List[str]) -> Tuple[List[str], List[str]]:
"""Replace any ``.socket.facts.json`` upload entry with a brotli-compressed ``.br`` sibling.
The Socket full-scan endpoint transparently decompresses a multipart part named
exactly ``.socket.facts.json.br``, so compressing here keeps a large facts file under
the server's per-file size cap without changing the stored result. Files whose
basename is not exactly ``.socket.facts.json`` are left untouched (the server only
matches that exact name), as are empty placeholder files (e.g. baseline scans).
Compression never blocks an upload: if it fails for any reason (missing optional
``brotli`` dependency, unwritable directory, etc.) the original plain file is used.
Args:
files: The list of file paths about to be uploaded.
Returns:
``(upload_files, temp_paths)`` where ``upload_files`` is the possibly-rewritten
list to upload and ``temp_paths`` are compressed files the caller must delete
once the upload completes.
"""
upload_files: List[str] = []
temp_paths: List[str] = []
for file_path in files:
try:
if (
os.path.basename(file_path) == SOCKET_FACTS_FILENAME
and os.path.isfile(file_path)
and os.path.getsize(file_path) > 0
):
compressed_path = self._compress_facts_file(file_path)
log.debug(
f"Brotli-compressed {file_path} for upload: "
f"{os.path.getsize(file_path)} -> {os.path.getsize(compressed_path)} bytes "
f"(uploading as {SOCKET_FACTS_BROTLI_FILENAME})"
)
upload_files.append(compressed_path)
temp_paths.append(compressed_path)
continue
except Exception as e:
# Never let compression break an upload: fall back to the plain file.
log.warning(
f"Failed to brotli-compress facts file {file_path}, uploading uncompressed: {e}"
)
upload_files.append(file_path)
return upload_files, temp_paths
def create_full_scan(self, files: List[str], params: FullScanParams, base_paths: Optional[List[str]] = None) -> FullScan:
"""
Creates a new full scan via the Socket API.
Args:
files: List of file paths to scan
params: Parameters for the full scan
base_paths: List of base paths for the scan (optional)
Returns:
FullScan object with scan results
"""
log.info("Creating new full scan")
create_full_start = time.time()
# Brotli-compress the reachability facts file (if present) so it is uploaded as a
# `.socket.facts.json.br` part. The API decompresses it server-side, keeping a large
# facts file under the per-file upload size cap. See _compress_facts_files_for_upload.
upload_files, compressed_temp_files = self._compress_facts_files_for_upload(files)
try:
res = self.sdk.fullscans.post(upload_files, params, use_types=True, use_lazy_loading=True, max_open_files=50, base_paths=base_paths)
finally:
for temp_file in compressed_temp_files:
try:
os.unlink(temp_file)
log.debug(f"Cleaned up temporary compressed facts file: {temp_file}")
except OSError as cleanup_error:
log.debug(f"Failed to clean up temporary compressed facts file {temp_file}: {cleanup_error}")
if not res.success:
log.error(f"Error creating full scan: {res.message}, status: {res.status}")
raise Exception(f"Error creating full scan: {res.message}, status: {res.status}")
full_scan = FullScan(**asdict(res.data))
create_full_end = time.time()
total_time = create_full_end - create_full_start
log.debug(f"New Full Scan created in {total_time:.2f} seconds")
# Finalize tier1 scan if reachability analysis was enabled
if self.cli_config and self.cli_config.reach:
facts_file_path = os.path.join(
self.cli_config.target_path or ".",
self.cli_config.reach_output_file
)
log.debug(f"Reachability analysis enabled, finalizing tier1 scan for full scan {full_scan.id}")
try:
success = self.finalize_tier1_scan(full_scan.id, facts_file_path)
if success:
log.debug(f"Successfully finalized tier1 scan for full scan {full_scan.id}")
else:
log.debug(f"Failed to finalize tier1 scan for full scan {full_scan.id}")
except Exception as e:
log.warning(f"Error finalizing tier1 scan for full scan {full_scan.id}: {e}")
return full_scan
def create_full_scan_with_report_url(
self,
paths: List[str],
params: FullScanParams,
no_change: bool = False,
save_files_list_path: Optional[str] = None,
save_manifest_tar_path: Optional[str] = None,
base_paths: Optional[List[str]] = None,
explicit_files: Optional[List[str]] = None
) -> Diff:
"""Create a new full scan and return with html_report_url.
Args:
paths: List of paths to look for manifest files
params: Query params for the Full Scan endpoint
no_change: If True, return empty result
save_files_list_path: Optional path to save submitted files list for debugging
save_manifest_tar_path: Optional path to save manifest files tar.gz archive
base_paths: List of base paths for the scan (optional)
explicit_files: Optional list of explicit files to use instead of discovering files
Returns:
Dict with full scan data including html_report_url
"""
log.debug(f"starting create_full_scan_with_report_url with no_change: {no_change}")
diff = Diff(
id="NO_SCAN_RAN",
report_url="",
diff_url=""
)
if no_change:
return diff
# Use explicit files if provided, otherwise find manifest files from all paths
if explicit_files is not None:
all_files = explicit_files
log.debug(f"Using {len(all_files)} explicit files instead of discovering files")
else:
all_files = []
for path in paths:
files = self.find_files(path)
all_files.extend(files)
# Save submitted files list if requested
if save_files_list_path and all_files:
self.save_submitted_files_list(all_files, save_files_list_path)
# Save manifest tar.gz if requested (use first path as base)
if save_manifest_tar_path and all_files and paths:
self.save_manifest_tar(all_files, save_manifest_tar_path, paths[0])
# If no supported files found, create empty scan
if not all_files:
log.info("No supported manifest files found - creating empty scan")
empty_files = Core.empty_head_scan_file()
try:
# Create new scan
new_scan_start = time.time()
new_full_scan = self.create_full_scan(empty_files, params, base_paths=base_paths)
new_scan_end = time.time()
log.info(f"Total time to create empty full scan: {new_scan_end - new_scan_start:.2f}")
# Clean up the temporary empty file
for temp_file in empty_files:
try:
os.unlink(temp_file)
log.debug(f"Cleaned up temporary file: {temp_file}")
except OSError as e:
log.warning(f"Failed to clean up temporary file {temp_file}: {e}")
except Exception as e:
# Clean up temp files even if scan creation fails
for temp_file in empty_files:
try:
os.unlink(temp_file)
except OSError:
pass
raise e
else:
try:
# Create new scan
new_scan_start = time.time()
new_full_scan = self.create_full_scan(all_files, params, base_paths=base_paths)
new_scan_end = time.time()
log.info(f"Total time to create new full scan: {new_scan_end - new_scan_start:.2f}")
except APIFailure as e:
log.error(f"Failed to create full scan: {e}")
raise
# Construct report URL
base_socket = "https://socket.dev/dashboard/org"
diff.report_url = f"{base_socket}/{self.config.org_slug}/sbom/{new_full_scan.id}"
diff.diff_url = diff.report_url
diff.id = new_full_scan.id
needs_alerts = (
self.cli_config is not None
and (
self.cli_config.enable_gitlab_security
or self.cli_config.enable_json
or self.cli_config.enable_sarif
)
)
if needs_alerts:
log.info("Output format requires alerts, fetching SBOM data for full scan")
sbom_start = time.time()
sbom_artifacts_dict = self.get_sbom_data(new_full_scan.id)
sbom_artifacts = self.get_sbom_data_list(sbom_artifacts_dict)
packages = self._create_packages_dict_without_license_text(sbom_artifacts)
diff.packages = packages
all_alerts_collection: Dict[str, List[Issue]] = {}
for package_id, package in packages.items():
self.add_package_alerts_to_collection(
package=package,
alerts_collection=all_alerts_collection,
packages=packages
)
consolidated: Set[str] = set()
for alert_key, alerts in all_alerts_collection.items():
for alert in alerts:
alert_str = f"{alert.purl},{alert.type}"
if (alert.error or alert.warn) and alert_str not in consolidated:
diff.new_alerts.append(alert)
consolidated.add(alert_str)
sbom_end = time.time()
log.info(
f"Fetched {len(packages)} packages and {len(diff.new_alerts)} alerts "
f"in {sbom_end - sbom_start:.2f}s"
)
else:
diff.packages = {}
return diff
def get_full_scan(self, full_scan_id: str) -> FullScan:
"""
Get a FullScan object for an existing full scan including sbom_artifacts and packages.
Args:
full_scan_id: The ID of the full scan to get
Returns:
The FullScan object with populated artifacts and packages
"""
full_scan_metadata = self.sdk.fullscans.metadata(self.config.org_slug, full_scan_id, use_types=True)
full_scan = FullScan(**asdict(full_scan_metadata.data))
full_scan_artifacts_dict = self.get_sbom_data(full_scan_id)
full_scan.sbom_artifacts = self.get_sbom_data_list(full_scan_artifacts_dict)
full_scan.packages = self.create_packages_dict(full_scan.sbom_artifacts)
return full_scan
def create_packages_dict(self, sbom_artifacts: list[SocketArtifact]) -> dict[str, Package]:
"""
Creates a dictionary of Package objects from SBOM artifacts.
Args:
sbom_artifacts: List of SBOM artifacts from the scan
Returns:
Dictionary mapping package IDs to Package objects
"""
packages = {}
top_level_count = {}
for artifact in sbom_artifacts:
package = Package.from_socket_artifact(asdict(artifact))
if package.id in packages:
print("Duplicate package?")
else:
package.license_text = self.get_package_license_text(package)
packages[package.id] = package
if package.topLevelAncestors:
for top_id in package.topLevelAncestors:
if top_id not in top_level_count: