|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | + |
| 3 | +""" |
| 4 | +diff-files comparator generating the list of files for reproducibility test workflow |
| 5 | +""" |
| 6 | + |
| 7 | +import filecmp |
| 8 | +import importlib |
| 9 | +import importlib.resources |
| 10 | +import json |
| 11 | +import re |
| 12 | +import tarfile |
| 13 | +import tempfile |
| 14 | +from os import PathLike |
| 15 | +from pathlib import Path |
| 16 | +from typing import Optional |
| 17 | + |
| 18 | + |
| 19 | +class Comparator(object): |
| 20 | + """ |
| 21 | + This class takes either two .tar or two .oci files and identifies differences in the filesystems |
| 22 | +
|
| 23 | + :author: Garden Linux Maintainers |
| 24 | + :copyright: Copyright 2026 SAP SE |
| 25 | + :package: gardenlinux |
| 26 | + :subpackage: features |
| 27 | + :since: 1.0.0 |
| 28 | + :license: https://www.apache.org/licenses/LICENSE-2.0 |
| 29 | + Apache License, Version 2.0 |
| 30 | + """ |
| 31 | + |
| 32 | + _default_whitelist: list[str] = [] |
| 33 | + |
| 34 | + _nightly_whitelist = json.loads( |
| 35 | + importlib.resources.read_text(__name__, "nightly_whitelist.json") |
| 36 | + ) |
| 37 | + |
| 38 | + def __init__( |
| 39 | + self, nightly: bool = False, whitelist: list[str] = _default_whitelist |
| 40 | + ): |
| 41 | + """ |
| 42 | + Constructor __init__(Comparator) |
| 43 | +
|
| 44 | + :param nightly: Flag indicating if the nightlywhitelist should be used |
| 45 | + :param whitelst: Additional whitelist |
| 46 | +
|
| 47 | + :since: 1.0.0 |
| 48 | + """ |
| 49 | + self.whitelist = whitelist |
| 50 | + if nightly: |
| 51 | + self.whitelist += self._nightly_whitelist |
| 52 | + |
| 53 | + @staticmethod |
| 54 | + def _unpack(file: PathLike[str]) -> tempfile.TemporaryDirectory[str]: |
| 55 | + """ |
| 56 | + Unpack a .tar archive or .oci image into a temporary dictionary |
| 57 | +
|
| 58 | + :param file: .tar or .oci file |
| 59 | +
|
| 60 | + :return: TemporaryDirectory Temporary directory containing the unpacked file |
| 61 | + :since: 1.0.0 |
| 62 | + """ |
| 63 | + |
| 64 | + output_dir = tempfile.TemporaryDirectory() |
| 65 | + file = Path(file).resolve() |
| 66 | + if file.name.endswith(".oci"): |
| 67 | + with tempfile.TemporaryDirectory() as extracted: |
| 68 | + # Extract .oci file |
| 69 | + with tarfile.open(file, "r") as tar: |
| 70 | + tar.extractall( |
| 71 | + path=extracted, filter="fully_trusted", members=tar.getmembers() |
| 72 | + ) |
| 73 | + |
| 74 | + layers_dir = Path(extracted).joinpath("blobs/sha256") |
| 75 | + assert layers_dir.is_dir() |
| 76 | + |
| 77 | + with open(Path(extracted).joinpath("index.json"), "r") as f: |
| 78 | + index = json.load(f) |
| 79 | + |
| 80 | + # Only support first manifest |
| 81 | + manifest = index["manifests"][0]["digest"].split(":")[1] |
| 82 | + |
| 83 | + with open(layers_dir.joinpath(manifest), "r") as f: |
| 84 | + manifest = json.load(f) |
| 85 | + |
| 86 | + layers = [layer["digest"].split(":")[1] for layer in manifest["layers"]] |
| 87 | + |
| 88 | + # Extract layers in order |
| 89 | + for layer in layers: |
| 90 | + layer_path = layers_dir.joinpath(layer) |
| 91 | + if tarfile.is_tarfile(layer_path): |
| 92 | + with tarfile.open(layer_path, "r") as tar: |
| 93 | + for member in tar.getmembers(): |
| 94 | + try: |
| 95 | + tar.extract( |
| 96 | + member, |
| 97 | + path=output_dir.name, |
| 98 | + filter="fully_trusted", |
| 99 | + ) |
| 100 | + except tarfile.AbsoluteLinkError: |
| 101 | + # Convert absolute link to relative link |
| 102 | + member.linkpath = ( |
| 103 | + "../" * member.path.count("/") |
| 104 | + + member.linkpath[1:] |
| 105 | + ) |
| 106 | + tar.extract( |
| 107 | + member, |
| 108 | + path=output_dir.name, |
| 109 | + filter="fully_trusted", |
| 110 | + ) |
| 111 | + except tarfile.TarError as e: |
| 112 | + print(f"Skipping {member.name} due to error: {e}") |
| 113 | + else: |
| 114 | + with tarfile.open(file, "r") as tar: |
| 115 | + tar.extractall( |
| 116 | + path=output_dir.name, |
| 117 | + filter="fully_trusted", |
| 118 | + members=tar.getmembers(), |
| 119 | + ) |
| 120 | + |
| 121 | + return output_dir |
| 122 | + |
| 123 | + def _diff_files( |
| 124 | + self, cmp: filecmp.dircmp[str], left_root: Optional[Path] = None |
| 125 | + ) -> list[str]: |
| 126 | + """ |
| 127 | + Recursively compare files |
| 128 | +
|
| 129 | + :param cmp: Dircmp to recursively compare |
| 130 | + :param left_root: Left root to obtain the archive relative path |
| 131 | +
|
| 132 | + :return: list[Path] List of paths with different content |
| 133 | + :since: 1.0.0 |
| 134 | + """ |
| 135 | + |
| 136 | + result = [] |
| 137 | + if not left_root: |
| 138 | + left_root = Path(cmp.left) |
| 139 | + for name in cmp.diff_files: |
| 140 | + result.append(f"/{Path(cmp.left).relative_to(left_root).joinpath(name)}") |
| 141 | + for sub_cmp in cmp.subdirs.values(): |
| 142 | + result += self._diff_files(sub_cmp, left_root=left_root) |
| 143 | + return result |
| 144 | + |
| 145 | + def generate(self, a: PathLike[str], b: PathLike[str]) -> tuple[list[str], bool]: |
| 146 | + """ |
| 147 | + Compare two .tar/.oci images with each other |
| 148 | +
|
| 149 | + :param a: First .tar/.oci file |
| 150 | + :param b: Second .tar/.oci file |
| 151 | +
|
| 152 | + :return: list[Path], bool Filtered list of paths with different content and flag indicating if whitelist was applied |
| 153 | + :since: 1.0.0 |
| 154 | + """ |
| 155 | + |
| 156 | + if filecmp.cmp(a, b, shallow=False): |
| 157 | + return [], False |
| 158 | + |
| 159 | + with self._unpack(a) as unpacked_a, self._unpack(b) as unpacked_b: |
| 160 | + cmp = filecmp.dircmp(unpacked_a, unpacked_b, shallow=False) |
| 161 | + |
| 162 | + diff_files = self._diff_files(cmp) |
| 163 | + |
| 164 | + filtered = [ |
| 165 | + file |
| 166 | + for file in diff_files |
| 167 | + if not any(re.match(pattern, file) for pattern in self.whitelist) |
| 168 | + ] |
| 169 | + whitelist = len(diff_files) != len(filtered) |
| 170 | + |
| 171 | + return filtered, whitelist |
0 commit comments