Skip to content

Commit 7d2f71f

Browse files
feature: add custom error codes for data contract and a number of new readers for spark and duckdb
custom error codes for data contract; new duckdb and spark readers and several bugfixes.
2 parents 970f0ef + 5204206 commit 7d2f71f

File tree

66 files changed

+1975
-271
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+1975
-271
lines changed

docs/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ DVE configuration can be instantiated from a json (dischema) file which might be
1818
{
1919
"contract": {
2020
"cache_originals": true,
21-
"contract_error_codes": null,
21+
"error_details": null,
2222
"types": {},
2323
"schemas": {},
2424
"datasets": {

docs/detailed_guidance/data_contract.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Lets look at the data contract configuration from [Introduction to DVE](../READM
44
{
55
"contract": {
66
"cache_originals": true,
7-
"contract_error_codes": null,
7+
"error_details": null,
88
"types": {},
99
"schemas": {},
1010
"datasets": {
@@ -78,7 +78,7 @@ Here we have only filled out datasets. We've added a few more fields such as `Pe
7878
{
7979
"contract": {
8080
"cache_originals": true,
81-
"contract_error_codes": null,
81+
"error_details": null,
8282
"types": {
8383
"isodate": {
8484
"description": "an isoformatted date type",
@@ -172,7 +172,7 @@ We can see here that the Activity has a number of fields. `startdate`, `enddate`
172172
{
173173
"contract": {
174174
"cache_originals": true,
175-
"contract_error_codes": null,
175+
"error_details": null,
176176
"types": {
177177
"isodate": {
178178
"description": "an isoformatted date type",
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"$schema": "https://json-schema.org/draft-07/schema",
3+
"$id": "data-ingest:contract/components/contract_error_details.schema.json",
4+
"title": "base_entity",
5+
"description": "A mapping of field names to the custom error code and message required if these fields were to fail validation during the data contract phase. For nested fields, these should be specified using struct '.' notation (eg. fieldA.fieldB.fieldC)",
6+
"type": "object",
7+
"additionalProperties": {
8+
"$ref": "field_error_type.schema.json"
9+
}
10+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"$schema": "https://json-schema.org/draft-07/schema",
3+
"$id": "data-ingest:contract/components/field_error_detail.schema.json",
4+
"title": "field_error_detail",
5+
"description": "The custom details to be used for a field when a validation error is raised during the data contract phase",
6+
"type": "object",
7+
"properties": {
8+
"error_code": {
9+
"description": "The code to be used for the field and error type specified",
10+
"type": "string"
11+
},
12+
"error_message": {
13+
"description": "The message to be used for the field and error type specified. This can include templating (specified using jinja2 conventions). During templating, the full record will be available with an additional __error_value to easily obtain nested offending values.",
14+
"type": "string",
15+
"enum": [
16+
"record_rejection",
17+
"file_rejection",
18+
"warning"
19+
]
20+
}
21+
},
22+
"required": [
23+
"error_code",
24+
"error_message"
25+
],
26+
"additionalProperties": false
27+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"$schema": "https://json-schema.org/draft-07/schema",
3+
"$id": "data-ingest:contract/components/field_error_type.schema.json",
4+
"title": "field_error_detail",
5+
"description": "The error type for a field when a validation error is raised during the data contract phase",
6+
"type": "object",
7+
"properties": {
8+
"error_type": {
9+
"description": "The type of error the details are for",
10+
"type": "string",
11+
"enum": [
12+
"Blank",
13+
"Bad value",
14+
"Wrong format"
15+
],
16+
"additionalProperties": {
17+
"$ref": "field_error_detail.schema.json"
18+
}
19+
}
20+
}
21+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
"""Implementation of duckdb backend"""
2+
from dve.core_engine.backends.implementations.duckdb.readers.json import DuckDBJSONReader
3+
from dve.core_engine.backends.readers import register_reader
4+
5+
from .contract import DuckDBDataContract
6+
from .readers import DuckDBCSVReader, DuckDBXMLStreamReader
7+
from .reference_data import DuckDBRefDataLoader
8+
from .rules import DuckDBStepImplementations
9+
10+
register_reader(DuckDBCSVReader)
11+
register_reader(DuckDBJSONReader)
12+
register_reader(DuckDBXMLStreamReader)
13+
14+
__all__ = [
15+
"DuckDBDataContract",
16+
"DuckDBRefDataLoader",
17+
"DuckDBStepImplementations",
18+
]

src/dve/core_engine/backends/implementations/duckdb/auditing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
)
1414
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
1515
PYTHON_TYPE_TO_DUCKDB_TYPE,
16-
PYTHON_TYPE_TO_POLARS_TYPE,
1716
table_exists,
1817
)
18+
from dve.core_engine.backends.utilities import PYTHON_TYPE_TO_POLARS_TYPE
1919
from dve.core_engine.models import (
2020
AuditRecord,
2121
ProcessingStatusRecord,

src/dve/core_engine/backends/implementations/duckdb/contract.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@
2020
duckdb_read_parquet,
2121
duckdb_write_parquet,
2222
get_duckdb_type_from_annotation,
23-
get_polars_type_from_annotation,
2423
relation_is_empty,
2524
)
2625
from dve.core_engine.backends.implementations.duckdb.types import DuckDBEntities
2726
from dve.core_engine.backends.metadata.contract import DataContractMetadata
2827
from dve.core_engine.backends.types import StageSuccessful
29-
from dve.core_engine.backends.utilities import stringify_model
28+
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
3029
from dve.core_engine.message import FeedbackMessage
3130
from dve.core_engine.type_hints import URI, Messages
3231
from dve.core_engine.validation import RowValidator
@@ -95,8 +94,8 @@ def generate_ddb_cast_statement(
9594
Current duckdb python API doesn't play well with this currently.
9695
"""
9796
if not null_flag:
98-
return f"try_cast({column_name} AS {dtype}) AS {column_name}"
99-
return f"cast(NULL AS {dtype}) AS {column_name}"
97+
return f'try_cast("{column_name}" AS {dtype}) AS "{column_name}"'
98+
return f'cast(NULL AS {dtype}) AS "{column_name}"'
10099

101100
def apply_data_contract(
102101
self, entities: DuckDBEntities, contract_metadata: DataContractMetadata

src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py

Lines changed: 8 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,14 @@
66
from datetime import date, datetime
77
from decimal import Decimal
88
from pathlib import Path
9-
from typing import Any, ClassVar, Dict, Set, Union
9+
from typing import Any, ClassVar, Dict, Generator, Iterator, Set, Union
1010
from urllib.parse import urlparse
1111

1212
import duckdb.typing as ddbtyp
1313
import numpy as np
14-
import polars as pl # type: ignore
1514
from duckdb import DuckDBPyConnection, DuckDBPyRelation
1615
from duckdb.typing import DuckDBPyType
1716
from pandas import DataFrame
18-
from polars.datatypes.classes import DataTypeClass as PolarsType
1917
from pydantic import BaseModel
2018
from typing_extensions import Annotated, get_args, get_origin, get_type_hints
2119

@@ -91,19 +89,6 @@ def __call__(self):
9189
}
9290
"""A mapping of Python types to the equivalent DuckDB types."""
9391

94-
PYTHON_TYPE_TO_POLARS_TYPE: Dict[type, PolarsType] = {
95-
# issue with decimal conversion at the moment...
96-
str: pl.Utf8, # type: ignore
97-
int: pl.Int64, # type: ignore
98-
bool: pl.Boolean, # type: ignore
99-
float: pl.Float64, # type: ignore
100-
bytes: pl.Binary, # type: ignore
101-
date: pl.Date, # type: ignore
102-
datetime: pl.Datetime, # type: ignore
103-
Decimal: pl.Utf8, # type: ignore
104-
}
105-
"""A mapping of Python types to the equivalent Polars types."""
106-
10792

10893
def table_exists(connection: DuckDBPyConnection, table_name: str) -> bool:
10994
"""check if a table exists in a given DuckDBPyConnection"""
@@ -205,98 +190,6 @@ def get_duckdb_type_from_annotation(type_annotation: Any) -> DuckDBPyType:
205190
raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}")
206191

207192

208-
def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType:
209-
"""Get a polars type from a Python type annotation.
210-
211-
Supported types are any of the following (this definition is recursive):
212-
- Supported basic Python types. These are:
213-
* `str`: pl.Utf8
214-
* `int`: pl.Int64
215-
* `bool`: pl.Boolean
216-
* `float`: pl.Float64
217-
* `bytes`: pl.Binary
218-
* `datetime.date`: pl.Date
219-
* `datetime.datetime`: pl.Datetime
220-
* `decimal.Decimal`: pl.Decimal with precision of 38 and scale of 18
221-
- A list of supported types (e.g. `List[str]` or `typing.List[str]`).
222-
This will return a pl.List type (variable length)
223-
- A `typing.Optional` type or a `typing.Union` of the type and `None` (e.g.
224-
`typing.Optional[str]`, `typing.Union[List[str], None]`). This will remove the
225-
'optional' wrapper and return the inner type
226-
- A subclass of `typing.TypedDict` with values typed using supported types. This
227-
will parse the value types as Polars types and return a Polars Struct.
228-
- A dataclass or `pydantic.main.ModelMetaClass` with values typed using supported types.
229-
This will parse the field types as Polars types and return a Polars Struct.
230-
- Any supported type, with a `typing_extensions.Annotated` wrapper.
231-
- A `decimal.Decimal` wrapped with `typing_extensions.Annotated` with a `DecimalConfig`
232-
indicating precision and scale. This will return a Polars Decimal
233-
with the specfied scale and precision.
234-
- A `pydantic.types.condecimal` created type.
235-
236-
Any `ClassVar` types within `TypedDict`s, dataclasses, or `pydantic` models will be
237-
ignored.
238-
239-
"""
240-
type_origin = get_origin(type_annotation)
241-
242-
# An `Optional` or `Union` type, check to ensure non-heterogenity.
243-
if type_origin is Union:
244-
python_type = _get_non_heterogenous_type(get_args(type_annotation))
245-
return get_polars_type_from_annotation(python_type)
246-
247-
# Type hint is e.g. `List[str]`, check to ensure non-heterogenity.
248-
if type_origin is list or (isinstance(type_origin, type) and issubclass(type_origin, list)):
249-
element_type = _get_non_heterogenous_type(get_args(type_annotation))
250-
return pl.List(get_polars_type_from_annotation(element_type)) # type: ignore
251-
252-
if type_origin is Annotated:
253-
python_type, *other_args = get_args(type_annotation) # pylint: disable=unused-variable
254-
return get_polars_type_from_annotation(python_type)
255-
# Ensure that we have a concrete type at this point.
256-
if not isinstance(type_annotation, type):
257-
raise ValueError(f"Unsupported type annotation {type_annotation!r}")
258-
259-
if (
260-
# Type hint is a dict subclass, but not dict. Possibly a `TypedDict`.
261-
(issubclass(type_annotation, dict) and type_annotation is not dict)
262-
# Type hint is a dataclass.
263-
or is_dataclass(type_annotation)
264-
# Type hint is a `pydantic` model.
265-
or (type_origin is None and issubclass(type_annotation, BaseModel))
266-
):
267-
fields: Dict[str, PolarsType] = {}
268-
for field_name, field_annotation in get_type_hints(type_annotation).items():
269-
# Technically non-string keys are disallowed, but people are bad.
270-
if not isinstance(field_name, str):
271-
raise ValueError(
272-
f"Dictionary/Dataclass keys must be strings, got {type_annotation!r}"
273-
) # pragma: no cover
274-
if get_origin(field_annotation) is ClassVar:
275-
continue
276-
277-
fields[field_name] = get_polars_type_from_annotation(field_annotation)
278-
279-
if not fields:
280-
raise ValueError(
281-
f"No type annotations in dict/dataclass type (got {type_annotation!r})"
282-
)
283-
284-
return pl.Struct(fields) # type: ignore
285-
286-
if type_annotation is list:
287-
raise ValueError(
288-
f"List must have type annotation (e.g. `List[str]`), got {type_annotation!r}"
289-
)
290-
if type_annotation is dict or type_origin is dict:
291-
raise ValueError(f"Dict must be `typing.TypedDict` subclass, got {type_annotation!r}")
292-
293-
for type_ in type_annotation.mro():
294-
polars_type = PYTHON_TYPE_TO_POLARS_TYPE.get(type_)
295-
if polars_type:
296-
return polars_type
297-
raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}")
298-
299-
300193
def coerce_inferred_numpy_array_to_list(pandas_df: DataFrame) -> DataFrame:
301194
"""Function to modify numpy inferred array when cnverting from duckdb relation to
302195
pandas dataframe - these cause issues with pydantic models
@@ -331,15 +224,20 @@ def _ddb_read_parquet(
331224

332225

333226
def _ddb_write_parquet( # pylint: disable=unused-argument
334-
self, entity: DuckDBPyRelation, target_location: URI, **kwargs
227+
self, entity: Union[Iterator[Dict[str, Any]], DuckDBPyRelation], target_location: URI, **kwargs
335228
) -> URI:
336229
"""Method to write parquet files from type cast entities
337230
following data contract application
338231
"""
339232
if isinstance(_get_implementation(target_location), LocalFilesystemImplementation):
340233
Path(target_location).parent.mkdir(parents=True, exist_ok=True)
341234

342-
entity.to_parquet(file_name=target_location, compression="snappy", **kwargs)
235+
if isinstance(entity, Generator):
236+
entity = self._connection.query(
237+
"select dta.* from (select unnest($data) as dta)", params={"data": list(entity)}
238+
)
239+
240+
entity.to_parquet(file_name=target_location, compression="snappy", **kwargs) # type: ignore
343241
return target_location
344242

345243

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
"""Readers for use with duckdb backend"""
22

33
from .csv import DuckDBCSVReader
4+
from .json import DuckDBJSONReader
45
from .xml import DuckDBXMLStreamReader
56

67
__all__ = [
78
"DuckDBCSVReader",
9+
"DuckDBJSONReader",
810
"DuckDBXMLStreamReader",
911
]

0 commit comments

Comments
 (0)