Skip to content

Commit 08ac20b

Browse files
committed
refactor: tweaks around submission status - now optional to pass as can determine from audit (in service based approach). Tests fixed and added to
1 parent f922f0a commit 08ac20b

File tree

11 files changed

+257
-43
lines changed

11 files changed

+257
-43
lines changed

src/dve/core_engine/backends/base/auditing.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ def mark_business_rules(self, submissions: list[tuple[str, bool]], **kwargs):
330330
ProcessingStatusRecord(
331331
submission_id=submission_id,
332332
processing_status="business_rules",
333-
submission_result="failed" if failed else None,
333+
submission_result="validation_failed" if failed else None,
334334
**kwargs,
335335
)
336336
for submission_id, failed in submissions
@@ -380,7 +380,10 @@ def mark_failed(self, submissions: list[str], **kwargs):
380380
"""Update submission processing_status to failed."""
381381
recs = [
382382
ProcessingStatusRecord(
383-
submission_id=submission_id, processing_status="failed", **kwargs
383+
submission_id=submission_id,
384+
processing_status="failed",
385+
submission_result="processing_failed",
386+
**kwargs
384387
)
385388
for submission_id in submissions
386389
]
@@ -494,16 +497,22 @@ def get_submission_statistics(self, submission_id: str) -> Optional[SubmissionSt
494497
)
495498
except StopIteration:
496499
return None
497-
def get_submission_status(self, submission_id: str) -> SubmissionStatus:
500+
def get_submission_status(self, submission_id: str) -> Optional[SubmissionStatus]:
498501
"""Get the latest submission status for a submission"""
502+
503+
try:
504+
processing_rec: ProcessingStatusRecord = next(self._processing_status.conv_to_records(
505+
self._processing_status.get_most_recent_records(
506+
order_criteria=[OrderCriteria("time_updated", True)],
507+
pre_filter_criteria=[FilterCriteria("submission_id", submission_id)]
508+
)))
509+
except StopIteration:
510+
return None
499511
sub_status = SubmissionStatus()
500-
processing_rec: ProcessingStatusRecord = next(self._processing_status.conv_to_records(self._processing_status.get_most_recent_records(order_criteria=[OrderCriteria("time_updated", True)],
501-
pre_filter_criteria=[FilterCriteria("submission_id",
502-
submission_id)])))
503512
sub_stats_rec: Optional[SubmissionStatisticsRecord] = self.get_submission_statistics(submission_id)
504-
if processing_rec.processing_status == "failed":
513+
if processing_rec.submission_result == "processing_failed":
505514
sub_status.processing_failed = True
506-
if processing_rec.submission_result == "failed":
515+
if processing_rec.submission_result == "validation_failed":
507516
sub_status.validation_failed = True
508517
if sub_stats_rec:
509518
sub_status.number_of_records = sub_stats_rec.record_count

src/dve/core_engine/type_hints.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@
236236
PROCESSING_STATUSES: tuple[ProcessingStatus, ...] = tuple(list(get_args(ProcessingStatus)))
237237
"""List of all possible DVE submission statuses"""
238238

239-
SubmissionResult = Literal["success", "failed", "archived", "processing_failed"]
239+
SubmissionResult = Literal["success", "validation_failed", "archived", "processing_failed"]
240240
"""Allowed DVE submission results"""
241241

242242
SUBMISSION_RESULTS: tuple[SubmissionResult, ...] = tuple(list(get_args(SubmissionResult)))

src/dve/pipeline/foundry_ddb_pipeline.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def file_transformation(
4141
"file_transformation",
4242
[CriticalProcessingError.from_exception(exc)]
4343
)
44+
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
4445
return submission_info, SubmissionStatus(processing_failed=True)
4546

4647
def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus) -> tuple[SubmissionInfo | SubmissionStatus]:
@@ -54,6 +55,7 @@ def apply_data_contract(self, submission_info: SubmissionInfo, submission_status
5455
"contract",
5556
[CriticalProcessingError.from_exception(exc)]
5657
)
58+
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
5759
return submission_info, SubmissionStatus(processing_failed=True)
5860

5961
def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus):
@@ -67,6 +69,7 @@ def apply_business_rules(self, submission_info: SubmissionInfo, submission_statu
6769
"business_rules",
6870
[CriticalProcessingError.from_exception(exc)]
6971
)
72+
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
7073
return submission_info, SubmissionStatus(processing_failed=True)
7174

