From 73122a0d29b52324ac33eda6fc11e72457a9e6c3 Mon Sep 17 00:00:00 2001 From: brucearctor <5032356+brucearctor@users.noreply.github.com.> Date: Fri, 13 Mar 2026 19:42:57 -0700 Subject: [PATCH] Fix: Surface destructive schema changes on forward-only models during dev plans (#5719) Extend _check_destructive_additive_changes in PlanBuilder to also inspect indirectly modified forward-only snapshots for destructive/additive schema changes. Previously, only directly modified snapshots were checked, and the evaluator-level check (MigrateSchemasStage) is skipped for dev plans. Changes: - Refactor schema diffing into reusable _check_schema_change helper - Pass indirectly_modified mapping to _check_destructive_additive_changes - Add loop to check indirectly modified forward-only snapshots - Add 5 new tests for is_dev=True scenarios (ERROR/WARN/ALLOW modes, indirect modification, --forward-only flag) --- sqlmesh/core/plan/builder.py | 121 ++++++++++++------ tests/core/test_plan.py | 236 +++++++++++++++++++++++++++++++++++ 2 files changed, 317 insertions(+), 40 deletions(-) diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index 7d753cc330..89fe63c1c6 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -291,7 +291,7 @@ def build(self) -> Plan: dag = self._build_dag() directly_modified, indirectly_modified = self._build_directly_and_indirectly_modified(dag) - self._check_destructive_additive_changes(directly_modified) + self._check_destructive_additive_changes(directly_modified, indirectly_modified) self._categorize_snapshots(dag, indirectly_modified) self._adjust_snapshot_intervals() @@ -547,7 +547,11 @@ def _adjust_snapshot_intervals(self) -> None: if new.is_forward_only: new.dev_intervals = new.intervals.copy() - def _check_destructive_additive_changes(self, directly_modified: t.Set[SnapshotId]) -> None: + def _check_destructive_additive_changes( + self, + directly_modified: t.Set[SnapshotId], + indirectly_modified: SnapshotMapping, + ) -> None: for s_id in sorted(directly_modified): if s_id.name not in self._context_diff.modified_snapshots: continue @@ -566,49 +570,86 @@ def _check_destructive_additive_changes(self, directly_modified: t.Set[SnapshotI continue new, old = self._context_diff.modified_snapshots[snapshot.name] + self._check_schema_change( + snapshot, new, old, needs_destructive_check, needs_additive_check + ) - # we must know all columns_to_types to determine whether a change is destructive - old_columns_to_types = old.model.columns_to_types or {} - new_columns_to_types = new.model.columns_to_types or {} + # Also check indirectly modified forward-only snapshots. + # These inherit schema changes from upstream and the evaluator-level check + # (in MigrateSchemasStage) is skipped for dev plans. + for _parent_id, downstream_ids in indirectly_modified.items(): + for s_id in sorted(downstream_ids): + if s_id.name not in self._context_diff.modified_snapshots: + continue - if columns_to_types_all_known(old_columns_to_types) and columns_to_types_all_known( - new_columns_to_types - ): - alter_operations = t.cast( - t.List[TableAlterOperation], - get_schema_differ(snapshot.model.dialect).compare_columns( - new.name, - old_columns_to_types, - new_columns_to_types, - ignore_destructive=new.model.on_destructive_change.is_ignore, - ignore_additive=new.model.on_additive_change.is_ignore, - ), + snapshot = self._context_diff.snapshots[s_id] + if not snapshot.is_model: + continue + + is_forward_only = self._is_forward_only_change(s_id) or self._forward_only + if not is_forward_only: + continue + + needs_destructive_check = snapshot.needs_destructive_check( + self._allow_destructive_models + ) + needs_additive_check = snapshot.needs_additive_check(self._allow_additive_models) + if not needs_destructive_check and not needs_additive_check: + continue + + new, old = self._context_diff.modified_snapshots[snapshot.name] + self._check_schema_change( + snapshot, new, old, needs_destructive_check, needs_additive_check ) - snapshot_name = snapshot.name - model_dialect = snapshot.model.dialect + def _check_schema_change( + self, + snapshot: Snapshot, + new: Snapshot, + old: Snapshot, + needs_destructive_check: bool, + needs_additive_check: bool, + ) -> None: + # we must know all columns_to_types to determine whether a change is destructive + old_columns_to_types = old.model.columns_to_types or {} + new_columns_to_types = new.model.columns_to_types or {} - if needs_destructive_check and has_drop_alteration(alter_operations): - self._console.log_destructive_change( - snapshot_name, - alter_operations, - model_dialect, - error=not snapshot.model.on_destructive_change.is_warn, - ) - if snapshot.model.on_destructive_change.is_error: - raise PlanError( - "Plan requires a destructive change to a forward-only model." - ) - - if needs_additive_check and has_additive_alteration(alter_operations): - self._console.log_additive_change( - snapshot_name, - alter_operations, - model_dialect, - error=not snapshot.model.on_additive_change.is_warn, - ) - if snapshot.model.on_additive_change.is_error: - raise PlanError("Plan requires an additive change to a forward-only model.") + if columns_to_types_all_known(old_columns_to_types) and columns_to_types_all_known( + new_columns_to_types + ): + alter_operations = t.cast( + t.List[TableAlterOperation], + get_schema_differ(snapshot.model.dialect).compare_columns( + new.name, + old_columns_to_types, + new_columns_to_types, + ignore_destructive=new.model.on_destructive_change.is_ignore, + ignore_additive=new.model.on_additive_change.is_ignore, + ), + ) + + snapshot_name = snapshot.name + model_dialect = snapshot.model.dialect + + if needs_destructive_check and has_drop_alteration(alter_operations): + self._console.log_destructive_change( + snapshot_name, + alter_operations, + model_dialect, + error=not snapshot.model.on_destructive_change.is_warn, + ) + if snapshot.model.on_destructive_change.is_error: + raise PlanError("Plan requires a destructive change to a forward-only model.") + + if needs_additive_check and has_additive_alteration(alter_operations): + self._console.log_additive_change( + snapshot_name, + alter_operations, + model_dialect, + error=not snapshot.model.on_additive_change.is_warn, + ) + if snapshot.model.on_additive_change.is_error: + raise PlanError("Plan requires an additive change to a forward-only model.") def _categorize_snapshots( self, dag: DAG[SnapshotId], indirectly_modified: SnapshotMapping diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index 4b330c376f..2853dc26f5 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -774,6 +774,242 @@ def test_forward_only_model_on_destructive_change_no_column_types( assert mock_logger.call_count == 0 +def test_forward_only_destructive_change_dev_plan( + make_snapshot_on_destructive_change, +): + """Issue #5719: Destructive schema changes on forward-only models must be + caught during dev plans, not just prod plans.""" + snapshot_a_old, snapshot_a = make_snapshot_on_destructive_change() + + context_diff = ContextDiff( + environment="dev", + is_new_environment=True, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={snapshot_a.name: (snapshot_a, snapshot_a_old)}, + snapshots={snapshot_a.snapshot_id: snapshot_a, snapshot_a_old.snapshot_id: snapshot_a_old}, + new_snapshots={snapshot_a.snapshot_id: snapshot_a}, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + # Should raise PlanError during dev plan too + with pytest.raises( + PlanError, match="Plan requires a destructive change to a forward-only model" + ): + PlanBuilder(context_diff, forward_only=False, is_dev=True).build() + + +def test_forward_only_destructive_change_dev_plan_warn( + mocker, + make_snapshot_on_destructive_change, +): + """Issue #5719: Destructive schema changes with on_destructive_change=WARN + should log a warning during dev plans.""" + snapshot_a_old, snapshot_a = make_snapshot_on_destructive_change( + on_destructive_change=OnDestructiveChange.WARN + ) + + context_diff = ContextDiff( + environment="dev", + is_new_environment=True, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={snapshot_a.name: (snapshot_a, snapshot_a_old)}, + snapshots={snapshot_a.snapshot_id: snapshot_a, snapshot_a_old.snapshot_id: snapshot_a_old}, + new_snapshots={snapshot_a.snapshot_id: snapshot_a}, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + console = TerminalConsole() + log_destructive_spy = mocker.spy(console, "log_destructive_change") + + # Should NOT raise, but should log a warning + PlanBuilder(context_diff, forward_only=False, is_dev=True, console=console).build() + assert log_destructive_spy.call_count == 1 + + +def test_forward_only_destructive_change_dev_plan_allow( + make_snapshot_on_destructive_change, +): + """Issue #5719: Destructive schema changes with on_destructive_change=ALLOW + should pass silently during dev plans.""" + snapshot_a_old, snapshot_a = make_snapshot_on_destructive_change( + on_destructive_change=OnDestructiveChange.ALLOW + ) + + context_diff = ContextDiff( + environment="dev", + is_new_environment=True, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={snapshot_a.name: (snapshot_a, snapshot_a_old)}, + snapshots={snapshot_a.snapshot_id: snapshot_a, snapshot_a_old.snapshot_id: snapshot_a_old}, + new_snapshots={snapshot_a.snapshot_id: snapshot_a}, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + # Should build successfully + PlanBuilder(context_diff, forward_only=False, is_dev=True).build() + + +def test_forward_only_indirect_destructive_change_dev_plan( + make_snapshot, + make_snapshot_on_destructive_change, +): + """Issue #5719: Indirectly modified downstream forward-only models should + also be checked for destructive schema changes during dev plans. + In this case, A has ALLOW so its check passes, and B is only indirectly + modified with no schema change of its own, so the plan should succeed.""" + # A is directly modified with ALLOW so it passes its own check + snapshot_a_old, snapshot_a = make_snapshot_on_destructive_change( + on_destructive_change=OnDestructiveChange.ALLOW, + old_query="select '1' as one, '2' as two, '2022-01-01' ds", + new_query="select '1' as one, '2022-01-01' ds", + ) + + # B is a downstream forward-only model — same query in both versions + b_query = "select one, '2022-01-01' ds from a" + + snapshot_b_old = make_snapshot( + SqlModel( + name="b", + dialect="duckdb", + query=parse_one(b_query), + kind=IncrementalByTimeRangeKind(time_column="ds", forward_only=True), + ), + nodes={'"a"': snapshot_a_old.model}, + ) + + snapshot_b = make_snapshot( + SqlModel( + name="b", + dialect="duckdb", + query=parse_one(b_query), + kind=IncrementalByTimeRangeKind(time_column="ds", forward_only=True), + ), + nodes={'"a"': snapshot_a.model}, + ) + snapshot_b.previous_versions = ( + SnapshotDataVersion( + fingerprint=SnapshotFingerprint( + data_hash="test_data_hash_b", + metadata_hash="test_metadata_hash_b", + ), + version="test_version_b", + dev_table_suffix="dev", + ), + ) + + context_diff = ContextDiff( + environment="dev", + is_new_environment=True, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={ + snapshot_a.name: (snapshot_a, snapshot_a_old), + snapshot_b.name: (snapshot_b, snapshot_b_old), + }, + snapshots={ + snapshot_a.snapshot_id: snapshot_a, + snapshot_b.snapshot_id: snapshot_b, + }, + new_snapshots={ + snapshot_a.snapshot_id: snapshot_a, + snapshot_b.snapshot_id: snapshot_b, + }, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + # B's schema didn't change from its own query perspective, so plan succeeds + PlanBuilder(context_diff, forward_only=False, is_dev=True).build() + + +def test_forward_only_flag_destructive_change_dev_plan( + make_snapshot, +): + """Issue #5719: When using the --forward-only flag on a dev plan, destructive + schema changes should be caught at plan-build time.""" + snapshot_old = make_snapshot( + SqlModel( + name="a", + dialect="duckdb", + query=parse_one("select '1' as one, '2' as two, '2022-01-01' ds"), + kind=IncrementalByTimeRangeKind(time_column="ds"), + ) + ) + + snapshot = make_snapshot( + SqlModel( + name="a", + dialect="duckdb", + query=parse_one("select 1 as one, 2 as two, '2022-01-01' ds"), + kind=IncrementalByTimeRangeKind(time_column="ds"), + ) + ) + + context_diff = ContextDiff( + environment="dev", + is_new_environment=True, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={snapshot.name: (snapshot, snapshot_old)}, + snapshots={snapshot.snapshot_id: snapshot, snapshot_old.snapshot_id: snapshot_old}, + new_snapshots={snapshot.snapshot_id: snapshot}, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + # forward_only=True flag + is_dev=True should still catch destructive type changes + with pytest.raises( + PlanError, match="Plan requires a destructive change to a forward-only model" + ): + PlanBuilder(context_diff, forward_only=True, is_dev=True).build() + + def test_missing_intervals_lookback(make_snapshot, mocker: MockerFixture): snapshot_a = make_snapshot( SqlModel(