1515"""
1616from boto3_type_annotations .s3 .service_resource import Object
1717from charon .utils .files import read_sha1
18+ from charon .constants import PROD_INFO_SUFFIX
1819
1920from boto3 import session
2021from botocore .errorfactory import ClientError
@@ -134,8 +135,6 @@ def path_upload_handler(full_file_path: str, path: str, index: int, total: int)
134135 f_meta = {}
135136 if sha1 .strip () != "" :
136137 f_meta [CHECKSUM_META_KEY ] = sha1
137- if product :
138- f_meta [PRODUCT_META_KEY ] = product
139138 try :
140139 if not self .__dry_run :
141140 if len (f_meta ) > 0 :
@@ -149,6 +148,8 @@ def path_upload_handler(full_file_path: str, path: str, index: int, total: int)
149148 full_file_path ,
150149 ExtraArgs = {'ContentType' : content_type }
151150 )
151+ if product :
152+ self .__update_prod_info (path_key , bucket_name , [product ])
152153 logger .info ('Uploaded %s to bucket %s' , path , bucket_name )
153154 uploaded_files .append (path_key )
154155 except (ClientError , HTTPClientError ) as e :
@@ -170,25 +171,15 @@ def path_upload_handler(full_file_path: str, path: str, index: int, total: int)
170171 'one in S3. Product: %s' , path_key , product )
171172 return False
172173
173- prods = []
174- try :
175- prods = f_meta [PRODUCT_META_KEY ].split ("," )
176- except KeyError :
177- pass
178- if not self .__dry_run and product not in prods :
174+ (prods , no_error ) = self .__get_prod_info (path_key , bucket_name )
175+ if not self .__dry_run and no_error and product not in prods :
179176 logger .info (
180177 "File %s has new product, updating the product %s" ,
181178 full_file_path ,
182179 product ,
183180 )
184181 prods .append (product )
185- try :
186- self .__update_file_metadata (file_object , bucket_name ,
187- {PRODUCT_META_KEY : "," .join (prods )})
188- except (ClientError , HTTPClientError ) as e :
189- logger .error ("ERROR: file %s not uploaded to bucket"
190- " %s due to error: %s " , full_file_path ,
191- bucket_name , e )
182+ if not self .__update_prod_info (path_key , bucket_name , prods ):
192183 return False
193184 return True
194185
@@ -198,7 +189,7 @@ def path_upload_handler(full_file_path: str, path: str, index: int, total: int)
198189
199190 def upload_metadatas (
200191 self , meta_file_paths : List [str ], bucket_name : str ,
201- product : Optional [str ], root = "/" , key_prefix : str = None
192+ product : Optional [str ] = None , root = "/" , key_prefix : str = None
202193 ) -> Tuple [List [str ], List [str ]]:
203194 """ Upload a list of metadata files to s3 bucket. This function is very similar to
204195 upload_files, except:
@@ -237,14 +228,6 @@ def path_upload_handler(full_file_path: str, path: str, index: int, total: int):
237228 )
238229
239230 f_meta [CHECKSUM_META_KEY ] = sha1
240- prods = (
241- f_meta [PRODUCT_META_KEY ].split ("," )
242- if PRODUCT_META_KEY in f_meta
243- else []
244- )
245- if product and product not in prods :
246- prods .append (product )
247- f_meta [PRODUCT_META_KEY ] = "," .join (prods )
248231 try :
249232 if not self .__dry_run :
250233 if need_overwritten :
@@ -253,15 +236,16 @@ def path_upload_handler(full_file_path: str, path: str, index: int, total: int):
253236 Metadata = f_meta ,
254237 ContentType = content_type
255238 )
256-
257- else :
258- # Should we update the s3 object metadata for metadata files?
259- try :
260- self .__update_file_metadata (file_object , bucket_name , f_meta )
261- except (ClientError , HTTPClientError ) as e :
262- logger .error ("ERROR: metadata %s not updated to bucket"
263- " %s due to error: %s " , full_file_path ,
264- bucket_name , e )
239+ if product :
240+ # NOTE: This should not happen for most cases, as most of the metadata
241+ # file does not have product info. Just leave for requirement change in
242+ # future
243+ (prods , no_error ) = self .__get_prod_info (path_key , bucket_name )
244+ if not no_error :
245+ return False
246+ if no_error and product not in prods :
247+ prods .append (product )
248+ if not self .__update_prod_info (path_key , bucket_name , prods ):
265249 return False
266250 logger .info ('Updated metadata %s to bucket %s' , path , bucket_name )
267251 uploaded_files .append (path_key )
@@ -306,11 +290,9 @@ def path_delete_handler(full_file_path: str, path: str, index: int, total: int):
306290 # the product reference counts will be used (from object metadata).
307291 prods = []
308292 if product :
309- try :
310- prods = file_object .metadata [PRODUCT_META_KEY ].split ("," )
311- except KeyError :
312- pass
313-
293+ (prods , no_error ) = self .__get_prod_info (path_key , bucket_name )
294+ if not no_error :
295+ return False
314296 if product in prods :
315297 prods .remove (product )
316298
@@ -321,10 +303,10 @@ def path_delete_handler(full_file_path: str, path: str, index: int, total: int):
321303 " will remove %s from its metadata" ,
322304 path , product
323305 )
324- self .__update_file_metadata (
325- file_object ,
306+ self .__update_prod_info (
307+ path_key ,
326308 bucket_name ,
327- { PRODUCT_META_KEY : "," . join ( prods )} ,
309+ prods ,
328310 )
329311 logger .info (
330312 "Removed product %s from metadata of file %s" ,
@@ -341,6 +323,8 @@ def path_delete_handler(full_file_path: str, path: str, index: int, total: int):
341323 try :
342324 if not self .__dry_run :
343325 bucket .delete_objects (Delete = {"Objects" : [{"Key" : path_key }]})
326+ if not self .__update_prod_info (path_key , bucket_name , prods ):
327+ return False
344328 logger .info ("Deleted %s from bucket %s" , path , bucket_name )
345329 deleted_files .append (path )
346330 return True
@@ -439,23 +423,74 @@ def __file_exists(self, file_object: Object) -> bool:
439423 try :
440424 file_object .load ()
441425 return True
442- except ClientError as e :
443- if e .response ["Error" ]["Code" ] == "404" :
426+ except ( ClientError , HTTPClientError ) as e :
427+ if isinstance ( e , ClientError ) and e .response ["Error" ]["Code" ] == "404" :
444428 return False
445429 else :
446- raise e
447-
448- def __update_file_metadata (
449- self , file_object : s3 .Object , bucket_name : str , metadata : Dict
450- ):
451- if not self .__dry_run :
452- file_object .metadata .update (metadata )
453- file_object .copy_from (
454- CopySource = {"Bucket" : bucket_name , "Key" : file_object .key },
455- Metadata = file_object .metadata ,
456- ContentType = file_object .content_type ,
457- MetadataDirective = "REPLACE" ,
458- )
430+ logger .error ("Error: file existence check failed due "
431+ "to error: %s" , e )
432+
433+ # def __update_file_metadata(
434+ # self, file_object: s3.Object, bucket_name: str, metadata: Dict
435+ # ):
436+ # if not self.__dry_run:
437+ # file_object.metadata.update(metadata)
438+ # file_object.copy_from(
439+ # CopySource={"Bucket": bucket_name, "Key": file_object.key},
440+ # Metadata=file_object.metadata,
441+ # ContentType=file_object.content_type,
442+ # MetadataDirective="REPLACE",
443+ # )
444+
445+ def __get_prod_info (
446+ self , file : str , bucket_name : str
447+ ) -> Tuple [List [str ], bool ]:
448+ logger .debug ("Getting product infomation for file %s" , file )
449+ prod_info_file = file + PROD_INFO_SUFFIX
450+ try :
451+ info_file_content = self .read_file_content (bucket_name , prod_info_file )
452+ prods = [p .strip () for p in info_file_content .split ("\n " )]
453+ logger .debug ("Got product information as below %s" , prods )
454+ return (prods , True )
455+ except (ClientError , HTTPClientError ) as e :
456+ logger .error ("ERROR: Can not get product info for file %s "
457+ "due to error: %s" , file , e )
458+ return ([], False )
459+
460+ def __update_prod_info (
461+ self , file : str , bucket_name : str , prods : List [str ]
462+ ) -> bool :
463+ prod_info_file = file + PROD_INFO_SUFFIX
464+ bucket = self .__get_bucket (bucket_name )
465+ file_obj = bucket .Object (prod_info_file )
466+ content_type = "text/plain"
467+ if len (prods ) > 0 :
468+ logger .debug ("Updating product infomation for file %s "
469+ "with products: %s" , file , prods )
470+ try :
471+ file_obj .put (
472+ Body = "\n " .join (prods ).encode ("utf-8" ),
473+ ContentType = content_type
474+ )
475+ logger .debug ("Updated product infomation for file %s" , file )
476+ return True
477+ except (ClientError , HTTPClientError ) as e :
478+ logger .error ("ERROR: Can not update product info for file %s "
479+ "due to error: %s" , file , e )
480+ return False
481+ else :
482+ logger .debug ("Removing product infomation file for file %s "
483+ "because no products left" , file )
484+ try :
485+ if self .__file_exists (file_obj ):
486+ bucket .delete_objects (
487+ Delete = {"Objects" : [{"Key" : prod_info_file }]})
488+ logger .debug ("Removed product infomation file for file %s" , file )
489+ return True
490+ except (ClientError , HTTPClientError ) as e :
491+ logger .error ("ERROR: Can not delete product info file for file %s "
492+ "due to error: %s" , file , e )
493+ return False
459494
460495 def __do_path_cut_and (
461496 self , file_paths : List [str ],
0 commit comments