55import threading
66import time
77from abc import abstractmethod
8+ from collections .abc import Callable , Iterable
89from concurrent .futures import ThreadPoolExecutor
910from dataclasses import dataclass
1011from datetime import date , datetime , timedelta
1314from types import TracebackType
1415from typing import (
1516 Any ,
16- Callable ,
1717 ClassVar ,
18- Dict ,
1918 Generic ,
20- Iterable ,
21- List ,
2219 Optional ,
23- Set ,
24- Tuple ,
25- Type ,
2620 TypeVar ,
2721 Union ,
2822)
@@ -55,7 +49,7 @@ class FilterCriteria:
5549 field : str
5650 comparison_value : Any
5751 operator_ : Callable = operator .eq
58- operator_mapping : ClassVar [Dict [BinaryComparator , str ]] = {
52+ operator_mapping : ClassVar [dict [BinaryComparator , str ]] = {
5953 operator .eq : "=" ,
6054 operator .ne : "!=" ,
6155 operator .lt : "<" ,
@@ -102,12 +96,12 @@ class BaseAuditor(Generic[AuditReturnType]):
10296 """Base auditor object - defines structure for implementations to use
10397 in conjunction with AuditingManager"""
10498
105- def __init__ (self , name : str , record_type : Type [AuditRecord ]):
99+ def __init__ (self , name : str , record_type : type [AuditRecord ]):
106100 self ._name = name
107101 self ._record_type = record_type
108102
109103 @property
110- def schema (self ) -> Dict [str , type ]:
104+ def schema (self ) -> dict [str , type ]:
111105 """Determine python schema of auditor"""
112106 return {
113107 fld : str if get_origin (mdl .type_ ) == Literal else mdl .type_
@@ -135,27 +129,27 @@ def conv_to_records(self, recs: AuditReturnType) -> Iterable[AuditRecord]:
135129 raise NotImplementedError ()
136130
137131 @abstractmethod
138- def conv_to_entity (self , recs : List [AuditRecord ]) -> AuditReturnType :
132+ def conv_to_entity (self , recs : list [AuditRecord ]) -> AuditReturnType :
139133 """Convert the list of pydantic models to an entity for use in pipelines"""
140134 raise NotImplementedError ()
141135
142136 @abstractmethod
143- def add_records (self , records : Iterable [Dict [str , Any ]]):
137+ def add_records (self , records : Iterable [dict [str , Any ]]):
144138 """Add audit records to the Auditor"""
145139 raise NotImplementedError ()
146140
147141 @abstractmethod
148142 def retrieve_records (
149- self , filter_criteria : List [FilterCriteria ], data : Optional [AuditReturnType ] = None
143+ self , filter_criteria : list [FilterCriteria ], data : Optional [AuditReturnType ] = None
150144 ) -> AuditReturnType :
151145 """Retrieve audit records from the Auditor"""
152146 raise NotImplementedError ()
153147
154148 def get_most_recent_records (
155149 self ,
156- order_criteria : List [OrderCriteria ],
157- partition_fields : Optional [List [str ]] = None ,
158- pre_filter_criteria : Optional [List [FilterCriteria ]] = None ,
150+ order_criteria : list [OrderCriteria ],
151+ partition_fields : Optional [list [str ]] = None ,
152+ pre_filter_criteria : Optional [list [FilterCriteria ]] = None ,
159153 ) -> AuditReturnType :
160154 """Retrieve the most recent records, defined by the ordering criteria
161155 for each partition combination"""
@@ -203,12 +197,12 @@ def combine_auditor_information(
203197 raise NotImplementedError ()
204198
205199 @staticmethod
206- def conv_to_iterable (recs : Union [AuditorType , AuditReturnType ]) -> Iterable [Dict [str , Any ]]:
200+ def conv_to_iterable (recs : Union [AuditorType , AuditReturnType ]) -> Iterable [dict [str , Any ]]:
207201 """Convert AuditReturnType to iterable of dictionaries"""
208202 raise NotImplementedError ()
209203
210204 @validate_arguments
211- def add_processing_records (self , processing_records : List [ProcessingStatusRecord ]):
205+ def add_processing_records (self , processing_records : list [ProcessingStatusRecord ]):
212206 """Add an entry to the processing_status auditor."""
213207 if self .pool :
214208 return self ._submit (
@@ -220,7 +214,7 @@ def add_processing_records(self, processing_records: List[ProcessingStatusRecord
220214 )
221215
222216 @validate_arguments
223- def add_submission_statistics_records (self , sub_stats : List [SubmissionStatisticsRecord ]):
217+ def add_submission_statistics_records (self , sub_stats : list [SubmissionStatisticsRecord ]):
224218 """Add an entry to the submission statistics auditor."""
225219 if self .pool :
226220 return self ._submit (
@@ -230,7 +224,7 @@ def add_submission_statistics_records(self, sub_stats: List[SubmissionStatistics
230224 return self ._submission_statistics .add_records (records = [dict (rec ) for rec in sub_stats ])
231225
232226 @validate_arguments
233- def add_transfer_records (self , transfer_records : List [TransferRecord ]):
227+ def add_transfer_records (self , transfer_records : list [TransferRecord ]):
234228 """Add an entry to the transfers auditor"""
235229 if self .pool :
236230 return self ._submit (
@@ -241,7 +235,7 @@ def add_transfer_records(self, transfer_records: List[TransferRecord]):
241235 @validate_arguments
242236 def add_new_submissions (
243237 self ,
244- submissions : List [SubmissionMetadata ],
238+ submissions : list [SubmissionMetadata ],
245239 job_run_id : Optional [int ] = None ,
246240 ):
247241 """Add an entry to the submission_info auditor."""
@@ -250,8 +244,8 @@ def add_new_submissions(
250244 time_now : datetime = datetime .now ()
251245 ts_info = {"time_updated" : time_now , "date_updated" : time_now .date ()}
252246
253- processing_status_recs : List [ Dict [str , Any ]] = []
254- submission_info_recs : List [ Dict [str , Any ]] = []
247+ processing_status_recs : list [ dict [str , Any ]] = []
248+ submission_info_recs : list [ dict [str , Any ]] = []
255249
256250 for sub_info in submissions :
257251 # add processing_record - add time info
@@ -311,7 +305,7 @@ def is_writing(self) -> bool:
311305
312306 return not self .queue .empty () or locked
313307
314- def mark_transform (self , submission_ids : List [str ], ** kwargs ):
308+ def mark_transform (self , submission_ids : list [str ], ** kwargs ):
315309 """Update submission processing_status to file_transformation."""
316310
317311 recs = [
@@ -323,7 +317,7 @@ def mark_transform(self, submission_ids: List[str], **kwargs):
323317
324318 return self .add_processing_records (recs )
325319
326- def mark_data_contract (self , submission_ids : List [str ], ** kwargs ):
320+ def mark_data_contract (self , submission_ids : list [str ], ** kwargs ):
327321 """Update submission processing_status to data_contract."""
328322
329323 recs = [
@@ -335,7 +329,7 @@ def mark_data_contract(self, submission_ids: List[str], **kwargs):
335329
336330 return self .add_processing_records (recs )
337331
338- def mark_business_rules (self , submissions : List [ Tuple [str , bool ]], ** kwargs ):
332+ def mark_business_rules (self , submissions : list [ tuple [str , bool ]], ** kwargs ):
339333 """Update submission processing_status to business_rules."""
340334
341335 recs = [
@@ -352,11 +346,11 @@ def mark_business_rules(self, submissions: List[Tuple[str, bool]], **kwargs):
352346
353347 def mark_error_report (
354348 self ,
355- submissions : List [ Tuple [str , SubmissionResult ]],
349+ submissions : list [ tuple [str , SubmissionResult ]],
356350 job_run_id : Optional [int ] = None ,
357351 ):
358352 """Mark the given submission as being ready for error report"""
359- processing_recs : List [ProcessingStatusRecord ] = []
353+ processing_recs : list [ProcessingStatusRecord ] = []
360354
361355 sub_id : str
362356 sub_result : str
@@ -373,7 +367,7 @@ def mark_error_report(
373367
374368 return self .add_processing_records (processing_recs )
375369
376- def mark_finished (self , submissions : List [ Tuple [str , SubmissionResult ]], ** kwargs ):
370+ def mark_finished (self , submissions : list [ tuple [str , SubmissionResult ]], ** kwargs ):
377371 """Update submission processing_status to finished."""
378372
379373 recs = [
@@ -388,7 +382,7 @@ def mark_finished(self, submissions: List[Tuple[str, SubmissionResult]], **kwarg
388382
389383 return self .add_processing_records (recs )
390384
391- def mark_failed (self , submissions : List [str ], ** kwargs ):
385+ def mark_failed (self , submissions : list [str ], ** kwargs ):
392386 """Update submission processing_status to failed."""
393387 recs = [
394388 ProcessingStatusRecord (
@@ -399,7 +393,7 @@ def mark_failed(self, submissions: List[str], **kwargs):
399393
400394 return self .add_processing_records (recs )
401395
402- def mark_archived (self , submissions : List [str ], ** kwargs ):
396+ def mark_archived (self , submissions : list [str ], ** kwargs ):
403397 """Update submission processing_status to archived."""
404398 recs = [
405399 ProcessingStatusRecord (
@@ -410,7 +404,7 @@ def mark_archived(self, submissions: List[str], **kwargs):
410404
411405 return self .add_processing_records (recs )
412406
413- def add_feedback_transfer_ids (self , submissions : List [ Tuple [str , str ]], ** kwargs ):
407+ def add_feedback_transfer_ids (self , submissions : list [ tuple [str , str ]], ** kwargs ):
414408 """Adds transfer_id for error report to submission"""
415409 recs = [
416410 TransferRecord (
@@ -425,7 +419,7 @@ def add_feedback_transfer_ids(self, submissions: List[Tuple[str, str]], **kwargs
425419 return self .add_transfer_records (recs )
426420
427421 def get_latest_processing_records (
428- self , filter_criteria : Optional [List [FilterCriteria ]] = None
422+ self , filter_criteria : Optional [list [FilterCriteria ]] = None
429423 ) -> AuditReturnType :
430424 """Get the most recent processing record for each submission_id stored in
431425 the processing_status auditor"""
@@ -441,18 +435,18 @@ def downstream_pending(
441435 max_concurrency : int = 1 ,
442436 run_number : int = 0 ,
443437 max_days_old : int = 3 ,
444- statuses_to_include : Optional [List [ProcessingStatus ]] = None ,
438+ statuses_to_include : Optional [list [ProcessingStatus ]] = None ,
445439 ) -> bool :
446440 """Checks if there are any downstream submissions currently pending"""
447- steps : List [ProcessingStatus ] = [
441+ steps : list [ProcessingStatus ] = [
448442 "received" ,
449443 "file_transformation" ,
450444 "data_contract" ,
451445 "business_rules" ,
452446 "error_report" ,
453447 ]
454448
455- downstream : Set [ProcessingStatus ]
449+ downstream : set [ProcessingStatus ]
456450 if statuses_to_include :
457451 downstream = {status , * statuses_to_include }
458452 else :
@@ -519,7 +513,7 @@ def __enter__(self):
519513
520514 def __exit__ (
521515 self ,
522- exc_type : Optional [Type [Exception ]],
516+ exc_type : Optional [type [Exception ]],
523517 exc_value : Optional [Exception ],
524518 traceback : Optional [TracebackType ],
525519 ) -> None :
@@ -532,7 +526,7 @@ def __exit__(
532526
533527 def _get_status (
534528 self ,
535- status : Union [ProcessingStatus , Set [ProcessingStatus ], List [ProcessingStatus ]],
529+ status : Union [ProcessingStatus , set [ProcessingStatus ], list [ProcessingStatus ]],
536530 max_days_old : int ,
537531 ) -> AuditReturnType :
538532 _filter = [
@@ -572,8 +566,8 @@ def get_all_error_report_submissions(self, max_days_old: int = 3):
572566 self .combine_auditor_information (subs , self ._submission_info )
573567 )
574568
575- processed : List [SubmissionInfo ] = []
576- dodgy_info : List [ Tuple [ Dict , str ]] = []
569+ processed : list [SubmissionInfo ] = []
570+ dodgy_info : list [ tuple [ dict , str ]] = []
577571
578572 for sub_info in sub_infos :
579573 try :
0 commit comments