Skip to content

Commit 5452fb1

Browse files
committed
Phase 1: Add lineage infrastructure for semantic matching
- Add lineage field to Attribute namedtuple in heading.py - Add get_lineage() method to Heading class - Create lineage.py module with: - LineageTable class for ~lineage hidden table - compute_lineage_from_dependencies() for FK graph fallback - migrate_schema_lineage() utility function - Integrate LineageTable with Schema class: - Add schema.lineage property - Add schema.migrate_lineage() method
1 parent 70c35b2 commit 5452fb1

File tree

3 files changed

+357
-0
lines changed

3 files changed

+357
-0
lines changed

datajoint/heading.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
attribute_expression=None,
4242
database=None,
4343
dtype=object,
44+
lineage=None, # "schema.table.attribute" string tracing attribute origin, or None
4445
)
4546
)
4647

@@ -154,6 +155,15 @@ def new_attributes(self):
154155
k for k, v in self.attributes.items() if v.attribute_expression is not None
155156
]
156157

158+
def get_lineage(self, name):
159+
"""
160+
Get the lineage of an attribute.
161+
162+
:param name: attribute name
163+
:return: lineage string "schema.table.attribute" or None
164+
"""
165+
return self.attributes[name].lineage
166+
157167
def __getitem__(self, name):
158168
"""shortcut to the attribute"""
159169
return self.attributes[name]

