1+ """
2+ Copyright (C) 2022 Red Hat, Inc. (https://github.com/Commonjava/charon)
3+
4+ Licensed under the Apache License, Version 2.0 (the "License");
5+ you may not use this file except in compliance with the License.
6+ You may obtain a copy of the License at
7+
8+ http://www.apache.org/licenses/LICENSE-2.0
9+
10+ Unless required by applicable law or agreed to in writing, software
11+ distributed under the License is distributed on an "AS IS" BASIS,
12+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ See the License for the specific language governing permissions and
14+ limitations under the License.
15+ """
16+ import proton
17+ import proton .handlers
18+ import threading
19+ import logging
20+ import json
21+ import os
22+ import asyncio
23+ from typing import List , Any , Tuple , Callable , Dict
24+ from charon .config import get_config
25+ from charon .constants import DEFAULT_SIGN_RESULT_LOC
26+ from charon .constants import DEFAULT_RADAS_SIGN_TIMEOUT_COUNT
27+ from charon .constants import DEFAULT_RADAS_SIGN_WAIT_INTERVAL_SEC
28+ from charon .pkgs .oras_client import OrasClient
29+
30+ logger = logging .getLogger (__name__ )
31+
32+ class SignHandler :
33+ """
34+ Handle the sign result status management
35+ """
36+ _is_processing : bool = True
37+ _downloaded_files : List [str ] = []
38+
39+ @classmethod
40+ def is_processing (cls ) -> bool :
41+ return cls ._is_processing
42+
43+ @classmethod
44+ def get_downloaded_files (cls ) -> List [str ]:
45+ return cls ._downloaded_files .copy ()
46+
47+ @classmethod
48+ def set_processing (cls , value : bool ) -> None :
49+ cls ._is_processing = value
50+
51+ @classmethod
52+ def set_downloaded_files (cls , files : List [str ]) -> None :
53+ cls ._downloaded_files = files
54+
55+ class UmbListener (proton .handlers .MessagingHandler ):
56+ """
57+ UmbListener class (AMQP version), register this when setup UmbClient
58+ """
59+
60+ def __init__ (self ) -> None :
61+ super ().__init__ ()
62+
63+ def on_start (self , event : proton .Event ) -> None :
64+ """
65+ On start callback
66+ """
67+ conf = get_config ()
68+ if not conf :
69+ sys .exit (1 )
70+ event .container .create_receiver (conf .get_amqp_queue ())
71+
72+ def on_message (self , event : proton .Event ) -> None :
73+ """
74+ On message callback
75+ """
76+ # handle response from radas in a thread
77+ thread = threading .Thread (
78+ target = self ._process_message ,
79+ args = [event .message .body ]
80+ )
81+ thread .start ()
82+
83+ def on_error (self , event : proton .Event ) -> None :
84+ """
85+ On error callback
86+ """
87+ logger .error ("Received an error event:\n %s" , event )
88+
89+ def on_disconnected (self , event : proton .Event ) -> None :
90+ """
91+ On disconnected callback
92+ """
93+ logger .error ("Disconnected from AMQP broker." )
94+
95+ def _process_message (msg : Any ) -> None :
96+ """
97+ Process a message received from UMB
98+ Args:
99+ msg: The message body received
100+ """
101+ try :
102+ msg_dict = json .loads (msg )
103+ result_reference_url = msg_dict .get ("result_reference" )
104+
105+ if not result_reference_url :
106+ logger .warning ("Not found result_reference in message,ignore." )
107+ return
108+
109+ conf = get_config ()
110+ if not conf :
111+ sign_result_loc = DEFAULT_SIGN_RESULT_LOC
112+ sign_result_loc = os .getenv ("SIGN_RESULT_LOC" ) or conf .get_sign_result_loc ()
113+ logger .info ("Using SIGN RESULT LOC: %s" , sign_result_loc )
114+
115+ sign_result_parent_dir = os .path .dirname (sign_result_loc )
116+ os .makedirs (sign_result_parent_dir , exist_ok = True )
117+
118+ oras_client = OrasClient ()
119+ files = oras_client .pull (
120+ result_reference_url = result_reference_url ,
121+ sign_result_loc = sign_result_loc
122+ )
123+ SignHandler .set_downloaded_files (files )
124+ finally :
125+ SignHandler .set_processing (False )
126+
127+ def generate_radas_sign (top_level : str ) -> Tuple [List [str ], List [str ]]:
128+ """
129+ Generate .asc files based on RADAS sign result json file
130+ """
131+ conf = get_config ()
132+ timeout_count = conf .get_radas_sign_timeout_count () if conf else DEFAULT_RADAS_SIGN_TIMEOUT_COUNT
133+ wait_interval_sec = conf .get_radas_sign_wait_interval_sec () if conf else DEFAULT_RADAS_SIGN_WAIT_INTERVAL_SEC
134+ wait_count = 0
135+ while SignHandler .is_processing ():
136+ wait_count += 1
137+ if wait_count > timeout_count :
138+ logger .warning ("Timeout when waiting for sign response." )
139+ break
140+ time .sleep (wait_interval_sec )
141+
142+ files = SignHandler .get_downloaded_files ()
143+ if not files :
144+ return [], []
145+
146+ # should only have the single sign result json file from the radas registry
147+ json_file_path = files [0 ]
148+ try :
149+ with open (json_file_path , 'r' ) as f :
150+ data = json .load (f )
151+ except Exception as e :
152+ logger .error (f"Failed to read or parse the JSON file: { e } " )
153+ raise
154+
155+ async def generate_single_sign_file (
156+ file_path : str , signature : str , failed_paths : List [str ], generated_signs : List [str ],
157+ sem : asyncio .BoundedSemaphore
158+ ):
159+ async with sem :
160+ if not file_path or not signature :
161+ logger .error (f"Invalid JSON entry" )
162+ return
163+ # remove the root path maven-repository
164+ filename = file_path .split ("/" , 1 )[1 ]
165+ signature = item .get ("signature" )
166+
167+ artifact_path = os .path .join (top_level , filename )
168+ asc_filename = f"{ filename } .asc"
169+ signature_path = os .path .join (top_level , asc_filename )
170+
171+ if not os .path .isfile (artifact_path ):
172+ logger .warning ("Artifact missing, skip signature file generation" )
173+ return
174+
175+ try :
176+ with open (signature_path , 'w' ) as asc_file :
177+ asc_file .write (signature )
178+ generated_signs .append (signature_path )
179+ logger .info (f"Generated .asc file: { signature_path } " )
180+ except Exception as e :
181+ failed_paths .append (signature_path )
182+ logger .error (f"Failed to write .asc file for { artifact_path } : { e } " )
183+
184+ result = data .get ("result" , [])
185+ return __do_path_cut_and (
186+ path_handler = generate_single_sign_file ,
187+ data = result
188+ )
189+
190+ def __do_path_cut_and (
191+ path_handler : Callable ,
192+ data : List [Dict [str , str ]]
193+ ) -> Tuple [List [str ], List [str ]]:
194+
195+ failed_paths : List [str ] = []
196+ generated_signs : List [str ] = []
197+ tasks = []
198+ sem = asyncio .BoundedSemaphore (10 )
199+ for item in data :
200+ file_path = item .get ("file" )
201+ signature = item .get ("signature" )
202+ tasks .append (
203+ asyncio .ensure_future (
204+ path_handler (file_path , signature , failed_paths , generated_signs , sem )
205+ )
206+ )
207+
208+ loop = asyncio .get_event_loop ()
209+ loop .run_until_complete (asyncio .gather (* tasks ))
210+ return (failed_paths , generated_signs )
0 commit comments