Skip to content

Commit 4869482

Browse files
fix: deal with pathing assumption that file had been moved to processed_file_path during file transformation
1 parent 5b5a2eb commit 4869482

File tree

2 files changed

+53
-2
lines changed

2 files changed

+53
-2
lines changed

src/dve/pipeline/foundry_ddb_pipeline.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# pylint: disable=W0223
22
"""A duckdb pipeline for running on Foundry platform"""
33

4+
import shutil
5+
from pathlib import Path
46
from typing import Optional
57

68
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
@@ -23,6 +25,15 @@
2325
class FoundryDDBPipeline(DDBDVEPipeline):
2426
"""DuckDB pipeline for running on Foundry Platform"""
2527

28+
def _move_submission_to_processing_files_path(self, submission_info: SubmissionInfo):
29+
"""Move submitted file to 'processed_files_path'."""
30+
_submitted_file_location = Path(
31+
self._submitted_files_path, submission_info.file_name_with_ext # type: ignore
32+
)
33+
_dest = Path(self.processed_files_path, submission_info.submission_id)
34+
_dest.mkdir(parents=True, exist_ok=True)
35+
shutil.copy2(_submitted_file_location, _dest)
36+
2637
def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
2738
"""Write out key audit relations to parquet for persisting to datasets"""
2839
write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/")
@@ -113,8 +124,8 @@ def run_pipeline(
113124
try:
114125
sub_id: str = submission_info.submission_id
115126
report_uri = None
116-
self._audit_tables.add_new_submissions(submissions=[submission_info])
117-
self._audit_tables.mark_transform(submission_ids=[sub_id])
127+
if self._submitted_files_path:
128+
self._move_submission_to_processing_files_path(submission_info)
118129
sub_info, sub_status = self.file_transformation(submission_info=submission_info)
119130
if not (sub_status.validation_failed or sub_status.processing_failed):
120131
self._audit_tables.mark_data_contract(submission_ids=[sub_id])

tests/test_pipeline/test_foundry_ddb_pipeline.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from datetime import datetime
66
from pathlib import Path
77
import shutil
8+
import tempfile
89
from uuid import uuid4
910

1011
import pytest
@@ -116,3 +117,42 @@ def test_foundry_runner_error(planet_test_files, temp_ddb_conn):
116117
assert not fh.get_resource_exists(report_uri)
117118
assert not output_loc
118119
assert len(list(fh.iter_prefix(audit_files))) == 2
120+
121+
122+
def test_foundry_runner_with_submitted_files_path(movies_test_files, temp_ddb_conn):
123+
db_file, conn = temp_ddb_conn
124+
ref_db_file = Path(db_file.parent, "movies_refdata.duckdb").as_posix()
125+
conn.sql(f"ATTACH '{ref_db_file}' AS movies_refdata")
126+
conn.read_parquet(
127+
get_test_file_path("movies/refdata/movies_sequels.parquet").as_posix()
128+
).to_table("movies_refdata.sequels")
129+
processing_folder = Path(tempfile.mkdtemp()).as_posix()
130+
submitted_files_path = Path(movies_test_files).as_posix()
131+
sub_id = uuid4().hex
132+
sub_info = SubmissionInfo(
133+
submission_id=sub_id,
134+
dataset_id="movies",
135+
file_name="good_movies",
136+
file_extension="json",
137+
submitting_org="TEST",
138+
datetime_received=datetime(2025,11,5)
139+
)
140+
141+
DuckDBRefDataLoader.connection = conn
142+
DuckDBRefDataLoader.dataset_config_uri = None
143+
144+
with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager:
145+
dve_pipeline = FoundryDDBPipeline(
146+
processed_files_path=processing_folder,
147+
audit_tables=audit_manager,
148+
connection=conn,
149+
rules_path=get_test_file_path("movies/movies_ddb.dischema.json").as_posix(),
150+
submitted_files_path=submitted_files_path,
151+
reference_data_loader=DuckDBRefDataLoader,
152+
)
153+
output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info)
154+
155+
assert Path(processing_folder, sub_id, sub_info.file_name_with_ext).exists()
156+
assert fh.get_resource_exists(report_uri)
157+
assert len(list(fh.iter_prefix(output_loc))) == 2
158+
assert len(list(fh.iter_prefix(audit_files))) == 3

0 commit comments

Comments
 (0)