Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,54 @@ def datasets_to_ldm(
# Get the data source info
dataset_source_table_id, dataset_sql = self._get_sources(dataset)

# Build one reference source per column for composite-key joins to
# the parent dataset. The three list fields are validated to share
# the same length on the input model.
parent_reference_sources = [
CatalogDeclarativeReferenceSource(
column=column,
data_type=data_type.value,
target=CatalogGrainIdentifier(
id=attribute_id,
type=CustomFieldType.ATTRIBUTE.value,
),
)
for column, data_type, attribute_id in zip(
dataset.definition.dataset_reference_source_columns,
dataset.definition.dataset_reference_source_column_data_types,
dataset.definition.parent_dataset_reference_attribute_ids,
strict=True,
)
]

# Workspace data filter fields are optional and must be set together
# (validated on the input model). Emit columns/references only when
# both are provided.
wdf_columns: list[CatalogDeclarativeWorkspaceDataFilterColumn] = []
wdf_references: list[
CatalogDeclarativeWorkspaceDataFilterReferences
] = []
if (
dataset.definition.workspace_data_filter_id is not None
and dataset.definition.workspace_data_filter_column_name
is not None
):
wdf_columns.append(
CatalogDeclarativeWorkspaceDataFilterColumn(
name=dataset.definition.workspace_data_filter_column_name,
data_type=ColumnDataType.STRING.value,
)
)
wdf_references.append(
CatalogDeclarativeWorkspaceDataFilterReferences(
filter_id=CatalogDatasetWorkspaceDataFilterIdentifier(
id=dataset.definition.workspace_data_filter_id
),
filter_column=dataset.definition.workspace_data_filter_column_name,
filter_column_data_type=ColumnDataType.STRING.value,
)
)

# Construct the declarative dataset object and append it to the list.
declarative_datasets.append(
CatalogDeclarativeDataset(
Expand All @@ -265,16 +313,7 @@ def datasets_to_ldm(
id=dataset.definition.parent_dataset_reference,
),
multivalue=True,
sources=[
CatalogDeclarativeReferenceSource(
column=dataset.definition.dataset_reference_source_column,
data_type=dataset.definition.dataset_reference_source_column_data_type.value,
target=CatalogGrainIdentifier(
id=dataset.definition.parent_dataset_reference_attribute_id,
type=CustomFieldType.ATTRIBUTE.value,
),
)
],
sources=parent_reference_sources,
),
]
+ date_references,
Expand All @@ -283,21 +322,8 @@ def datasets_to_ldm(
facts=facts,
data_source_table_id=dataset_source_table_id,
sql=dataset_sql,
workspace_data_filter_columns=[
CatalogDeclarativeWorkspaceDataFilterColumn(
name=dataset.definition.workspace_data_filter_column_name,
data_type=ColumnDataType.STRING.value,
)
],
workspace_data_filter_references=[
CatalogDeclarativeWorkspaceDataFilterReferences(
filter_id=CatalogDatasetWorkspaceDataFilterIdentifier(
id=dataset.definition.workspace_data_filter_id
),
filter_column=dataset.definition.workspace_data_filter_column_name,
filter_column_data_type=ColumnDataType.STRING.value,
)
],
workspace_data_filter_columns=wdf_columns or None,
workspace_data_filter_references=wdf_references or None,
tags=_effective_dataset_tags(dataset.definition),
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,18 @@ def check_ids_not_equal(self) -> "CustomFieldDefinition":


class CustomDatasetDefinition(BaseModel):
"""Input model for custom dataset definition."""
"""Input model for custom dataset definition.

Reference fields are lists to support composite-key joins to the parent
dataset. ``parent_dataset_reference_attribute_ids``, ``dataset_reference_source_columns``,
and ``dataset_reference_source_column_data_types`` must all have the same
(non-zero) length and are zipped position-by-position to produce one
declarative reference source per column.

Workspace data filter fields are optional. Both must be set together or
both left unset; when set, a single-column WDF binding is emitted on the
declarative dataset.
"""

workspace_id: str
dataset_id: str
Expand All @@ -71,11 +82,11 @@ class CustomDatasetDefinition(BaseModel):
dataset_source_table: str | None
dataset_source_sql: str | None
parent_dataset_reference: str
parent_dataset_reference_attribute_id: str
dataset_reference_source_column: str
dataset_reference_source_column_data_type: ColumnDataType
workspace_data_filter_id: str
workspace_data_filter_column_name: str
parent_dataset_reference_attribute_ids: list[str]
dataset_reference_source_columns: list[str]
dataset_reference_source_column_data_types: list[ColumnDataType]
workspace_data_filter_id: str | None = None
workspace_data_filter_column_name: str | None = None
dataset_description: str | None = Field(
default=None,
description="Declarative description on the custom dataset.",
Expand All @@ -98,6 +109,36 @@ def check_source(self) -> "CustomDatasetDefinition":
)
return self

@model_validator(mode="after")
def check_reference_lists(self) -> "CustomDatasetDefinition":
"""Reference list fields must be parallel (same non-zero length)."""
n = len(self.dataset_reference_source_columns)
if n == 0:
raise ValueError(
"dataset_reference_source_columns must contain at least one column"
)
if (
len(self.parent_dataset_reference_attribute_ids) != n
or len(self.dataset_reference_source_column_data_types) != n
):
raise ValueError(
"parent_dataset_reference_attribute_ids, dataset_reference_source_columns, "
"and dataset_reference_source_column_data_types must have the same length"
)
return self

@model_validator(mode="after")
def check_wdf_pair(self) -> "CustomDatasetDefinition":
"""Workspace data filter id and column name must be provided together or both omitted."""
has_id = self.workspace_data_filter_id is not None
has_col = self.workspace_data_filter_column_name is not None
if has_id != has_col:
raise ValueError(
"workspace_data_filter_id and workspace_data_filter_column_name "
"must both be set or both be omitted"
)
return self


class CustomDataset(BaseModel):
"""Custom dataset with its definition and custom fields."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ def mock_dataset_definition():
dataset_datasource_id="ds_source",
dataset_source_sql=None,
parent_dataset_reference="parent_ds",
parent_dataset_reference_attribute_id="parent_attr",
dataset_reference_source_column="ref_col",
dataset_reference_source_column_data_type=ColumnDataType.STRING,
parent_dataset_reference_attribute_ids=["parent_attr"],
dataset_reference_source_columns=["ref_col"],
dataset_reference_source_column_data_types=[ColumnDataType.STRING],
workspace_data_filter_id="wdf1",
workspace_data_filter_column_name="col1",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ def valid_dataset_definitions():
dataset_source_table="table1",
dataset_source_sql=None,
parent_dataset_reference="parent1",
parent_dataset_reference_attribute_id="parent1.id",
dataset_reference_source_column="id",
dataset_reference_source_column_data_type=ColumnDataType.STRING,
parent_dataset_reference_attribute_ids=["parent1.id"],
dataset_reference_source_columns=["id"],
dataset_reference_source_column_data_types=[ColumnDataType.STRING],
workspace_data_filter_id="wdf1",
workspace_data_filter_column_name="id",
),
Expand All @@ -39,9 +39,9 @@ def valid_dataset_definitions():
dataset_source_table="table2",
dataset_source_sql=None,
parent_dataset_reference="parent2",
parent_dataset_reference_attribute_id="parent2.id",
dataset_reference_source_column="id",
dataset_reference_source_column_data_type=ColumnDataType.INT,
parent_dataset_reference_attribute_ids=["parent2.id"],
dataset_reference_source_columns=["id"],
dataset_reference_source_column_data_types=[ColumnDataType.INT],
workspace_data_filter_id="wdf2",
workspace_data_filter_column_name="id",
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ def test_merge_removes_managed_dataset_not_in_input():
dataset_source_table="table1",
dataset_source_sql=None,
parent_dataset_reference="parent_ds",
parent_dataset_reference_attribute_id="parent_attr",
dataset_reference_source_column="ref_col",
dataset_reference_source_column_data_type=ColumnDataType.STRING,
parent_dataset_reference_attribute_ids=["parent_attr"],
dataset_reference_source_columns=["ref_col"],
dataset_reference_source_column_data_types=[ColumnDataType.STRING],
workspace_data_filter_id="wdf1",
workspace_data_filter_column_name="col1",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ def make_valid_dataset_def(**kwargs):
"dataset_source_table": "table1",
"dataset_source_sql": None,
"parent_dataset_reference": "parent_ds",
"parent_dataset_reference_attribute_id": "parent_attr",
"dataset_reference_source_column": "src_col",
"dataset_reference_source_column_data_type": ColumnDataType.STRING,
"parent_dataset_reference_attribute_ids": ["parent_attr"],
"dataset_reference_source_columns": ["src_col"],
"dataset_reference_source_column_data_types": [ColumnDataType.STRING],
"workspace_data_filter_id": "wdf1",
"workspace_data_filter_column_name": "col1",
}
Expand Down Expand Up @@ -100,3 +100,66 @@ def test_custom_dataset_model():
assert dataset.definition.dataset_id == "ds1"
assert len(dataset.custom_fields) == 1
assert dataset.custom_fields[0].custom_field_id == "cf1"


def test_custom_dataset_definition_composite_reference():
"""Multi-column reference: lists with matching length zip into N sources."""
data = make_valid_dataset_def(
parent_dataset_reference_attribute_ids=["parent_pk1", "parent_pk2"],
dataset_reference_source_columns=["src_col1", "src_col2"],
dataset_reference_source_column_data_types=[
ColumnDataType.STRING,
ColumnDataType.INT,
],
)
ds = CustomDatasetDefinition(**data)
assert len(ds.dataset_reference_source_columns) == 2


def test_custom_dataset_definition_mismatched_reference_lengths_raises():
data = make_valid_dataset_def(
parent_dataset_reference_attribute_ids=["a", "b"],
dataset_reference_source_columns=["c"],
dataset_reference_source_column_data_types=[ColumnDataType.STRING],
)
with pytest.raises(ValidationError) as exc:
CustomDatasetDefinition(**data)
assert "must have the same length" in str(exc.value)


def test_custom_dataset_definition_empty_reference_columns_raises():
data = make_valid_dataset_def(
parent_dataset_reference_attribute_ids=[],
dataset_reference_source_columns=[],
dataset_reference_source_column_data_types=[],
)
with pytest.raises(ValidationError) as exc:
CustomDatasetDefinition(**data)
assert "at least one column" in str(exc.value)


def test_custom_dataset_definition_wdf_optional_both_none():
data = make_valid_dataset_def(
workspace_data_filter_id=None, workspace_data_filter_column_name=None
)
ds = CustomDatasetDefinition(**data)
assert ds.workspace_data_filter_id is None
assert ds.workspace_data_filter_column_name is None


def test_custom_dataset_definition_wdf_only_id_raises():
data = make_valid_dataset_def(
workspace_data_filter_id="wdf1", workspace_data_filter_column_name=None
)
with pytest.raises(ValidationError) as exc:
CustomDatasetDefinition(**data)
assert "both be set or both be omitted" in str(exc.value)


def test_custom_dataset_definition_wdf_only_column_raises():
data = make_valid_dataset_def(
workspace_data_filter_id=None, workspace_data_filter_column_name="col1"
)
with pytest.raises(ValidationError) as exc:
CustomDatasetDefinition(**data)
assert "both be set or both be omitted" in str(exc.value)
Loading