Skip to content

Commit 2a8c856

Browse files
committed
feat: added support for nested fields when configuring custom error details for data contract. Includes accessing nested error values in error messages.
1 parent 2f2ca5a commit 2a8c856

File tree

6 files changed

+254
-71
lines changed

6 files changed

+254
-71
lines changed

src/dve/core_engine/message.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
"""Functionality to represent messages."""
22

3+
import copy
34
import datetime as dt
5+
from functools import reduce
46
import json
57
from decimal import Decimal
6-
from typing import Any, Callable, ClassVar, Dict, List, Optional, Set, Type, Union
8+
import operator
9+
from typing import Any, Callable, ClassVar, Dict, List, Optional, Set, Tuple, Type, Union
710

811
from pydantic import ValidationError, validator, BaseModel
912
from pydantic.dataclasses import dataclass
@@ -24,16 +27,28 @@
2427
class DataContractErrorDetail(BaseModel):
2528
error_code: str
2629
error_message: Optional[str] = None
27-
def template_message(self, variables: Dict[str, Any]) -> str:
30+
def template_message(self,
31+
variables: Dict[str, Any],
32+
error_location: Optional[Tuple[Union[str, int], ...]] = None) -> str:
33+
if error_location:
34+
variables = self.extract_error_value(variables, error_location)
2835
return template_object(self.error_message, variables)
29-
36+
@staticmethod
37+
def extract_error_value(records, error_location):
38+
_records = copy.copy(records)
39+
try:
40+
_records["__error_value"] = reduce(operator.getitem, error_location, _records)
41+
except KeyError:
42+
pass
43+
return _records
44+
3045
DEFAULT_ERROR_DETAIL: Dict[ErrorCategory, DataContractErrorDetail] = {
3146
"Blank": DataContractErrorDetail(error_code="FieldBlank",
3247
error_message="cannot be blank"),
3348
"Bad value": DataContractErrorDetail(error_code="BadValue",
3449
error_message="is invalid"),
3550
"Wrong Format": DataContractErrorDetail(error_code="WrongFormat",
36-
error_message="has worng format"),}
51+
error_message="has wrong format"),}
3752

3853

