Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ def yield_dashboard(
)
for chart in self.context.get().charts or []
],
# Force-clear Dashboard.dataModels by sending an empty list.
# See comment in SupersetDBSource.yield_dashboard for why
# we represent the DataModel<->Dashboard relationship via
# the DataModel -> Chart -> Dashboard lineage chain
# instead of the structural Dashboard.dataModels field.
dataModels=[],
service=FullyQualifiedEntityName(self.context.get().dashboard_service),
owners=self.get_owner_ref(dashboard_details=dashboard_details),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ def yield_dashboard(
)
for chart in self.context.get().charts or []
],
# Force-clear Dashboard.dataModels by sending an empty list.
# The DataModel<->Dashboard relationship is represented via
# the DataModel -> Chart -> Dashboard lineage chain emitted
# in SupersetSourceMixin.yield_dashboard_lineage_details.
# Sending [] (instead of omitting the field) ensures any
# datamodel entries left over from prior runs are deleted.
dataModels=[],
service=FullyQualifiedEntityName(self.context.get().dashboard_service),
owners=self.get_owner_ref(dashboard_details=dashboard_details),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from collate_sqllineage.core.models import Table as LineageTable

from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDataModel
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import (
Expand Down Expand Up @@ -319,14 +321,36 @@ def _get_dashboard_data_model_entity(
fqn=datamodel_fqn,
)

def yield_datamodel_dashboard_lineage(
self,
) -> Iterable[Either[AddLineageRequest]]:
"""
Skip the base class' direct DataModel -> Dashboard lineage edge.
For Superset we bridge the chain through the Chart node so the graph
renders DataModel -> Chart -> Dashboard rather than DataModel ->
Dashboard alongside the chart list. The DataModel -> Chart edge is
emitted in yield_dashboard_lineage_details.
"""
return
yield # pragma: no cover # noqa: F841 # mark this as a generator

def yield_dashboard_lineage_details(
self,
dashboard_details: Union[FetchDashboard, DashboardResult],
db_service_prefix: Optional[str] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""
Get lineage between datamodel and table
Emit lineage edges Table -> DataModel -> Chart -> Dashboard for every
chart on this dashboard. Dashboard.charts (set in yield_dashboard)
is a structural ref only — the dashboard lineage graph traverses
through explicit lineage edges, so we also emit Chart -> Dashboard
here. The base class' direct DataModel -> Dashboard edge is
suppressed by the override of yield_datamodel_dashboard_lineage so
the chart node bridges the chain in the rendered graph.
"""
# Resolve the dashboard entity once per dashboard, not once per chart,
# to avoid an N+1 lookup against the metadata server.
dashboard_entity = self._get_dashboard_entity(dashboard_details)
for chart_json in filter(
None,
[
Expand All @@ -350,6 +374,26 @@ def yield_dashboard_lineage_details(
from_entity=from_entity_table,
column_lineage=column_lineage,
)

# DataModel -> Chart -> Dashboard bridge: emit BOTH edges
# so the dashboard's lineage graph renders the chart
# between the datamodel and the dashboard, instead of
# the datamodel hanging off the dashboard directly.
chart_entity = self._get_chart_entity(chart_json)
if chart_entity is not None:
dm_to_chart = self._get_add_lineage_request(
to_entity=chart_entity,
from_entity=to_entity,
)
if dm_to_chart is not None:
yield dm_to_chart
if dashboard_entity is not None:
chart_to_dash = self._get_add_lineage_request(
to_entity=dashboard_entity,
from_entity=chart_entity,
)
if chart_to_dash is not None:
yield chart_to_dash
except Exception as exc:
yield Either(
left=StackTraceError(
Expand All @@ -362,6 +406,54 @@ def yield_dashboard_lineage_details(
)
)

def _get_dashboard_entity(self, dashboard_details) -> Optional[Dashboard]:
"""
Look up the Dashboard entity created earlier so we can emit a
Chart -> Dashboard lineage edge.
"""
dashboard_id = getattr(dashboard_details, "id", None)
if dashboard_id is None:
return None
try:
dashboard_fqn = fqn.build(
self.metadata,
entity_type=Dashboard,
service_name=self.context.get().dashboard_service,
dashboard_name=str(dashboard_id),
)
return self.metadata.get_by_name(entity=Dashboard, fqn=dashboard_fqn)
except Exception as exc: # pylint: disable=broad-except
logger.warning(
"Failed to resolve dashboard entity for dashboard_id=%s: %s",
dashboard_id,
exc,
)
return None

def _get_chart_entity(self, chart_json) -> Optional[Chart]:
"""
Look up the Chart entity created earlier in this pipeline so we can
emit a DataModel -> Chart lineage edge.
"""
chart_id = getattr(chart_json, "id", None)
if chart_id is None:
return None
try:
chart_fqn = fqn.build(
self.metadata,
entity_type=Chart,
service_name=self.context.get().dashboard_service,
chart_name=str(chart_id),
)
return self.metadata.get_by_name(entity=Chart, fqn=chart_fqn)
except Exception as exc: # pylint: disable=broad-except
logger.warning(
"Failed to resolve chart entity for chart_id=%s: %s",
chart_id,
exc,
)
return None

def _get_datamodel(
self, datamodel: Union[SupersetDatasource, FetchChart]
) -> Optional[DashboardDataModel]:
Expand Down
Loading
Loading