7275
def error_report(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus):
@@ -82,12 +85,14 @@ def error_report(self, submission_info: SubmissionInfo, submission_status: Submi
8285
"error_report",
8386
[CriticalProcessingError.from_exception(exc)]
8487
)
88+
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
8589
return submission_info, submission_status, sub_stats, report_uri
8690

87-
def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]:
91+
def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], Optional[URI], URI]:
8892
"""Sequential single submission pipeline runner"""
8993
try:
9094
sub_id: str = submission_info.submission_id
95+
report_uri = None
9196
self._audit_tables.add_new_submissions(submissions=[submission_info])
9297
self._audit_tables.mark_transform(submission_ids=[sub_id])
9398
sub_info, sub_status = self.file_transformation(submission_info=submission_info)
@@ -99,13 +104,14 @@ def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI],
99104
submission_info=submission_info, submission_status=sub_status
100105
)
101106

102-
self._audit_tables.mark_error_report(
103-
submissions=[(sub_id, sub_status.submission_result)]
104-
)
105-
sub_info, sub_status, sub_stats, report_uri = self.error_report(
106-
submission_info=submission_info, status=sub_status
107-
)
108-
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
107+
if not sub_status.processing_failed:
108+
self._audit_tables.mark_error_report(
109+
submissions=[(sub_id, sub_status.submission_result)]
110+
)
111+
sub_info, sub_status, sub_stats, report_uri = self.error_report(
112+
submission_info=submission_info, submission_status=sub_status
113+
)
114+
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
109115
except Exception as err: # pylint: disable=W0718
110116
self._logger.error(
111117
f"During processing of submission_id: {sub_id}, the following exception was raised: {err}"
@@ -124,6 +130,6 @@ def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI],
124130
if (sub_status.validation_failed or sub_status.processing_failed)
125131
else fh.joinuri(self.processed_files_path, sub_id, "business_rules")
126132
),
127-
report_uri,
133+
report_uri if report_uri else None,
128134
audit_files_uri,
129135
)

src/dve/pipeline/pipeline.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ def file_transformation_step(
362362
starmap(
363363
lambda x, _: (
364364
x.submission_id,
365-
"failed",
365+
"validation_failed",
366366
),
367367
failed,
368368
)
@@ -377,9 +377,10 @@ def file_transformation_step(
377377

378378
return success, failed
379379

380-
def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus) -> tuple[SubmissionInfo, SubmissionStatus]:
380+
def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None) -> tuple[SubmissionInfo, SubmissionStatus]:
381381
"""Method for applying the data contract given a submission_info"""
382-
382+
if not submission_status:
383+
submission_status = self._audit_tables.get_submission_status(submission_info.submission_id)
383384
if not self.processed_files_path:
384385
raise AttributeError("processed files path not provided")
385386

@@ -422,14 +423,17 @@ def apply_data_contract(self, submission_info: SubmissionInfo, submission_status
422423
return submission_info, submission_status
423424

424425
def data_contract_step(
425-
self, pool: Executor, file_transform_results: list[tuple[SubmissionInfo, SubmissionStatus]]
426+
self, pool: Executor, file_transform_results: list[tuple[SubmissionInfo, Optional[SubmissionStatus]]]
426427
) -> tuple[list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]]:
427428
"""Step to validate the types of an untyped (stringly typed) parquet file"""
428429
processed_files: list[tuple[SubmissionInfo, SubmissionStatus]] = []
429430
failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = []
430431
dc_futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []
431432

432433
for info, sub_status in file_transform_results:
434+
sub_status = (
435+
sub_status if sub_status
436+
else self._audit_tables.get_submission_status(info.submission_id))
433437
dc_futures.append((info, sub_status, pool.submit(self.apply_data_contract, info, sub_status)))
434438

435439
for sub_info, sub_status, future in dc_futures:
@@ -476,10 +480,13 @@ def data_contract_step(
476480

477481
return processed_files, failed_processing
478482

479-
def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus):
483+
def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None):
480484
"""Apply the business rules to a given submission, the submission may have failed at the
481485
data_contract step so this should be passed in as a bool
482486
"""
487+
if not submission_status:
488+
submission_status = self._audit_tables.get_submission_status(submission_info.submission_id)
489+
483490
if not self.rules_path:
484491
raise AttributeError("business rules path not provided.")
485492

