Skip to content

Commit 28e29a7

Browse files
authored
Merge pull request #1 from sidequery/fix-review-findings
Fix critical and important review findings
2 parents b58282e + 1e97580 commit 28e29a7

File tree

8 files changed

+491
-68
lines changed

8 files changed

+491
-68
lines changed

src/sqlmesh_openlineage/console.py

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
"""OpenLineage Console wrapper for SQLMesh."""
22
from __future__ import annotations
33

4+
import logging
45
import uuid
56
import typing as t
67

8+
logger = logging.getLogger(__name__)
9+
710
if t.TYPE_CHECKING:
811
from sqlmesh.core.console import Console
912
from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike
@@ -72,10 +75,14 @@ def start_snapshot_evaluation_progress(
7275
# Store snapshot for later reference
7376
self._current_snapshots[snapshot.name] = snapshot
7477

75-
self._emitter.emit_snapshot_start(
76-
snapshot=snapshot,
77-
run_id=run_id,
78-
)
78+
try:
79+
self._emitter.emit_snapshot_start(
80+
snapshot=snapshot,
81+
run_id=run_id,
82+
snapshots=self._current_snapshots,
83+
)
84+
except Exception:
85+
logger.warning("Failed to emit START event for %s", snapshot.name, exc_info=True)
7986

8087
# Delegate to wrapped console
8188
self._wrapped.start_snapshot_evaluation_progress(snapshot, audit_only)
@@ -96,20 +103,24 @@ def update_snapshot_evaluation_progress(
96103
run_id = self._active_runs.pop(snapshot.name, None)
97104

98105
if run_id:
99-
if num_audits_failed > 0:
100-
self._emitter.emit_snapshot_fail(
101-
snapshot=snapshot,
102-
run_id=run_id,
103-
error=f"Audit failed: {num_audits_failed} audit(s) failed",
104-
)
105-
else:
106-
self._emitter.emit_snapshot_complete(
107-
snapshot=snapshot,
108-
run_id=run_id,
109-
interval=interval,
110-
duration_ms=duration_ms,
111-
execution_stats=execution_stats,
112-
)
106+
try:
107+
if num_audits_failed > 0:
108+
self._emitter.emit_snapshot_fail(
109+
snapshot=snapshot,
110+
run_id=run_id,
111+
error=f"Audit failed: {num_audits_failed} audit(s) failed",
112+
)
113+
else:
114+
self._emitter.emit_snapshot_complete(
115+
snapshot=snapshot,
116+
run_id=run_id,
117+
interval=interval,
118+
duration_ms=duration_ms,
119+
execution_stats=execution_stats,
120+
snapshots=self._current_snapshots,
121+
)
122+
except Exception:
123+
logger.warning("Failed to emit event for %s", snapshot.name, exc_info=True)
113124

114125
# Delegate to wrapped console
115126
self._wrapped.update_snapshot_evaluation_progress(
@@ -130,11 +141,14 @@ def stop_evaluation_progress(self, success: bool = True) -> None:
130141
for snapshot_name, run_id in list(self._active_runs.items()):
131142
snapshot = self._current_snapshots.get(snapshot_name)
132143
if snapshot and run_id:
133-
self._emitter.emit_snapshot_fail(
134-
snapshot=snapshot,
135-
run_id=run_id,
136-
error="Evaluation interrupted" if not success else "Unknown error",
137-
)
144+
try:
145+
self._emitter.emit_snapshot_fail(
146+
snapshot=snapshot,
147+
run_id=run_id,
148+
error="Evaluation interrupted" if not success else "Unknown error",
149+
)
150+
except Exception:
151+
logger.warning("Failed to emit FAIL event for %s", snapshot_name, exc_info=True)
138152

139153
# Clear tracking state
140154
self._active_runs.clear()

src/sqlmesh_openlineage/datasets.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@
22
from __future__ import annotations
33

44
import typing as t
5-
from collections import defaultdict
65

76
if t.TYPE_CHECKING:
87
from sqlmesh.core.snapshot import Snapshot
9-
from sqlmesh.core.model import Model
108
from openlineage.client.event_v2 import InputDataset, OutputDataset
119

1210

@@ -90,19 +88,28 @@ def snapshot_to_column_lineage_facet(
9088
# Get column name
9189
source_col = exp.to_column(lineage_node.name).name
9290

91+
# Determine transformation type based on whether
92+
# output column name matches source column name
93+
is_identity = col_name == source_col
94+
transformations = [
95+
column_lineage_dataset.Transformation(
96+
type="DIRECT",
97+
subtype="IDENTITY" if is_identity else "TRANSFORMATION",
98+
)
99+
]
100+
93101
input_fields.append(
94102
column_lineage_dataset.InputField(
95103
namespace=namespace,
96104
name=table_name,
97105
field=source_col,
106+
transformations=transformations,
98107
)
99108
)
100109

101110
if input_fields:
102111
fields[col_name] = column_lineage_dataset.Fields(
103112
inputFields=input_fields,
104-
transformationType="",
105-
transformationDescription="",
106113
)
107114

108115
except Exception:
@@ -156,19 +163,28 @@ def snapshot_to_output_dataset(
156163
def snapshot_to_input_datasets(
157164
snapshot: "Snapshot",
158165
namespace: str,
166+
snapshots: t.Optional[t.Dict[str, "Snapshot"]] = None,
159167
) -> t.List["InputDataset"]:
160-
"""Get upstream dependencies as input datasets."""
168+
"""Get upstream dependencies as input datasets.
169+
170+
When a snapshots dict is provided, parent snapshots are looked up to
171+
produce fully qualified table names consistent with output datasets.
172+
"""
161173
from openlineage.client.event_v2 import InputDataset
162174

163175
inputs: t.List["InputDataset"] = []
164176

165177
# Get parent snapshot IDs
166178
for parent_id in snapshot.parents:
167-
# Parent ID contains the name we need
179+
# Try to resolve fully qualified name via the snapshots dict
180+
parent_name = parent_id.name
181+
if snapshots and parent_name in snapshots:
182+
parent_name = snapshot_to_table_name(snapshots[parent_name])
183+
168184
inputs.append(
169185
InputDataset(
170186
namespace=namespace,
171-
name=parent_id.name,
187+
name=parent_name,
172188
)
173189
)
174190

src/sqlmesh_openlineage/emitter.py

Lines changed: 107 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
"""OpenLineage event emitter for SQLMesh."""
22
from __future__ import annotations
33

4+
import logging
45
import typing as t
56
from datetime import datetime, timezone
67

8+
logger = logging.getLogger(__name__)
9+
710
if t.TYPE_CHECKING:
811
from sqlmesh.core.snapshot import Snapshot
912
from sqlmesh.core.snapshot.definition import Interval
1013
from sqlmesh.core.snapshot.execution_tracker import QueryExecutionStats
1114

15+
PRODUCER = "https://github.com/sidequery/sqlmesh-openlineage"
16+
1217

1318
class OpenLineageEmitter:
1419
"""Emits OpenLineage events for SQLMesh operations."""
@@ -20,6 +25,7 @@ def __init__(
2025
api_key: t.Optional[str] = None,
2126
):
2227
from openlineage.client import OpenLineageClient
28+
from openlineage.client.client import OpenLineageClientOptions
2329

2430
self.namespace = namespace
2531
self.url = url
@@ -32,15 +38,82 @@ def __init__(
3238
elif api_key:
3339
self.client = OpenLineageClient(
3440
url=url,
35-
options={"api_key": api_key},
41+
options=OpenLineageClientOptions(api_key=api_key),
3642
)
3743
else:
3844
self.client = OpenLineageClient(url=url)
3945

46+
def _build_job_facets(self, snapshot: "Snapshot") -> t.Dict[str, t.Any]:
47+
"""Build job facets including SQL, job type, and source code location."""
48+
from openlineage.client.facet_v2 import job_type_job, sql_job, source_code_location_job
49+
50+
facets: t.Dict[str, t.Any] = {}
51+
52+
# JobTypeJobFacet: identify as SQLMesh batch job
53+
facets["jobType"] = job_type_job.JobTypeJobFacet(
54+
processingType="BATCH",
55+
integration="SQLMESH",
56+
jobType="MODEL",
57+
)
58+
59+
# SQLJobFacet: include the model SQL query
60+
try:
61+
if snapshot.is_model and snapshot.model:
62+
query = snapshot.model.query
63+
if query is not None:
64+
sql_str = str(query)
65+
if sql_str:
66+
facets["sql"] = sql_job.SQLJobFacet(query=sql_str)
67+
except Exception:
68+
pass
69+
70+
# SourceCodeLocationJobFacet: include model file path
71+
try:
72+
if snapshot.is_model and snapshot.model:
73+
model_path = getattr(snapshot.model, "_path", None)
74+
if model_path is not None:
75+
path_str = str(model_path)
76+
if path_str:
77+
facets["sourceCodeLocation"] = (
78+
source_code_location_job.SourceCodeLocationJobFacet(
79+
type="file",
80+
url=f"file://{path_str}",
81+
)
82+
)
83+
except Exception:
84+
pass
85+
86+
return facets
87+
88+
def _build_processing_engine_facet(self) -> t.Dict[str, t.Any]:
89+
"""Build run facets for processing engine info."""
90+
from openlineage.client.facet_v2 import processing_engine_run
91+
92+
facets: t.Dict[str, t.Any] = {}
93+
94+
try:
95+
from sqlmesh import __version__ as sqlmesh_version
96+
except ImportError:
97+
sqlmesh_version = "unknown"
98+
99+
try:
100+
from sqlmesh_openlineage import __version__ as adapter_version
101+
except ImportError:
102+
adapter_version = "unknown"
103+
104+
facets["processing_engine"] = processing_engine_run.ProcessingEngineRunFacet(
105+
version=sqlmesh_version,
106+
name="SQLMesh",
107+
openlineageAdapterVersion=adapter_version,
108+
)
109+
110+
return facets
111+
40112
def emit_snapshot_start(
41113
self,
42114
snapshot: "Snapshot",
43115
run_id: str,
116+
snapshots: t.Optional[t.Dict[str, "Snapshot"]] = None,
44117
) -> None:
45118
"""Emit a START event for snapshot evaluation."""
46119
from openlineage.client.event_v2 import RunEvent, RunState, Run, Job
@@ -50,19 +123,25 @@ def emit_snapshot_start(
50123
snapshot_to_input_datasets,
51124
)
52125

53-
inputs = snapshot_to_input_datasets(snapshot, self.namespace)
126+
inputs = snapshot_to_input_datasets(snapshot, self.namespace, snapshots=snapshots)
54127
output = snapshot_to_output_dataset(snapshot, self.namespace)
55128

129+
job_facets = self._build_job_facets(snapshot)
130+
run_facets = self._build_processing_engine_facet()
131+
56132
event = RunEvent(
57133
eventType=RunState.START,
58134
eventTime=datetime.now(timezone.utc).isoformat(),
59-
run=Run(runId=run_id),
60-
job=Job(namespace=self.namespace, name=snapshot.name),
135+
run=Run(runId=run_id, facets=run_facets),
136+
job=Job(namespace=self.namespace, name=snapshot.name, facets=job_facets),
61137
inputs=inputs,
62138
outputs=[output] if output else [],
63-
producer="sqlmesh-openlineage",
139+
producer=PRODUCER,
64140
)
65-
self.client.emit(event)
141+
try:
142+
self.client.emit(event)
143+
except Exception:
144+
logger.warning("Failed to emit %s event for %s", event.eventType, snapshot.name, exc_info=True)
66145

67146
def emit_snapshot_complete(
68147
self,
@@ -71,33 +150,46 @@ def emit_snapshot_complete(
71150
interval: t.Optional["Interval"] = None,
72151
duration_ms: t.Optional[int] = None,
73152
execution_stats: t.Optional["QueryExecutionStats"] = None,
153+
snapshots: t.Optional[t.Dict[str, "Snapshot"]] = None,
74154
) -> None:
75155
"""Emit a COMPLETE event for snapshot evaluation."""
76156
from openlineage.client.event_v2 import RunEvent, RunState, Run, Job
77157

78-
from sqlmesh_openlineage.datasets import snapshot_to_output_dataset
158+
from sqlmesh_openlineage.datasets import (
159+
snapshot_to_output_dataset,
160+
snapshot_to_input_datasets,
161+
)
79162
from sqlmesh_openlineage.facets import build_run_facets, build_output_facets
80163

81164
run_facets = build_run_facets(
82165
duration_ms=duration_ms,
83166
execution_stats=execution_stats,
84167
)
168+
run_facets.update(self._build_processing_engine_facet())
85169

86170
output = snapshot_to_output_dataset(
87171
snapshot,
88172
self.namespace,
89173
facets=build_output_facets(execution_stats),
90174
)
91175

176+
inputs = snapshot_to_input_datasets(snapshot, self.namespace, snapshots=snapshots)
177+
178+
job_facets = self._build_job_facets(snapshot)
179+
92180
event = RunEvent(
93181
eventType=RunState.COMPLETE,
94182
eventTime=datetime.now(timezone.utc).isoformat(),
95183
run=Run(runId=run_id, facets=run_facets),
96-
job=Job(namespace=self.namespace, name=snapshot.name),
184+
job=Job(namespace=self.namespace, name=snapshot.name, facets=job_facets),
185+
inputs=inputs,
97186
outputs=[output] if output else [],
98-
producer="sqlmesh-openlineage",
187+
producer=PRODUCER,
99188
)
100-
self.client.emit(event)
189+
try:
190+
self.client.emit(event)
191+
except Exception:
192+
logger.warning("Failed to emit %s event for %s", event.eventType, snapshot.name, exc_info=True)
101193

102194
def emit_snapshot_fail(
103195
self,
@@ -124,6 +216,9 @@ def emit_snapshot_fail(
124216
},
125217
),
126218
job=Job(namespace=self.namespace, name=snapshot.name),
127-
producer="sqlmesh-openlineage",
219+
producer=PRODUCER,
128220
)
129-
self.client.emit(event)
221+
try:
222+
self.client.emit(event)
223+
except Exception:
224+
logger.warning("Failed to emit %s event for %s", event.eventType, snapshot.name, exc_info=True)

src/sqlmesh_openlineage/facets.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ def build_run_facets(
1717
# Add custom SQLMesh facet with execution info
1818
if duration_ms is not None or execution_stats is not None:
1919
sqlmesh_facet = {
20-
"_producer": "sqlmesh-openlineage",
21-
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLMeshExecutionFacet.json",
20+
"_producer": "https://github.com/sidequery/sqlmesh-openlineage",
21+
"_schemaURL": "https://github.com/sidequery/sqlmesh-openlineage#SQLMeshExecutionFacet",
2222
}
2323

2424
if duration_ms is not None:

0 commit comments

Comments
 (0)