Skip to content

Commit 90ff789

Browse files
authored
refactor: include submission status for services passthrough
* feat: some formatting. Tweaked how errors are handled within file transformation * test: reenabled books behave tests * feat: added pass through of submission status to provide context to services processing files * refactor: tweaks around submission status - now optional to pass as can determine from audit (in service based approach). Tests fixed and added to * refactor: fixes following review comments
1 parent 37429c4 commit 90ff789

File tree

18 files changed

+574
-244
lines changed

18 files changed

+574
-244
lines changed

src/dve/core_engine/backends/base/auditing.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
QueueType,
3232
SubmissionResult,
3333
)
34+
from dve.pipeline.utils import SubmissionStatus
3435

3536
AuditReturnType = TypeVar("AuditReturnType") # pylint: disable=invalid-name
3637

@@ -329,7 +330,7 @@ def mark_business_rules(self, submissions: list[tuple[str, bool]], **kwargs):
329330
ProcessingStatusRecord(
330331
submission_id=submission_id,
331332
processing_status="business_rules",
332-
submission_result="failed" if failed else None,
333+
submission_result="validation_failed" if failed else None,
333334
**kwargs,
334335
)
335336
for submission_id, failed in submissions
@@ -379,7 +380,10 @@ def mark_failed(self, submissions: list[str], **kwargs):
379380
"""Update submission processing_status to failed."""
380381
recs = [
381382
ProcessingStatusRecord(
382-
submission_id=submission_id, processing_status="failed", **kwargs
383+
submission_id=submission_id,
384+
processing_status="failed",
385+
submission_result="processing_failed",
386+
**kwargs
383387
)
384388
for submission_id in submissions
385389
]
@@ -493,6 +497,27 @@ def get_submission_statistics(self, submission_id: str) -> Optional[SubmissionSt
493497
)
494498
except StopIteration:
495499
return None
500+
def get_submission_status(self, submission_id: str) -> Optional[SubmissionStatus]:
501+
"""Get the latest submission status for a submission"""
502+
503+
try:
504+
processing_rec: ProcessingStatusRecord = next(self._processing_status.conv_to_records(
505+
self._processing_status.get_most_recent_records(
506+
order_criteria=[OrderCriteria("time_updated", True)],
507+
pre_filter_criteria=[FilterCriteria("submission_id", submission_id)]
508+
)))
509+
except StopIteration:
510+
return None
511+
sub_status = SubmissionStatus()
512+
sub_stats_rec: Optional[SubmissionStatisticsRecord] = self.get_submission_statistics(submission_id)
513+
if processing_rec.submission_result == "processing_failed":
514+
sub_status.processing_failed = True
515+
if processing_rec.submission_result == "validation_failed":
516+
sub_status.validation_failed = True
517+
if sub_stats_rec:
518+
sub_status.number_of_records = sub_stats_rec.record_count
519+
520+
return sub_status
496521

497522
def __enter__(self):
498523
"""Use audit table as context manager"""

src/dve/core_engine/backends/utilities.py

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import dve.parser.file_handling as fh
1616
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
1717
from dve.core_engine.type_hints import URI, Messages
18-
from dve.reporting.error_report import conditional_cast
1918

2019
# We need to rely on a Python typing implementation detail in Python <= 3.7.
2120
if sys.version_info[:2] <= (3, 7):
@@ -179,38 +178,3 @@ def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType:
179178
if polars_type:
180179
return polars_type
181180
raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}")
182-
183-
184-
def dump_errors(
185-
working_folder: URI,
186-
step_name: str,
187-
messages: Messages,
188-
key_fields: Optional[dict[str, list[str]]] = None,
189-
):
190-
"""Write out to disk captured feedback error messages."""
191-
if not working_folder:
192-
raise AttributeError("processed files path not passed")
193-
194-
if not key_fields:
195-
key_fields = {}
196-
197-
errors = fh.joinuri(working_folder, "errors", f"{step_name}_errors.json")
198-
processed = []
199-
200-
for message in messages:
201-
primary_keys: list[str] = key_fields.get(message.entity if message.entity else "", [])
202-
error = message.to_dict(
203-
key_field=primary_keys,
204-
value_separator=" -- ",
205-
max_number_of_values=10,
206-
record_converter=None,
207-
)
208-
error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ")
209-
processed.append(error)
210-
211-
with fh.open_stream(errors, "a") as f:
212-
json.dump(
213-
processed,
214-
f,
215-
default=str,
216-
)

