Skip to content

Commit 8c4eab1

Browse files
committed
feat: bug fixes with readers and base pipeline. Added spark csv reader. Added new dischema files for tests
1 parent 2a8c856 commit 8c4eab1

37 files changed

+994
-86
lines changed

src/dve/core_engine/backends/implementations/duckdb/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from dve.core_engine.backends.implementations.duckdb.readers.json import DuckDBJSONReader
12
from dve.core_engine.backends.readers import register_reader
23

34
from .contract import DuckDBDataContract
@@ -6,6 +7,7 @@
67
from .rules import DuckDBStepImplementations
78

89
register_reader(DuckDBCSVReader)
10+
register_reader(DuckDBJSONReader)
911
register_reader(DuckDBXMLStreamReader)
1012

1113
__all__ = [

src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from datetime import date, datetime
77
from decimal import Decimal
88
from pathlib import Path
9-
from typing import Any, ClassVar, Dict, Set, Union
9+
from typing import Any, ClassVar, Dict, Generator, Iterator, Set, Union
1010
from urllib.parse import urlparse
1111

1212
import duckdb.typing as ddbtyp
@@ -224,13 +224,21 @@ def _ddb_read_parquet(
224224

225225

226226
def _ddb_write_parquet( # pylint: disable=unused-argument
227-
self, entity: DuckDBPyRelation, target_location: URI, **kwargs
227+
self,
228+
entity: Union[Iterator[Dict[str, Any]],
229+
DuckDBPyRelation],
230+
target_location: URI,
231+
**kwargs
228232
) -> URI:
229233
"""Method to write parquet files from type cast entities
230234
following data contract application
231235
"""
232236
if isinstance(_get_implementation(target_location), LocalFilesystemImplementation):
233237
Path(target_location).parent.mkdir(parents=True, exist_ok=True)
238+
239+
if isinstance(entity, Generator):
240+
entity = self._connection.query("select dta.* from (select unnest($data) as dta)",
241+
params={"data": list(entity)})
234242

235243
entity.to_parquet(file_name=target_location, compression="snappy", **kwargs)
236244
return target_location
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
"""Readers for use with duckdb backend"""
22

33
from .csv import DuckDBCSVReader
4+
from .json import DuckDBJSONReader
45
from .xml import DuckDBXMLStreamReader
56

67
__all__ = [
78
"DuckDBCSVReader",
9+
"DuckDBJSONReader",
810
"DuckDBXMLStreamReader",
911
]

src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,42 +3,17 @@
33
# pylint: disable=arguments-differ
44
from typing import Any, Dict, Iterator, Type
55

6-
from duckdb import DuckDBPyConnection, DuckDBPyRelation, read_csv
6+
from duckdb import DuckDBPyConnection, DuckDBPyRelation, read_csv, default_connection
77
from pydantic import BaseModel
8-
from typing_extensions import Literal
98

109
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
1110
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
1211
duckdb_write_parquet,
1312
get_duckdb_type_from_annotation,
1413
)
14+
from dve.core_engine.backends.implementations.duckdb.types import SQLType
1515
from dve.core_engine.type_hints import URI, EntityName
1616

17-
SQLType = Literal[
18-
"BIGINT",
19-
"BIT",
20-
"BLOB",
21-
"BOOLEAN",
22-
"DATE",
23-
"DECIMAL",
24-
"DOUBLE",
25-
"HUGEINT",
26-
"INTEGER",
27-
"INTERVAL",
28-
"REAL",
29-
"SMALLINT",
30-
"TIME",
31-
"UBIGINT",
32-
"UHUGEINT",
33-
"UINTEGER",
34-
"USMALLINT",
35-
"UTINYINT",
36-
"UUID",
37-
"VARCHAR",
38-
]
39-
"""SQL types recognised in duckdb"""
40-
41-
4217
@duckdb_write_parquet
4318
class DuckDBCSVReader(BaseFileReader):
4419
"""A reader for CSV files"""
@@ -47,21 +22,21 @@ class DuckDBCSVReader(BaseFileReader):
4722
# TODO - stringify or not
4823
def __init__(
4924
self,
50-
header: bool,
51-
delim: str,
52-
connection: DuckDBPyConnection,
25+
header: bool = True,
26+
delim: str = ",",
27+
connection: DuckDBPyConnection = None,
5328
):
5429
self.header = header
5530
self.delim = delim
56-
self._connection = connection
31+
self._connection = connection if connection else default_connection
5732

5833
super().__init__()
5934

6035
def read_to_py_iterator(
6136
self, resource: URI, entity_name: EntityName, schema: Type[BaseModel]
6237
) -> Iterator[Dict[str, Any]]:
6338
"""Creates an iterable object of rows as dictionaries"""
64-
return self.read_to_relation(resource, entity_name, schema).pl().iter_rows(named=True)
39+
yield from self.read_to_relation(resource, entity_name, schema).pl().iter_rows(named=True)
6540

6641
@read_function(DuckDBPyRelation)
6742
def read_to_relation( # pylint: disable=unused-argument
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""A csv reader to create duckdb relations"""
2+
3+
# pylint: disable=arguments-differ
4+
from typing import Any, Dict, Iterator, Optional, Type
5+
6+
from duckdb import DuckDBPyRelation, read_json
7+
from pydantic import BaseModel
8+
from typing_extensions import Literal
9+
10+
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
11+
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
12+
duckdb_write_parquet,
13+
get_duckdb_type_from_annotation,
14+
)
15+
from dve.core_engine.backends.implementations.duckdb.types import SQLType
16+
from dve.core_engine.type_hints import URI, EntityName
17+
18+
19+
@duckdb_write_parquet
20+
class DuckDBJSONReader(BaseFileReader):
21+
"""A reader for JSON files"""
22+
23+
def __init__(
24+
self,
25+
format: Optional[str] = "array"
26+
):
27+
self._format = format
28+
29+
super().__init__()
30+
31+
def read_to_py_iterator(
32+
self, resource: URI, entity_name: EntityName, schema: Type[BaseModel]
33+
) -> Iterator[Dict[str, Any]]:
34+
"""Creates an iterable object of rows as dictionaries"""
35+
return self.read_to_relation(resource, entity_name, schema).pl().iter_rows(named=True)
36+
37+
@read_function(DuckDBPyRelation)
38+
def read_to_relation( # pylint: disable=unused-argument
39+
self, resource: URI, entity_name: EntityName, schema: Type[BaseModel]
40+
) -> DuckDBPyRelation:
41+
"""Returns a relation object from the source json"""
42+
43+
ddb_schema: Dict[str, SQLType] = {
44+
fld.name: str(get_duckdb_type_from_annotation(fld.annotation)) # type: ignore
45+
for fld in schema.__fields__.values()
46+
}
47+
48+
return read_json(resource,
49+
columns=ddb_schema,
50+
format=self._format)

src/dve/core_engine/backends/implementations/duckdb/readers/xml.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
# mypy: disable-error-code="attr-defined"
22
"""An xml reader to create duckdb relations"""
33

4-
from typing import Dict, Type
4+
from typing import Dict, Optional, Type
55

6+
import pandas as pd
67
import polars as pl
7-
from duckdb import DuckDBPyConnection, DuckDBPyRelation
8+
from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection
89
from pydantic import BaseModel
910

1011
from dve.core_engine.backends.base.reader import read_function
@@ -20,15 +21,17 @@
2021
class DuckDBXMLStreamReader(XMLStreamReader):
2122
"""A reader for XML files"""
2223

23-
def __init__(self, ddb_connection: DuckDBPyConnection, **kwargs):
24-
self.ddb_connection = ddb_connection
24+
def __init__(self,
25+
ddb_connection: Optional[DuckDBPyConnection] = None,
26+
**kwargs):
27+
self.ddb_connection = ddb_connection if ddb_connection else default_connection
2528
super().__init__(**kwargs)
2629

2730
@read_function(DuckDBPyRelation)
2831
def read_to_relation(self, resource: URI, entity_name: str, schema: Type[BaseModel]):
2932
"""Returns a relation object from the source xml"""
3033
polars_schema: Dict[str, pl.DataType] = { # type: ignore
31-
fld.name: get_polars_type_from_annotation(fld.type_)
34+
fld.name: get_polars_type_from_annotation(fld.annotation)
3235
for fld in stringify_model(schema).__fields__.values()
3336
}
3437

src/dve/core_engine/backends/implementations/duckdb/types.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,36 @@
22

33
# pylint: disable=C0103
44
from typing import MutableMapping
5+
from typing_extensions import Literal
56

67
from duckdb import DuckDBPyRelation
78

89
from dve.core_engine.type_hints import EntityName
910

11+
SQLType = Literal[
12+
"BIGINT",
13+
"BIT",
14+
"BLOB",
15+
"BOOLEAN",
16+
"DATE",
17+
"DECIMAL",
18+
"DOUBLE",
19+
"HUGEINT",
20+
"INTEGER",
21+
"INTERVAL",
22+
"REAL",
23+
"SMALLINT",
24+
"TIME",
25+
"UBIGINT",
26+
"UHUGEINT",
27+
"UINTEGER",
28+
"USMALLINT",
29+
"UTINYINT",
30+
"UUID",
31+
"VARCHAR",
32+
]
33+
"""SQL types recognised in duckdb"""
34+
1035
Source = DuckDBPyRelation
1136
"""The source entity for a join. This will be aliased to the source entity name."""
1237
Target = DuckDBPyRelation

src/dve/core_engine/backends/implementations/spark/__init__.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,20 @@
44

55
from .backend import SparkBackend
66
from .contract import SparkDataContract
7-
from .readers import SparkXMLReader
7+
from .readers import (
8+
SparkCSVReader,
9+
SparkJSONReader,
10+
SparkXMLReader,
11+
SparkXMLStreamReader
12+
)
813
from .reference_data import SparkRefDataLoader
914
from .rules import SparkStepImplementations
1015

16+
register_reader(SparkCSVReader)
17+
register_reader(SparkJSONReader)
1118
register_reader(SparkXMLReader)
19+
register_reader(SparkXMLStreamReader)
20+
1221

1322
__all__ = [
1423
"SparkBackend",
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,22 @@
11
"""Spark-specific readers."""
22

3+
from dve.core_engine.backends.implementations.spark.readers.csv import (
4+
SparkCSVReader
5+
)
6+
from dve.core_engine.backends.implementations.spark.readers.json import (
7+
SparkJSONReader
8+
)
9+
310
from dve.core_engine.backends.implementations.spark.readers.xml import (
411
SparkXMLReader,
512
SparkXMLStreamReader,
613
)
714

15+
16+
817
__all__ = [
18+
"SparkCSVReader"
19+
"SparkJSONReader",
920
"SparkXMLReader",
1021
"SparkXMLStreamReader",
1122
]
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
"""A reader implementation using the Databricks Spark XML reader."""
2+
3+
4+
from typing import Any, Dict, Iterator, Optional, Type
5+
6+
from pydantic import BaseModel
7+
from pyspark.sql import DataFrame, SparkSession
8+
from pyspark.sql.types import StructType
9+
10+
11+
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
12+
from dve.core_engine.backends.exceptions import EmptyFileError
13+
from dve.core_engine.backends.implementations.spark.spark_helpers import (
14+
get_type_from_annotation,
15+
spark_write_parquet,
16+
)
17+
from dve.core_engine.type_hints import URI, EntityName
18+
from dve.parser.file_handling import get_content_length
19+
20+
21+
@spark_write_parquet
22+
class SparkCSVReader(BaseFileReader):
23+
"""A Spark reader for JSON files."""
24+
25+
def __init__(
26+
self,
27+
*,
28+
delimiter: str = ",",
29+
escape_char: str = "\\",
30+
quote_char: str = '"',
31+
header: bool = True,
32+
multi_line: bool = False,
33+
encoding: str = "utf-8-sig",
34+
spark_session: SparkSession = None
35+
) -> None:
36+
37+
self.delimiter = delimiter
38+
self.escape_char = escape_char
39+
self.encoding = encoding
40+
self.quote_char = quote_char
41+
self.header = header
42+
self.multi_line = multi_line
43+
self.spark_session = spark_session if spark_session else SparkSession.builder.getOrCreate()
44+
45+
super().__init__()
46+
47+
def read_to_py_iterator(
48+
self, resource: URI, entity_name: EntityName, schema: Type[BaseModel]
49+
) -> Iterator[Dict[URI, Any]]:
50+
df = self.read_to_dataframe(resource, entity_name, schema)
51+
yield from (record.asDict(True) for record in df.toLocalIterator())
52+
53+
@read_function(DataFrame)
54+
def read_to_dataframe(
55+
self,
56+
resource: URI,
57+
entity_name: EntityName, # pylint: disable=unused-argument
58+
schema: Type[BaseModel],
59+
) -> DataFrame:
60+
"""Read an JSON file directly to a Spark DataFrame.
61+
62+
"""
63+
if get_content_length(resource) == 0:
64+
raise EmptyFileError(f"File at {resource} is empty.")
65+
66+
spark_schema: StructType = get_type_from_annotation(schema)
67+
kwargs = {
68+
"sep": self.delimiter,
69+
"header": self.header,
70+
"escape": self.escape_char,
71+
"quote": self.quote_char,
72+
"multiLine": self.multi_line,
73+
74+
}
75+
76+
return (
77+
self.spark_session.read.format("csv")
78+
.options(**kwargs) # type: ignore
79+
.load(resource, schema=spark_schema)
80+
)
81+

0 commit comments

Comments
 (0)