datajoint/lineage.py

Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
"""
2+
Lineage tracking for semantic matching in joins.
3+
4+
This module provides:
5+
- LineageTable: hidden table (~lineage) for storing attribute lineage
6+
- Functions to compute lineage from the FK graph (fallback)
7+
- Migration utilities for existing schemas
8+
"""
9+
10+
import logging
11+
import re
12+
13+
from .errors import DataJointError
14+
from .heading import Heading
15+
from .table import Table
16+
17+
logger = logging.getLogger(__name__.split(".")[0])
18+
19+
20+
class LineageTable(Table):
21+
"""
22+
Hidden table for storing attribute lineage information.
23+
24+
Each row maps (table_name, attribute_name) -> lineage string.
25+
Lineage is "schema.table.attribute" tracing the attribute to its origin,
26+
or NULL for native secondary attributes (which have no lineage).
27+
"""
28+
29+
def __init__(self, connection, database):
30+
self.database = database
31+
self._connection = connection
32+
self._heading = Heading(
33+
table_info=dict(
34+
conn=connection,
35+
database=database,
36+
table_name=self.table_name,
37+
context=None,
38+
)
39+
)
40+
self._support = [self.full_table_name]
41+
42+
if not self.is_declared:
43+
self.declare()
44+
45+
@property
46+
def definition(self):
47+
return """
48+
# Attribute lineage tracking for semantic matching
49+
table_name : varchar(64) # name of the table
50+
attribute_name : varchar(64) # name of the attribute
51+
---
52+
lineage : varchar(200) # "schema.table.attribute" or empty for no lineage
53+
"""
54+
55+
@property
56+
def table_name(self):
57+
return "~lineage"
58+
59+
def delete(self):
60+
"""Bypass interactive prompts and dependencies."""
61+
self.delete_quick()
62+
63+
def drop(self):
64+
"""Bypass interactive prompts and dependencies."""
65+
self.drop_quick()
66+
67+
def store_lineage(self, table_name, attribute_name, lineage):
68+
"""
69+
Store lineage for an attribute.
70+
71+
:param table_name: name of the table (without schema)
72+
:param attribute_name: name of the attribute
73+
:param lineage: lineage string "schema.table.attribute" or None
74+
"""
75+
self.insert1(
76+
dict(
77+
table_name=table_name,
78+
attribute_name=attribute_name,
79+
lineage=lineage or "", # Store None as empty string
80+
),
81+
replace=True,
82+
)
83+
84+
def get_lineage(self, table_name, attribute_name):
85+
"""
86+
Get lineage for an attribute.
87+
88+
:param table_name: name of the table (without schema)
89+
:param attribute_name: name of the attribute
90+
:return: lineage string or None
91+
"""
92+
result = (
93+
self & dict(table_name=table_name, attribute_name=attribute_name)
94+
).fetch("lineage")
95+
if len(result) == 0:
96+
return None
97+
lineage = result[0]
98+
return lineage if lineage else None # Convert empty string back to None
99+
100+
def get_table_lineage(self, table_name):
101+
"""
102+
Get lineage for all attributes in a table.
103+
104+
:param table_name: name of the table (without schema)
105+
:return: dict mapping attribute_name -> lineage (or None)
106+
"""
107+
result = (self & dict(table_name=table_name)).fetch(
108+
"attribute_name", "lineage"
109+
)
110+
if len(result[0]) == 0:
111+
return {}
112+
return {
113+
attr: (lin if lin else None)
114+
for attr, lin in zip(result[0], result[1])
115+
}
116+
117+
def delete_table_lineage(self, table_name):
118+
"""
119+
Delete all lineage records for a table.
120+
121+
:param table_name: name of the table (without schema)
122+
"""
123+
(self & dict(table_name=table_name)).delete_quick()
124+
125+
126+
def parse_full_table_name(full_name):
127+
"""
128+
Parse a full table name like `schema`.`table` into (schema, table).
129+
130+
:param full_name: full table name in format `schema`.`table`
131+
:return: tuple (schema, table)
132+
"""
133+
match = re.match(r"`(\w+)`\.`(\w+)`", full_name)
134+
if not match:
135+
raise DataJointError(f"Invalid table name format: {full_name}")
136+
return match.group(1), match.group(2)
137+
138+
139+
def compute_lineage_from_dependencies(connection, schema, table_name, attribute_name):
140+
"""
141+
Compute lineage by traversing the FK graph.
142+
143+
Uses connection.dependencies which loads FK info from INFORMATION_SCHEMA.
144+
This is the fallback when the ~lineage table doesn't exist.
145+
146+
:param connection: database connection
147+
:param schema: schema name
148+
:param table_name: table name
149+
:param attribute_name: attribute name
150+
:return: lineage string "schema.table.attribute" or None for native secondary attrs
151+
"""
152+
connection.dependencies.load(force=False)
153+
154+
full_table_name = f"`{schema}`.`{table_name}`"
155+
156+
# Check if the table exists in the dependency graph
157+
if full_table_name not in connection.dependencies:
158+
# Table not in graph - compute lineage based on primary key status
159+
# We need to query the database to check if this is a PK attribute
160+
pk_attrs = _get_primary_key_attrs(connection, schema, table_name)
161+
if attribute_name in pk_attrs:
162+
return f"{schema}.{table_name}.{attribute_name}"
163+
else:
164+
return None
165+
166+
# Check incoming edges (foreign keys TO this table's parents)
167+
for parent, props in connection.dependencies.parents(full_table_name).items():
168+
attr_map = props.get("attr_map", {})
169+
if attribute_name in attr_map:
170+
# This attribute is inherited from parent - recurse to find origin
171+
parent_attr = attr_map[attribute_name]
172+
# Handle alias nodes (numeric string nodes in the graph)
173+
if parent.isdigit():
174+
# Find the actual parent by traversing through alias
175+
for grandparent, gprops in connection.dependencies.parents(parent).items():
176+
if not grandparent.isdigit():
177+
parent = grandparent
178+
parent_attr = gprops.get("attr_map", {}).get(attribute_name, parent_attr)
179+
break
180+
parent_schema, parent_table = parse_full_table_name(parent)
181+
return compute_lineage_from_dependencies(
182+
connection, parent_schema, parent_table, parent_attr
183+
)
184+
185+
# Not inherited - check if it's a primary key attribute
186+
node_data = connection.dependencies.nodes.get(full_table_name, {})
187+
pk_attrs = node_data.get("primary_key", set())
188+
189+
if attribute_name in pk_attrs:
190+
# Native primary key attribute - has lineage to itself
191+
return f"{schema}.{table_name}.{attribute_name}"
192+
else:
193+
# Native secondary attribute - no lineage
194+
return None
195+
196+
197+
def _get_primary_key_attrs(connection, schema, table_name):
198+
"""
199+
Get the primary key attributes for a table by querying the database.
200+
201+
:param connection: database connection
202+
:param schema: schema name
203+
:param table_name: table name
204+
:return: set of primary key attribute names
205+
"""
206+
result = connection.query(
207+
"""
208+
SELECT column_name
209+
FROM information_schema.key_column_usage
210+
WHERE table_schema = %s
211+
AND table_name = %s
212+
AND constraint_name = 'PRIMARY'
213+
""",
214+
args=(schema, table_name),
215+
)
216+
return {row[0] for row in result}
217+
218+
219+
def compute_all_lineage_for_table(connection, schema, table_name):
220+
"""
221+
Compute lineage for all attributes in a table.
222+
223+
:param connection: database connection
224+
:param schema: schema name
225+
:param table_name: table name
226+
:return: dict mapping attribute_name -> lineage (or None)
227+
"""
228+
# Get all attributes for this table
229+
result = connection.query(
230+
"""
231+
SELECT column_name
232+
FROM information_schema.columns
233+
WHERE table_schema = %s AND table_name = %s
234+
ORDER BY ordinal_position
235+
""",
236+
args=(schema, table_name),
237+
)
238+
attributes = [row[0] for row in result]
239+
240+
# Compute lineage for each attribute
241+
lineage_map = {}
242+
for attr in attributes:
243+
lineage_map[attr] = compute_lineage_from_dependencies(
244+
connection, schema, table_name, attr
245+
)
246+
247+
return lineage_map
248+
249+
250+
def migrate_schema_lineage(connection, schema):
251+
"""
252+
Compute and populate the ~lineage table for an existing schema.
253+
254+
Analyzes foreign key relationships to determine attribute origins.
255+
256+
:param connection: database connection
257+
:param schema: schema object or schema name
258+
"""
259+
from .schemas import Schema
260+
261+
if isinstance(schema, Schema):
262+
schema_name = schema.database
263+
else:
264+
schema_name = schema
265+
266+
# Create or get the lineage table
267+
lineage_table = LineageTable(connection, schema_name)
268+
269+
# Get all user tables in the schema (excluding hidden tables)
270+
result = connection.query(
271+
"""
272+
SELECT table_name
273+
FROM information_schema.tables
274+
WHERE table_schema = %s
275+
AND table_name NOT LIKE '~%%'
276+
AND table_type = 'BASE TABLE'
277+
""",
278+
args=(schema_name,),
279+
)
280+
tables = [row[0] for row in result]
281+
282+
# Ensure dependencies are loaded
283+
connection.dependencies.load(force=True)
284+
285+
# Compute and store lineage for each table
286+
for table_name in tables:
287+
lineage_map = compute_all_lineage_for_table(connection, schema_name, table_name)
288+
for attr_name, lineage in lineage_map.items():
289+
lineage_table.store_lineage(table_name, attr_name, lineage)
290+
291+
logger.info(f"Migrated lineage for schema `{schema_name}`: {len(tables)} tables")
292+
293+
294+
def get_lineage_for_heading(connection, schema, table_name, heading):
295+
"""
296+
Get lineage information for all attributes in a heading.
297+
298+
First tries to load from ~lineage table, falls back to FK graph computation.
299+
300+
:param connection: database connection
301+
:param schema: schema name
302+
:param table_name: table name
303+
:param heading: Heading object to populate
304+
:return: dict mapping attribute_name -> lineage (or None)
305+
"""
306+
# Check if ~lineage table exists
307+
lineage_table_exists = connection.query(
308+
"""
309+
SELECT COUNT(*) FROM information_schema.tables
310+
WHERE table_schema = %s AND table_name = '~lineage'
311+
""",
312+
args=(schema,),
313+
).fetchone()[0] > 0
314+
315+
if lineage_table_exists:
316+
# Load from ~lineage table
317+
lineage_table = LineageTable(connection, schema)
318+
return lineage_table.get_table_lineage(table_name)
319+
else:
320+
# Compute from FK graph
321+
return compute_all_lineage_for_table(connection, schema, table_name)

