2121import gnupg
2222from typing import Awaitable , Callable , List , Tuple
2323from charon .storage import S3Client
24+ from getpass import getpass
2425
2526logger = logging .getLogger (__name__ )
2627
@@ -34,6 +35,7 @@ def generate_sign(
3435 bucket : str ,
3536 key_id : str = None ,
3637 key_file : str = None ,
38+ sign_method : str = None ,
3739 passphrase : str = None
3840) -> Tuple [List [str ], List [str ]]:
3941 """ This Python function generates a digital signature for a list of metadata files using
@@ -50,11 +52,15 @@ def generate_sign(
5052 and another with the failed to generate files due to exceptions.
5153 """
5254
55+ gpg = None
5356 if key_file is not None :
5457 gnupg_home_path = os .path .join (os .getenv ("HOME" ), ".charon" , ".gnupg" )
5558 gpg = gnupg .GPG (gnupghome = gnupg_home_path )
5659 gpg .import_keys_file (key_file )
5760
61+ if sign_method == 'gpg' and passphrase is None :
62+ passphrase = getpass ('Passphrase for your gpg key:' )
63+
5864 async def sign_file (
5965 filename : str , failed_paths : List [str ], generated_signs : List [str ]
6066 ):
@@ -87,45 +93,20 @@ async def sign_file(
8793 logger .debug (".asc file %s existed, skipping" , remote )
8894 return
8995
90- command = [
91- 'gpg' ,
92- '--batch' ,
93- '--armor' ,
94- '-u' , key_id ,
95- '--passphrase' , passphrase ,
96- '--sign' , artifact
97- ]
98-
99- if key_file is None :
100- # use GPG command line tool to sign artifact if key_id is passed
101- try :
102- # result = await __run_cmd_async(command)
103- result = subprocess .run (command , capture_output = True , text = True , check = True )
104- except subprocess .CalledProcessError as e :
105- logger .error (
106- "Error: signature generation failed due to error: %s" , e
107- )
108- failed_paths .append (local )
109- return
110- if result .returncode == 0 :
111- generated_signs .append (local )
112- else :
113- logger .info (
114- "signature failed with exit code %s, message %s" ,
115- result .returncode , result .stderr .decode ()
116- )
96+ if sign_method == "rpm-sign" :
97+ result = detach_rpm_sign_files (key_id , artifact )
98+ elif sign_method == "gpg" :
99+ result = gpg_sign_files (
100+ gpg = gpg ,
101+ artifact = artifact ,
102+ key_id = key_id , key_file = key_file ,
103+ passphrase = passphrase
104+ )
105+
106+ if result == 0 :
107+ generated_signs .append (local )
117108 else :
118- try :
119- with open (artifact , "rb" ) as f :
120- gpg .sign_file (f , passphrase = passphrase , output = local , detach = True )
121- generated_signs .append (local )
122- except ValueError as e :
123- logger .error (
124- "Error: signature generation failed due to error: %s" , e
125- )
126- failed_paths .append (local )
127-
128- return
109+ failed_paths .append (local )
129110
130111 return __do_path_cut_and (
131112 file_paths = artifact_path ,
@@ -134,6 +115,82 @@ async def sign_file(
134115 )
135116
136117
118+ def detach_rpm_sign_files (key : str , artifact : str ) -> int :
119+ # Usage: detach_sign_files KEYNAME FILE ...
120+
121+ # let's make sure we can actually sign with the given key
122+ command = [
123+ 'rpm-sign' ,
124+ '--list-keys' ,
125+ '|' ,
126+ 'grep' ,
127+ key
128+ ]
129+ result = subprocess .run (command , capture_output = True , text = True , check = True )
130+ if result .returncode != 0 :
131+ logger .error ("Key %s is not in list of allowed keys" , key )
132+ return result .returncode
133+
134+ # okay, now let's actually sign this thing
135+ command = [
136+ 'rpm-sign' ,
137+ '--detachsign' ,
138+ '--key' ,
139+ key ,
140+ artifact
141+ ]
142+ try :
143+ # result = await __run_cmd_async(command)
144+ result = subprocess .run (command , capture_output = True , text = True , check = True )
145+ except subprocess .CalledProcessError as e :
146+ logger .error (
147+ "Error: signature generation failed due to error: %s" , e
148+ )
149+ return result .returncode
150+
151+ return 1
152+
153+
154+ def gpg_sign_files (
155+ artifact : str ,
156+ gpg = None ,
157+ key_id : str = None ,
158+ key_file : str = None ,
159+ passphrase : str = None
160+ ) -> int :
161+ command = [
162+ 'gpg' ,
163+ '--batch' ,
164+ '--armor' ,
165+ '-u' , key_id ,
166+ '--passphrase' , passphrase ,
167+ '--sign' , artifact
168+ ]
169+
170+ if key_file is None :
171+ # use GPG command line tool to sign artifact if key_id is passed
172+ try :
173+ # result = await __run_cmd_async(command)
174+ result = subprocess .run (command , capture_output = True , text = True , check = True )
175+ except subprocess .CalledProcessError as e :
176+ logger .error (
177+ "Error: signature generation failed due to error: %s" , e
178+ )
179+ return result .returncode
180+ else :
181+ try :
182+ with open (artifact , "rb" ) as f :
183+ local = artifact + '.asc'
184+ gpg .sign_file (f , passphrase = passphrase , output = local , detach = True )
185+ return 0
186+ except ValueError as e :
187+ logger .error (
188+ "Error: signature generation failed due to error: %s" , e
189+ )
190+ return 1
191+ return 1
192+
193+
137194def __do_path_cut_and (
138195 file_paths : List [str ],
139196 path_handler : Callable [[str , List [str ], List [str ], asyncio .Semaphore ], Awaitable [bool ]],
0 commit comments