3954
INTEGRITY_ERROR_CODES: Set[str] = {"blockingsubmission"}
@@ -190,12 +205,15 @@ def from_pydantic_error(
190205
failure_type = "submission"
191206
else:
192207
failure_type = "record"
208+
209+
error_field = ".".join([idx for idx in error_dict["loc"]
210+
if not isinstance(idx, int)])
193211

194212
is_informational = False
195213
if error_code.endswith("warning"):
196214
is_informational = True
197215
error_detail: DataContractErrorDetail = error_details.get(
198-
error_dict["loc"][-1],
216+
error_field,
199217
DEFAULT_ERROR_DETAIL
200218
).get(category)
201219

@@ -207,7 +225,7 @@ def from_pydantic_error(
207225
is_informational=is_informational,
208226
error_type=error_type,
209227
error_location=error_dict["loc"], # type: ignore
210-
error_message=error_detail.template_message(record),
228+
error_message=error_detail.template_message(record, error_dict["loc"]),
211229
reporting_field=error_dict["loc"][-1], # type: ignore
212230
category=category, # type: ignore
213231
error_code=error_detail.error_code, # type: ignore

src/dve/core_engine/templating.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def template_object(
8383

8484
if isinstance(object_, str):
8585
if method == "jinja":
86-
return ENVIRONMENT.from_string(object_).render(**variables) # type: ignore
86+
return ENVIRONMENT.from_string(object_).render(variables) # type: ignore
8787
return object_.format(**variables) # type: ignore
8888

8989
parameterise = partial(template_object, variables=variables, method=method)

tests/test_core_engine/test_backends/fixtures.py

Lines changed: 115 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -461,39 +461,129 @@ def nested_typecast_parquet(temp_dir) -> Iterator[Tuple[URI, List[Dict[str, Any]
461461
_df.coalesce(1).write.format("parquet").save(output_location)
462462
yield output_location, data
463463

464+
@pytest.fixture(scope="function")
465+
def nested_all_string_parquet_w_errors(temp_dir,
466+
nested_parquet_custom_dc_err_details) -> Iterator[Tuple[URI, str, List[Dict[str, Any]]]]:
467+
contract_meta = json.dumps(
468+
{
469+
"contract": {
470+
"error_details": f"{nested_parquet_custom_dc_err_details.as_posix()}",
471+
"schemas": {
472+
"SubField": {
473+
"fields": {
474+
"id": "int",
475+
"substrfield": "str",
476+
"subarrayfield": {"type": "date", "is_array": True},
477+
},
478+
"mandatory_fields": ["id"],
479+
}
480+
},
481+
"datasets": {
482+
"nested_model": {
483+
"fields": {
484+
"id": "int",
485+
"strfield": "str",
486+
"datetimefield": "datetime",
487+
"subfield": {"model": "SubField", "is_array": True},
488+
},
489+
"reader_config": {
490+
".xml": {
491+
"reader": "DuckDBXMLStreamReader",
492+
"parameters": {"root_tag": "root", "record_tag": "NestedModel"},
493+
}
494+
},
495+
"key_field": "id",
496+
}
497+
},
498+
}
499+
}
500+
)
501+
502+
_spark: SparkSession = SparkSession.builder.getOrCreate()
503+
data: List[Dict[str, Any]] = [
504+
dict(
505+
id=1,
506+
strfield="hi",
507+
datetimefield=str(datetime(2020, 9, 20, 12, 34, 56)),
508+
subfield=[
509+
dict(
510+
id=1,
511+
substrfield="bye",
512+
subarrayfield=[str(date(2020, 9, 20)), str(date(2020, 9, 21))],
513+
)
514+
],
515+
),
516+
dict(
517+
id="WRONG",
518+
strfield="hello",
519+
datetimefield=str(datetime(2020, 9, 21, 12, 34, 56)),
520+
subfield=[
521+
dict(
522+
id=2,
523+
substrfield="bye",
524+
subarrayfield=[str(date(2020, 9, 20)), str(date(2020, 9, 21))],
525+
),
526+
dict(
527+
id="WRONG",
528+
substrfield="aurevoir",
529+
subarrayfield=[str(date(2020, 9, 22)), str(date(2020, 9, 23))],
530+
),
531+
],
532+
),
533+
]
534+
535+
output_location: URI = str(Path(temp_dir).joinpath("nested_parquet").as_posix()) + "/"
536+
537+
_df: DataFrame = _spark.createDataFrame(
538+
data,
539+
schema=StructType(
540+
[
541+
StructField("id", StringType()),
542+
StructField("strfield", StringType()),
543+
StructField("datetimefield", StringType()),
544+
StructField(
545+
"subfield",
546+
ArrayType(
547+
StructType(
548+
[
549+
StructField("id", StringType()),
550+
StructField("substrfield", StringType()),
551+
StructField("subarrayfield", ArrayType(StringType())),
552+
]
553+
)
554+
),
555+
),
556+
]
557+
),
558+
)
559+
_df.coalesce(1).write.format("parquet").save(output_location)
560+
yield output_location, contract_meta, data
561+
562+
464563
@pytest.fixture()
465564
def nested_parquet_custom_dc_err_details(temp_dir):
565+
file_path = Path(temp_dir).joinpath("nested_parquet_data_contract_codes.json")
466566
err_details = {
467567
"id": {
468-
"Blank": {"error_code": "TESTID",
568+
"Blank": {"error_code": "TESTIDBLANK",
469569
"error_message": "id cannot be null"},
470-
"Bad Value": {"error_code": "TESTID",
570+
"Bad value": {"error_code": "TESTIDBAD",
471571
"error_message": "id is invalid: id - {{id}}"}
472572
},
473573
"datetimefield": {
474-
"Bad Value": {"error_code": "TESTDTFIELD",
574+
"Bad value": {"error_code": "TESTDTFIELDBAD",
475575
"error_message": "datetimefield is invalid: id - {{id}}, datetimefield - {{datetimefield}}"}
476-
}
576+
},
577+
"subfield.id": {
578+
"Blank": {"error_code": "SUBFIELDTESTIDBLANK",
579+
"error_message": "subfield id cannot be null"},
580+
"Bad value": {"error_code": "SUBFIELDTESTIDBAD",
581+
"error_message": "subfield id is invalid: subfield.id - {{__error_value}}"}
582+
},
477583
}
584+
with open(file_path, mode="w") as fle:
585+
json.dump(err_details, fle)
586+
587+
yield file_path
478588

479-
480-
481-
StructType(
482-
[
483-
StructField("id", StringType()),
484-
StructField("strfield", StringType()),
485-
StructField("datetimefield", StringType()),
486-
StructField(
487-
"subfield",
488-
ArrayType(
489-
StructType(
490-
[
491-
StructField("id", StringType()),
492-
StructField("substrfield", StringType()),
493-
StructField("subarrayfield", ArrayType(StringType())),
494-
]
495-
)
496-
),
497-
),
498-
]
499-
)
589+

tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from tests.test_core_engine.test_backends.fixtures import (
2020
nested_all_string_parquet,
2121
simple_all_string_parquet,
22+
nested_all_string_parquet_w_errors,
2223
nested_parquet_custom_dc_err_details,
2324
temp_csv_file,
2425
temp_duckdb_dir,
@@ -307,5 +308,48 @@ def test_ddb_data_contract_read_nested_parquet(nested_all_string_parquet):
307308
"subfield": "STRUCT(id BIGINT, substrfield VARCHAR, subarrayfield DATE[])[]",
308309
}
309310

310-
def test_ddb_data_contract_custom_error_details():
311-
pass
311+
def test_duckdb_data_contract_custom_error_details(nested_all_string_parquet_w_errors,
312+
nested_parquet_custom_dc_err_details):
313+
parquet_uri, contract_meta, _ = nested_all_string_parquet_w_errors
314+
connection = default_connection
315+
data_contract = DuckDBDataContract(connection)
316+
317+
entity = data_contract.read_parquet(path=parquet_uri)
318+
assert entity.count("*").fetchone()[0] == 2
319+
320+
# check processes entity
321+
contract_dict = json.loads(contract_meta).get("contract")
322+
entities: Dict[str, DuckDBPyRelation] = {
323+
"nested_model": entity,
324+
}
325+
326+
with open(nested_parquet_custom_dc_err_details) as err_dets:
327+
custom_error_details = json.load(err_dets)
328+
329+
dc_meta = DataContractMetadata(
330+
reader_metadata={
331+
"nested_model": {
332+
".xml": ReaderConfig(
333+
**contract_dict.get("datasets", {})
334+
.get("nested_model", {})
335+
.get("reader_config", {})
336+
.get(".xml")
337+
)
338+
}
339+
},
340+
validators={
341+
"nested_model": RowValidator(contract_dict,
342+
"nested_model",
343+
error_info=custom_error_details)
344+
},
345+
reporting_fields={"nested_model": ["id"]},
346+
)
347+
348+
entities, messages, stage_successful = data_contract.apply_data_contract(entities, dc_meta)
349+
assert stage_successful
350+
assert len(messages) == 2
351+
messages = sorted(messages, key= lambda x: x.error_code)
352+
assert messages[0].error_code == "SUBFIELDTESTIDBAD"
353+
assert messages[0].error_message == "subfield id is invalid: subfield.id - WRONG"
354+
assert messages[1].error_code == "TESTIDBAD"
355+
assert messages[1].error_message == "id is invalid: id - WRONG"

tests/test_core_engine/test_backends/test_implementations/test_spark/test_data_contract.py

Lines changed: 19 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from dve.core_engine.validation import RowValidator
2121
from tests.test_core_engine.test_backends.fixtures import (
2222
nested_all_string_parquet,
23+
nested_all_string_parquet_w_errors,
2324
simple_all_string_parquet,
2425
nested_parquet_custom_dc_err_details
2526
)
@@ -172,12 +173,11 @@ def test_spark_data_contract_read_nested_parquet(nested_all_string_parquet):
172173
]
173174
)
174175

175-
def test_spark_data_contract_custom_error_details(nested_all_string_parquet):
176-
# can we read in a stringified parquet and run the data contract on it?
177-
# more complex file - nested, arrays of structs
178-
parquet_uri, contract_meta, _ = nested_all_string_parquet
176+
def test_spark_data_contract_custom_error_details(nested_all_string_parquet_w_errors,
177+
nested_parquet_custom_dc_err_details):
178+
parquet_uri, contract_meta, _ = nested_all_string_parquet_w_errors
179179
data_contract = SparkDataContract()
180-
# check can read
180+
181181
entity = data_contract.read_parquet(path=parquet_uri)
182182
assert entity.count() == 2
183183
assert entity.schema == StructType(
@@ -204,6 +204,9 @@ def test_spark_data_contract_custom_error_details(nested_all_string_parquet):
204204
entities: Dict[str, DataFrame] = {
205205
"nested_model": entity,
206206
}
207+
208+
with open(nested_parquet_custom_dc_err_details) as err_dets:
209+
custom_error_details = json.load(err_dets)
207210

208211
dc_meta = DataContractMetadata(
209212
reader_metadata={
@@ -217,40 +220,20 @@ def test_spark_data_contract_custom_error_details(nested_all_string_parquet):
217220
}
218221
},
219222
validators={
220-
"nested_model": RowValidator(contract_dict, "nested_model"),
223+
"nested_model": RowValidator(contract_dict,
224+
"nested_model",
225+
error_info=custom_error_details)
221226
},
222227
reporting_fields={"nested_model": ["id"]},
223228
)
224229

225230
entities, messages, stage_successful = data_contract.apply_data_contract(entities, dc_meta)
226231
assert stage_successful
227-
assert len(messages) == 0
228-
assert entities["nested_model"].count() == 2
229-
# check writes entity to parquet
230-
output_path: Path = Path(parquet_uri).parent.joinpath("nested_model_output.parquet")
231-
data_contract.write_parquet(
232-
entity=entities["nested_model"], target_location=output_path.as_posix()
233-
)
234-
assert output_path.exists()
235-
# check when read back in what is expected
236-
check = data_contract.read_parquet(path=output_path.as_posix())
237-
assert check.count() == 2
238-
assert check.schema == StructType(
239-
[
240-
StructField("id", LongType()),
241-
StructField("strfield", StringType()),
242-
StructField("datetimefield", TimestampType()),
243-
StructField(
244-
"subfield",
245-
ArrayType(
246-
StructType(
247-
[
248-
StructField("id", LongType()),
249-
StructField("substrfield", StringType()),
250-
StructField("subarrayfield", ArrayType(DateType())),
251-
]
252-
)
253-
),
254-
),
255-
]
256-
)
232+
assert len(messages) == 2
233+
messages = sorted(messages, key= lambda x: x.error_code)
234+
assert messages[0].error_code == "SUBFIELDTESTIDBAD"
235+
assert messages[0].error_message == "subfield id is invalid: subfield.id - WRONG"
236+
assert messages[1].error_code == "TESTIDBAD"
237+
assert messages[1].error_message == "id is invalid: id - WRONG"
238+
239+

0 commit comments

Comments
 (0)