Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charon/pkgs/maven.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ def handle_maven_uploading(
if conf.is_radas_enabled() and sign_result_file and os.path.isfile(sign_result_file):
logger.info("Start generating radas signature files for s3 bucket %s\n", bucket_name)
(_failed_metas, _generated_signs) = radas_signature.generate_radas_sign(
top_level=top_level, sign_result_file=sign_result_file
top_level=top_level, root=root, sign_result_file=sign_result_file
)
if not _generated_signs:
logger.error(
Expand Down
29 changes: 24 additions & 5 deletions charon/pkgs/radas_sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ def _handle_failed_delivery(self, reason: str):
self.close()


def generate_radas_sign(top_level: str, sign_result_file: str) -> Tuple[List[str], List[str]]:
def generate_radas_sign(
top_level: str, root: str, sign_result_file: str
) -> Tuple[List[str], List[str]]:
"""
Generate .asc files based on RADAS sign result json file
"""
Expand All @@ -321,11 +323,28 @@ async def generate_single_sign_file(
if not file_path or not signature:
logger.error("Invalid JSON entry")
return
# remove the root path maven-repository
filename = file_path.split("/", 1)[1]

artifact_path = os.path.join(top_level, filename)
asc_filename = f"{filename}.asc"
if "/" not in file_path:
logger.warning("Invalid entry: %s, skip signature file generation.", file_path)
return

if root not in file_path:
logger.debug(
"Root '%s' not found in file_path '%s', handling directly.", root, file_path
)
artifact_path = os.path.join(top_level, file_path)
asc_filename = f"{file_path}.asc"
else:
logger.debug(
"Root '%s' found in file_path '%s', removing it as prefix.", root, file_path
)
stripped_file_path = file_path
parts = file_path.split(root, 1)
if len(parts) > 1:
stripped_file_path = parts[1].lstrip("/")
artifact_path = os.path.join(top_level, stripped_file_path)
asc_filename = f"{stripped_file_path}.asc"

signature_path = os.path.join(top_level, asc_filename)

if not os.path.isfile(artifact_path):
Expand Down
67 changes: 54 additions & 13 deletions tests/test_radas_sign_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,19 @@ def tearDown(self) -> None:

def test_multi_sign_files_generation(self):
self.__prepare_artifacts()
failed, generated = generate_radas_sign(self.__repo_dir, self.__sign_result_file)
failed, generated = generate_radas_sign(
self.__repo_dir, self.__root, self.__sign_result_file
)
self.assertEqual(failed, [])
expected_asc1 = os.path.join(self.__repo_dir, "foo/bar/1.0/foo-bar-1.0.jar.asc")
expected_asc2 = os.path.join(self.__repo_dir, "foo/bar/2.0/foo-bar-2.0.jar.asc")
self.assertEqual(len(generated), 2)
expected_asc3 = os.path.join(self.__repo_dir, "foo/bar/3.0/foo-bar-3.0.jar.asc")
expected_asc4 = os.path.join(self.__repo_dir, "foo/bar/4.0/foo-bar-4.0.jar.asc")
self.assertEqual(len(generated), 4)
self.assertIn(expected_asc1, generated)
self.assertIn(expected_asc2, generated)
self.assertIn(expected_asc3, generated)
self.assertIn(expected_asc4, generated)

with open(expected_asc1) as f:
content1 = f.read()
Expand All @@ -54,7 +60,9 @@ def test_multi_sign_files_generation(self):
self.assertIn("signature2@hash", content2)

def test_sign_files_generation_with_missing_artifacts(self):
failed, generated = generate_radas_sign(self.__repo_dir, self.__sign_result_file)
failed, generated = generate_radas_sign(
self.__repo_dir, self.__root, self.__sign_result_file
)
self.assertEqual(failed, [])
expected_asc1 = os.path.join(self.__repo_dir, "foo/bar/1.0/foo-bar-1.0.jar.asc")
expected_asc2 = os.path.join(self.__repo_dir, "foo/bar/2.0/foo-bar-2.0.jar.asc")
Expand All @@ -70,12 +78,16 @@ def test_sign_files_generation_with_failure(self):
# simulate expected_asc1 can not be written properly
real_overwrite = overwrite_file
with mock.patch("charon.pkgs.radas_sign.files.overwrite_file") as mock_overwrite:

def side_effect(path, content):
if path == expected_asc1:
raise IOError("mock write error")
return real_overwrite(path, content)

mock_overwrite.side_effect = side_effect
failed, generated = generate_radas_sign(self.__repo_dir, self.__sign_result_file)
failed, generated = generate_radas_sign(
self.__repo_dir, self.__root, self.__sign_result_file
)

self.assertEqual(len(failed), 1)
self.assertNotIn(expected_asc1, generated)
Expand All @@ -86,7 +98,9 @@ def test_sign_files_generation_with_missing_result(self):
# simulate missing pull result by removing the sign result file loc
shutil.rmtree(self.__sign_result_loc)

failed, generated = generate_radas_sign(self.__repo_dir, self.__sign_result_file)
failed, generated = generate_radas_sign(
self.__repo_dir, self.__root, self.__sign_result_file
)
self.assertEqual(failed, [])
expected_asc1 = os.path.join(self.__repo_dir, "foo/bar/1.0/foo-bar-1.0.jar.asc")
expected_asc2 = os.path.join(self.__repo_dir, "foo/bar/2.0/foo-bar-2.0.jar.asc")
Expand All @@ -97,6 +111,7 @@ def test_sign_files_generation_with_missing_result(self):
def __prepare_sign_result_file(self):
self.__sign_result_loc = tempfile.mkdtemp()
self.__sign_result_file = os.path.join(self.__sign_result_loc, "result.json")
self.__root = "maven-repository"
self.__repo_dir = os.path.join(tempfile.mkdtemp(), "maven-repository")
data = {
"request-id": "request-id",
Expand All @@ -120,20 +135,46 @@ def __prepare_sign_result_file(self):
),
"checksum": "sha256:sha256-content",
},
{
"file": "README.md",
"signature": (
"-----BEGIN PGP SIGNATURE-----"
"signature2@hash"
"-----END PGP SIGNATURE-----"
),
"checksum": "sha256:sha256-content",
},
{
"file": "radas-tmp/maven-repository/foo/bar/3.0/foo-bar-3.0.jar",
"signature": (
"-----BEGIN PGP SIGNATURE-----"
"signature2@hash"
"-----END PGP SIGNATURE-----"
),
"checksum": "sha256:sha256-content",
},
{
"file": "foo/bar/4.0/foo-bar-4.0.jar",
"signature": (
"-----BEGIN PGP SIGNATURE-----"
"signature2@hash"
"-----END PGP SIGNATURE-----"
),
"checksum": "sha256:sha256-content",
},
],
}
json_str = json.dumps(data, indent=2)
overwrite_file(self.__sign_result_file, json_str)

def __prepare_artifacts(self):
os.makedirs(os.path.join(self.__repo_dir, "foo/bar/1.0"), exist_ok=True)
os.makedirs(os.path.join(self.__repo_dir, "foo/bar/2.0"), exist_ok=True)
artifact1 = os.path.join(self.__repo_dir, "foo/bar/1.0/foo-bar-1.0.jar")
artifact2 = os.path.join(self.__repo_dir, "foo/bar/2.0/foo-bar-2.0.jar")
with open(artifact1, "w") as f:
f.write("dummy1")
with open(artifact2, "w") as f:
f.write("dummy2")
for version in ["1.0", "2.0", "3.0", "4.0"]:
dir_path = os.path.join(self.__repo_dir, f"foo/bar/{version}")
os.makedirs(dir_path, exist_ok=True)

artifact_path = os.path.join(dir_path, f"foo-bar-{version}.jar")
with open(artifact_path, "w") as f:
f.write("dummy")

def __clear_sign_result_file(self):
if os.path.exists(self.__sign_result_loc):
Expand Down