Skip to content

Commit 4499041

Browse files
committed
feat: small fixes and movies dataset working up to end of data contract
1 parent 8c4eab1 commit 4499041

File tree

5 files changed

+41
-26
lines changed

5 files changed

+41
-26
lines changed

src/dve/core_engine/backends/implementations/duckdb/contract.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ def generate_ddb_cast_statement(
9494
Current duckdb python API doesn't play well with this currently.
9595
"""
9696
if not null_flag:
97-
return f"try_cast({column_name} AS {dtype}) AS {column_name}"
98-
return f"cast(NULL AS {dtype}) AS {column_name}"
97+
return f'try_cast("{column_name}" AS {dtype}) AS "{column_name}"'
98+
return f'cast(NULL AS {dtype}) AS "{column_name}"'
9999

100100
def apply_data_contract(
101101
self, entities: DuckDBEntities, contract_metadata: DataContractMetadata

src/dve/core_engine/configuration/v1/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ class V1DataContractConfig(BaseModel):
137137

138138
cache_originals: bool = False
139139
"""Whether to cache the original entities after loading."""
140-
contract_error_message_info: Optional[URI] = None
140+
error_details: Optional[URI] = None
141141
"""Optional URI containing custom data contract error codes and messages"""
142142
types: Dict[TypeName, TypeOrDef] = Field(default_factory=dict)
143143
"""Dataset specific types defined within the config."""
@@ -304,9 +304,9 @@ def get_contract_metadata(self) -> DataContractMetadata:
304304

305305
contract_dict = self.contract.dict()
306306
error_info = {}
307-
if self.contract.contract_error_message_info:
307+
if self.contract.error_details:
308308
error_info = self.load_error_message_info(
309-
self.contract.contract_error_message_info
309+
self.contract.error_details
310310
)
311311
for entity_name, dataset_config in self.contract.datasets.items():
312312
reader_metadata[entity_name] = {

tests/features/movies.feature

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,20 @@ Feature: Pipeline tests using the movies dataset
66

77
Some validation of entity attributes is performed: SQL expressions and Python filter
88
functions are used, and templatable business rules feature in the transformations.
9-
9+
1010
Scenario: Validate and filter movies (duckdb)
1111
Given I submit the movies file movies.json for processing
1212
And A duckdb pipeline is configured
1313
And I add initial audit entries for the submission
1414
Then the latest audit record for the submission is marked with processing status file_transformation
1515
When I run the file transformation phase
16-
Then the planets entity is stored as a parquet after the file_transformation phase
16+
Then the movies entity is stored as a parquet after the file_transformation phase
1717
And the latest audit record for the submission is marked with processing status data_contract
1818
When I run the data contract phase
19-
Then there is 1 record rejection from the data_contract phase
19+
Then there are 2 record rejections from the data_contract phase
20+
And there are errors with the following details and associated error_count from the data_contract phase
21+
| ErrorCode | ErrorMessage | error_count |
22+
| BLANKYEAR | year not provided | 1 |
23+
| DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
2024
And the movies entity is stored as a parquet after the data_contract phase
2125
And the latest audit record for the submission is marked with processing status business_rules
22-
When I run the business rules phase
23-
Then The rules restrict "planets" to 1 qualifying record
24-
And At least one row from "planets" has generated error code "HIGH_DENSITY"
25-
And At least one row from "planets" has generated error code "WEAK_ESCAPE"
26-
And the planets entity is stored as a parquet after the business_rules phase
27-
And the latest audit record for the submission is marked with processing status error_report
28-
When I run the error report phase
29-
Then An error report is produced
30-
And The entity "planets" does not contain an entry for "Jupiter" in column "planet"
31-
And The entity "planets" contains an entry for "Neptune" in column "planet"
32-
And The statistics entry for the submission shows the following information
33-
| parameter | value |
34-
| record_count | 9 |
35-
| number_record_rejections | 18 |
36-
| number_warnings | 0 |

tests/features/steps/steps_pipeline.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77
"""
88
# pylint: disable=no-name-in-module
99
from concurrent.futures import ThreadPoolExecutor
10-
from functools import partial
10+
from functools import partial, reduce
1111
from itertools import chain
12+
import operator
1213
from pathlib import Path
13-
from typing import Callable, Dict, Optional
14+
from typing import Any, Callable, Dict, List, Optional, Tuple
1415
from uuid import uuid4
1516
from behave import given, then, when # type: ignore
1617
from behave.model import Row, Table
@@ -163,6 +164,30 @@ def get_record_rejects_from_service(context: Context, service: str, expected_num
163164
message_df = load_errors_from_service(processing_path, service)
164165
num_rejections = message_df.filter(pl.col("FailureType").eq("record")).shape[0]
165166
assert num_rejections == expected_num_errors, f"Got {num_rejections} actual rejections"
167+
168+
169+
@then("there are errors with the following details and associated error_count from the {service} phase")
170+
def check_error_record_details_from_service(context: Context, service:str):
171+
processing_path = ctxt.get_processing_location(context)
172+
table: Optional[Table] = context.table
173+
if table is None:
174+
raise ValueError("No table supplied in step")
175+
error_details: List[Tuple[pl.Expr, int]] = []
176+
row: Row
177+
for row in table:
178+
record = row.as_dict()
179+
error_count = int(record.pop("error_count"))
180+
filter_expr = reduce(operator.and_,
181+
[pl.col(k).eq(v) for k, v in record.items()])
182+
error_details.append((filter_expr, error_count))
183+
184+
message_df = load_errors_from_service(processing_path, service)
185+
for err_details in error_details:
186+
filter_expr, error_count = err_details
187+
assert message_df.filter(filter_expr).shape[0] == error_count
188+
189+
190+
166191

167192

168193
@given("A {implementation} pipeline is configured")

tests/testdata/movies/movies.dischema.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"schemas": {
44
"ratings": {
55
"fields": {
6-
"IMDB": "NonNegativeFloat",
6+
"IMDb": "NonNegativeFloat",
77
"RottenTomatoes": "str"
88

99
}
@@ -15,6 +15,7 @@
1515
}
1616
}
1717
},
18+
"error_details": "movies_contract_error_details.json",
1819
"datasets": {
1920
"movies": {
2021
"fields": {

0 commit comments

Comments
 (0)