Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions LICENSE
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge skew

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Business Source License 1.1

Licensor: Materialize, Inc.

Licensed Work: Materialize Version 20260416
Licensed Work: Materialize Version 20260415
The Licensed Work is © 2026 Materialize, Inc.

Additional Use Grant: Within a single installation of the Licensed Work,
Expand All @@ -39,7 +39,7 @@ Additional Use Grant: Within a single installation of the Licensed Work,
whose definitions are controlled by such third
parties.

Change Date: April 16, 2030
Change Date: April 15, 2030

Change License: Apache License, Version 2.0

Expand Down
24 changes: 0 additions & 24 deletions console/types/materialize.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1047,19 +1047,6 @@ export interface MzConsoleClusterUtilizationOverview {
size: string | null;
}

export interface MzContinualTasks {
cluster_id: Generated<string>;
create_sql: Generated<string>;
definition: Generated<string>;
id: Generated<string>;
name: Generated<string>;
oid: Generated<number>;
owner_id: Generated<string>;
privileges: Generated<string[]>;
redacted_create_sql: Generated<string>;
schema_id: Generated<string>;
}

export interface MzDatabases {
/**
* Materialize's unique ID for the database.
Expand Down Expand Up @@ -3007,15 +2994,6 @@ export interface MzShowConnections {
type: string;
}

export interface MzShowContinualTasks {
cluster: string;
cluster_id: string;
comment: string;
id: string;
name: string;
schema_id: string;
}

export interface MzShowDatabasePrivileges {
/**
* The role that the privilege was granted to.
Expand Down Expand Up @@ -4302,7 +4280,6 @@ export interface DB {
mz_compute_operator_hydration_statuses_per_worker: MzComputeOperatorHydrationStatusesPerWorker;
mz_connections: MzConnections;
mz_console_cluster_utilization_overview: MzConsoleClusterUtilizationOverview;
mz_continual_tasks: MzContinualTasks;
mz_databases: MzDatabases;
mz_dataflow_addresses: MzDataflowAddresses;
mz_dataflow_addresses_per_worker: MzDataflowAddressesPerWorker;
Expand Down Expand Up @@ -4415,7 +4392,6 @@ export interface DB {
mz_show_clusters: MzShowClusters;
mz_show_columns: MzShowColumns;
mz_show_connections: MzShowConnections;
mz_show_continual_tasks: MzShowContinualTasks;
mz_show_database_privileges: MzShowDatabasePrivileges;
mz_show_databases: MzShowDatabases;
mz_show_default_privileges: MzShowDefaultPrivileges;
Expand Down
9 changes: 0 additions & 9 deletions doc/developer/generated/adapter/continual_task.md

This file was deleted.

This file was deleted.

10 changes: 0 additions & 10 deletions doc/developer/generated/compute/render/continual_task.md

This file was deleted.

2 changes: 0 additions & 2 deletions doc/user/content/reference/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -1390,7 +1390,6 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_catalog_raw -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_cluster_workload_classes -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_compute_error_counts_raw_unified -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_continual_tasks -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_recent_activity_log_redacted -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_recent_activity_log_thinned -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_aggregates -->
Expand All @@ -1403,7 +1402,6 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_cluster_replicas -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_columns -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_connections -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_continual_tasks -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_databases -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_indexes -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_materialized_views -->
Expand Down
3 changes: 2 additions & 1 deletion doc/user/content/releases/v26.20.md
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unintentional!

Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
---
title: Materialize v26.20
date: 2026-04-15
released: true
released: false
patch: 0
rc: 1
publish_helm_chart: true
build:
render: never
Expand Down
171 changes: 47 additions & 124 deletions misc/python/materialize/checks/all_checks/continual_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,145 +10,68 @@

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion

# CTs were introduced in v0.127.0 and removed in v26.21.0.
CT_MIN_VERSION = MzVersion.parse_mz("v0.127.0-dev")
CT_REMOVED_VERSION = MzVersion.parse_mz("v26.21.0-dev")

def schemas() -> str:
return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)

class ContinualTaskMigration(Check):
"""Test that continual task removal is handled on upgrade.

class AuditLogCT(Check):
"""Continual Task for audit logging"""
When upgrading from a version that had CTs to one that removed them,
verify that CT syntax is rejected and no CT objects remain in the catalog.

def _can_run(self, e: Executor) -> bool:
return self.base_version > MzVersion.parse_mz("v0.127.0-dev")

def initialize(self) -> Testdrive:
return Testdrive(schemas() + dedent("""
> CREATE TABLE t_input (key INT);
> INSERT INTO t_input VALUES (1);
> CREATE MATERIALIZED VIEW anomalies AS SELECT sum(key)::INT FROM t_input;
> CREATE CONTINUAL TASK audit_log (count INT) ON INPUT anomalies AS (
INSERT INTO audit_log SELECT * FROM anomalies WHERE sum IS NOT NULL;
)
"""))

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
for s in [
"""
> INSERT INTO t_input VALUES (2), (3);
""",
"""
> INSERT INTO t_input VALUES (4), (5), (6);
""",
]
]

def validate(self) -> Testdrive:
return Testdrive(dedent("""
> SELECT * FROM audit_log
1
6
21
"""))


class StreamTableJoinCT(Check):
"""Continual Task for stream table join"""
Note: we cannot create CTs from this check because the new testdrive binary
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wow, when did that start?! How can we test compatibility against old versions then?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had no idea, we always had that. This means the test, and all tests in continual_task.py are now useless. They should instead use $ postgres-execute at least, but I guess with the intentional found continual task "ct_audit_log" in catalog; drop all continual tasks before upgrading panic they can't do much, so we can remove them after all.

parses SQL locally and rejects CT syntax. The catalog migration that removes
existing CTs is tested in v81_to_v82.rs unit tests instead.
"""

def _can_run(self, e: Executor) -> bool:
return self.base_version > MzVersion.parse_mz("v0.127.0-dev")
# Only meaningful when upgrading from a version that had CTs
# to a version that removed them.
return (
self.base_version >= CT_MIN_VERSION
and self.base_version < CT_REMOVED_VERSION
)

def initialize(self) -> Testdrive:
return Testdrive(schemas() + dedent("""
> CREATE TABLE big (key INT);
> CREATE TABLE small (key INT, val STRING);
> INSERT INTO small VALUES (1, 'v1');
> INSERT INTO small VALUES (2, 'v2');
> INSERT INTO small VALUES (3, 'v3');
> INSERT INTO small VALUES (4, 'v4');
> INSERT INTO small VALUES (5, 'v5');
> CREATE CONTINUAL TASK stj (key INT, val STRING) ON INPUT big AS (
INSERT INTO stj SELECT b.key, s.val FROM big b JOIN small s ON b.key = s.key;
)
> INSERT INTO big VALUES (1), (2), (3), (4), (5)
"""))

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
for s in [
"""
> UPDATE small SET val = 'v' || val;
> INSERT INTO big VALUES (1), (2), (3), (4), (5)
""",
"""
> UPDATE small SET val = 'v' || val;
> INSERT INTO big VALUES (1), (2), (3), (4), (5)
""",
]
]

def validate(self) -> Testdrive:
return Testdrive(dedent("""
> SELECT * FROM stj
1 v1
2 v2
3 v3
4 v4
5 v5
1 vv1
2 vv2
3 vv3
4 vv4
5 vv5
1 vvv1
2 vvv2
3 vvv3
4 vvv4
5 vvv5
"""))


class UpsertCT(Check):
"""Continual Task for upserts"""

def _can_run(self, e: Executor) -> bool:
return self.base_version > MzVersion.parse_mz("v0.127.0-dev")

def initialize(self) -> Testdrive:
return Testdrive(schemas() + dedent("""
> CREATE TABLE append_only (key INT, val INT);
> CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS (
DELETE FROM upsert WHERE key IN (SELECT key FROM append_only);
INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key;
)
> INSERT INTO append_only VALUES (1, 2), (1, 1)
"""))
> CREATE TABLE ct_migration_input (key INT);
> INSERT INTO ct_migration_input VALUES (1);
> CREATE MATERIALIZED VIEW ct_migration_mv AS SELECT sum(key)::INT AS s FROM ct_migration_input;
"""))

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
for s in [
"""
> INSERT INTO append_only VALUES (1, 3), (2, 4)
""",
"""
> INSERT INTO append_only VALUES (1, 5), (2, 6), (3, 7);
""",
]
Testdrive(dedent("""
> INSERT INTO ct_migration_input VALUES (2), (3);
""")),
Testdrive(dedent("""
> INSERT INTO ct_migration_input VALUES (4), (5);
""")),
]

def validate(self) -> Testdrive:
return Testdrive(dedent("""
> INSERT INTO append_only VALUES (3, 8);

> SELECT * FROM upsert
1 5
2 6
3 8
"""))
if self.current_version >= CT_REMOVED_VERSION:
# After upgrade: no CT objects should remain, CT syntax rejected.
return Testdrive(dedent("""
> SELECT count(*) FROM mz_objects WHERE type = 'continual-task';
0

! CREATE CONTINUAL TASK ct_should_fail (x INT) ON INPUT ct_migration_mv AS (
INSERT INTO ct_should_fail SELECT * FROM ct_migration_mv;
)
contains:Expected DATABASE

> SELECT s FROM ct_migration_mv;
15
"""))
else:
# Still on old version: basic objects should work.
return Testdrive(dedent("""
> SELECT s FROM ct_migration_mv;
15
"""))
4 changes: 0 additions & 4 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ def get_minimal_system_parameters(
"enable_compute_correction_v2": "true",
"enable_compute_logical_backpressure": "true",
"enable_connection_validation_syntax": "true",
"enable_continual_task_create": "true",
"enable_continual_task_retain": "true",
"enable_continual_task_transform": "true",
"enable_copy_to_expr": "true",
"enable_copy_from_remote": "true",
"enable_create_table_from_source": "true",
Expand Down Expand Up @@ -618,7 +615,6 @@ def get_default_system_parameters(
"with_0dt_caught_up_check_cutoff",
"enable_0dt_caught_up_replica_status_check",
"plan_insights_notice_fast_path_clusters_optimize_duration",
"enable_continual_task_builtins",
"enable_expression_cache",
"mz_metrics_lgalloc_map_refresh_interval",
"mz_metrics_lgalloc_refresh_interval",
Expand Down
1 change: 0 additions & 1 deletion misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,6 @@ def __init__(
"enable_statement_lifecycle_logging",
"enable_introspection_subscribes",
"plan_insights_notice_fast_path_clusters_optimize_duration",
"enable_continual_task_builtins",
"enable_expression_cache",
"enable_multi_replica_sources",
"enable_password_auth",
Expand Down
1 change: 0 additions & 1 deletion misc/wasm/src/sql-parser-wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ fn in_cluster(stmt: &Statement<mz_sql_parser::ast::Raw>) -> Option<String> {
Statement::CreateSource(src) => src.in_cluster.clone(),
Statement::CreateWebhookSource(src) => src.in_cluster.clone(),
Statement::CreateSink(src) => src.in_cluster.clone(),
Statement::CreateContinualTask(ct) => ct.in_cluster.clone(),
_ => None,
};

Expand Down
8 changes: 0 additions & 8 deletions src/adapter-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,6 @@ pub const PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION: Config<Dura
"Enable plan insights fast path clusters calculation if the optimize step took less than this duration.",
);

/// Whether to create system builtin continual tasks on boot.
pub const ENABLE_CONTINUAL_TASK_BUILTINS: Config<bool> = Config::new(
"enable_continual_task_builtins",
false,
"Create system builtin continual tasks on boot.",
);

/// Whether to use an expression cache on boot.
pub const ENABLE_EXPRESSION_CACHE: Config<bool> = Config::new(
"enable_expression_cache",
Expand Down Expand Up @@ -229,7 +222,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&ENABLE_INTROSPECTION_SUBSCRIBES)
.add(&ENABLE_FRONTEND_SUBSCRIBES)
.add(&PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION)
.add(&ENABLE_CONTINUAL_TASK_BUILTINS)
.add(&ENABLE_EXPRESSION_CACHE)
.add(&ENABLE_MULTI_REPLICA_SOURCES)
.add(&ENABLE_PASSWORD_AUTH)
Expand Down
Loading
Loading