Skip to content

Commit b356bd2

Browse files
committed
feat: some formatting. Tweaked how errors are handled within file transformation
1 parent 047c6a5 commit b356bd2

File tree

6 files changed

+80
-59
lines changed

6 files changed

+80
-59
lines changed

src/dve/core_engine/exceptions.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""Exceptions emitted by the pipeline."""
22

33
from collections.abc import Iterator
4-
from typing import Any
54

65
from dve.core_engine.backends.implementations.spark.types import SparkEntities
76
from dve.core_engine.message import FeedbackMessage
@@ -26,16 +25,17 @@ def __init__(
2625
def critical_messages(self) -> Iterator[FeedbackMessage]:
2726
"""Critical messages which caused the processing error."""
2827
yield from filter(lambda message: message.is_critical, self.messages)
29-
28+
3029
def to_feedback_message(self) -> FeedbackMessage:
30+
"Convert to feedback message to write to json file"
3131
return FeedbackMessage(
3232
entity=None,
3333
record=None,
3434
failure_type="integrity",
3535
error_type="processing",
3636
error_location="Whole File",
37-
error_message=self.error_message
38-
)
37+
error_message=self.error_message,
38+
)
3939

4040

4141
class EntityTypeMismatch(TypeError):

src/dve/core_engine/message.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,7 @@ def to_dict(
445445
self.to_row(key_field, max_number_of_values, value_separator, record_converter),
446446
)
447447
)
448+
448449
def __hash__(self):
449450
return hash(str(self))
450451

src/dve/pipeline/duckdb_pipeline.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def __init__(
4040
processed_files_path,
4141
submitted_files_path,
4242
reference_data_loader,
43-
job_run_id
43+
job_run_id,
4444
)
4545

4646
# pylint: disable=arguments-differ
@@ -50,4 +50,3 @@ def write_file_to_parquet( # type: ignore
5050
return super().write_file_to_parquet(
5151
submission_file_uri, submission_info, output, DuckDBPyRelation
5252
)
53-

src/dve/pipeline/foundry_ddb_pipeline.py

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,57 @@
11
"""A duckdb pipeline for running on Foundry platform"""
2-
from typing import List, Optional, Tuple
3-
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet
2+
3+
from typing import Optional
4+
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet
45
from dve.core_engine.backends.utilities import dump_errors
56
from dve.core_engine.models import SubmissionInfo
67
from dve.core_engine.type_hints import URI, Failed
78
from dve.pipeline.duckdb_pipeline import DDBDVEPipeline
89
from dve.pipeline.utils import SubmissionStatus
910
from dve.parser import file_handling as fh
1011

11-
@duckdb_write_parquet
1212
class FoundryDDBPipeline(DDBDVEPipeline):
1313
"""DuckDB pipeline for running on Foundry Platform"""
14+
1415
def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
1516
"""Write out key audit relations to parquet for persisting to datasets"""
1617
write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/")
1718
self.write_parquet(
1819
self._audit_tables._processing_status.get_relation(),
19-
write_to + "processing_status.parquet")
20+
write_to + "processing_status.parquet",
21+
)
2022
self.write_parquet(
2123
self._audit_tables._submission_statistics.get_relation(),
22-
write_to + "submission_statistics.parquet")
24+
write_to + "submission_statistics.parquet",
25+
)
2326
return write_to
24-
25-
def file_transformation(self, submission_info: SubmissionInfo) -> SubmissionInfo | dict[str, str]:
27+
28+
def file_transformation(
29+
self, submission_info: SubmissionInfo
30+
) -> SubmissionInfo | dict[str, str]:
2631
try:
2732
return super().file_transformation(submission_info)
28-
except Exception as exc:
33+
except Exception as exc: # pylint: disable=W0718
2934
self._logger.error(f"File transformation raised exception: {exc}")
3035
self._logger.exception(exc)
31-
# TODO: write errors to file here (maybe processing errors - not to be seen by end user)
3236
return submission_info.dict()
33-
34-
def apply_data_contract(self, submission_info: SubmissionInfo) -> Tuple[SubmissionInfo | bool]:
37+
38+
def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo | bool]:
3539
try:
3640
return super().apply_data_contract(submission_info)
37-
except Exception as exc:
41+
except Exception as exc: # pylint: disable=W0718
3842
self._logger.error(f"Apply data contract raised exception: {exc}")
3943
self._logger.exception(exc)
40-
# TODO: write errors to file here (maybe processing errors - not to be seen by end user)
4144
return submission_info, True
42-
45+
4346
def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed):
4447
try:
4548
return super().apply_business_rules(submission_info, failed)
46-
except Exception as exc:
49+
except Exception as exc: # pylint: disable=W0718
4750
self._logger.error(f"Apply business rules raised exception: {exc}")
4851
self._logger.exception(exc)
49-
# TODO: write errors to file here (maybe processing errors - not to be seen by end user)
5052
return submission_info, SubmissionStatus(failed=True)
51-
52-
53-
def run_pipeline(self, submission_info: SubmissionInfo) -> Tuple[Optional[URI], URI, URI]:
53+
54+
def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]:
5455
"""Sequential single submission pipeline runner"""
5556
try:
5657
sub_id: str = submission_info.submission_id
@@ -61,23 +62,31 @@ def run_pipeline(self, submission_info: SubmissionInfo) -> Tuple[Optional[URI],
6162
self._audit_tables.mark_data_contract(submission_ids=[sub_id])
6263
sub_info, failed = self.apply_data_contract(submission_info=submission_info)
6364
self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)])
64-
sub_info, sub_status = self.apply_business_rules(submission_info=submission_info, failed= failed)
65+
sub_info, sub_status = self.apply_business_rules(
66+
submission_info=submission_info, failed=failed
67+
)
6568
else:
66-
sub_status = SubmissionStatus(failed=True)
67-
self._audit_tables.mark_error_report(submissions=[(sub_id, sub_status.submission_result)])
68-
sub_info, sub_status, sub_stats, report_uri = self.error_report(submission_info=submission_info, status=sub_status)
69+
sub_status = SubmissionStatus(failed=True)
70+
self._audit_tables.mark_error_report(
71+
submissions=[(sub_id, sub_status.submission_result)]
72+
)
73+
sub_info, sub_status, sub_stats, report_uri = self.error_report(
74+
submission_info=submission_info, status=sub_status
75+
)
6976
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
70-
except Exception as err:
71-
self._logger.error(f"During processing of submission_id: {sub_id}, the following exception was raised: {err}")
77+
except Exception as err: # pylint: disable=W0718
78+
self._logger.error(
79+
f"During processing of submission_id: {sub_id}, the following exception was raised: {err}"
80+
)
7281
self._audit_tables.mark_failed(submissions=[sub_id])
7382
finally:
7483
audit_files_uri = self.persist_audit_records(submission_info=submission_info)
75-
return (
76-
None if sub_status.failed else fh.joinuri(
77-
self.processed_files_path,
78-
sub_id,
79-
"business_rules"),
84+
return (
85+
(
86+
None
87+
if sub_status.failed
88+
else fh.joinuri(self.processed_files_path, sub_id, "business_rules")
89+
),
8090
report_uri,
81-
audit_files_uri
91+
audit_files_uri,
8292
)
83-

src/dve/pipeline/pipeline.py

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121
from dve.core_engine.backends.base.core import EntityManager
2222
from dve.core_engine.backends.base.reference_data import BaseRefDataLoader
2323
from dve.core_engine.backends.base.rules import BaseStepImplementations
24-
from dve.core_engine.backends.exceptions import BackendError, MessageBearingError, ReaderLacksEntityTypeSupport
24+
from dve.core_engine.backends.exceptions import (
25+
BackendError,
26+
MessageBearingError,
27+
ReaderLacksEntityTypeSupport,
28+
)
2529
from dve.core_engine.backends.readers import BaseFileReader
2630
from dve.core_engine.backends.types import EntityType
2731
from dve.core_engine.backends.utilities import dump_errors, stringify_model
@@ -52,7 +56,7 @@ def __init__(
5256
processed_files_path: Optional[URI],
5357
submitted_files_path: Optional[URI],
5458
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
55-
job_run_id: Optional[int] = None
59+
job_run_id: Optional[int] = None,
5660
):
5761
self._submitted_files_path = submitted_files_path
5862
self._processed_files_path = processed_files_path
@@ -267,33 +271,41 @@ def file_transformation(
267271
if not self.processed_files_path:
268272
raise AttributeError("processed files path not provided")
269273

274+
errors: list[FeedbackMessage] = []
270275
submission_file_uri: URI = fh.joinuri(
271276
self.processed_files_path,
272277
submission_info.submission_id,
273278
submission_info.file_name_with_ext,
274279
)
275280
try:
276-
errors = self.write_file_to_parquet(
281+
errors.extend(self.write_file_to_parquet(
277282
submission_file_uri, submission_info, self.processed_files_path
278-
)
283+
))
284+
285+
except MessageBearingError as exc:
286+
self._logger.error(f"Unexpected file transformation error: {exc}")
287+
self._logger.exception(exc)
288+
errors.extend(exc.messages)
279289

280-
except Exception as exc: # pylint: disable=broad-except
290+
except BackendError as exc: # pylint: disable=broad-except
281291
self._logger.error(f"Unexpected file transformation error: {exc}")
282292
self._logger.exception(exc)
283-
# TODO: should this go to processing_errors.json?
284-
# TODO: shouldn't be seen by user and don't need to maintain feedback message structure
285-
errors = [CriticalProcessingError(entities=None,
286-
error_message=repr(exc),
287-
messages=[]).to_feedback_message()]
288-
finally:
289-
if errors:
290-
dump_errors(
291-
fh.joinuri(self.processed_files_path, submission_info.submission_id),
292-
"file_transformation",
293-
errors,
294-
)
295-
return submission_info.dict()
296-
return submission_info
293+
errors.extend([
294+
CriticalProcessingError(
295+
entities=None,
296+
error_message=repr(exc),
297+
messages=[],
298+
).to_feedback_message()
299+
])
300+
301+
if errors:
302+
dump_errors(
303+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
304+
"file_transformation",
305+
errors,
306+
)
307+
return submission_info.dict()
308+
return submission_info
297309

298310
def file_transformation_step(
299311
self, pool: Executor, submissions_to_process: list[SubmissionInfo]
@@ -326,7 +338,7 @@ def file_transformation_step(
326338
except Exception as exc: # pylint: disable=W0703
327339
self._logger.error(f"File transformation raised exception: {exc}")
328340
self._logger.exception(exc)
329-
# TODO: write errors to file here (maybe processing errors - not to be seen by end user)
341+
# TODO: write errors to file here (maybe processing errors - not to be seen by end user)
330342
failed_processing.append(sub_info)
331343
continue
332344

src/dve/pipeline/spark_pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def __init__(
4242
processed_files_path,
4343
submitted_files_path,
4444
reference_data_loader,
45-
job_run_id
45+
job_run_id,
4646
)
4747

4848
# pylint: disable=arguments-differ

0 commit comments

Comments
 (0)