Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 81 additions & 40 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def build(self) -> Plan:
dag = self._build_dag()
directly_modified, indirectly_modified = self._build_directly_and_indirectly_modified(dag)

self._check_destructive_additive_changes(directly_modified)
self._check_destructive_additive_changes(directly_modified, indirectly_modified)
self._categorize_snapshots(dag, indirectly_modified)
self._adjust_snapshot_intervals()

Expand Down Expand Up @@ -547,7 +547,11 @@ def _adjust_snapshot_intervals(self) -> None:
if new.is_forward_only:
new.dev_intervals = new.intervals.copy()

def _check_destructive_additive_changes(self, directly_modified: t.Set[SnapshotId]) -> None:
def _check_destructive_additive_changes(
self,
directly_modified: t.Set[SnapshotId],
indirectly_modified: SnapshotMapping,
) -> None:
for s_id in sorted(directly_modified):
if s_id.name not in self._context_diff.modified_snapshots:
continue
Expand All @@ -566,49 +570,86 @@ def _check_destructive_additive_changes(self, directly_modified: t.Set[SnapshotI
continue

new, old = self._context_diff.modified_snapshots[snapshot.name]
self._check_schema_change(
snapshot, new, old, needs_destructive_check, needs_additive_check
)

# we must know all columns_to_types to determine whether a change is destructive
old_columns_to_types = old.model.columns_to_types or {}
new_columns_to_types = new.model.columns_to_types or {}
# Also check indirectly modified forward-only snapshots.
# These inherit schema changes from upstream and the evaluator-level check
# (in MigrateSchemasStage) is skipped for dev plans.
for _parent_id, downstream_ids in indirectly_modified.items():
for s_id in sorted(downstream_ids):
if s_id.name not in self._context_diff.modified_snapshots:
continue

if columns_to_types_all_known(old_columns_to_types) and columns_to_types_all_known(
new_columns_to_types
):
alter_operations = t.cast(
t.List[TableAlterOperation],
get_schema_differ(snapshot.model.dialect).compare_columns(
new.name,
old_columns_to_types,
new_columns_to_types,
ignore_destructive=new.model.on_destructive_change.is_ignore,
ignore_additive=new.model.on_additive_change.is_ignore,
),
snapshot = self._context_diff.snapshots[s_id]
if not snapshot.is_model:
continue

is_forward_only = self._is_forward_only_change(s_id) or self._forward_only
if not is_forward_only:
continue

needs_destructive_check = snapshot.needs_destructive_check(
self._allow_destructive_models
)
needs_additive_check = snapshot.needs_additive_check(self._allow_additive_models)
if not needs_destructive_check and not needs_additive_check:
continue

new, old = self._context_diff.modified_snapshots[snapshot.name]
self._check_schema_change(
snapshot, new, old, needs_destructive_check, needs_additive_check
)

snapshot_name = snapshot.name
model_dialect = snapshot.model.dialect
def _check_schema_change(
self,
snapshot: Snapshot,
new: Snapshot,
old: Snapshot,
needs_destructive_check: bool,
needs_additive_check: bool,
) -> None:
# we must know all columns_to_types to determine whether a change is destructive
old_columns_to_types = old.model.columns_to_types or {}
new_columns_to_types = new.model.columns_to_types or {}

if needs_destructive_check and has_drop_alteration(alter_operations):
self._console.log_destructive_change(
snapshot_name,
alter_operations,
model_dialect,
error=not snapshot.model.on_destructive_change.is_warn,
)
if snapshot.model.on_destructive_change.is_error:
raise PlanError(
"Plan requires a destructive change to a forward-only model."
)

if needs_additive_check and has_additive_alteration(alter_operations):
self._console.log_additive_change(
snapshot_name,
alter_operations,
model_dialect,
error=not snapshot.model.on_additive_change.is_warn,
)
if snapshot.model.on_additive_change.is_error:
raise PlanError("Plan requires an additive change to a forward-only model.")
if columns_to_types_all_known(old_columns_to_types) and columns_to_types_all_known(
new_columns_to_types
):
alter_operations = t.cast(
t.List[TableAlterOperation],
get_schema_differ(snapshot.model.dialect).compare_columns(
new.name,
old_columns_to_types,
new_columns_to_types,
ignore_destructive=new.model.on_destructive_change.is_ignore,
ignore_additive=new.model.on_additive_change.is_ignore,
),
)

snapshot_name = snapshot.name
model_dialect = snapshot.model.dialect

if needs_destructive_check and has_drop_alteration(alter_operations):
self._console.log_destructive_change(
snapshot_name,
alter_operations,
model_dialect,
error=not snapshot.model.on_destructive_change.is_warn,
)
if snapshot.model.on_destructive_change.is_error:
raise PlanError("Plan requires a destructive change to a forward-only model.")

if needs_additive_check and has_additive_alteration(alter_operations):
self._console.log_additive_change(
snapshot_name,
alter_operations,
model_dialect,
error=not snapshot.model.on_additive_change.is_warn,
)
if snapshot.model.on_additive_change.is_error:
raise PlanError("Plan requires an additive change to a forward-only model.")

def _categorize_snapshots(
self, dag: DAG[SnapshotId], indirectly_modified: SnapshotMapping
Expand Down
236 changes: 236 additions & 0 deletions tests/core/test_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,242 @@ def test_forward_only_model_on_destructive_change_no_column_types(
assert mock_logger.call_count == 0


def test_forward_only_destructive_change_dev_plan(
make_snapshot_on_destructive_change,
):
"""Issue #5719: Destructive schema changes on forward-only models must be
caught during dev plans, not just prod plans."""
snapshot_a_old, snapshot_a = make_snapshot_on_destructive_change()

context_diff = ContextDiff(
environment="dev",
is_new_environment=True,
is_unfinalized_environment=False,
normalize_environment_name=True,
create_from="prod",
create_from_env_exists=True,
added=set(),
removed_snapshots={},
modified_snapshots={snapshot_a.name: (snapshot_a, snapshot_a_old)},
snapshots={snapshot_a.snapshot_id: snapshot_a, snapshot_a_old.snapshot_id: snapshot_a_old},
new_snapshots={snapshot_a.snapshot_id: snapshot_a},
previous_plan_id=None,
previously_promoted_snapshot_ids=set(),
previous_finalized_snapshots=None,
previous_gateway_managed_virtual_layer=False,
gateway_managed_virtual_layer=False,
environment_statements=[],
)

# Should raise PlanError during dev plan too
with pytest.raises(
PlanError, match="Plan requires a destructive change to a forward-only model"
):
PlanBuilder(context_diff, forward_only=False, is_dev=True).build()


def test_forward_only_destructive_change_dev_plan_warn(
mocker,
make_snapshot_on_destructive_change,
):
"""Issue #5719: Destructive schema changes with on_destructive_change=WARN
should log a warning during dev plans."""
snapshot_a_old, snapshot_a = make_snapshot_on_destructive_change(
on_destructive_change=OnDestructiveChange.WARN
)

context_diff = ContextDiff(
environment="dev",
is_new_environment=True,
is_unfinalized_environment=False,
normalize_environment_name=True,
create_from="prod",
create_from_env_exists=True,
added=set(),
removed_snapshots={},
modified_snapshots={snapshot_a.name: (snapshot_a, snapshot_a_old)},
snapshots={snapshot_a.snapshot_id: snapshot_a, snapshot_a_old.snapshot_id: snapshot_a_old},
new_snapshots={snapshot_a.snapshot_id: snapshot_a},
previous_plan_id=None,
previously_promoted_snapshot_ids=set(),
previous_finalized_snapshots=None,
previous_gateway_managed_virtual_layer=False,
gateway_managed_virtual_layer=False,
environment_statements=[],
)

console = TerminalConsole()
log_destructive_spy = mocker.spy(console, "log_destructive_change")

# Should NOT raise, but should log a warning
PlanBuilder(context_diff, forward_only=False, is_dev=True, console=console).build()
assert log_destructive_spy.call_count == 1


def test_forward_only_destructive_change_dev_plan_allow(
make_snapshot_on_destructive_change,
):
"""Issue #5719: Destructive schema changes with on_destructive_change=ALLOW
should pass silently during dev plans."""
snapshot_a_old, snapshot_a = make_snapshot_on_destructive_change(
on_destructive_change=OnDestructiveChange.ALLOW
)

context_diff = ContextDiff(
environment="dev",
is_new_environment=True,
is_unfinalized_environment=False,
normalize_environment_name=True,
create_from="prod",
create_from_env_exists=True,
added=set(),
removed_snapshots={},
modified_snapshots={snapshot_a.name: (snapshot_a, snapshot_a_old)},
snapshots={snapshot_a.snapshot_id: snapshot_a, snapshot_a_old.snapshot_id: snapshot_a_old},
new_snapshots={snapshot_a.snapshot_id: snapshot_a},
previous_plan_id=None,
previously_promoted_snapshot_ids=set(),
previous_finalized_snapshots=None,
previous_gateway_managed_virtual_layer=False,
gateway_managed_virtual_layer=False,
environment_statements=[],
)

# Should build successfully
PlanBuilder(context_diff, forward_only=False, is_dev=True).build()


def test_forward_only_indirect_destructive_change_dev_plan(
make_snapshot,
make_snapshot_on_destructive_change,
):
"""Issue #5719: Indirectly modified downstream forward-only models should
also be checked for destructive schema changes during dev plans.
In this case, A has ALLOW so its check passes, and B is only indirectly
modified with no schema change of its own, so the plan should succeed."""
# A is directly modified with ALLOW so it passes its own check
snapshot_a_old, snapshot_a = make_snapshot_on_destructive_change(
on_destructive_change=OnDestructiveChange.ALLOW,
old_query="select '1' as one, '2' as two, '2022-01-01' ds",
new_query="select '1' as one, '2022-01-01' ds",
)

# B is a downstream forward-only model — same query in both versions
b_query = "select one, '2022-01-01' ds from a"

snapshot_b_old = make_snapshot(
SqlModel(
name="b",
dialect="duckdb",
query=parse_one(b_query),
kind=IncrementalByTimeRangeKind(time_column="ds", forward_only=True),
),
nodes={'"a"': snapshot_a_old.model},
)

snapshot_b = make_snapshot(
SqlModel(
name="b",
dialect="duckdb",
query=parse_one(b_query),
kind=IncrementalByTimeRangeKind(time_column="ds", forward_only=True),
),
nodes={'"a"': snapshot_a.model},
)
snapshot_b.previous_versions = (
SnapshotDataVersion(
fingerprint=SnapshotFingerprint(
data_hash="test_data_hash_b",
metadata_hash="test_metadata_hash_b",
),
version="test_version_b",
dev_table_suffix="dev",
),
)

context_diff = ContextDiff(
environment="dev",
is_new_environment=True,
is_unfinalized_environment=False,
normalize_environment_name=True,
create_from="prod",
create_from_env_exists=True,
added=set(),
removed_snapshots={},
modified_snapshots={
snapshot_a.name: (snapshot_a, snapshot_a_old),
snapshot_b.name: (snapshot_b, snapshot_b_old),
},
snapshots={
snapshot_a.snapshot_id: snapshot_a,
snapshot_b.snapshot_id: snapshot_b,
},
new_snapshots={
snapshot_a.snapshot_id: snapshot_a,
snapshot_b.snapshot_id: snapshot_b,
},
previous_plan_id=None,
previously_promoted_snapshot_ids=set(),
previous_finalized_snapshots=None,
previous_gateway_managed_virtual_layer=False,
gateway_managed_virtual_layer=False,
environment_statements=[],
)

# B's schema didn't change from its own query perspective, so plan succeeds
PlanBuilder(context_diff, forward_only=False, is_dev=True).build()


def test_forward_only_flag_destructive_change_dev_plan(
make_snapshot,
):
"""Issue #5719: When using the --forward-only flag on a dev plan, destructive
schema changes should be caught at plan-build time."""
snapshot_old = make_snapshot(
SqlModel(
name="a",
dialect="duckdb",
query=parse_one("select '1' as one, '2' as two, '2022-01-01' ds"),
kind=IncrementalByTimeRangeKind(time_column="ds"),
)
)

snapshot = make_snapshot(
SqlModel(
name="a",
dialect="duckdb",
query=parse_one("select 1 as one, 2 as two, '2022-01-01' ds"),
kind=IncrementalByTimeRangeKind(time_column="ds"),
)
)

context_diff = ContextDiff(
environment="dev",
is_new_environment=True,
is_unfinalized_environment=False,
normalize_environment_name=True,
create_from="prod",
create_from_env_exists=True,
added=set(),
removed_snapshots={},
modified_snapshots={snapshot.name: (snapshot, snapshot_old)},
snapshots={snapshot.snapshot_id: snapshot, snapshot_old.snapshot_id: snapshot_old},
new_snapshots={snapshot.snapshot_id: snapshot},
previous_plan_id=None,
previously_promoted_snapshot_ids=set(),
previous_finalized_snapshots=None,
previous_gateway_managed_virtual_layer=False,
gateway_managed_virtual_layer=False,
environment_statements=[],
)

# forward_only=True flag + is_dev=True should still catch destructive type changes
with pytest.raises(
PlanError, match="Plan requires a destructive change to a forward-only model"
):
PlanBuilder(context_diff, forward_only=True, is_dev=True).build()


def test_missing_intervals_lookback(make_snapshot, mocker: MockerFixture):
snapshot_a = make_snapshot(
SqlModel(
Expand Down