-
Notifications
You must be signed in to change notification settings - Fork 499
Remove continual tasks (CT) feature #35967
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
This file was deleted.
This file was deleted.
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems unintentional! |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,145 +10,68 @@ | |
|
|
||
| from materialize.checks.actions import Testdrive | ||
| from materialize.checks.checks import Check | ||
| from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD | ||
| from materialize.checks.executors import Executor | ||
| from materialize.mz_version import MzVersion | ||
|
|
||
| # CTs were introduced in v0.127.0 and removed in v26.21.0. | ||
| CT_MIN_VERSION = MzVersion.parse_mz("v0.127.0-dev") | ||
| CT_REMOVED_VERSION = MzVersion.parse_mz("v26.21.0-dev") | ||
|
|
||
| def schemas() -> str: | ||
| return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) | ||
|
|
||
| class ContinualTaskMigration(Check): | ||
| """Test that continual task removal is handled on upgrade. | ||
|
|
||
| class AuditLogCT(Check): | ||
| """Continual Task for audit logging""" | ||
| When upgrading from a version that had CTs to one that removed them, | ||
| verify that CT syntax is rejected and no CT objects remain in the catalog. | ||
|
|
||
| def _can_run(self, e: Executor) -> bool: | ||
| return self.base_version > MzVersion.parse_mz("v0.127.0-dev") | ||
|
|
||
| def initialize(self) -> Testdrive: | ||
| return Testdrive(schemas() + dedent(""" | ||
| > CREATE TABLE t_input (key INT); | ||
| > INSERT INTO t_input VALUES (1); | ||
| > CREATE MATERIALIZED VIEW anomalies AS SELECT sum(key)::INT FROM t_input; | ||
| > CREATE CONTINUAL TASK audit_log (count INT) ON INPUT anomalies AS ( | ||
| INSERT INTO audit_log SELECT * FROM anomalies WHERE sum IS NOT NULL; | ||
| ) | ||
| """)) | ||
|
|
||
| def manipulate(self) -> list[Testdrive]: | ||
| return [ | ||
| Testdrive(schemas() + dedent(s)) | ||
| for s in [ | ||
| """ | ||
| > INSERT INTO t_input VALUES (2), (3); | ||
| """, | ||
| """ | ||
| > INSERT INTO t_input VALUES (4), (5), (6); | ||
| """, | ||
| ] | ||
| ] | ||
|
|
||
| def validate(self) -> Testdrive: | ||
| return Testdrive(dedent(""" | ||
| > SELECT * FROM audit_log | ||
| 1 | ||
| 6 | ||
| 21 | ||
| """)) | ||
|
|
||
|
|
||
| class StreamTableJoinCT(Check): | ||
| """Continual Task for stream table join""" | ||
| Note: we cannot create CTs from this check because the new testdrive binary | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh wow, when did that start?! How can we test compatibility against old versions then?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had no idea, we always had that. This means the test, and all tests in continual_task.py are now useless. They should instead use |
||
| parses SQL locally and rejects CT syntax. The catalog migration that removes | ||
| existing CTs is tested in v81_to_v82.rs unit tests instead. | ||
| """ | ||
|
|
||
| def _can_run(self, e: Executor) -> bool: | ||
| return self.base_version > MzVersion.parse_mz("v0.127.0-dev") | ||
| # Only meaningful when upgrading from a version that had CTs | ||
| # to a version that removed them. | ||
| return ( | ||
| self.base_version >= CT_MIN_VERSION | ||
| and self.base_version < CT_REMOVED_VERSION | ||
| ) | ||
|
|
||
| def initialize(self) -> Testdrive: | ||
| return Testdrive(schemas() + dedent(""" | ||
| > CREATE TABLE big (key INT); | ||
| > CREATE TABLE small (key INT, val STRING); | ||
| > INSERT INTO small VALUES (1, 'v1'); | ||
| > INSERT INTO small VALUES (2, 'v2'); | ||
| > INSERT INTO small VALUES (3, 'v3'); | ||
| > INSERT INTO small VALUES (4, 'v4'); | ||
| > INSERT INTO small VALUES (5, 'v5'); | ||
| > CREATE CONTINUAL TASK stj (key INT, val STRING) ON INPUT big AS ( | ||
| INSERT INTO stj SELECT b.key, s.val FROM big b JOIN small s ON b.key = s.key; | ||
| ) | ||
| > INSERT INTO big VALUES (1), (2), (3), (4), (5) | ||
| """)) | ||
|
|
||
| def manipulate(self) -> list[Testdrive]: | ||
| return [ | ||
| Testdrive(schemas() + dedent(s)) | ||
| for s in [ | ||
| """ | ||
| > UPDATE small SET val = 'v' || val; | ||
| > INSERT INTO big VALUES (1), (2), (3), (4), (5) | ||
| """, | ||
| """ | ||
| > UPDATE small SET val = 'v' || val; | ||
| > INSERT INTO big VALUES (1), (2), (3), (4), (5) | ||
| """, | ||
| ] | ||
| ] | ||
|
|
||
| def validate(self) -> Testdrive: | ||
| return Testdrive(dedent(""" | ||
| > SELECT * FROM stj | ||
| 1 v1 | ||
| 2 v2 | ||
| 3 v3 | ||
| 4 v4 | ||
| 5 v5 | ||
| 1 vv1 | ||
| 2 vv2 | ||
| 3 vv3 | ||
| 4 vv4 | ||
| 5 vv5 | ||
| 1 vvv1 | ||
| 2 vvv2 | ||
| 3 vvv3 | ||
| 4 vvv4 | ||
| 5 vvv5 | ||
| """)) | ||
|
|
||
|
|
||
| class UpsertCT(Check): | ||
| """Continual Task for upserts""" | ||
|
|
||
| def _can_run(self, e: Executor) -> bool: | ||
| return self.base_version > MzVersion.parse_mz("v0.127.0-dev") | ||
|
|
||
| def initialize(self) -> Testdrive: | ||
| return Testdrive(schemas() + dedent(""" | ||
| > CREATE TABLE append_only (key INT, val INT); | ||
| > CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS ( | ||
| DELETE FROM upsert WHERE key IN (SELECT key FROM append_only); | ||
| INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key; | ||
| ) | ||
| > INSERT INTO append_only VALUES (1, 2), (1, 1) | ||
| """)) | ||
| > CREATE TABLE ct_migration_input (key INT); | ||
| > INSERT INTO ct_migration_input VALUES (1); | ||
| > CREATE MATERIALIZED VIEW ct_migration_mv AS SELECT sum(key)::INT AS s FROM ct_migration_input; | ||
| """)) | ||
|
|
||
| def manipulate(self) -> list[Testdrive]: | ||
| return [ | ||
| Testdrive(schemas() + dedent(s)) | ||
| for s in [ | ||
| """ | ||
| > INSERT INTO append_only VALUES (1, 3), (2, 4) | ||
| """, | ||
| """ | ||
| > INSERT INTO append_only VALUES (1, 5), (2, 6), (3, 7); | ||
| """, | ||
| ] | ||
| Testdrive(dedent(""" | ||
| > INSERT INTO ct_migration_input VALUES (2), (3); | ||
| """)), | ||
| Testdrive(dedent(""" | ||
| > INSERT INTO ct_migration_input VALUES (4), (5); | ||
| """)), | ||
| ] | ||
|
|
||
| def validate(self) -> Testdrive: | ||
| return Testdrive(dedent(""" | ||
| > INSERT INTO append_only VALUES (3, 8); | ||
|
|
||
| > SELECT * FROM upsert | ||
| 1 5 | ||
| 2 6 | ||
| 3 8 | ||
| """)) | ||
| if self.current_version >= CT_REMOVED_VERSION: | ||
| # After upgrade: no CT objects should remain, CT syntax rejected. | ||
| return Testdrive(dedent(""" | ||
| > SELECT count(*) FROM mz_objects WHERE type = 'continual-task'; | ||
| 0 | ||
|
|
||
| ! CREATE CONTINUAL TASK ct_should_fail (x INT) ON INPUT ct_migration_mv AS ( | ||
| INSERT INTO ct_should_fail SELECT * FROM ct_migration_mv; | ||
| ) | ||
| contains:Expected DATABASE | ||
|
|
||
| > SELECT s FROM ct_migration_mv; | ||
| 15 | ||
| """)) | ||
| else: | ||
| # Still on old version: basic objects should work. | ||
| return Testdrive(dedent(""" | ||
| > SELECT s FROM ct_migration_mv; | ||
| 15 | ||
| """)) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merge skew