@@ -557,7 +564,7 @@ def apply_business_rules(self, submission_info: SubmissionInfo, submission_statu
557564
def business_rule_step(
558565
self,
559566
pool: Executor,
560-
files: list[tuple[SubmissionInfo, SubmissionStatus]],
567+
files: list[tuple[SubmissionInfo, Optional[SubmissionStatus]]],
561568
) -> tuple[
562569
list[tuple[SubmissionInfo, SubmissionStatus]],
563570
list[tuple[SubmissionInfo, SubmissionStatus]],
@@ -567,6 +574,9 @@ def business_rule_step(
567574
future_files: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []
568575

569576
for submission_info, submission_status in files:
577+
submission_status = (
578+
submission_status if submission_status
579+
else self._audit_tables.get_submission_status(submission_info.submission_id))
570580
future_files.append(
571581
(
572582
submission_info,
@@ -677,8 +687,14 @@ def _get_error_dataframes(self, submission_id: str):
677687

678688
return errors_df, aggregates
679689

680-
def error_report(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus) -> tuple[SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI]]:
690+
def error_report(self,
691+
submission_info: SubmissionInfo,
692+
submission_status: Optional[SubmissionStatus] = None) -> tuple[SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI]]:
681693
"""Creates the error reports given a submission info and submission status"""
694+
695+
if not submission_status:
696+
submission_status = self._audit_tables.get_submission_status(submission_info.submission_id)
697+
682698
if not self.processed_files_path:
683699
raise AttributeError("processed files path not provided")
684700

@@ -728,7 +744,7 @@ def error_report(self, submission_info: SubmissionInfo, submission_status: Submi
728744
def error_report_step(
729745
self,
730746
pool: Executor,
731-
processed: Iterable[tuple[SubmissionInfo, SubmissionStatus]] = tuple(),
747+
processed: Iterable[tuple[SubmissionInfo, Optional[SubmissionStatus]]] = tuple(),
732748
failed_file_transformation: Iterable[tuple[SubmissionInfo, SubmissionStatus]] = tuple(),
733749
) -> list[
734750
tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI]
@@ -743,6 +759,10 @@ def error_report_step(
743759
failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = []
744760

745761
for info, status in processed:
762+
status = (
763+
status if status
764+
else self._audit_tables.get_submission_status(info.submission_id)
765+
)
746766
futures.append((info, status, pool.submit(self.error_report, info, status)))
747767

748768
for info_dict, status in failed_file_transformation:

src/dve/pipeline/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,5 +86,5 @@ def submission_result(self) -> SubmissionResult:
8686
if self.processing_failed:
8787
return "processing_failed"
8888
if self.validation_failed:
89-
return "failed"
89+
return "validation_failed"
9090
return "success"

src/dve/reporting/utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,16 @@ def dump_processing_errors(
4848
if not working_folder:
4949
raise AttributeError("processed files path not passed")
5050

51-
errors = fh.joinuri(working_folder, "errors", f"processing_errors.json")
51+
error_file: URI = fh.joinuri(working_folder, "errors", f"processing_errors.json")
5252
processed = []
5353

5454
for error in errors:
5555
processed.append({"step_name": step_name,
5656
"error_location": "processing",
5757
"error_level": "integrity",
58-
"error_message": repr(error)})
58+
"error_message": error.error_message})
5959

60-
with fh.open_stream(errors, "a") as f:
60+
with fh.open_stream(error_file, "a") as f:
6161
json.dump(
6262
processed,
6363
f,

tests/features/planets.feature

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,7 @@ Feature: Pipeline tests using the planets dataset
4242
And I add initial audit entries for the submission
4343
Then the latest audit record for the submission is marked with processing status file_transformation
4444
When I run the file transformation phase
45-
Then the latest audit record for the submission is marked with processing status error_report
46-
When I run the error report phase
47-
Then An error report is produced
45+
Then the latest audit record for the submission is marked with processing status failed
4846

4947
Scenario: Handle a file with duplicated extension provided (spark)
5048
Given I submit the planets file planets.csv.csv for processing

0 commit comments

Comments
 (0)