|
15 | 15 | from sqlmesh.utils.errors import SQLMeshError |
16 | 16 | from sqlmesh.utils.connection_pool import ConnectionPool |
17 | 17 | from sqlmesh.core.schema_diff import TableAlterOperation |
| 18 | +from sqlmesh.utils import random_id |
18 | 19 |
|
19 | 20 |
|
20 | 21 | logger = logging.getLogger(__name__) |
@@ -156,15 +157,111 @@ def set_current_catalog(self, catalog_name: str) -> None: |
156 | 157 | ) |
157 | 158 |
|
158 | 159 | def alter_table( |
159 | | - self, |
160 | | - alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], |
| 160 | + self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]] |
161 | 161 | ) -> None: |
162 | 162 | """ |
163 | | - Disables ALTER TABLE for Fabric since it has limited support. |
164 | | - By making this a no-op, we signal to the caller to fall back to a |
165 | | - more reliable drop/add strategy for columns to apply schema changes. |
| 163 | + Applies alter expressions to a table. Fabric has limited support for ALTER TABLE, |
| 164 | + so this method implements a workaround for column type changes. |
| 165 | + This method is self-contained and sets its own catalog context. |
166 | 166 | """ |
167 | | - return |
| 167 | + if not alter_expressions: |
| 168 | + return |
| 169 | + |
| 170 | + # Get the target table from the first expression to determine the correct catalog. |
| 171 | + first_op = alter_expressions[0] |
| 172 | + expression = first_op.expression if isinstance(first_op, TableAlterOperation) else first_op |
| 173 | + if not isinstance(expression, exp.Alter) or not expression.this.catalog: |
| 174 | + # Fallback for unexpected scenarios |
| 175 | + logger.warning( |
| 176 | + "Could not determine catalog from alter expression, executing with current context." |
| 177 | + ) |
| 178 | + super().alter_table(alter_expressions) |
| 179 | + return |
| 180 | + |
| 181 | + target_catalog = expression.this.catalog |
| 182 | + self.set_current_catalog(target_catalog) |
| 183 | + |
| 184 | + with self.transaction(): |
| 185 | + for op in alter_expressions: |
| 186 | + expression = op.expression if isinstance(op, TableAlterOperation) else op |
| 187 | + |
| 188 | + if not isinstance(expression, exp.Alter): |
| 189 | + self.execute(expression) |
| 190 | + continue |
| 191 | + |
| 192 | + for action in expression.actions: |
| 193 | + table_name = expression.this |
| 194 | + |
| 195 | + table_name_without_catalog = table_name.copy() |
| 196 | + table_name_without_catalog.set("catalog", None) |
| 197 | + |
| 198 | + is_type_change = isinstance(action, exp.AlterColumn) and action.args.get( |
| 199 | + "dtype" |
| 200 | + ) |
| 201 | + |
| 202 | + if is_type_change: |
| 203 | + column_to_alter = action.this |
| 204 | + new_type = action.args["dtype"] |
| 205 | + temp_column_name_str = f"{column_to_alter.name}__{random_id(short=True)}" |
| 206 | + temp_column_name = exp.to_identifier(temp_column_name_str) |
| 207 | + |
| 208 | + logger.info( |
| 209 | + "Applying workaround for column '%s' on table '%s' to change type to '%s'.", |
| 210 | + column_to_alter.sql(), |
| 211 | + table_name.sql(), |
| 212 | + new_type.sql(), |
| 213 | + ) |
| 214 | + |
| 215 | + # Step 1: Add a temporary column. |
| 216 | + add_column_expr = exp.Alter( |
| 217 | + this=table_name_without_catalog.copy(), |
| 218 | + kind="TABLE", |
| 219 | + actions=[ |
| 220 | + exp.ColumnDef(this=temp_column_name.copy(), kind=new_type.copy()) |
| 221 | + ], |
| 222 | + ) |
| 223 | + add_sql = self._to_sql(add_column_expr) |
| 224 | + self.execute(add_sql) |
| 225 | + |
| 226 | + # Step 2: Copy and cast data. |
| 227 | + update_sql = self._to_sql( |
| 228 | + exp.Update( |
| 229 | + this=table_name_without_catalog.copy(), |
| 230 | + expressions=[ |
| 231 | + exp.EQ( |
| 232 | + this=temp_column_name.copy(), |
| 233 | + expression=exp.Cast( |
| 234 | + this=column_to_alter.copy(), to=new_type.copy() |
| 235 | + ), |
| 236 | + ) |
| 237 | + ], |
| 238 | + ) |
| 239 | + ) |
| 240 | + self.execute(update_sql) |
| 241 | + |
| 242 | + # Step 3: Drop the original column. |
| 243 | + drop_sql = self._to_sql( |
| 244 | + exp.Alter( |
| 245 | + this=table_name_without_catalog.copy(), |
| 246 | + kind="TABLE", |
| 247 | + actions=[exp.Drop(this=column_to_alter.copy(), kind="COLUMN")], |
| 248 | + ) |
| 249 | + ) |
| 250 | + self.execute(drop_sql) |
| 251 | + |
| 252 | + # Step 4: Rename the temporary column. |
| 253 | + old_name_qualified = f"{table_name_without_catalog.sql(dialect=self.dialect)}.{temp_column_name.sql(dialect=self.dialect)}" |
| 254 | + new_name_unquoted = column_to_alter.sql( |
| 255 | + dialect=self.dialect, identify=False |
| 256 | + ) |
| 257 | + rename_sql = f"EXEC sp_rename '{old_name_qualified}', '{new_name_unquoted}', 'COLUMN'" |
| 258 | + self.execute(rename_sql) |
| 259 | + else: |
| 260 | + # For other alterations, execute directly. |
| 261 | + direct_alter_expr = exp.Alter( |
| 262 | + this=table_name_without_catalog.copy(), kind="TABLE", actions=[action] |
| 263 | + ) |
| 264 | + self.execute(direct_alter_expr) |
168 | 265 |
|
169 | 266 |
|
170 | 267 | class FabricHttpClient: |
|
0 commit comments