src/dve/core_engine/exceptions.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,12 @@ def __init__(
2525
def critical_messages(self) -> Iterator[FeedbackMessage]:
2626
"""Critical messages which caused the processing error."""
2727
yield from filter(lambda message: message.is_critical, self.messages)
28-
29-
def to_feedback_message(self) -> FeedbackMessage:
30-
"Convert to feedback message to write to json file"
31-
return FeedbackMessage(
32-
entity=None,
33-
record=None,
34-
failure_type="integrity",
35-
error_type="processing",
36-
error_location="Whole File",
37-
error_message=self.error_message,
38-
)
39-
28+
29+
@classmethod
30+
def from_exception(cls, exc:Exception):
31+
return cls(error_message = repr(exc),
32+
entities=None,
33+
messages=[])
4034

4135
class EntityTypeMismatch(TypeError):
4236
"""An exception emitted if entity type outputs from two collaborative objects are different."""

src/dve/core_engine/type_hints.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@
236236
PROCESSING_STATUSES: tuple[ProcessingStatus, ...] = tuple(list(get_args(ProcessingStatus)))
237237
"""List of all possible DVE submission statuses"""
238238

239-
SubmissionResult = Literal["success", "failed", "failed_xml_generation", "archived"]
239+
SubmissionResult = Literal["success", "validation_failed", "archived", "processing_failed"]
240240
"""Allowed DVE submission results"""
241241

242242
SUBMISSION_RESULTS: tuple[SubmissionResult, ...] = tuple(list(get_args(SubmissionResult)))

src/dve/pipeline/foundry_ddb_pipeline.py

Lines changed: 77 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@
22

33
from typing import Optional
44
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet
5-
from dve.core_engine.backends.utilities import dump_errors
5+
from dve.core_engine.exceptions import CriticalProcessingError
66
from dve.core_engine.models import SubmissionInfo
77
from dve.core_engine.type_hints import URI, Failed
88
from dve.pipeline.duckdb_pipeline import DDBDVEPipeline
99
from dve.pipeline.utils import SubmissionStatus
1010
from dve.parser import file_handling as fh
11+
from dve.reporting.utils import dump_processing_errors
1112

13+
@duckdb_get_entity_count
14+
@duckdb_write_parquet
1215
class FoundryDDBPipeline(DDBDVEPipeline):
13-
"""DuckDB pipeline for running on Foundry Platform.
14-
Polymorphed to allow for exception handling when processing
15-
single files sequentially through services."""
16+
"""DuckDB pipeline for running on Foundry Platform"""
1617

1718
def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
1819
"""Write out key audit relations to parquet for persisting to datasets"""
@@ -29,66 +30,106 @@ def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
2930

3031
def file_transformation(
3132
self, submission_info: SubmissionInfo
32-
) -> SubmissionInfo | dict[str, str]:
33+
) -> tuple[SubmissionInfo, SubmissionStatus]:
3334
try:
3435
return super().file_transformation(submission_info)
3536
except Exception as exc: # pylint: disable=W0718
3637
self._logger.error(f"File transformation raised exception: {exc}")
3738
self._logger.exception(exc)
38-
return submission_info.dict()
39+
dump_processing_errors(
40+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
41+
"file_transformation",
42+
[CriticalProcessingError.from_exception(exc)]
43+
)
44+
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
45+
return submission_info, SubmissionStatus(processing_failed=True)
3946

40-
def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo | bool]:
47+
def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None) -> tuple[SubmissionInfo | SubmissionStatus]:
4148
try:
42-
return super().apply_data_contract(submission_info)
49+
return super().apply_data_contract(submission_info, submission_status)
4350
except Exception as exc: # pylint: disable=W0718
4451
self._logger.error(f"Apply data contract raised exception: {exc}")
4552
self._logger.exception(exc)
46-
return submission_info, True
53+
dump_processing_errors(
54+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
55+
"contract",
56+
[CriticalProcessingError.from_exception(exc)]
57+
)
58+
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
59+
return submission_info, SubmissionStatus(processing_failed=True)
4760

