Skip to content

Commit 1df7e8a

Browse files
committed
Cleanly separate lineage table and dependency graph methods
- Rename functions with _from_graph suffix to clarify they use FK graph - Remove _get_primary_key_attrs - use Heading.primary_key instead - Add clear section comments and docstrings explaining the two methods - Update module docstring to explain mutual exclusivity - Mark graph methods as FALLBACK with incomplete results warning
1 parent 9849f52 commit 1df7e8a

File tree

1 file changed

+91
-73
lines changed

1 file changed

+91
-73
lines changed

datajoint/lineage.py

Lines changed: 91 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
"""
22
Lineage tracking for semantic matching in joins.
33
4-
This module provides:
5-
- LineageTable: hidden table (~lineage) for storing attribute lineage
6-
- Functions to compute lineage from the FK graph (fallback)
7-
- Migration utilities for existing schemas
4+
This module provides two separate methods for determining attribute lineage:
5+
6+
1. **Lineage tables** (~lineage): Used by DataJoint-managed schemas.
7+
Lineage is stored explicitly and copied from parent tables at declaration time.
8+
9+
2. **Dependency graph**: Fallback for non-DataJoint schemas without lineage tables.
10+
Lineage is computed by traversing foreign key relationships.
11+
12+
These two methods are mutually exclusive and should not be mixed.
813
"""
914

1015
import logging
@@ -16,6 +21,11 @@
1621
logger = logging.getLogger(__name__.split(".")[0])
1722

1823