datajoint/schemas.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from .external import ExternalMapping
1212
from .heading import Heading
1313
from .jobs import JobTable
14+
from .lineage import LineageTable, migrate_schema_lineage
1415
from .settings import config
1516
from .table import FreeTable, Log, lookup_class_name
1617
from .user_tables import Computed, Imported, Lookup, Manual, Part, _get_tier
@@ -71,6 +72,7 @@ def __init__(
7172
self.create_schema = create_schema
7273
self.create_tables = create_tables
7374
self._jobs = None
75+
self._lineage = None
7476
self.external = ExternalMapping(self)
7577
self.add_objects = add_objects
7678
self.declare_list = []
@@ -401,6 +403,30 @@ def jobs(self):
401403
self._jobs = JobTable(self.connection, self.database)
402404
return self._jobs
403405

406+
@property
407+
def lineage(self):
408+
"""
409+
schema.lineage provides a view of the lineage tracking table for the schema.
410+
411+
The lineage table stores attribute origin information for semantic matching in joins.
412+
413+
:return: lineage table
414+
"""
415+
self._assert_exists()
416+
if self._lineage is None:
417+
self._lineage = LineageTable(self.connection, self.database)
418+
return self._lineage
419+
420+
def migrate_lineage(self):
421+
"""
422+
Compute and populate the ~lineage table for this schema.
423+
424+
Analyzes foreign key relationships to determine attribute origins.
425+
Use this to migrate an existing schema to support semantic matching.
426+
"""
427+
self._assert_exists()
428+
migrate_schema_lineage(self.connection, self.database)
429+
404430
@property
405431
def code(self):
406432
self._assert_exists()

0 commit comments

Comments
 (0)