Skip to content

Commit f0dbbe7

Browse files
committed
feat: initial work to add Foundry pipeline for running DVE
1 parent 62b573e commit f0dbbe7

File tree

5 files changed

+80
-5
lines changed

5 files changed

+80
-5
lines changed

src/dve/pipeline/duckdb_pipeline.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,23 @@ class DDBDVEPipeline(BaseDVEPipeline):
2424
def __init__(
2525
self,
2626
audit_tables: DDBAuditingManager,
27-
job_run_id: int,
2827
connection: DuckDBPyConnection,
2928
rules_path: Optional[URI],
3029
processed_files_path: Optional[URI],
3130
submitted_files_path: Optional[URI],
3231
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
32+
job_run_id: Optional[int] = None,
3333
):
3434
self._connection = connection
3535
super().__init__(
3636
audit_tables,
37-
job_run_id,
3837
DuckDBDataContract(connection=self._connection),
3938
DuckDBStepImplementations.register_udfs(connection=self._connection),
4039
rules_path,
4140
processed_files_path,
4241
submitted_files_path,
4342
reference_data_loader,
43+
job_run_id
4444
)
4545

4646
# pylint: disable=arguments-differ
@@ -50,3 +50,4 @@ def write_file_to_parquet( # type: ignore
5050
return super().write_file_to_parquet(
5151
submission_file_uri, submission_info, output, DuckDBPyRelation
5252
)
53+
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""A duckdb pipeline for running on Foundry platform"""
2+
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet
3+
from dve.core_engine.models import SubmissionInfo
4+
from dve.pipeline.duckdb_pipeline import DDBDVEPipeline
5+
from dve.pipeline.utils import SubmissionStatus
6+
from dve.parser import file_handling as fh
7+
8+
@duckdb_write_parquet
9+
class FoundryDDBPipeline(DDBDVEPipeline):
10+
"""DuckDB pipeline for running on Foundry Platform"""
11+
def persist_audit_records(self, submission_info: SubmissionInfo):
12+
"""Write out key audit relations to parquet for persisting to datasets"""
13+
write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/")
14+
self.write_parquet(
15+
self._audit_tables._processing_status.get_relation(),
16+
write_to + "processing_status.parquet")
17+
self.write_parquet(
18+
self._audit_tables._submission_statistics.get_relation(),
19+
write_to + "submission_statistics.parquet")
20+
21+
def run_pipeline(self, submission_info: SubmissionInfo):
22+
"""Sequential single submission pipeline runner"""
23+
try:
24+
sub_id: str = submission_info.submission_id
25+
self._audit_tables.add_new_submissions(submissions=[submission_info])
26+
self._audit_tables.mark_transform(submission_ids=[sub_id])
27+
sub_info = self.file_transformation(submission_info=submission_info)
28+
if isinstance(sub_info, SubmissionInfo):
29+
self._audit_tables.mark_data_contract(submission_ids=[sub_id])
30+
sub_info, failed = self.apply_data_contract(submission_info=submission_info)
31+
self._audit_tables.mark_business_rules(submissions=[(sub_info, failed)])
32+
sub_info, sub_status = self.apply_business_rules(submission_info=submission_info, failed= failed)
33+
else:
34+
sub_status = SubmissionStatus(failed=True)
35+
self._audit_tables.mark_error_report(submissions=[(sub_id, sub_status.submission_result)])
36+
sub_info, sub_status, sub_stats = self.error_report(submission_info=submission_info)
37+
self._audit_tables.add_submission_statistics_records(subs_stats=[sub_stats])
38+
except Exception as err:
39+
self._logger.error(f"During processing of submission_id: {sub_id}, the following exception was raised: {err}")
40+
self._audit_tables.mark_failed(submissions=[sub_id])
41+
finally:
42+
self.persist_audit_records(submission_info=submission_info)
43+

src/dve/pipeline/pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ class BaseDVEPipeline:
4444
def __init__(
4545
self,
4646
audit_tables: BaseAuditingManager,
47-
job_run_id: int,
4847
data_contract: BaseDataContract,
4948
step_implementations: Optional[BaseStepImplementations[EntityType]],
5049
rules_path: Optional[URI],
5150
processed_files_path: Optional[URI],
5251
submitted_files_path: Optional[URI],
5352
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
53+
job_run_id: Optional[int] = None
5454
):
5555
self._submitted_files_path = submitted_files_path
5656
self._processed_files_path = processed_files_path

src/dve/pipeline/spark_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,23 @@ class SparkDVEPipeline(BaseDVEPipeline):
2626
def __init__(
2727
self,
2828
audit_tables: SparkAuditingManager,
29-
job_run_id: int,
3029
rules_path: Optional[URI],
3130
processed_files_path: Optional[URI],
3231
submitted_files_path: Optional[URI],
3332
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
3433
spark: Optional[SparkSession] = None,
34+
job_run_id: Optional[int] = None,
3535
):
3636
self._spark = spark if spark else SparkSession.builder.getOrCreate()
3737
super().__init__(
3838
audit_tables,
39-
job_run_id,
4039
SparkDataContract(spark_session=self._spark),
4140
SparkStepImplementations.register_udfs(self._spark),
4241
rules_path,
4342
processed_files_path,
4443
submitted_files_path,
4544
reference_data_loader,
45+
job_run_id
4646
)
4747

4848
# pylint: disable=arguments-differ

tests/test_pipeline/test_duckdb_pipeline.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from concurrent.futures import ThreadPoolExecutor
66
from pathlib import Path
7+
import shutil
78
from typing import Dict, Tuple
89
from uuid import uuid4
910

@@ -16,6 +17,7 @@
1617
from dve.core_engine.models import SubmissionInfo
1718
import dve.parser.file_handling as fh
1819
from dve.pipeline.duckdb_pipeline import DDBDVEPipeline
20+
from dve.pipeline.foundry_ddb_pipeline import FoundryDDBPipeline
1921

2022
from ..conftest import get_test_file_path
2123
from ..fixtures import temp_ddb_conn # pylint: disable=unused-import
@@ -204,3 +206,32 @@ def test_error_report_step(
204206

205207
audit_result = audit_manager.get_current_processing_info(submitted_file_info.submission_id)
206208
assert audit_result.processing_status == "success"
209+
210+
def test_foundry_runner_success(planet_test_files, temp_ddb_conn):
211+
db_file, conn = temp_ddb_conn
212+
processing_folder = planet_test_files
213+
214+
DuckDBRefDataLoader.connection = conn
215+
DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH)
216+
sub_id = uuid4().hex
217+
sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id,
218+
metadata_uri=PLANETS_RULES_PATH)
219+
220+
shutil.copytree()
221+
222+
with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager:
223+
dve_pipeline = FoundryDDBPipeline(
224+
audit_tables=audit_manager,
225+
connection=conn,
226+
rules_path=PLANETS_RULES_PATH,
227+
processed_files_path=processing_folder,
228+
submitted_files_path=None,
229+
reference_data_loader=DuckDBRefDataLoader,
230+
)
231+
232+
233+
def test_foundry_runner_fail():
234+
pass
235+
236+
def test_foundry_runner_error():
237+
pass

0 commit comments

Comments
 (0)