48-
def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed):
61+
def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None):
4962
try:
50-
return super().apply_business_rules(submission_info, failed)
63+
return super().apply_business_rules(submission_info, submission_status)
5164
except Exception as exc: # pylint: disable=W0718
5265
self._logger.error(f"Apply business rules raised exception: {exc}")
5366
self._logger.exception(exc)
54-
return submission_info, SubmissionStatus(failed=True)
67+
dump_processing_errors(
68+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
69+
"business_rules",
70+
[CriticalProcessingError.from_exception(exc)]
71+
)
72+
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
73+
return submission_info, SubmissionStatus(processing_failed=True)
74+
75+
def error_report(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None):
76+
try:
77+
return super().error_report(submission_info, submission_status)
78+
except Exception as exc: # pylint: disable=W0718
79+
self._logger.error(f"Error reports raised exception: {exc}")
80+
self._logger.exception(exc)
81+
sub_stats = None
82+
report_uri = None
83+
dump_processing_errors(
84+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
85+
"error_report",
86+
[CriticalProcessingError.from_exception(exc)]
87+
)
88+
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
89+
return submission_info, submission_status, sub_stats, report_uri
5590

56-
def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]:
91+
def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], Optional[URI], URI]:
5792
"""Sequential single submission pipeline runner"""
5893
try:
5994
sub_id: str = submission_info.submission_id
95+
report_uri = None
6096
self._audit_tables.add_new_submissions(submissions=[submission_info])
6197
self._audit_tables.mark_transform(submission_ids=[sub_id])
62-
sub_info = self.file_transformation(submission_info=submission_info)
63-
if isinstance(sub_info, SubmissionInfo):
98+
sub_info, sub_status = self.file_transformation(submission_info=submission_info)
99+
if not (sub_status.validation_failed or sub_status.processing_failed):
64100
self._audit_tables.mark_data_contract(submission_ids=[sub_id])
65-
sub_info, failed = self.apply_data_contract(submission_info=submission_info)
66-
self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)])
101+
sub_info, sub_status = self.apply_data_contract(submission_info=sub_info, submission_status=sub_status)
102+
self._audit_tables.mark_business_rules(submissions=[(sub_id, sub_status.validation_failed)])
67103
sub_info, sub_status = self.apply_business_rules(
68-
submission_info=submission_info, failed=failed
104+
submission_info=submission_info, submission_status=sub_status
69105
)
70-
else:
71-
sub_status = SubmissionStatus(failed=True)
72-
self._audit_tables.mark_error_report(
73-
submissions=[(sub_id, sub_status.submission_result)]
74-
)
75-
sub_info, sub_status, sub_stats, report_uri = self.error_report(
76-
submission_info=submission_info, status=sub_status
77-
)
78-
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
106+
107+
if not sub_status.processing_failed:
108+
self._audit_tables.mark_error_report(
109+
submissions=[(sub_id, sub_status.submission_result)]
110+
)
111+
sub_info, sub_status, sub_stats, report_uri = self.error_report(
112+
submission_info=submission_info, submission_status=sub_status
113+
)
114+
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
79115
except Exception as err: # pylint: disable=W0718
80116
self._logger.error(
81117
f"During processing of submission_id: {sub_id}, the following exception was raised: {err}"
82118
)
119+
dump_processing_errors(
120+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
121+
"run_pipeline",
122+
[CriticalProcessingError.from_exception(err)]
123+
)
83124
self._audit_tables.mark_failed(submissions=[sub_id])
84125
finally:
85126
audit_files_uri = self.persist_audit_records(submission_info=submission_info)
86-
return (
87-
(
88-
None
89-
if sub_status.failed
90-
else fh.joinuri(self.processed_files_path, sub_id, "business_rules")
91-
),
92-
report_uri,
93-
audit_files_uri,
94-
)
127+
return (
128+
(
129+
None
130+
if (sub_status.validation_failed or sub_status.processing_failed)
131+
else fh.joinuri(self.processed_files_path, sub_id, "business_rules")
132+
),
133+
report_uri if report_uri else None,
134+
audit_files_uri,
135+
)

0 commit comments

Comments
 (0)