diff --git a/README.md b/README.md index 71030cff..c0e4d1b1 100644 --- a/README.md +++ b/README.md @@ -250,6 +250,8 @@ We run both user-oriented and automation-oriented commands in automated tests as These commands are centered around a user making changes to a project (manually from the shell / command line), not a computer building/deploying it: * `cfbs add`: Add a module to the project (local files/folders, prepended with `./` are also considered modules). +* `cfbs analyse`: Same as `cfbs analyze`. +* `cfbs analyze`: Analyze the policy set specified by the given path. * `cfbs clean`: Remove modules which were added as dependencies, but are no longer needed. * `cfbs help`: Print the help menu. * `cfbs info`: Print information about a module. diff --git a/cfbs/analyze.py b/cfbs/analyze.py new file mode 100644 index 00000000..55108099 --- /dev/null +++ b/cfbs/analyze.py @@ -0,0 +1,633 @@ +from collections import OrderedDict +import os + +from cfbs.internal_file_management import fetch_archive +from cfbs.masterfiles.analyze import ( + highest_version, + sort_versions, + version_as_comparable_list, +) +from cfbs.utils import ( + FetchError, + cfbs_dir, + fetch_url, + file_sha256, + get_json, + get_or_read_json, + immediate_subdirectories, + mkdir, + user_error, +) + + +def path_components(path): + """Returns a list of path components of `path`. + + The first component is `""` for a path starting with a separator. On Windows, if `path` begins with n backslashes, the first n components will be `""`. + + The last component is the filename, trailing separators do not affect the result.""" + norm_path = os.path.normpath(path) + + dir_components = norm_path.split(os.sep) + + return dir_components + + +def is_path_component(path, component): + """Returns whether `component` is a path component of `path`.""" + p_components = path_components(path) + + # check if `component` is a directory or a file + if component[-1] == "/": + # strip the suffixed directory slash + component = component[:-1] + + return component in p_components[:-1] + else: + return component == p_components[-1] + + +def contains_ignored_components(path, ignored_components): + """Returns whether `path` contains any of the path components in `ignored_components`.""" + for i_comp in ignored_components: + if is_path_component(path, i_comp): + return True + + return False + + +DEFAULT_CHECKSUMS_DICT = {"checksums": {}} +DEFAULT_FILES_DICT = {"files": {}} + + +def checksums_files( + files_dir_path, + checksums_dict=None, + files_dict=None, + ignored_path_components=[], +): + if checksums_dict is None: + checksums_dict = DEFAULT_CHECKSUMS_DICT + if files_dict is None: + files_dict = DEFAULT_FILES_DICT + + for root, _, files in os.walk(files_dir_path): + for name in files: + full_relpath = os.path.join(root, name) + tarball_relpath = os.path.relpath(full_relpath, files_dir_path) + file_checksum = file_sha256(full_relpath) + + if contains_ignored_components(full_relpath, ignored_path_components): + continue + + if not file_checksum in checksums_dict["checksums"]: + checksums_dict["checksums"][file_checksum] = set() + checksums_dict["checksums"][file_checksum].add(tarball_relpath) + + if not tarball_relpath in files_dict["files"]: + files_dict["files"][tarball_relpath] = set() + files_dict["files"][tarball_relpath].add(file_checksum) + + return checksums_dict, files_dict + + +def mpf_vcf_dicts(offline=False): + """(vcf stands for versions, checksums, files)""" + REPO_OWNER = "cfengine" + REPO_NAME = "release-information" + + REPO_OWNERNAME = REPO_OWNER + "/" + REPO_NAME + # RI stands for release information + RI_SUBDIRS = "downloads/github.com/" + REPO_OWNERNAME + "/archive/refs/tags/" + + if offline: + ERROR_MESSAGE = "Masterfiles Policy Framework release information not found. Provide the release information, for example by running 'cfbs analyze' without '--offline'." + + cfbs_ri_dir = os.path.join(cfbs_dir(), RI_SUBDIRS) + if not os.path.exists(cfbs_ri_dir): + user_error(ERROR_MESSAGE) + + ri_versions = immediate_subdirectories(cfbs_ri_dir) + if len(ri_versions) == 0: + user_error(ERROR_MESSAGE) + + ri_latest_version = max(ri_versions) + mpf_vcf_path = os.path.join( + cfbs_ri_dir, + ri_latest_version, + REPO_NAME + "-" + ri_latest_version, + "masterfiles", + ) + else: + REPO_URL = "https://github.com/" + REPO_OWNERNAME + LATEST_RELEASE_API_URL = ( + "https://api.github.com/repos/" + REPO_OWNERNAME + "/releases/latest" + ) + + latest_release_data = get_json(LATEST_RELEASE_API_URL) + + latest_release_name = latest_release_data["name"] + ri_archive_url = REPO_URL + "/archive/refs/tags/" + latest_release_name + ".zip" + ri_checksums_url = ( + REPO_URL + "/releases/download/" + latest_release_name + "/checksums.txt" + ) + ri_version_subdirs = RI_SUBDIRS + latest_release_name + ri_version_path = os.path.join(cfbs_dir(), ri_version_subdirs) + mpf_vcf_subdirs = REPO_NAME + "-" + latest_release_name + "/masterfiles/" + mpf_vcf_path = os.path.join(ri_version_path, mpf_vcf_subdirs) + + if not os.path.exists(mpf_vcf_path): + mkdir(ri_version_path) + + archive_checksums_path = ri_version_path + "/checksums.txt" + try: + fetch_url(ri_checksums_url, archive_checksums_path) + except FetchError as e: + user_error(str(e)) + + with open(archive_checksums_path) as file: + lines = [line.rstrip() for line in file] + zip_line = lines[1] + zip_checksum = zip_line.split(" ")[0] + + fetch_archive( + ri_archive_url, + zip_checksum, + directory=ri_version_path, + with_index=False, + extract_to_directory=True, + ) + + mpf_versions_json_path = os.path.join(mpf_vcf_path, "versions.json") + mpf_checkfiles_json_path = os.path.join(mpf_vcf_path, "checksums.json") + mpf_files_json_path = os.path.join(mpf_vcf_path, "files.json") + + mpf_versions_dict = get_or_read_json(mpf_versions_json_path) + mpf_versions_dict = mpf_versions_dict["versions"] + + mpf_checksums_dict = get_or_read_json(mpf_checkfiles_json_path) + mpf_checksums_dict = mpf_checksums_dict["checksums"] + + mpf_files_dict = get_or_read_json(mpf_files_json_path) + mpf_files_dict = mpf_files_dict["files"] + + return mpf_versions_dict, mpf_checksums_dict, mpf_files_dict + + +def filepaths_sorted(filepaths): + """Currently sorts alphabetically, not hierarchically.""" + return sorted(filepaths) + + +def filepaths_display(filepaths): + filepaths = filepaths_sorted(filepaths) + + for path in filepaths[:-1]: + print("├──", path) + if len(filepaths) > 0: + print("└──", filepaths[-1]) + + +def list_or_single(l): + if len(l) == 1: + return l[0] + return l + + +def filepaths_display_moved(filepaths): + filepaths = filepaths_sorted(filepaths) + + for path in filepaths[:-1]: + print("├──", path[0], "<-", list_or_single(path[1])) + if len(filepaths) > 0: + print("└──", filepaths[-1][0], "<-", list_or_single(filepaths[-1][1])) + + +def mpf_normalized_path(path, is_parentpath, masterfiles_dir): + """Returns a filepath converted from `path` to an MPF-comparable form.""" + # downloaded MPF release information filepaths always have forward slashes + norm_path = path.replace(os.sep, "/") + + if is_parentpath: + if norm_path.startswith(masterfiles_dir + "/"): + norm_path = os.path.relpath(norm_path, masterfiles_dir) + # `os.path.relpath` will still output paths with `os.sep`, even if `norm_path` uses forward slashes on e.g. Windows + norm_path = norm_path.replace(os.sep, "/") + norm_path = "masterfiles/" + norm_path + else: + norm_path = "masterfiles/" + norm_path + + return norm_path + + +def mpf_denormalized_path(path, is_parentpath, masterfiles_dir): + """Inverse function of `mpf_normalized_path`.""" + denorm_path = path + # this does work as intended even if the first dir isn't masterfiles and there's a masterfiles dir deeper in the path + relpath = os.path.relpath(denorm_path, "masterfiles") + + if is_parentpath: + # if `"masterfiles"`, substitute to `masterfiles_dir` + # if not, then the path should stay the same + if not relpath.startswith(".." + os.sep): + denorm_path = os.path.join(masterfiles_dir, relpath) + + else: + # this will work as intended even for other directories than `masterfiles` e.g. `modules` + denorm_path = relpath + + return denorm_path + + +class VersionsCounter: + def __init__(self): + self._versions_counts = {} + + def increment(self, version): + if version not in self._versions_counts: + self._versions_counts[version] = 0 + self._versions_counts[version] += 1 + + def most_common_version(self): + """Returns version with the highest count. In case of a tie, returns the highest version with the highest count.""" + highest_count = max(self._versions_counts.values(), default=0) + + versions_with_highest_count = [ + k for (k, v) in self._versions_counts.items() if v == highest_count + ] + + return highest_version(versions_with_highest_count) + + def sorted_list(self): + """Returns a sorted list of key-value pairs `(version, count)`. The sorting is in descending order. In case of a count tie, the higher version's pair is considered greater.""" + return sorted( + self._versions_counts.items(), + key=lambda item: (item[1], version_as_comparable_list(item[0])), + reverse=True, + ) + + def is_empty(self): + return self._versions_counts == {} + + +class VersionsData: + def __init__(self): + self.version_counter = VersionsCounter() + self.highest_version_counter = VersionsCounter() + # acronyms: vc = version_counter, hvc = highest_version_counter + self.different_filepath_vc = VersionsCounter() + self.different_filepath_hvc = VersionsCounter() + + def display(self, verbose=False): + if not self.version_counter.is_empty(): + if verbose: + print( + "Same filepath versions distribution:", + self.version_counter.sorted_list(), + ) + if verbose: + print( + "Same filepath highest versions distribution:", + self.highest_version_counter.sorted_list(), + ) + if not self.different_filepath_vc.is_empty(): + if verbose: + print( + "Different filepath versions distribution:", + self.different_filepath_vc.sorted_list(), + ) + if verbose: + print( + "Different filepath highest versions distribution:", + self.different_filepath_hvc.sorted_list(), + ) + if self.version_counter.is_empty() and self.different_filepath_vc.is_empty(): + print( + "Not a single file in the analyzed policy set appears in the Masterfiles Policy Framework.\n" + ) + elif verbose: + print() + + def to_json_dict(self): + json_dict = OrderedDict() + + json_dict["same_filepath_versions"] = self.version_counter.sorted_list() + json_dict["same_filepath_highest_versions"] = ( + self.highest_version_counter.sorted_list() + ) + json_dict["different_filepath_versions"] = ( + self.different_filepath_vc.sorted_list() + ) + json_dict["different_filepath_highest_versions"] = ( + self.different_filepath_hvc.sorted_list() + ) + + return json_dict + + +class AnalyzedFiles: + def __init__(self, reference_version): + self.reference_version = reference_version + + self.missing = [] + self.modified = [] + self.moved_or_renamed = [] + self.different = [] + self.different_modified = [] + self.different_moved_or_renamed = [] + self.not_from_any = [] + + def _denormalize_origin(origin, is_parentpath, masterfiles_dir): + return [ + (mpf_denormalized_path(filepath, is_parentpath, masterfiles_dir), versions) + for (filepath, versions) in origin.items() + ] + + def denormalize(self, is_parentpath, masterfiles_dir): + """Currently irreversible and meant to only be used once after all the files are analyzed.""" + + self.missing = [ + mpf_denormalized_path(file, is_parentpath, masterfiles_dir) + for file in self.missing + ] + self.modified = [ + mpf_denormalized_path(file, is_parentpath, masterfiles_dir) + for file in self.modified + ] + self.moved_or_renamed = [ + ( + mpf_denormalized_path(file, is_parentpath, masterfiles_dir), + [ + mpf_denormalized_path(o_f, is_parentpath, masterfiles_dir) + for o_f in origin_filepaths + ], + ) + for (file, origin_filepaths) in self.moved_or_renamed + ] + self.different = [ + ( + mpf_denormalized_path(file, is_parentpath, masterfiles_dir), + other_versions, + ) + for (file, other_versions) in self.different + ] + self.different_modified = [ + ( + mpf_denormalized_path(file, is_parentpath, masterfiles_dir), + other_versions, + ) + for (file, other_versions) in self.different_modified + ] + self.different_moved_or_renamed = [ + ( + mpf_denormalized_path(file, is_parentpath, masterfiles_dir), + AnalyzedFiles._denormalize_origin( + origin, is_parentpath, masterfiles_dir + ), + ) + for (file, origin) in self.different_moved_or_renamed + ] + self.not_from_any = [ + mpf_denormalized_path(file, is_parentpath, masterfiles_dir) + for file in self.not_from_any + ] + + def sort(self): + self.missing = filepaths_sorted(self.missing) + self.modified = filepaths_sorted(self.modified) + self.moved_or_renamed = filepaths_sorted(self.moved_or_renamed) + self.different = filepaths_sorted(self.different) + self.different_modified = filepaths_sorted(self.different_modified) + self.different_moved_or_renamed = filepaths_sorted( + self.different_moved_or_renamed + ) + self.not_from_any = filepaths_sorted(self.not_from_any) + + def display(self): + print("Reference version:", self.reference_version, "\n") + + if len(self.missing) > 0: + print("Files missing from the version:") + elif self.reference_version is not None: + print("No files are missing from the version.") + filepaths_display(self.missing) + + if len(self.modified) == 0 and len(self.moved_or_renamed) == 0: + print("No files of the version are modified.") + if len(self.modified) > 0: + print("Files from the version but with modifications:") + filepaths_display(self.modified) + if len(self.moved_or_renamed) > 0: + print("Files moved or renamed:") + filepaths_display_moved(self.moved_or_renamed) + + if ( + len(self.different) == 0 + and len(self.different_modified) == 0 + and len(self.different_moved_or_renamed) == 0 + ): + print("No files are from a different version.") + if len(self.different) > 0: + print("Files from a different version:") + filepaths_display(self.different) + if len(self.different_modified) > 0: + print("Files from a different version, with modifications:") + filepaths_display(self.different_modified) + if len(self.different_moved_or_renamed) > 0: + print("Files moved or renamed from a different version:") + filepaths_display_moved(self.different_moved_or_renamed) + + if len(self.not_from_any) > 0: + print("Files not from any version (with both custom content and path):") + else: + print("No files are not from any version.") + filepaths_display(self.not_from_any) + + def to_json_dict(self): + self.sort() + + json_dict = OrderedDict() + + json_dict["reference_version"] = self.reference_version + + json_dict["files"] = {} + + json_dict["files"]["missing"] = self.missing + json_dict["files"]["modified"] = self.modified + json_dict["files"]["moved_or_renamed"] = self.moved_or_renamed + json_dict["files"]["different_version"] = self.different + json_dict["files"]["different_version_modified"] = self.different_modified + json_dict["files"][ + "different_version_moved_or_renamed" + ] = self.different_moved_or_renamed + json_dict["files"]["not_from_any_version"] = self.not_from_any + + return json_dict + + +DEFAULT_IGNORED_PATH_COMPONENTS = [".git/", ".gitignore", ".gitattributes", ".github/"] + + +def analyze_policyset( + path, + is_parentpath=False, + reference_version=None, + masterfiles_dir="masterfiles", + ignored_path_components=None, + offline=False, +): + """`path` should be either a masterfiles-path (containing masterfiles files directly), or a parent-path (containing `masterfiles_dir` and "modules" folders). `is_parentpath` should specify which of the two it is. + + The analysis ignores policyset (not MPF release information) files whose filepaths contain any of the path components specified in `ignored_path_components`. Components in `ignored_path_components` should end with a `/` if the component represents a directory (also on operating systems using a different separator e.g. a backslash), and should not end with a `/` if it represents a file. + """ + if ignored_path_components is None: + ignored_path_components = DEFAULT_IGNORED_PATH_COMPONENTS + + checksums_dict, files_dict = checksums_files( + path, ignored_path_components=ignored_path_components + ) + checksums_dict = checksums_dict["checksums"] + files_dict = files_dict["files"] + + # MPF filepath data contains "masterfiles/" (which might not be the same as `masterfiles_dir + "/"`) and "modules/" at the beginning of the filepaths + # therefore, care is needed comparing policyset filepaths to MPF filepaths + # before such comparing, convert the policyset filepaths to an MPF-comparable form using `mpf_normalized_path` + mpf_versions_dict, mpf_checksums_dict, mpf_files_dict = mpf_vcf_dicts(offline) + + # as mentioned above, normalize the analyzed policyset filepaths to be of the same form as filepaths in MPF dicts so that the two can be compared + for checksum in checksums_dict: + checksums_dict[checksum] = { + mpf_normalized_path(file, is_parentpath, masterfiles_dir) + for file in checksums_dict[checksum] + } + files_dict = { + mpf_normalized_path(file, is_parentpath, masterfiles_dir): checksums + for file, checksums in files_dict.items() + } + + versions_data = VersionsData() + + # first, count versions in order to find the reference version: + for checksum, files_of_checksum in checksums_dict.items(): + filepaths_highest_versions = {} + dfv_fhv = {} # acronym for different_filepath_vc_filepaths_highest_versions + + if checksum in mpf_checksums_dict: + # 1A. checksum known: + checksum_mpf_files_dict = mpf_checksums_dict[checksum] + + for filepath in files_of_checksum: + if filepath in checksum_mpf_files_dict: + # 1A1. a match of both checksum and filepath: + for version in checksum_mpf_files_dict[filepath]: + versions_data.version_counter.increment(version) + + filepaths_highest_versions[filepath] = highest_version( + checksum_mpf_files_dict[filepath] + ) + else: + # 1A2. there are files with the same checksum in MPF but not the same filepath: + if filepath in mpf_files_dict: + # 1A2A. filepath exists somewhere else but not for this checksum: + filepath_versions = [] + for mpf_checksum in mpf_files_dict[filepath]: + filepath_versions += mpf_files_dict[filepath][mpf_checksum] + for version in filepath_versions: + versions_data.different_filepath_vc.increment(version) + dfv_fhv[filepath] = highest_version(filepath_versions) + else: + # 1A2B. checksum exists but filepath is not known: + # there are no versions to count since the filepath is not known + pass + + for filepath in files_of_checksum: + if filepath in filepaths_highest_versions: + versions_data.highest_version_counter.increment( + filepaths_highest_versions[filepath] + ) + if filepath in dfv_fhv: + versions_data.different_filepath_hvc.increment(dfv_fhv[filepath]) + + if reference_version is None: + reference_version = versions_data.version_counter.most_common_version() + + # if not a single file in the analyzed policyset has an MPF-known checksum, and a specific `reference_version` was not given, `reference_version` will still be `None` + if reference_version is None: + reference_version_files = [] + reference_version_checksums = {} + else: + reference_version_files = mpf_versions_dict[reference_version].keys() + reference_version_checksums = {} + for mpf_filepath in mpf_versions_dict[reference_version]: + mpf_checksum = mpf_versions_dict[reference_version][mpf_filepath] + if mpf_checksum not in reference_version_checksums: + reference_version_checksums[mpf_checksum] = [] + reference_version_checksums[mpf_checksum].append(mpf_filepath) + + analyzed_files = AnalyzedFiles(reference_version) + + # categorize all files, based on their relation with the reference version and known MPF files: + # 1. files present: + for checksum, files_of_checksum in checksums_dict.items(): + if checksum in mpf_checksums_dict: + # 1A. checksum known: + checksum_mpf_files_dict = mpf_checksums_dict[checksum] + + for filepath in files_of_checksum: + if filepath in checksum_mpf_files_dict: + # 1A1. (checksum, filepath) known: + # check whether the (checksum, filepath) is in the reference version + if ( + filepath not in reference_version_files + ) or checksum != mpf_versions_dict[reference_version][filepath]: + # 1A1A. the file is modified to the same filepath of a different version: + other_versions = mpf_checksums_dict[checksum][filepath] + # since MPF data is sorted, so is `other_versions` + analyzed_files.different.append((filepath, other_versions)) + else: + # 1A2. checksum is known but there's no matching filepath with that checksum: + # therefore, it must be a rename/move + origin = mpf_checksums_dict[checksum] + if checksum in reference_version_checksums: + origin_filepaths = origin.keys() + analyzed_files.moved_or_renamed.append( + (filepath, origin_filepaths) + ) + else: + analyzed_files.different_moved_or_renamed.append( + (filepath, origin) + ) + else: + # 1B. checksum unknown: + for filepath in files_of_checksum: + if filepath in mpf_files_dict: + # 1B1. filepath is known: + if filepath in reference_version_files: + analyzed_files.modified.append(filepath) + else: + other_versions = [] + for checksum in mpf_files_dict[filepath]: + versions_list = mpf_files_dict[filepath][checksum] + other_versions.extend(versions_list) + sort_versions(other_versions) + analyzed_files.different_modified.append( + (filepath, other_versions) + ) + else: + analyzed_files.not_from_any.append(filepath) + # 2. files missing from the reference version: + for filepath in reference_version_files: + if filepath not in files_dict: + # the file is missing, but only if it's not present in any origin in moved_or_renamed + is_present = False + for _, origin_filepaths in analyzed_files.moved_or_renamed: + if filepath in origin_filepaths: + is_present = True + break + if not is_present: + analyzed_files.missing.append(filepath) + + # denormalize filepaths in all the analyzed files lists for display + analyzed_files.denormalize(is_parentpath, masterfiles_dir) + + return analyzed_files, versions_data diff --git a/cfbs/args.py b/cfbs/args.py index 4852a471..69edb206 100644 --- a/cfbs/args.py +++ b/cfbs/args.py @@ -115,6 +115,31 @@ def get_arg_parser(): help="Specify minimum version in 'cfbs generate-release-information'", dest="minimum_version", ) + parser.add_argument( + "--to-json", + help="Output 'cfbs analyze' results to a JSON file; optionally specify the JSON's filename", + nargs="?", + const="analysis", + default=None, + ) + parser.add_argument( + "--reference-version", + help="Specify version to compare against for 'cfbs analyze'", + ) + parser.add_argument( + "--masterfiles-dir", + help="If the path given to 'cfbs analyze' contains a masterfiles subdirectory, specify the subdirectory's name", + ) + parser.add_argument( + "--ignored-path-components", + help="Specify path components which should be ignored during 'cfbs analyze' (the components should be passed separately, delimited by spaces)", + nargs="*", + ) + parser.add_argument( + "--offline", + help="Do not connect to the Internet to download the latest version of MPF release information during 'cfbs analyze'", + action="store_true", + ) parser.add_argument( "--masterfiles", help="Add masterfiles on cfbs init choose between" ) diff --git a/cfbs/cfbs.1 b/cfbs/cfbs.1 index c3f8efe7..061e7a02 100644 --- a/cfbs/cfbs.1 +++ b/cfbs/cfbs.1 @@ -1,16 +1,16 @@ -.TH CFBS "1" "2025\-01\-09" "cfbs" "CFEngine Build System manual" +.TH CFBS "1" "2025\-04\-03" "cfbs" "CFEngine Build System manual" .SH NAME cfbs \- combines multiple modules into 1 policy set to deploy on your infrastructure. Modules can be custom promise types, JSON files which enable certain functionality, or reusable CFEngine policy. The modules you use can be written by the CFEngine team, others in the community, your colleagues, or yourself. .SH SYNOPSIS .B cfbs -[-h] [--loglevel LOGLEVEL] [-M] [--version] [--force] [--non-interactive] [--index INDEX] [--check] [--checksum CHECKSUM] [--keep-order] [--git {yes,no}] [--git-user-name GIT_USER_NAME] [--git-user-email GIT_USER_EMAIL] [--git-commit-message GIT_COMMIT_MESSAGE] [--ignore-versions-json] [--omit-download] [--check-against-git] [--from MINIMUM_VERSION] [--masterfiles MASTERFILES] [cmd] [args ...] +[-h] [--loglevel LOGLEVEL] [-M] [--version] [--force] [--non-interactive] [--index INDEX] [--check] [--checksum CHECKSUM] [--keep-order] [--git {yes,no}] [--git-user-name GIT_USER_NAME] [--git-user-email GIT_USER_EMAIL] [--git-commit-message GIT_COMMIT_MESSAGE] [--ignore-versions-json] [--omit-download] [--check-against-git] [--from MINIMUM_VERSION] [--to-json [TO_JSON]] [--reference-version REFERENCE_VERSION] [--masterfiles-dir MASTERFILES_DIR] [--ignored-path-components [IGNORED_PATH_COMPONENTS ...]] [--offline] [--masterfiles MASTERFILES] [cmd] [args ...] .SH DESCRIPTION CFEngine Build System. .TP \fBcmd\fR -The command to perform (pretty, init, status, search, add, remove, clean, update, validate, download, build, install, help, info, show, input, set\-input, get\-input, generate\- -release\-information) +The command to perform (pretty, init, status, search, add, remove, clean, update, validate, download, build, install, help, info, show, +analyse, analyze, input, set\-input, get\-input, generate\-release\-information) .TP \fBargs\fR @@ -85,6 +85,26 @@ Check whether masterfiles from cfengine.com and github.com match in 'cfbs genera \fB\-\-from\fR \fI\,MINIMUM_VERSION\/\fR Specify minimum version in 'cfbs generate\-release\-information' +.TP +\fB\-\-to\-json\fR \fI\,[TO_JSON]\/\fR +Output 'cfbs analyze' results to a JSON file; optionally specify the JSON's filename + +.TP +\fB\-\-reference\-version\fR \fI\,REFERENCE_VERSION\/\fR +Specify version to compare against for 'cfbs analyze' + +.TP +\fB\-\-masterfiles\-dir\fR \fI\,MASTERFILES_DIR\/\fR +If the path given to 'cfbs analyze' contains a masterfiles subdirectory, specify the subdirectory's name + +.TP +\fB\-\-ignored\-path\-components\fR \fI\,[IGNORED_PATH_COMPONENTS ...]\/\fR +Specify path components which should be ignored during 'cfbs analyze' (the components should be passed separately, delimited by spaces) + +.TP +\fB\-\-offline\fR +Do not connect to the Internet to download the latest version of MPF release information during 'cfbs analyze' + .TP \fB\-\-masterfiles\fR \fI\,MASTERFILES\/\fR Add masterfiles on cfbs init choose between diff --git a/cfbs/commands.py b/cfbs/commands.py index 6a50e906..6e313fc2 100644 --- a/cfbs/commands.py +++ b/cfbs/commands.py @@ -8,9 +8,9 @@ import copy import logging as log import json -import sys import functools from collections import OrderedDict +from cfbs.analyze import analyze_policyset from cfbs.args import get_args from cfbs.utils import ( @@ -87,7 +87,6 @@ def __init__(self, message): # Does not modify/wrap the function it decorates. def cfbs_command(name): def inner(function): - global _commands _commands[name] = function return function # Unmodified, we've just added it to the dict @@ -95,7 +94,6 @@ def inner(function): def get_command_names(): - global _commands names = _commands.keys() return names @@ -1057,6 +1055,75 @@ def info_command(modules): return 0 +@cfbs_command("analyze") +@cfbs_command("analyse") +def analyze_command( + policyset_paths, + json_filename=None, + reference_version=None, + masterfiles_dir=None, + user_ignored_path_components=None, + offline=False, + verbose=False, +): + if len(policyset_paths) == 0: + # no policyset path is a shorthand for using the current directory as the policyset path + log.info( + "No path was provided. Using the current directory as the policy set path." + ) + path = "." + else: + # currently, only support analyzing only one path + path = policyset_paths[0] + + if len(policyset_paths) > 1: + log.warning( + "More than one path to analyze provided. Analyzing the first one and ignoring the others." + ) + + if masterfiles_dir is None: + masterfiles_dir = "masterfiles" + # override masterfiles directory name (e.g. "inputs") + # strip trailing path separators + masterfiles_dir = masterfiles_dir.rstrip(os.sep) + # we assume the modules directory is always called "modules" + # thus `masterfiles_dir` can't be set to "modules" + if masterfiles_dir == "modules": + log.warning( + 'The masterfiles directory cannot be named "modules". Using the name "masterfiles" instead.' + ) + masterfiles_dir = "masterfiles" + + # the policyset path can either contain only masterfiles (masterfiles-path), or contain folders containing modules and masterfiles (parent-path) + # try to automatically determine which one it is (by checking whether `path` contains `masterfiles_dir`) + is_parentpath = os.path.isdir(os.path.join(path, masterfiles_dir)) + + print("Policy set path:", path, "\n") + + analyzed_files, versions_data = analyze_policyset( + path, + is_parentpath, + reference_version, + masterfiles_dir, + user_ignored_path_components, + offline, + ) + + versions_data.display(verbose) + analyzed_files.display() + + if json_filename is not None: + json_dict = OrderedDict() + + json_dict["policy_set_path"] = path + json_dict["versions_data"] = versions_data.to_json_dict() + json_dict["analyzed_files"] = analyzed_files.to_json_dict() + + write_json(json_filename + ".json", json_dict) + + return 0 + + @cfbs_command("input") @commit_after_command("Added input for module%s", [PLURAL_S]) def input_command(args, input_from="cfbs input"): diff --git a/cfbs/internal_file_management.py b/cfbs/internal_file_management.py index e254f2ea..5172b526 100644 --- a/cfbs/internal_file_management.py +++ b/cfbs/internal_file_management.py @@ -192,7 +192,9 @@ def clone_url_repo(repo_url): ) -def fetch_archive(url, checksum=None, directory=None, with_index=True): +def fetch_archive( + url, checksum=None, directory=None, with_index=True, extract_to_directory=False +): assert url.endswith(SUPPORTED_ARCHIVES) url_path = url[url.index("://") + 3 :] @@ -210,7 +212,8 @@ def fetch_archive(url, checksum=None, directory=None, with_index=True): downloads = os.path.join(cfbs_dir(), "downloads") archive_dir = os.path.join(downloads, archive_dirname) - mkdir(archive_dir) + if not extract_to_directory or not os.path.exists(archive_dir): + mkdir(archive_dir) archive_path = os.path.join(downloads, archive_dir, archive_filename) try: @@ -219,12 +222,15 @@ def fetch_archive(url, checksum=None, directory=None, with_index=True): user_error(str(e)) content_dir = os.path.join(downloads, archive_dir, archive_checksum) + if extract_to_directory: + content_dir = directory index_path = os.path.join(content_dir, "cfbs.json") if with_index and os.path.exists(index_path): # available already return (index_path, archive_checksum) else: - mkdir(content_dir) + if not extract_to_directory or not os.path.exists(content_dir): + mkdir(content_dir) # TODO: use Python modules instead of CLI tools? if archive_type.startswith(_SUPPORTED_TAR_TYPES): @@ -267,7 +273,7 @@ def fetch_archive(url, checksum=None, directory=None, with_index=True): "Archive '%s' doesn't contain a valid cfbs.json index file" % url ) else: - if directory is not None: + if not extract_to_directory and directory is not None: directory = directory.rstrip("/") mkdir(os.path.dirname(directory)) sh("rsync -a %s/ %s/" % (content_dir, directory)) diff --git a/cfbs/main.py b/cfbs/main.py index 2c207195..fcd99c06 100644 --- a/cfbs/main.py +++ b/cfbs/main.py @@ -33,6 +33,10 @@ def init_logging(level): raise ValueError("Unknown log level: {}".format(level)) +def does_log_info(level): + return level == "info" or level == "debug" + + def main() -> int: args = get_args() init_logging(args.loglevel) @@ -76,6 +80,36 @@ def main() -> int: % args.command ) + if args.masterfiles_dir and args.command not in ("analyze", "analyse"): + user_error( + "The option --masterfiles-dir is only for 'cfbs analyze', not 'cfbs %s'" + % args.command + ) + + if args.reference_version and args.command not in ("analyze", "analyse"): + user_error( + "The option --reference-version is only for 'cfbs analyze', not 'cfbs %s'" + % args.command + ) + + if args.to_json and args.command not in ("analyze", "analyse"): + user_error( + "The option --to-json is only for 'cfbs analyze', not 'cfbs %s'" + % args.command + ) + + if args.ignored_path_components and args.command not in ("analyze", "analyse"): + user_error( + "The option --ignored-path-components is only for 'cfbs analyze', not 'cfbs %s'" + % args.command + ) + + if args.offline and args.command not in ("analyze", "analyse"): + user_error( + "The option --offline is only for 'cfbs analyze', not 'cfbs %s'" + % args.command + ) + if args.non_interactive and args.command not in ( "init", "add", @@ -109,6 +143,17 @@ def main() -> int: if args.command in ("info", "show"): return commands.info_command(args.args) + if args.command in ("analyze", "analyse"): + return commands.analyze_command( + args.args, + args.to_json, + args.reference_version, + args.masterfiles_dir, + args.ignored_path_components, + args.offline, + does_log_info(args.loglevel), + ) + if args.command == "generate-release-information": return commands.generate_release_information_command( omit_download=args.omit_download, diff --git a/cfbs/masterfiles/analyze.py b/cfbs/masterfiles/analyze.py index 79647946..71698806 100644 --- a/cfbs/masterfiles/analyze.py +++ b/cfbs/masterfiles/analyze.py @@ -129,3 +129,15 @@ def version_is_at_least(version, min_version): return min_version is None or ( version_as_comparable_list(version) >= version_as_comparable_list(min_version) ) + + +def sort_versions(versions: list, reverse: bool = True): + """Sorts a list of versions, in descending order by default.""" + versions.sort( + key=version_as_comparable_list, + reverse=reverse, + ) + + +def highest_version(versions): + return max(versions, key=version_as_comparable_list, default=None) diff --git a/cfbs/utils.py b/cfbs/utils.py index a143c0da..ca042e00 100644 --- a/cfbs/utils.py +++ b/cfbs/utils.py @@ -43,8 +43,8 @@ def sh(cmd: str, directory=None): _sh("%s" % cmd) -def mkdir(path: str): - os.makedirs(path, exist_ok=True) +def mkdir(path: str, exist_ok=True): + os.makedirs(path, exist_ok=exist_ok) def touch(path: str):