Skip to content

Commit dcf4830

Browse files
committed
Populate lineage table at table declaration time
- Add populate_table_lineage() function in lineage.py - Call it from Table.declare() after successful table creation - For FK attributes: copy lineage from parent's lineage table - For native PK attributes: set lineage to "schema.table.attribute" - Native secondary attributes get no lineage entry
1 parent 1df7e8a commit dcf4830

File tree

2 files changed

+91
-0
lines changed

2 files changed

+91
-0
lines changed

datajoint/lineage.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,86 @@ def delete_table_lineage(self, table_name):
125125
(self & dict(table_name=table_name)).delete_quick()
126126

127127

128+
def populate_table_lineage(connection, schema, table_name, heading):
129+
"""
130+
Populate lineage for a newly declared table.
131+
132+
Called at table declaration time to store lineage for all attributes:
133+
- Native PK attributes: lineage = "schema.table.attribute"
134+
- FK attributes: copy lineage from parent's lineage table
135+
- Native secondary attributes: no entry (no lineage)
136+
137+
:param connection: database connection
138+
:param schema: schema name
139+
:param table_name: table name (without schema prefix)
140+
:param heading: Heading object for the declared table
141+
"""
142+
# Get or create the lineage table
143+
lineage_table = LineageTable(connection, schema)
144+
145+
# Load FK info from dependencies
146+
connection.dependencies.load(force=True)
147+
full_table_name = f"`{schema}`.`{table_name}`"
148+
149+
# Build map of FK attributes -> (parent_schema, parent_table, parent_attr)
150+
fk_sources = {}
151+
if full_table_name in connection.dependencies:
152+
for parent, props in connection.dependencies.parents(full_table_name).items():
153+
# Skip alias nodes
154+
if parent.isdigit():
155+
continue
156+
attr_map = props.get("attr_map", {})
157+
parent_schema, parent_table = parse_full_table_name(parent)
158+
for child_attr, parent_attr in attr_map.items():
159+
fk_sources[child_attr] = (parent_schema, parent_table, parent_attr)
160+
161+
# Store lineage for each attribute
162+
for attr_name in heading.names:
163+
if attr_name in fk_sources:
164+
# FK attribute - copy lineage from parent's lineage table
165+
parent_schema, parent_table, parent_attr = fk_sources[attr_name]
166+
parent_lineage = _get_parent_lineage(
167+
connection, parent_schema, parent_table, parent_attr
168+
)
169+
if parent_lineage:
170+
lineage_table.store_lineage(table_name, attr_name, parent_lineage)
171+
elif attr_name in heading.primary_key:
172+
# Native PK attribute - lineage to self
173+
lineage = f"{schema}.{table_name}.{attr_name}"
174+
lineage_table.store_lineage(table_name, attr_name, lineage)
175+
# Native secondary attributes: no entry needed
176+
177+
178+
def _get_parent_lineage(connection, parent_schema, parent_table, parent_attr):
179+
"""
180+
Get lineage for a parent attribute from its lineage table.
181+
182+
:param connection: database connection
183+
:param parent_schema: parent schema name
184+
:param parent_table: parent table name
185+
:param parent_attr: parent attribute name
186+
:return: lineage string or None
187+
"""
188+
# Check if parent schema has a lineage table
189+
lineage_table_exists = (
190+
connection.query(
191+
"""
192+
SELECT COUNT(*) FROM information_schema.tables
193+
WHERE table_schema = %s AND table_name = '~lineage'
194+
""",
195+
args=(parent_schema,),
196+
).fetchone()[0]
197+
> 0
198+
)
199+
200+
if lineage_table_exists:
201+
parent_lineage_table = LineageTable(connection, parent_schema)
202+
return parent_lineage_table.get_lineage(parent_table, parent_attr)
203+
else:
204+
# Parent schema has no lineage table - attribute has no tracked lineage
205+
return None
206+
207+
128208
# =============================================================================
129209
# Dependency Graph Method (fallback for non-DataJoint schemas)
130210
# =============================================================================

datajoint/table.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,17 @@ def declare(self, context=None):
119119
pass
120120
else:
121121
self._log("Declared " + self.full_table_name)
122+
# Populate lineage table for semantic matching
123+
from .lineage import populate_table_lineage
124+
125+
try:
126+
populate_table_lineage(
127+
self.connection, self.database, self.table_name, self.heading
128+
)
129+
except Exception as e:
130+
logger.debug(
131+
f"Could not populate lineage for {self.full_table_name}: {e}"
132+
)
122133

123134
def alter(self, prompt=True, context=None):
124135
"""

0 commit comments

Comments
 (0)