|
| 1 | +# Copyright Materialize, Inc. and contributors. All rights reserved. |
| 2 | +# |
| 3 | +# Use of this software is governed by the Business Source License |
| 4 | +# included in the LICENSE file at the root of this repository. |
| 5 | +# |
| 6 | +# As of the Change Date specified in that file, in accordance with |
| 7 | +# the Business Source License, use of this software will be governed |
| 8 | +# by the Apache License, Version 2.0. |
| 9 | + |
| 10 | +# Test PARTITION BY syntax for Iceberg sinks. |
| 11 | + |
| 12 | +> CREATE SECRET IF NOT EXISTS access_key_secret AS '${arg.s3-access-key}' |
| 13 | + |
| 14 | +> CREATE CONNECTION IF NOT EXISTS aws_conn TO AWS ( |
| 15 | + ACCESS KEY ID = 'tduser', |
| 16 | + SECRET ACCESS KEY = SECRET access_key_secret, |
| 17 | + ENDPOINT = '${arg.aws-endpoint}', |
| 18 | + REGION = 'us-east-1' |
| 19 | + ); |
| 20 | + |
| 21 | +> CREATE CONNECTION IF NOT EXISTS polaris TO ICEBERG CATALOG ( |
| 22 | + CATALOG TYPE = 'REST', |
| 23 | + URL = 'http://polaris:8181/api/catalog', |
| 24 | + CREDENTIAL = 'root:root', |
| 25 | + WAREHOUSE = 'default_catalog', |
| 26 | + SCOPE = 'PRINCIPAL_ROLE:ALL' |
| 27 | + ); |
| 28 | + |
| 29 | +# ---- Positive: identity partition ---- |
| 30 | + |
| 31 | +> CREATE TABLE part_identity (region text, v int); |
| 32 | + |
| 33 | +> CREATE SINK part_identity_sink |
| 34 | + FROM part_identity |
| 35 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 36 | + NAMESPACE 'default_namespace', |
| 37 | + TABLE 'part_identity_table' |
| 38 | + ) |
| 39 | + USING AWS CONNECTION aws_conn |
| 40 | + PARTITION BY (region) |
| 41 | + MODE APPEND |
| 42 | + WITH (COMMIT INTERVAL '1s'); |
| 43 | + |
| 44 | +> DROP SINK part_identity_sink; |
| 45 | +> DROP TABLE part_identity; |
| 46 | + |
| 47 | +# ---- Positive: temporal partition ---- |
| 48 | + |
| 49 | +> CREATE TABLE part_temporal (ts timestamp, v int); |
| 50 | + |
| 51 | +> CREATE SINK part_temporal_sink |
| 52 | + FROM part_temporal |
| 53 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 54 | + NAMESPACE 'default_namespace', |
| 55 | + TABLE 'part_temporal_table' |
| 56 | + ) |
| 57 | + USING AWS CONNECTION aws_conn |
| 58 | + PARTITION BY (day(ts)) |
| 59 | + MODE APPEND |
| 60 | + WITH (COMMIT INTERVAL '1s'); |
| 61 | + |
| 62 | +> DROP SINK part_temporal_sink; |
| 63 | +> DROP TABLE part_temporal; |
| 64 | + |
| 65 | +# ---- Positive: bucket partition ---- |
| 66 | + |
| 67 | +> CREATE TABLE part_bucket (user_id int, v text); |
| 68 | + |
| 69 | +> CREATE SINK part_bucket_sink |
| 70 | + FROM part_bucket |
| 71 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 72 | + NAMESPACE 'default_namespace', |
| 73 | + TABLE 'part_bucket_table' |
| 74 | + ) |
| 75 | + USING AWS CONNECTION aws_conn |
| 76 | + KEY (user_id) NOT ENFORCED |
| 77 | + PARTITION BY (bucket(16, user_id)) |
| 78 | + MODE UPSERT |
| 79 | + WITH (COMMIT INTERVAL '1s'); |
| 80 | + |
| 81 | +> DROP SINK part_bucket_sink; |
| 82 | +> DROP TABLE part_bucket; |
| 83 | + |
| 84 | +# ---- Positive: multiple partition fields ---- |
| 85 | + |
| 86 | +> CREATE TABLE part_multi (region text, ts timestamp, v int); |
| 87 | + |
| 88 | +> CREATE SINK part_multi_sink |
| 89 | + FROM part_multi |
| 90 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 91 | + NAMESPACE 'default_namespace', |
| 92 | + TABLE 'part_multi_table' |
| 93 | + ) |
| 94 | + USING AWS CONNECTION aws_conn |
| 95 | + PARTITION BY (region, month(ts)) |
| 96 | + MODE APPEND |
| 97 | + WITH (COMMIT INTERVAL '1s'); |
| 98 | + |
| 99 | +> DROP SINK part_multi_sink; |
| 100 | +> DROP TABLE part_multi; |
| 101 | + |
| 102 | +# ---- Negative: nonexistent column ---- |
| 103 | + |
| 104 | +> CREATE TABLE part_bad_col (a int, b text); |
| 105 | + |
| 106 | +! CREATE SINK part_bad_col_sink |
| 107 | + FROM part_bad_col |
| 108 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 109 | + NAMESPACE 'default_namespace', |
| 110 | + TABLE 'part_bad_col_table' |
| 111 | + ) |
| 112 | + USING AWS CONNECTION aws_conn |
| 113 | + PARTITION BY (no_such_col) |
| 114 | + MODE APPEND |
| 115 | + WITH (COMMIT INTERVAL '1s'); |
| 116 | +contains:column referenced in PARTITION BY does not exist |
| 117 | + |
| 118 | +> DROP TABLE part_bad_col; |
| 119 | + |
| 120 | +# ---- Negative: type incompatibility (day on text) ---- |
| 121 | + |
| 122 | +> CREATE TABLE part_bad_type (name text, v int); |
| 123 | + |
| 124 | +! CREATE SINK part_bad_type_sink |
| 125 | + FROM part_bad_type |
| 126 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 127 | + NAMESPACE 'default_namespace', |
| 128 | + TABLE 'part_bad_type_table' |
| 129 | + ) |
| 130 | + USING AWS CONNECTION aws_conn |
| 131 | + PARTITION BY (day(name)) |
| 132 | + MODE APPEND |
| 133 | + WITH (COMMIT INTERVAL '1s'); |
| 134 | +contains:PARTITION BY transform |
| 135 | + |
| 136 | +> DROP TABLE part_bad_type; |
| 137 | + |
| 138 | +# ---- Negative: partition field name collides with schema column ---- |
| 139 | +# day(foo) generates partition field name "foo_day", which collides |
| 140 | +# with the schema column "foo_day". |
| 141 | + |
| 142 | +> CREATE TABLE part_collision (foo date, foo_day text); |
| 143 | + |
| 144 | +! CREATE SINK part_collision_sink |
| 145 | + FROM part_collision |
| 146 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 147 | + NAMESPACE 'default_namespace', |
| 148 | + TABLE 'part_collision_table' |
| 149 | + ) |
| 150 | + USING AWS CONNECTION aws_conn |
| 151 | + PARTITION BY (day(foo)) |
| 152 | + MODE APPEND |
| 153 | + WITH (COMMIT INTERVAL '1s'); |
| 154 | +contains:conflicts with schema field |
| 155 | + |
| 156 | +> DROP TABLE part_collision; |
| 157 | + |
| 158 | +# ========================================================================== |
| 159 | +# Existing-table validation (table_matches_partitions) |
| 160 | +# |
| 161 | +# These tests create a sink to materialize the Iceberg table, drop the sink, |
| 162 | +# then create a second sink against the same table with a different partition |
| 163 | +# spec. The mismatch is detected at runtime by the storage worker. |
| 164 | +# ========================================================================== |
| 165 | + |
| 166 | +# ---- Positive: matching partition spec on existing table ---- |
| 167 | + |
| 168 | +> CREATE TABLE match_ok (ts timestamp, v int); |
| 169 | + |
| 170 | +> CREATE SINK match_ok_sink_1 |
| 171 | + FROM match_ok |
| 172 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 173 | + NAMESPACE 'default_namespace', |
| 174 | + TABLE 'match_ok_table' |
| 175 | + ) |
| 176 | + USING AWS CONNECTION aws_conn |
| 177 | + PARTITION BY (day(ts)) |
| 178 | + MODE APPEND |
| 179 | + WITH (COMMIT INTERVAL '1s'); |
| 180 | + |
| 181 | +> SELECT status FROM mz_internal.mz_sink_statuses WHERE name = 'match_ok_sink_1' |
| 182 | +running |
| 183 | + |
| 184 | +> DROP SINK match_ok_sink_1; |
| 185 | + |
| 186 | +# Same partition spec against the same table — should succeed. |
| 187 | +> CREATE SINK match_ok_sink_2 |
| 188 | + FROM match_ok |
| 189 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 190 | + NAMESPACE 'default_namespace', |
| 191 | + TABLE 'match_ok_table' |
| 192 | + ) |
| 193 | + USING AWS CONNECTION aws_conn |
| 194 | + PARTITION BY (day(ts)) |
| 195 | + MODE APPEND |
| 196 | + WITH (COMMIT INTERVAL '1s'); |
| 197 | + |
| 198 | +> SELECT status FROM mz_internal.mz_sink_statuses WHERE name = 'match_ok_sink_2' |
| 199 | +running |
| 200 | + |
| 201 | +> DROP SINK match_ok_sink_2; |
| 202 | +> DROP TABLE match_ok; |
| 203 | + |
| 204 | +# ---- Negative: partitioned table, no PARTITION BY ---- |
| 205 | + |
| 206 | +> CREATE TABLE mismatch_partitioned (region text, v int); |
| 207 | + |
| 208 | +> CREATE SINK mismatch_partitioned_sink_1 |
| 209 | + FROM mismatch_partitioned |
| 210 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 211 | + NAMESPACE 'default_namespace', |
| 212 | + TABLE 'mismatch_partitioned_table' |
| 213 | + ) |
| 214 | + USING AWS CONNECTION aws_conn |
| 215 | + PARTITION BY (region) |
| 216 | + MODE APPEND |
| 217 | + WITH (COMMIT INTERVAL '1s'); |
| 218 | + |
| 219 | +> SELECT status FROM mz_internal.mz_sink_statuses WHERE name = 'mismatch_partitioned_sink_1' |
| 220 | +running |
| 221 | + |
| 222 | +> DROP SINK mismatch_partitioned_sink_1; |
| 223 | + |
| 224 | +# No PARTITION BY against a partitioned table. |
| 225 | +> CREATE SINK mismatch_partitioned_sink_2 |
| 226 | + FROM mismatch_partitioned |
| 227 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 228 | + NAMESPACE 'default_namespace', |
| 229 | + TABLE 'mismatch_partitioned_table' |
| 230 | + ) |
| 231 | + USING AWS CONNECTION aws_conn |
| 232 | + MODE APPEND |
| 233 | + WITH (COMMIT INTERVAL '1s'); |
| 234 | + |
| 235 | +> SELECT st.error LIKE '%partitioned but sink has no PARTITION BY%' |
| 236 | + FROM mz_sinks s JOIN mz_internal.mz_sink_statuses st ON s.id = st.id |
| 237 | + WHERE s.name = 'mismatch_partitioned_sink_2' |
| 238 | +true |
| 239 | + |
| 240 | +> DROP SINK mismatch_partitioned_sink_2; |
| 241 | +> DROP TABLE mismatch_partitioned; |
| 242 | + |
| 243 | +# ---- Negative: unpartitioned table, with PARTITION BY ---- |
| 244 | + |
| 245 | +> CREATE TABLE mismatch_unpart (region text, v int); |
| 246 | + |
| 247 | +> CREATE SINK mismatch_unpart_sink_1 |
| 248 | + FROM mismatch_unpart |
| 249 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 250 | + NAMESPACE 'default_namespace', |
| 251 | + TABLE 'mismatch_unpart_table' |
| 252 | + ) |
| 253 | + USING AWS CONNECTION aws_conn |
| 254 | + MODE APPEND |
| 255 | + WITH (COMMIT INTERVAL '1s'); |
| 256 | + |
| 257 | +> SELECT status FROM mz_internal.mz_sink_statuses WHERE name = 'mismatch_unpart_sink_1' |
| 258 | +running |
| 259 | + |
| 260 | +> DROP SINK mismatch_unpart_sink_1; |
| 261 | + |
| 262 | +# PARTITION BY against an unpartitioned table. |
| 263 | +> CREATE SINK mismatch_unpart_sink_2 |
| 264 | + FROM mismatch_unpart |
| 265 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 266 | + NAMESPACE 'default_namespace', |
| 267 | + TABLE 'mismatch_unpart_table' |
| 268 | + ) |
| 269 | + USING AWS CONNECTION aws_conn |
| 270 | + PARTITION BY (region) |
| 271 | + MODE APPEND |
| 272 | + WITH (COMMIT INTERVAL '1s'); |
| 273 | + |
| 274 | +> SELECT st.error LIKE '%unpartitioned but sink specifies PARTITION BY%' |
| 275 | + FROM mz_sinks s JOIN mz_internal.mz_sink_statuses st ON s.id = st.id |
| 276 | + WHERE s.name = 'mismatch_unpart_sink_2' |
| 277 | +true |
| 278 | + |
| 279 | +> DROP SINK mismatch_unpart_sink_2; |
| 280 | +> DROP TABLE mismatch_unpart; |
| 281 | + |
| 282 | +# ---- Negative: different transforms ---- |
| 283 | + |
| 284 | +> CREATE TABLE mismatch_transform (ts timestamp, v int); |
| 285 | + |
| 286 | +> CREATE SINK mismatch_transform_sink_1 |
| 287 | + FROM mismatch_transform |
| 288 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 289 | + NAMESPACE 'default_namespace', |
| 290 | + TABLE 'mismatch_transform_table' |
| 291 | + ) |
| 292 | + USING AWS CONNECTION aws_conn |
| 293 | + PARTITION BY (day(ts)) |
| 294 | + MODE APPEND |
| 295 | + WITH (COMMIT INTERVAL '1s'); |
| 296 | + |
| 297 | +> SELECT status FROM mz_internal.mz_sink_statuses WHERE name = 'mismatch_transform_sink_1' |
| 298 | +running |
| 299 | + |
| 300 | +> DROP SINK mismatch_transform_sink_1; |
| 301 | + |
| 302 | +# month(ts) against a table partitioned by day(ts). |
| 303 | +> CREATE SINK mismatch_transform_sink_2 |
| 304 | + FROM mismatch_transform |
| 305 | + INTO ICEBERG CATALOG CONNECTION polaris ( |
| 306 | + NAMESPACE 'default_namespace', |
| 307 | + TABLE 'mismatch_transform_table' |
| 308 | + ) |
| 309 | + USING AWS CONNECTION aws_conn |
| 310 | + PARTITION BY (month(ts)) |
| 311 | + MODE APPEND |
| 312 | + WITH (COMMIT INTERVAL '1s'); |
| 313 | + |
| 314 | +> SELECT st.error LIKE '%partition spec mismatch%' |
| 315 | + FROM mz_sinks s JOIN mz_internal.mz_sink_statuses st ON s.id = st.id |
| 316 | + WHERE s.name = 'mismatch_transform_sink_2' |
| 317 | +true |
| 318 | + |
| 319 | +> DROP SINK mismatch_transform_sink_2; |
| 320 | +> DROP TABLE mismatch_transform; |
0 commit comments