From e74ab1367ed8ed8a0ec2b61075f55687e44feee0 Mon Sep 17 00:00:00 2001 From: Anna Benke Date: Thu, 7 May 2026 12:35:00 +0200 Subject: [PATCH] feat(gooddata-pipelines): support composite key references and optional WDF --- .../ldm_extension/input_processor.py | 76 +++++++++++++------ .../models/custom_data_object.py | 53 +++++++++++-- .../tests/test_ldm_extension/conftest.py | 6 +- .../test_input_validator.py | 12 +-- .../test_ldm_extension/test_merge_ldm.py | 6 +- .../test_models/test_custom_data_object.py | 69 ++++++++++++++++- 6 files changed, 176 insertions(+), 46 deletions(-) diff --git a/packages/gooddata-pipelines/src/gooddata_pipelines/ldm_extension/input_processor.py b/packages/gooddata-pipelines/src/gooddata_pipelines/ldm_extension/input_processor.py index 6f43a037c..8b6dbcf32 100644 --- a/packages/gooddata-pipelines/src/gooddata_pipelines/ldm_extension/input_processor.py +++ b/packages/gooddata-pipelines/src/gooddata_pipelines/ldm_extension/input_processor.py @@ -253,6 +253,54 @@ def datasets_to_ldm( # Get the data source info dataset_source_table_id, dataset_sql = self._get_sources(dataset) + # Build one reference source per column for composite-key joins to + # the parent dataset. The three list fields are validated to share + # the same length on the input model. + parent_reference_sources = [ + CatalogDeclarativeReferenceSource( + column=column, + data_type=data_type.value, + target=CatalogGrainIdentifier( + id=attribute_id, + type=CustomFieldType.ATTRIBUTE.value, + ), + ) + for column, data_type, attribute_id in zip( + dataset.definition.dataset_reference_source_columns, + dataset.definition.dataset_reference_source_column_data_types, + dataset.definition.parent_dataset_reference_attribute_ids, + strict=True, + ) + ] + + # Workspace data filter fields are optional and must be set together + # (validated on the input model). Emit columns/references only when + # both are provided. + wdf_columns: list[CatalogDeclarativeWorkspaceDataFilterColumn] = [] + wdf_references: list[ + CatalogDeclarativeWorkspaceDataFilterReferences + ] = [] + if ( + dataset.definition.workspace_data_filter_id is not None + and dataset.definition.workspace_data_filter_column_name + is not None + ): + wdf_columns.append( + CatalogDeclarativeWorkspaceDataFilterColumn( + name=dataset.definition.workspace_data_filter_column_name, + data_type=ColumnDataType.STRING.value, + ) + ) + wdf_references.append( + CatalogDeclarativeWorkspaceDataFilterReferences( + filter_id=CatalogDatasetWorkspaceDataFilterIdentifier( + id=dataset.definition.workspace_data_filter_id + ), + filter_column=dataset.definition.workspace_data_filter_column_name, + filter_column_data_type=ColumnDataType.STRING.value, + ) + ) + # Construct the declarative dataset object and append it to the list. declarative_datasets.append( CatalogDeclarativeDataset( @@ -265,16 +313,7 @@ def datasets_to_ldm( id=dataset.definition.parent_dataset_reference, ), multivalue=True, - sources=[ - CatalogDeclarativeReferenceSource( - column=dataset.definition.dataset_reference_source_column, - data_type=dataset.definition.dataset_reference_source_column_data_type.value, - target=CatalogGrainIdentifier( - id=dataset.definition.parent_dataset_reference_attribute_id, - type=CustomFieldType.ATTRIBUTE.value, - ), - ) - ], + sources=parent_reference_sources, ), ] + date_references, @@ -283,21 +322,8 @@ def datasets_to_ldm( facts=facts, data_source_table_id=dataset_source_table_id, sql=dataset_sql, - workspace_data_filter_columns=[ - CatalogDeclarativeWorkspaceDataFilterColumn( - name=dataset.definition.workspace_data_filter_column_name, - data_type=ColumnDataType.STRING.value, - ) - ], - workspace_data_filter_references=[ - CatalogDeclarativeWorkspaceDataFilterReferences( - filter_id=CatalogDatasetWorkspaceDataFilterIdentifier( - id=dataset.definition.workspace_data_filter_id - ), - filter_column=dataset.definition.workspace_data_filter_column_name, - filter_column_data_type=ColumnDataType.STRING.value, - ) - ], + workspace_data_filter_columns=wdf_columns or None, + workspace_data_filter_references=wdf_references or None, tags=_effective_dataset_tags(dataset.definition), ) ) diff --git a/packages/gooddata-pipelines/src/gooddata_pipelines/ldm_extension/models/custom_data_object.py b/packages/gooddata-pipelines/src/gooddata_pipelines/ldm_extension/models/custom_data_object.py index 9c0dae3a4..335c597bc 100644 --- a/packages/gooddata-pipelines/src/gooddata_pipelines/ldm_extension/models/custom_data_object.py +++ b/packages/gooddata-pipelines/src/gooddata_pipelines/ldm_extension/models/custom_data_object.py @@ -62,7 +62,18 @@ def check_ids_not_equal(self) -> "CustomFieldDefinition": class CustomDatasetDefinition(BaseModel): - """Input model for custom dataset definition.""" + """Input model for custom dataset definition. + + Reference fields are lists to support composite-key joins to the parent + dataset. ``parent_dataset_reference_attribute_ids``, ``dataset_reference_source_columns``, + and ``dataset_reference_source_column_data_types`` must all have the same + (non-zero) length and are zipped position-by-position to produce one + declarative reference source per column. + + Workspace data filter fields are optional. Both must be set together or + both left unset; when set, a single-column WDF binding is emitted on the + declarative dataset. + """ workspace_id: str dataset_id: str @@ -71,11 +82,11 @@ class CustomDatasetDefinition(BaseModel): dataset_source_table: str | None dataset_source_sql: str | None parent_dataset_reference: str - parent_dataset_reference_attribute_id: str - dataset_reference_source_column: str - dataset_reference_source_column_data_type: ColumnDataType - workspace_data_filter_id: str - workspace_data_filter_column_name: str + parent_dataset_reference_attribute_ids: list[str] + dataset_reference_source_columns: list[str] + dataset_reference_source_column_data_types: list[ColumnDataType] + workspace_data_filter_id: str | None = None + workspace_data_filter_column_name: str | None = None dataset_description: str | None = Field( default=None, description="Declarative description on the custom dataset.", @@ -98,6 +109,36 @@ def check_source(self) -> "CustomDatasetDefinition": ) return self + @model_validator(mode="after") + def check_reference_lists(self) -> "CustomDatasetDefinition": + """Reference list fields must be parallel (same non-zero length).""" + n = len(self.dataset_reference_source_columns) + if n == 0: + raise ValueError( + "dataset_reference_source_columns must contain at least one column" + ) + if ( + len(self.parent_dataset_reference_attribute_ids) != n + or len(self.dataset_reference_source_column_data_types) != n + ): + raise ValueError( + "parent_dataset_reference_attribute_ids, dataset_reference_source_columns, " + "and dataset_reference_source_column_data_types must have the same length" + ) + return self + + @model_validator(mode="after") + def check_wdf_pair(self) -> "CustomDatasetDefinition": + """Workspace data filter id and column name must be provided together or both omitted.""" + has_id = self.workspace_data_filter_id is not None + has_col = self.workspace_data_filter_column_name is not None + if has_id != has_col: + raise ValueError( + "workspace_data_filter_id and workspace_data_filter_column_name " + "must both be set or both be omitted" + ) + return self + class CustomDataset(BaseModel): """Custom dataset with its definition and custom fields.""" diff --git a/packages/gooddata-pipelines/tests/test_ldm_extension/conftest.py b/packages/gooddata-pipelines/tests/test_ldm_extension/conftest.py index 86754a5dc..d322bca29 100644 --- a/packages/gooddata-pipelines/tests/test_ldm_extension/conftest.py +++ b/packages/gooddata-pipelines/tests/test_ldm_extension/conftest.py @@ -59,9 +59,9 @@ def mock_dataset_definition(): dataset_datasource_id="ds_source", dataset_source_sql=None, parent_dataset_reference="parent_ds", - parent_dataset_reference_attribute_id="parent_attr", - dataset_reference_source_column="ref_col", - dataset_reference_source_column_data_type=ColumnDataType.STRING, + parent_dataset_reference_attribute_ids=["parent_attr"], + dataset_reference_source_columns=["ref_col"], + dataset_reference_source_column_data_types=[ColumnDataType.STRING], workspace_data_filter_id="wdf1", workspace_data_filter_column_name="col1", ) diff --git a/packages/gooddata-pipelines/tests/test_ldm_extension/test_input_validator.py b/packages/gooddata-pipelines/tests/test_ldm_extension/test_input_validator.py index 13401574a..2a6e517e8 100644 --- a/packages/gooddata-pipelines/tests/test_ldm_extension/test_input_validator.py +++ b/packages/gooddata-pipelines/tests/test_ldm_extension/test_input_validator.py @@ -25,9 +25,9 @@ def valid_dataset_definitions(): dataset_source_table="table1", dataset_source_sql=None, parent_dataset_reference="parent1", - parent_dataset_reference_attribute_id="parent1.id", - dataset_reference_source_column="id", - dataset_reference_source_column_data_type=ColumnDataType.STRING, + parent_dataset_reference_attribute_ids=["parent1.id"], + dataset_reference_source_columns=["id"], + dataset_reference_source_column_data_types=[ColumnDataType.STRING], workspace_data_filter_id="wdf1", workspace_data_filter_column_name="id", ), @@ -39,9 +39,9 @@ def valid_dataset_definitions(): dataset_source_table="table2", dataset_source_sql=None, parent_dataset_reference="parent2", - parent_dataset_reference_attribute_id="parent2.id", - dataset_reference_source_column="id", - dataset_reference_source_column_data_type=ColumnDataType.INT, + parent_dataset_reference_attribute_ids=["parent2.id"], + dataset_reference_source_columns=["id"], + dataset_reference_source_column_data_types=[ColumnDataType.INT], workspace_data_filter_id="wdf2", workspace_data_filter_column_name="id", ), diff --git a/packages/gooddata-pipelines/tests/test_ldm_extension/test_merge_ldm.py b/packages/gooddata-pipelines/tests/test_ldm_extension/test_merge_ldm.py index 45d0777e6..33ece9daa 100644 --- a/packages/gooddata-pipelines/tests/test_ldm_extension/test_merge_ldm.py +++ b/packages/gooddata-pipelines/tests/test_ldm_extension/test_merge_ldm.py @@ -67,9 +67,9 @@ def test_merge_removes_managed_dataset_not_in_input(): dataset_source_table="table1", dataset_source_sql=None, parent_dataset_reference="parent_ds", - parent_dataset_reference_attribute_id="parent_attr", - dataset_reference_source_column="ref_col", - dataset_reference_source_column_data_type=ColumnDataType.STRING, + parent_dataset_reference_attribute_ids=["parent_attr"], + dataset_reference_source_columns=["ref_col"], + dataset_reference_source_column_data_types=[ColumnDataType.STRING], workspace_data_filter_id="wdf1", workspace_data_filter_column_name="col1", ) diff --git a/packages/gooddata-pipelines/tests/test_ldm_extension/test_models/test_custom_data_object.py b/packages/gooddata-pipelines/tests/test_ldm_extension/test_models/test_custom_data_object.py index f0c605b15..992047b2c 100644 --- a/packages/gooddata-pipelines/tests/test_ldm_extension/test_models/test_custom_data_object.py +++ b/packages/gooddata-pipelines/tests/test_ldm_extension/test_models/test_custom_data_object.py @@ -34,9 +34,9 @@ def make_valid_dataset_def(**kwargs): "dataset_source_table": "table1", "dataset_source_sql": None, "parent_dataset_reference": "parent_ds", - "parent_dataset_reference_attribute_id": "parent_attr", - "dataset_reference_source_column": "src_col", - "dataset_reference_source_column_data_type": ColumnDataType.STRING, + "parent_dataset_reference_attribute_ids": ["parent_attr"], + "dataset_reference_source_columns": ["src_col"], + "dataset_reference_source_column_data_types": [ColumnDataType.STRING], "workspace_data_filter_id": "wdf1", "workspace_data_filter_column_name": "col1", } @@ -100,3 +100,66 @@ def test_custom_dataset_model(): assert dataset.definition.dataset_id == "ds1" assert len(dataset.custom_fields) == 1 assert dataset.custom_fields[0].custom_field_id == "cf1" + + +def test_custom_dataset_definition_composite_reference(): + """Multi-column reference: lists with matching length zip into N sources.""" + data = make_valid_dataset_def( + parent_dataset_reference_attribute_ids=["parent_pk1", "parent_pk2"], + dataset_reference_source_columns=["src_col1", "src_col2"], + dataset_reference_source_column_data_types=[ + ColumnDataType.STRING, + ColumnDataType.INT, + ], + ) + ds = CustomDatasetDefinition(**data) + assert len(ds.dataset_reference_source_columns) == 2 + + +def test_custom_dataset_definition_mismatched_reference_lengths_raises(): + data = make_valid_dataset_def( + parent_dataset_reference_attribute_ids=["a", "b"], + dataset_reference_source_columns=["c"], + dataset_reference_source_column_data_types=[ColumnDataType.STRING], + ) + with pytest.raises(ValidationError) as exc: + CustomDatasetDefinition(**data) + assert "must have the same length" in str(exc.value) + + +def test_custom_dataset_definition_empty_reference_columns_raises(): + data = make_valid_dataset_def( + parent_dataset_reference_attribute_ids=[], + dataset_reference_source_columns=[], + dataset_reference_source_column_data_types=[], + ) + with pytest.raises(ValidationError) as exc: + CustomDatasetDefinition(**data) + assert "at least one column" in str(exc.value) + + +def test_custom_dataset_definition_wdf_optional_both_none(): + data = make_valid_dataset_def( + workspace_data_filter_id=None, workspace_data_filter_column_name=None + ) + ds = CustomDatasetDefinition(**data) + assert ds.workspace_data_filter_id is None + assert ds.workspace_data_filter_column_name is None + + +def test_custom_dataset_definition_wdf_only_id_raises(): + data = make_valid_dataset_def( + workspace_data_filter_id="wdf1", workspace_data_filter_column_name=None + ) + with pytest.raises(ValidationError) as exc: + CustomDatasetDefinition(**data) + assert "both be set or both be omitted" in str(exc.value) + + +def test_custom_dataset_definition_wdf_only_column_raises(): + data = make_valid_dataset_def( + workspace_data_filter_id=None, workspace_data_filter_column_name="col1" + ) + with pytest.raises(ValidationError) as exc: + CustomDatasetDefinition(**data) + assert "both be set or both be omitted" in str(exc.value)