24+
# =============================================================================
25+
# Lineage Table Method (for DataJoint-managed schemas)
26+
# =============================================================================
27+
28+
1929
class LineageTable(Table):
2030
"""
2131
Hidden table for storing attribute lineage information.
@@ -115,12 +125,19 @@ def delete_table_lineage(self, table_name):
115125
(self & dict(table_name=table_name)).delete_quick()
116126

117127

118-
def compute_lineage_from_dependencies(connection, schema, table_name, attribute_name):
128+
# =============================================================================
129+
# Dependency Graph Method (fallback for non-DataJoint schemas)
130+
# =============================================================================
131+
132+
133+
def _compute_lineage_from_graph(connection, schema, table_name, attribute_name):
119134
"""
120-
Compute lineage by traversing the FK graph.
135+
Compute lineage for a single attribute by traversing the FK dependency graph.
136+
137+
This is a FALLBACK method for schemas that don't have ~lineage tables.
138+
It does NOT use lineage tables - it computes lineage purely from FK relationships.
121139
122-
Uses connection.dependencies which loads FK info from INFORMATION_SCHEMA.
123-
This is the fallback when the ~lineage table doesn't exist.
140+
Note: Results may be incomplete if referenced schemas are not loaded.
124141
125142
:param connection: database connection
126143
:param schema: schema name
@@ -134,10 +151,16 @@ def compute_lineage_from_dependencies(connection, schema, table_name, attribute_
134151

135152
# Check if the table exists in the dependency graph
136153
if full_table_name not in connection.dependencies:
137-
# Table not in graph - compute lineage based on primary key status
138-
# We need to query the database to check if this is a PK attribute
139-
pk_attrs = _get_primary_key_attrs(connection, schema, table_name)
140-
if attribute_name in pk_attrs:
154+
# Table not in graph - get PK info from Heading
155+
heading = Heading(
156+
table_info=dict(
157+
conn=connection,
158+
database=schema,
159+
table_name=table_name,
160+
context=None,
161+
)
162+
)
163+
if attribute_name in heading.primary_key:
141164
return f"{schema}.{table_name}.{attribute_name}"
142165
else:
143166
return None
@@ -161,7 +184,7 @@ def compute_lineage_from_dependencies(connection, schema, table_name, attribute_
161184
)
162185
break
163186
parent_schema, parent_table = parse_full_table_name(parent)
164-
return compute_lineage_from_dependencies(
187+
return _compute_lineage_from_graph(
165188
connection, parent_schema, parent_table, parent_attr
166189
)
167190

@@ -177,31 +200,14 @@ def compute_lineage_from_dependencies(connection, schema, table_name, attribute_
177200
return None
178201

179202

180-
def _get_primary_key_attrs(connection, schema, table_name):
203+
def _compute_all_lineage_from_graph(connection, schema, table_name):
181204
"""
182-
Get the primary key attributes for a table by querying the database.
183-
184-
:param connection: database connection
185-
:param schema: schema name
186-
:param table_name: table name
187-
:return: set of primary key attribute names
188-
"""
189-
result = connection.query(
190-
"""
191-
SELECT column_name
192-
FROM information_schema.key_column_usage
193-
WHERE table_schema = %s
194-
AND table_name = %s
195-
AND constraint_name = 'PRIMARY'
196-
""",
197-
args=(schema, table_name),
198-
)
199-
return {row[0] for row in result}
205+
Compute lineage for all attributes in a table using the FK dependency graph.
200206
207+
This is a FALLBACK method for schemas that don't have ~lineage tables.
208+
It does NOT use lineage tables - it computes lineage purely from FK relationships.
201209
202-
def compute_all_lineage_for_table(connection, schema, table_name):
203-
"""
204-
Compute lineage for all attributes in a table.
210+
Note: Results may be incomplete if referenced schemas are not loaded.
205211
206212
:param connection: database connection
207213
:param schema: schema name
@@ -218,18 +224,61 @@ def compute_all_lineage_for_table(connection, schema, table_name):
218224
)
219225
)
220226

221-
# Compute lineage for each attribute
227+
# Compute lineage for each attribute from the dependency graph
222228
return {
223-
attr: compute_lineage_from_dependencies(connection, schema, table_name, attr)
229+
attr: _compute_lineage_from_graph(connection, schema, table_name, attr)
224230
for attr in heading.names
225231
}
226232

227233

234+
# =============================================================================
235+
# Public API
236+
# =============================================================================
237+
238+
239+
def get_lineage_for_heading(connection, schema, table_name):
240+
"""
241+
Get lineage information for all attributes in a table.
242+
243+
Uses one of two methods (mutually exclusive):
244+
- If ~lineage table exists: load from lineage table
245+
- Otherwise: compute from FK dependency graph (fallback)
246+
247+
:param connection: database connection
248+
:param schema: schema name
249+
:param table_name: table name
250+
:return: dict mapping attribute_name -> lineage (or None)
251+
"""
252+
# Check if ~lineage table exists
253+
lineage_table_exists = (
254+
connection.query(
255+
"""
256+
SELECT COUNT(*) FROM information_schema.tables
257+
WHERE table_schema = %s AND table_name = '~lineage'
258+
""",
259+
args=(schema,),
260+
).fetchone()[0]
261+
> 0
262+
)
263+
264+
if lineage_table_exists:
265+
# Load from ~lineage table
266+
lineage_table = LineageTable(connection, schema)
267+
return lineage_table.get_table_lineage(table_name)
268+
else:
269+
# Compute from FK graph (fallback for non-DataJoint schemas)
270+
return _compute_all_lineage_from_graph(connection, schema, table_name)
271+
272+
228273
def compute_schema_lineage(connection, schema):
229274
"""
230-
Compute and populate the ~lineage table for a schema.
275+
Compute and populate the ~lineage table for a schema using the dependency graph.
276+
277+
This is a migration/initialization utility that analyzes FK relationships
278+
to determine attribute origins and stores them in the ~lineage table.
231279
232-
Analyzes foreign key relationships to determine attribute origins.
280+
After this is run, the schema will use lineage tables instead of the
281+
dependency graph for lineage lookups.
233282
234283
:param connection: database connection
235284
:param schema: schema object or schema name
@@ -262,42 +311,11 @@ def compute_schema_lineage(connection, schema):
262311

263312
# Compute and store lineage for each table
264313
for table_name in tables:
265-
lineage_map = compute_all_lineage_for_table(connection, schema_name, table_name)
314+
lineage_map = _compute_all_lineage_from_graph(
315+
connection, schema_name, table_name
316+
)
266317
for attr_name, lineage in lineage_map.items():
267318
if lineage is not None:
268319
lineage_table.store_lineage(table_name, attr_name, lineage)
269320

270321
logger.info(f"Computed lineage for schema `{schema_name}`: {len(tables)} tables")
271-
272-
273-
def get_lineage_for_heading(connection, schema, table_name, heading):
274-
"""
275-
Get lineage information for all attributes in a heading.
276-
277-
First tries to load from ~lineage table, falls back to FK graph computation.
278-
279-
:param connection: database connection
280-
:param schema: schema name
281-
:param table_name: table name
282-
:param heading: Heading object to populate
283-
:return: dict mapping attribute_name -> lineage (or None)
284-
"""
285-
# Check if ~lineage table exists
286-
lineage_table_exists = (
287-
connection.query(
288-
"""
289-
SELECT COUNT(*) FROM information_schema.tables
290-
WHERE table_schema = %s AND table_name = '~lineage'
291-
""",
292-
args=(schema,),
293-
).fetchone()[0]
294-
> 0
295-
)
296-
297-
if lineage_table_exists:
298-
# Load from ~lineage table
299-
lineage_table = LineageTable(connection, schema)
300-
return lineage_table.get_table_lineage(table_name)
301-
else:
302-
# Compute from FK graph
303-
return compute_all_lineage_for_table(connection, schema, table_name)

0 commit comments

Comments
 (0)