@@ -22,8 +22,15 @@ class LineageTable(Table):
2222 Hidden table for storing attribute lineage information.
2323
2424 Each row maps (table_name, attribute_name) -> lineage string.
25- Lineage is "schema.table.attribute" tracing the attribute to its origin,
26- or NULL for native secondary attributes (which have no lineage).
25+ Only attributes with lineage are stored; absence means no lineage.
26+ """
27+
28+ definition = """
29+ # Attribute lineage tracking for semantic matching
30+ table_name : varchar(64) # name of the table
31+ attribute_name : varchar(64) # name of the attribute
32+ ---
33+ lineage : varchar(200) # "schema.table.attribute"
2734 """
2835
2936 def __init__ (self , connection , database ):
@@ -42,16 +49,6 @@ def __init__(self, connection, database):
4249 if not self .is_declared :
4350 self .declare ()
4451
45- @property
46- def definition (self ):
47- return """
48- # Attribute lineage tracking for semantic matching
49- table_name : varchar(64) # name of the table
50- attribute_name : varchar(64) # name of the attribute
51- ---
52- lineage : varchar(200) # "schema.table.attribute" or empty for no lineage
53- """
54-
5552 @property
5653 def table_name (self ):
5754 return "~lineage"
@@ -66,48 +63,51 @@ def drop(self):
6663
6764 def store_lineage (self , table_name , attribute_name , lineage ):
6865 """
69- Store lineage for an attribute.
66+ Store lineage for an attribute. Only stores if lineage is not None.
7067
7168 :param table_name: name of the table (without schema)
7269 :param attribute_name: name of the attribute
7370 :param lineage: lineage string "schema.table.attribute" or None
7471 """
75- self .insert1 (
76- dict (
77- table_name = table_name ,
78- attribute_name = attribute_name ,
79- lineage = lineage or "" , # Store None as empty string
80- ),
81- replace = True ,
82- )
72+ if lineage is None :
73+ # No lineage - delete any existing entry
74+ (
75+ self & dict (table_name = table_name , attribute_name = attribute_name )
76+ ).delete_quick ()
77+ else :
78+ self .insert1 (
79+ dict (
80+ table_name = table_name ,
81+ attribute_name = attribute_name ,
82+ lineage = lineage ,
83+ ),
84+ replace = True ,
85+ )
8386
8487 def get_lineage (self , table_name , attribute_name ):
8588 """
8689 Get lineage for an attribute.
8790
8891 :param table_name: name of the table (without schema)
8992 :param attribute_name: name of the attribute
90- :return: lineage string or None
93+ :return: lineage string or None if no lineage
9194 """
9295 result = (
9396 self & dict (table_name = table_name , attribute_name = attribute_name )
9497 ).fetch ("lineage" )
95- if len (result ) == 0 :
96- return None
97- lineage = result [0 ]
98- return lineage if lineage else None # Convert empty string back to None
98+ return result [0 ] if len (result ) else None
9999
100100 def get_table_lineage (self , table_name ):
101101 """
102102 Get lineage for all attributes in a table.
103103
104104 :param table_name: name of the table (without schema)
105- :return: dict mapping attribute_name -> lineage (or None )
105+ :return: dict mapping attribute_name -> lineage (only attributes with lineage )
106106 """
107107 result = (self & dict (table_name = table_name )).fetch ("attribute_name" , "lineage" )
108108 if len (result [0 ]) == 0 :
109109 return {}
110- return { attr : ( lin if lin else None ) for attr , lin in zip (result [0 ], result [1 ])}
110+ return dict ( zip (result [0 ], result [1 ]))
111111
112112 def delete_table_lineage (self , table_name ):
113113 """
@@ -285,7 +285,8 @@ def migrate_schema_lineage(connection, schema):
285285 for table_name in tables :
286286 lineage_map = compute_all_lineage_for_table (connection , schema_name , table_name )
287287 for attr_name , lineage in lineage_map .items ():
288- lineage_table .store_lineage (table_name , attr_name , lineage )
288+ if lineage is not None :
289+ lineage_table .store_lineage (table_name , attr_name , lineage )
289290
290291 logger .info (f"Migrated lineage for schema `{ schema_name } `: { len (tables )} tables" )
291292
0 commit comments