From 1faea460644e257da77e98d1fc8f3ada3f35ba30 Mon Sep 17 00:00:00 2001 From: murphy-4o Date: Mon, 9 Mar 2026 13:56:36 +0000 Subject: [PATCH] Merge pull request #97904 from murphy-4o/murphy_issue_97420 Support ALTER TABLE ... EXECUTE expire_snapshots for Iceberg tables --- docs/en/sql-reference/statements/grant.md | 2 + .../sql-reference/table-functions/iceberg.md | 95 +++ src/Access/Common/AccessType.h | 1 + src/Interpreters/InterpreterAlterQuery.cpp | 20 +- src/Parsers/ASTAlterQuery.cpp | 10 + src/Parsers/ASTAlterQuery.h | 6 + src/Parsers/ParserAlterQuery.cpp | 28 + src/Storages/IStorage.cpp | 5 + src/Storages/IStorage.h | 2 + .../DataLakes/IDataLakeMetadata.h | 14 + .../DataLakes/Iceberg/Constant.h | 18 + .../DataLakes/Iceberg/IcebergMetadata.cpp | 85 +++ .../DataLakes/Iceberg/IcebergMetadata.h | 9 + .../DataLakes/Iceberg/Mutations.cpp | 544 +++++++++++++++ .../DataLakes/Iceberg/Mutations.h | 22 + .../ObjectStorage/StorageObjectStorage.cpp | 9 + .../ObjectStorage/StorageObjectStorage.h | 2 + .../test_expire_snapshots.py | 630 ++++++++++++++++++ .../01271_show_privileges.reference | 1 + ...e_execute_expire_snapshots_parse.reference | 7 + ...r_table_execute_expire_snapshots_parse.sql | 23 + 21 files changed, 1532 insertions(+), 1 deletion(-) create mode 100644 tests/integration/test_storage_iceberg_with_spark/test_expire_snapshots.py create mode 100644 tests/queries/0_stateless/03978_alter_table_execute_expire_snapshots_parse.reference create mode 100644 tests/queries/0_stateless/03978_alter_table_execute_expire_snapshots_parse.sql diff --git a/docs/en/sql-reference/statements/grant.md b/docs/en/sql-reference/statements/grant.md index 1e4d5b6a99cd..da84cf6b2d71 100644 --- a/docs/en/sql-reference/statements/grant.md +++ b/docs/en/sql-reference/statements/grant.md @@ -200,6 +200,7 @@ The hierarchy of privileges in ClickHouse is shown below: - `ALTER MODIFY STATISTICS` - `ALTER TTL` - `ALTER UPDATE` + - `ALTER TABLE EXECUTE` - `ALTER VIEW` - `ALTER VIEW MODIFY QUERY` - `ALTER VIEW REFRESH` @@ -438,6 +439,7 @@ Allows executing [ALTER](../../sql-reference/statements/alter/index.md) queries - `ALTER MOVE PARTITION`. Level: `TABLE`. Aliases: `ALTER MOVE PART`, `MOVE PARTITION`, `MOVE PART` - `ALTER FETCH PARTITION`. Level: `TABLE`. Aliases: `ALTER FETCH PART`, `FETCH PARTITION`, `FETCH PART` - `ALTER FREEZE PARTITION`. Level: `TABLE`. Aliases: `FREEZE PARTITION` + - `ALTER EXECUTE`. Level: `TABLE`. Aliases: `ALTER TABLE EXECUTE` - `ALTER VIEW`. Level: `GROUP` - `ALTER VIEW REFRESH`. Level: `VIEW`. Aliases: `REFRESH VIEW` - `ALTER VIEW MODIFY QUERY`. Level: `VIEW`. Aliases: `ALTER TABLE MODIFY QUERY` diff --git a/docs/en/sql-reference/table-functions/iceberg.md b/docs/en/sql-reference/table-functions/iceberg.md index 9fd78c5e17d7..4abadbc3ad40 100644 --- a/docs/en/sql-reference/table-functions/iceberg.md +++ b/docs/en/sql-reference/table-functions/iceberg.md @@ -502,6 +502,101 @@ x: Ivanov y: 993 ``` +### Expire Snapshots {#iceberg-expire-snapshots} + +Iceberg tables accumulate snapshots with each INSERT, DELETE, or UPDATE operation. Over time, this can lead to a large number of snapshots and associated data files. The `expire_snapshots` command removes old snapshots and cleans up data files that are no longer referenced by any retained snapshot. + +**Syntax:** + +```sql +ALTER TABLE iceberg_table EXECUTE expire_snapshots(['timestamp']); +``` + +Which snapshots to keep is determined by the [retention policy](#iceberg-snapshot-retention-policy) (table properties `min-snapshots-to-keep`, `max-snapshot-age-ms`, and per-ref overrides). The retention policy is always evaluated regardless of whether a timestamp argument is provided. + +The optional `timestamp` argument is a datetime string (e.g., `'2024-06-01 00:00:00'`) interpreted in the **server's timezone**. It acts as a safety fuse: snapshots whose `timestamp-ms` is at or after this value are protected from expiration, even if the retention policy would otherwise expire them. This lets you guarantee that no snapshot newer than the given point in time is removed. + +When no timestamp is provided, only the retention policy governs which snapshots are expired. + +**Example:** + +```sql +SET allow_insert_into_iceberg = 1; + +-- Create some snapshots by inserting data +INSERT INTO iceberg_table VALUES (1); +INSERT INTO iceberg_table VALUES (2); +INSERT INTO iceberg_table VALUES (3); + +-- Expire using retention policy; additionally protect snapshots newer than the timestamp +ALTER TABLE iceberg_table EXECUTE expire_snapshots('2025-01-01 00:00:00'); + +-- Expire using retention policy only (no additional fuse) +ALTER TABLE iceberg_table EXECUTE expire_snapshots(); +``` + +**Output:** + +The command returns a table with two columns (`metric_name String`, `metric_value Int64`) containing one row per metric. The metric names follow the [Iceberg spec](https://iceberg.apache.org/docs/latest/spark-procedures/#output): + +| metric_name | Description | +|---|---| +| `deleted_data_files_count` | Number of data files deleted | +| `deleted_position_delete_files_count` | Number of position delete files deleted | +| `deleted_equality_delete_files_count` | Number of equality delete files deleted | +| `deleted_manifest_files_count` | Number of manifest files deleted | +| `deleted_manifest_lists_count` | Number of manifest list files deleted | +| `deleted_statistics_files_count` | Number of statistics files deleted (always 0 currently) | + +The command performs the following steps: + +1. Evaluates the retention policy (see below) to determine which snapshots must be preserved +2. If a timestamp argument was provided, additionally protects all snapshots at or newer than that timestamp +3. Expires snapshots that are neither retained by the policy nor protected by the timestamp fuse +4. Computes which files are exclusively associated with expired snapshots +5. Generates new metadata without the expired snapshots +6. Physically deletes unreachable manifest lists, manifest files, and data files + +#### Snapshot Retention Policy {#iceberg-snapshot-retention-policy} + +The `expire_snapshots` command respects the [Iceberg snapshot retention policy](https://iceberg.apache.org/spec/#snapshot-retention-policy). Retention is configured via Iceberg table properties and per-reference overrides: + +| Property | Scope | Default | Description | +|---|---|---|---| +| `history.expire.min-snapshots-to-keep` | Table | 1 | Minimum number of snapshots to keep in each branch's ancestor chain | +| `history.expire.max-snapshot-age-ms` | Table | 432000000 (5 days) | Maximum age (in ms) of snapshots to retain in a branch | +| `history.expire.max-ref-age-ms` | Table | ∞ (never) | Maximum age (in ms) for a snapshot reference (branch or tag) before the reference itself is removed | + +Each snapshot reference (`refs` in the Iceberg metadata) can override these with per-ref fields: `min-snapshots-to-keep`, `max-snapshot-age-ms`, and `max-ref-age-ms`. + +**Retention evaluation:** + +- **For each branch** (including `main`): the ancestor chain is walked starting from the branch head. Snapshots are retained while either of these conditions is true: + - The snapshot is one of the first `min-snapshots-to-keep` in the chain + - The snapshot's age is within `max-snapshot-age-ms` (i.e., `now - timestamp-ms <= max-snapshot-age-ms`) +- **For tags**: the tagged snapshot is retained unless the tag has exceeded its `max-ref-age-ms`, in which case the tag reference is removed +- **Non-main references** whose age exceeds `max-ref-age-ms` are removed entirely (the `main` branch is never removed) +- **Dangling references** that point to non-existent snapshots are removed with a warning +- **The current snapshot is always preserved**, regardless of retention settings + +**Required privileges:** + +The `ALTER TABLE EXECUTE` privilege is required, which is a child of `ALTER TABLE` in the ClickHouse access control hierarchy. You can grant it specifically or via the parent: + +```sql +-- Grant only EXECUTE permission +GRANT ALTER TABLE EXECUTE ON my_iceberg_table TO my_user; + +-- Or grant all ALTER TABLE permissions (includes ALTER TABLE EXECUTE) +GRANT ALTER TABLE ON my_iceberg_table TO my_user; +``` + +:::note +- Only Iceberg format version 2 tables are supported (v1 snapshots do not guarantee `manifest-list`, which is required to safely identify files for cleanup) +- The current snapshot is always preserved, even if it is older than the specified timestamp +- Requires the `allow_insert_into_iceberg` setting to be enabled +- The catalog's own authorization (REST catalog auth, AWS Glue IAM, etc.) is enforced independently when ClickHouse updates the metadata + ::: ## Altinity Antalya branch diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 3c4c177bcff5..3e1bbdb18ddf 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -218,6 +218,7 @@ enum class AccessType : uint8_t M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \ M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \ M(ALTER_UNLOCK_SNAPSHOT, "UNLOCK SNAPSHOT", TABLE, ALTER_TABLE) \ + M(ALTER_EXECUTE, "ALTER TABLE EXECUTE", TABLE, ALTER_TABLE) \ \ M(ALTER_DATABASE_SETTINGS, "ALTER DATABASE SETTING, ALTER MODIFY DATABASE SETTING, MODIFY DATABASE SETTING", DATABASE, ALTER_DATABASE) /* allows to execute ALTER MODIFY SETTING */\ M(ALTER_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) /* allows to execute ALTER NAMED COLLECTION */\ diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index ffa5134a45d2..49ba30c0178a 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -179,11 +179,16 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter) AlterCommands alter_commands; PartitionCommands partition_commands; MutationCommands mutation_commands; + std::vector execute_commands; for (const auto & child : alter.command_list->children) { auto * command_ast = child->as(); - if (auto alter_command = AlterCommand::parse(command_ast)) + if (command_ast->type == ASTAlterCommand::EXECUTE_COMMAND) + { + execute_commands.push_back(command_ast); + } + else if (auto alter_command = AlterCommand::parse(command_ast)) { alter_commands.emplace_back(std::move(*alter_command)); } @@ -303,6 +308,14 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter) res.pipeline = QueryPipeline(std::move(partition_commands_pipe)); } + for (const auto * execute_command : execute_commands) + { + ASTPtr args_ast = execute_command->execute_args ? execute_command->execute_args->ptr() : nullptr; + auto execute_pipe = table->executeCommand(execute_command->execute_command_name, args_ast, getContext()); + if (!execute_pipe.empty()) + res.pipeline = QueryPipeline(std::move(execute_pipe)); + } + return res; } @@ -642,6 +655,11 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS required_access.emplace_back(AccessType::ALTER_UPDATE, database, table); break; } + case ASTAlterCommand::EXECUTE_COMMAND: + { + required_access.emplace_back(AccessType::ALTER_EXECUTE, database, table); + break; + } } return required_access; diff --git a/src/Parsers/ASTAlterQuery.cpp b/src/Parsers/ASTAlterQuery.cpp index cd3713ab2551..82181fba863a 100644 --- a/src/Parsers/ASTAlterQuery.cpp +++ b/src/Parsers/ASTAlterQuery.cpp @@ -69,6 +69,8 @@ ASTPtr ASTAlterCommand::clone() const res->to_table_function = res->children.emplace_back(to_table_function->clone()).get(); if (partition_by_expr) res->partition_by_expr = res->children.emplace_back(partition_by_expr->clone()).get(); + if (execute_args) + res->execute_args = res->children.emplace_back(execute_args->clone()).get(); return res; } @@ -592,6 +594,13 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett partition->format(ostr, settings, state, frame); } } + else if (type == ASTAlterCommand::EXECUTE_COMMAND) + { + ostr << "EXECUTE " << execute_command_name << "("; + if (execute_args) + execute_args->format(ostr, settings, state, frame); + ostr << ")"; + } else throw Exception(ErrorCodes::UNEXPECTED_AST_STRUCTURE, "Unexpected type of ALTER"); } @@ -621,6 +630,7 @@ void ASTAlterCommand::forEachPointerToChild(std::functiontype = ASTAlterCommand::EXECUTE_COMMAND; + + ParserIdentifier command_name_parser; + ASTPtr command_name_ast; + if (!command_name_parser.parse(pos, command_name_ast, expected)) + return false; + command->execute_command_name = getIdentifierName(command_name_ast); + + if (!parser_opening_round_bracket.ignore(pos, expected)) + return false; + + ASTPtr execute_args_list; + ParserList args_parser( + std::make_unique(false), + std::make_unique(TokenType::Comma), + /* allow_empty = */ true); + if (!args_parser.parse(pos, execute_args_list, expected)) + return false; + + if (!parser_closing_round_bracket.ignore(pos, expected)) + return false; + + if (execute_args_list) + command->execute_args = command->children.emplace_back(std::move(execute_args_list)).get(); + } else return false; break; diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index 46d2fb9154b2..7a64dda0e2d9 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -302,6 +302,11 @@ void IStorage::mutate(const MutationCommands &, ContextPtr) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Mutations are not supported by storage {}", getName()); } +Pipe IStorage::executeCommand(const String & command_name, const ASTPtr & /*args*/, ContextPtr /*context*/) +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "EXECUTE command '{}' is not supported by storage {}", command_name, getName()); +} + CancellationCode IStorage::killMutation(const String & /*mutation_id*/) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Mutations are not supported by storage {}", getName()); diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index e8b7ce0adbfc..8845ba1b1ad9 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -608,6 +608,8 @@ It is currently only implemented in StorageObjectStorage. /// Mutate the table contents virtual void mutate(const MutationCommands &, ContextPtr); + virtual Pipe executeCommand(const String & command_name, const ASTPtr & args, ContextPtr context); + /// Cancel a mutation. virtual CancellationCode killMutation(const String & /*mutation_id*/); diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 8f3542165eb7..c7c559346fa1 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include @@ -196,6 +197,19 @@ class IDataLakeMetadata : boost::noncopyable virtual void addDeleteTransformers(ObjectInfoPtr, QueryPipelineBuilder &, const std::optional &, FormatParserSharedResourcesPtr, ContextPtr) const { } virtual void checkAlterIsPossible(const AlterCommands & /*commands*/) { throwNotImplemented("alter"); } virtual void alter(const AlterCommands & /*params*/, ContextPtr /*context*/) { throwNotImplemented("alter"); } + + virtual Pipe executeCommand( + const String & command_name, + const ASTPtr & /*args*/, + ObjectStoragePtr /*object_storage*/, + StorageObjectStorageConfigurationPtr /*configuration*/, + std::shared_ptr /*catalog*/, + ContextPtr /*context*/, + const StorageID & /*storage_id*/) + { + throwNotImplemented(fmt::format("EXECUTE {}", command_name)); + } + virtual void drop(ContextPtr) { } virtual std::optional partitionKey(ContextPtr) const { return {}; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h index 3bc4747a5f18..e84c2d3ceded 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB::Iceberg { /// This file define the field name appearing in Iceberg files. @@ -62,6 +63,7 @@ DEFINE_ICEBERG_FIELD(file_format); DEFINE_ICEBERG_FIELD(file_size_in_bytes); DEFINE_ICEBERG_FIELD(refs); DEFINE_ICEBERG_FIELD(branch); +DEFINE_ICEBERG_FIELD(tag); DEFINE_ICEBERG_FIELD(main); DEFINE_ICEBERG_FIELD(operation); DEFINE_ICEBERG_FIELD(append); @@ -126,6 +128,13 @@ DEFINE_ICEBERG_FIELD_ALIAS(last_sequence_number, last-sequence-number); DEFINE_ICEBERG_FIELD_ALIAS(metadata_file, metadata-file); DEFINE_ICEBERG_FIELD_ALIAS(metadata_log, metadata-log); DEFINE_ICEBERG_FIELD_ALIAS(metadata_sequence_number, sequence-number); +DEFINE_ICEBERG_FIELD_ALIAS(min_snapshots_to_keep, history.expire.min-snapshots-to-keep); +DEFINE_ICEBERG_FIELD_ALIAS(max_snapshot_age_ms, history.expire.max-snapshot-age-ms); +DEFINE_ICEBERG_FIELD_ALIAS(max_ref_age_ms, history.expire.max-ref-age-ms); +/// Ref-level override fields (short names inside the "refs" map per Iceberg spec). +DEFINE_ICEBERG_FIELD_ALIAS(ref_min_snapshots_to_keep, min-snapshots-to-keep); +DEFINE_ICEBERG_FIELD_ALIAS(ref_max_snapshot_age_ms, max-snapshot-age-ms); +DEFINE_ICEBERG_FIELD_ALIAS(ref_max_ref_age_ms, max-ref-age-ms); /// These are compound fields like `data_file.file_path`, we use prefix 'c_' to distinguish them. DEFINE_ICEBERG_FIELD_COMPOUND(data_file, file_path); DEFINE_ICEBERG_FIELD_COMPOUND(data_file, file_format); @@ -139,4 +148,13 @@ DEFINE_ICEBERG_FIELD_COMPOUND(data_file, lower_bounds); DEFINE_ICEBERG_FIELD_COMPOUND(data_file, upper_bounds); DEFINE_ICEBERG_FIELD_COMPOUND(data_file, referenced_data_file); DEFINE_ICEBERG_FIELD_COMPOUND(data_file, sort_order_id); + +/// Fallback defaults for snapshot retention policy when table properties are absent. +/// These values follow the Java reference implementation; the Iceberg spec does not +/// mandate specific defaults. +/// TODO: consider exposing these as ClickHouse server-level settings so users can +/// change the global defaults without patching every table's properties. +constexpr Int32 default_min_snapshots_to_keep = 1; +constexpr Int64 default_max_snapshot_age_ms = 432000000; // 5 days +constexpr Int64 default_max_ref_age_ms = std::numeric_limits::max(); // forever, main branch never expires } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 58f2926cc07e..5369437a2270 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -15,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -39,6 +41,7 @@ #include #include #include +#include #include #include #include @@ -561,6 +564,88 @@ void IcebergMetadata::alter(const AlterCommands & params, ContextPtr context) Iceberg::alter(params, context, object_storage, data_lake_settings, persistent_components, write_format); } +static Pipe expireSnapshotsResultToPipe(const Iceberg::ExpireSnapshotsResult & result) +{ + Block header{ + ColumnWithTypeAndName(std::make_shared(), "metric_name"), + ColumnWithTypeAndName(std::make_shared(), "metric_value"), + }; + + MutableColumns columns = header.cloneEmptyColumns(); + + auto add = [&](const char * name, Int64 value) + { + columns[0]->insert(String(name)); + columns[1]->insert(value); + }; + + add("deleted_data_files_count", result.deleted_data_files_count); + add("deleted_position_delete_files_count", result.deleted_position_delete_files_count); + add("deleted_equality_delete_files_count", result.deleted_equality_delete_files_count); + add("deleted_manifest_files_count", result.deleted_manifest_files_count); + add("deleted_manifest_lists_count", result.deleted_manifest_lists_count); + add("deleted_statistics_files_count", result.deleted_statistics_files_count); + + Chunk chunk(std::move(columns), 6); + return Pipe(std::make_shared(std::make_shared(std::move(header)), std::move(chunk))); +} + +Pipe IcebergMetadata::executeCommand( + const String & command_name, + const ASTPtr & args, + ObjectStoragePtr object_storage_, + StorageObjectStorageConfigurationPtr configuration_, + std::shared_ptr catalog_, + ContextPtr context, + const StorageID & storage_id) +{ + if (!context->getSettingsRef()[Setting::allow_insert_into_iceberg].value) + { + throw Exception( + ErrorCodes::SUPPORT_IS_DISABLED, + "Iceberg EXECUTE commands are experimental. " + "To allow their usage, enable setting allow_insert_into_iceberg"); + } + + if (command_name == "expire_snapshots") + { + if (args && args->children.size() > 1) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "expire_snapshots expects zero or one argument (timestamp), got {}", args->children.size()); + + std::optional expire_before_ms; + if (args && args->children.size() == 1) + { + const auto * literal = args->children[0]->as(); + if (!literal || literal->value.getType() != Field::Types::String) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "expire_snapshots expects a string timestamp argument like '2024-06-01 00:00:00'"); + + const String & timestamp_str = literal->value.safeGet(); + ReadBufferFromString buf(timestamp_str); + time_t expire_time; + readDateTimeText(expire_time, buf); + expire_before_ms = static_cast(expire_time) * 1000; + } + + auto result = Iceberg::expireSnapshots( + expire_before_ms, + context, + object_storage_, + data_lake_settings, + persistent_components, + write_format, + catalog_, + configuration_->getTypeName(), + configuration_->getNamespace(), + storage_id.getTableName()); + + return expireSnapshotsResultToPipe(result); + } + else + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown EXECUTE command '{}' for Iceberg table", command_name); + } +} + void IcebergMetadata::createInitial( const ObjectStoragePtr & object_storage, const StorageObjectStorageConfigurationWeakPtr & configuration, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index dcb2b91131bd..f4e65ef1c912 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -126,6 +126,15 @@ class IcebergMetadata : public IDataLakeMetadata void checkAlterIsPossible(const AlterCommands & commands) override; void alter(const AlterCommands & params, ContextPtr context) override; + Pipe executeCommand( + const String & command_name, + const ASTPtr & args, + ObjectStoragePtr object_storage, + StorageObjectStorageConfigurationPtr configuration, + std::shared_ptr catalog, + ContextPtr context, + const StorageID & storage_id) override; + ObjectIterator iterate( const ActionsDAG * filter_dag, FileProgressCallback callback, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp index deeb05a49102..2b1faf391b77 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -744,6 +745,549 @@ void alter( throw Exception(ErrorCodes::LIMIT_EXCEEDED, "Too many unsuccessed retries to alter iceberg table"); } +/// Table-level snapshot retention policy read from Iceberg table properties. +struct RetentionPolicy +{ + Int32 min_snapshots_to_keep = Iceberg::default_min_snapshots_to_keep; + Int64 max_snapshot_age_ms = Iceberg::default_max_snapshot_age_ms; + Int64 max_ref_age_ms = Iceberg::default_max_ref_age_ms; +}; + +static RetentionPolicy readRetentionPolicy(const Poco::JSON::Object::Ptr & metadata) +{ + RetentionPolicy policy; + if (!metadata->has(Iceberg::f_properties)) + return policy; + + auto props = metadata->getObject(Iceberg::f_properties); + if (props->has(Iceberg::f_min_snapshots_to_keep)) + policy.min_snapshots_to_keep = std::stoi(props->getValue(Iceberg::f_min_snapshots_to_keep)); + if (props->has(Iceberg::f_max_snapshot_age_ms)) + policy.max_snapshot_age_ms = std::stoll(props->getValue(Iceberg::f_max_snapshot_age_ms)); + if (props->has(Iceberg::f_max_ref_age_ms)) + policy.max_ref_age_ms = std::stoll(props->getValue(Iceberg::f_max_ref_age_ms)); + return policy; +} + +/// Snapshot parent graph built from metadata, used for branch ancestor traversal. +class SnapshotGraph +{ +public: + explicit SnapshotGraph(const Poco::JSON::Array::Ptr & snapshots) + { + for (UInt32 i = 0; i < snapshots->size(); ++i) + { + auto snapshot = snapshots->getObject(i); + Int64 snap_id = snapshot->getValue(Iceberg::f_metadata_snapshot_id); + timestamps[snap_id] = snapshot->getValue(Iceberg::f_timestamp_ms); + if (snapshot->has(Iceberg::f_parent_snapshot_id) && !snapshot->isNull(Iceberg::f_parent_snapshot_id)) + parent_chain[snap_id] = snapshot->getValue(Iceberg::f_parent_snapshot_id); + } + } + + bool hasSnapshot(Int64 snap_id) const { return timestamps.contains(snap_id); } + + Int64 getTimestamp(Int64 snap_id) const { return timestamps.at(snap_id); } + + std::optional getParent(Int64 snap_id) const + { + auto it = parent_chain.find(snap_id); + return it != parent_chain.end() ? std::optional(it->second) : std::nullopt; + } + + /// Retain ancestors from head_id while min-keep or max-age is satisfied. + void walkBranchAncestors(Int64 now_ms, Int64 head_id, Int32 min_keep, Int64 max_age_ms, std::set & retained) const + { + Int64 walk_id = head_id; + Int32 count = 0; + while (hasSnapshot(walk_id)) + { + bool within_min_keep = (count < min_keep); + bool within_max_age = (now_ms - getTimestamp(walk_id) <= max_age_ms); + if (!within_min_keep && !within_max_age) + break; + retained.insert(walk_id); + ++count; + auto parent = getParent(walk_id); + if (!parent) + break; + walk_id = *parent; + } + } + +private: + std::unordered_map parent_chain; + std::unordered_map timestamps; +}; + +/// Apply Iceberg Snapshot Retention Policy. Returns (retained IDs, expired ref names). +static std::pair, Strings> applyRetentionPolicy( + const Poco::JSON::Object::Ptr & metadata, + Int64 current_snapshot_id, + const SnapshotGraph & graph, + const RetentionPolicy & policy, + Int64 now_ms) +{ + std::set retained; + Strings expired_ref_names; + bool main_branch_walked = false; + if (metadata->has(Iceberg::f_refs)) + { + auto refs = metadata->getObject(Iceberg::f_refs); + for (const auto & ref_name : refs->getNames()) + { + auto ref_obj = refs->getObject(ref_name); + Int64 ref_snap_id = ref_obj->getValue(Iceberg::f_metadata_snapshot_id); + String ref_type = ref_obj->getValue(Iceberg::f_type); + + Int64 ref_max_ref_age = ref_obj->has(Iceberg::f_ref_max_ref_age_ms) + ? ref_obj->getValue(Iceberg::f_ref_max_ref_age_ms) + : policy.max_ref_age_ms; + + bool is_main = (ref_name == Iceberg::f_main); + + if (!is_main && !graph.hasSnapshot(ref_snap_id)) + { + LOG_WARNING(getLogger("IcebergExpireSnapshots"), + "Removing invalid ref {}: snapshot {} does not exist", ref_name, ref_snap_id); + expired_ref_names.push_back(ref_name); + continue; + } + + bool ref_expired = !is_main && (now_ms - graph.getTimestamp(ref_snap_id)) > ref_max_ref_age; + + if (ref_expired) + { + expired_ref_names.push_back(ref_name); + continue; + } + + if (ref_type == Iceberg::f_branch) + { + Int32 min_keep = ref_obj->has(Iceberg::f_ref_min_snapshots_to_keep) + ? ref_obj->getValue(Iceberg::f_ref_min_snapshots_to_keep) + : policy.min_snapshots_to_keep; + Int64 max_age = ref_obj->has(Iceberg::f_ref_max_snapshot_age_ms) + ? ref_obj->getValue(Iceberg::f_ref_max_snapshot_age_ms) + : policy.max_snapshot_age_ms; + graph.walkBranchAncestors(now_ms, ref_snap_id, min_keep, max_age, retained); + if (is_main) + main_branch_walked = true; + } + else if (ref_type == Iceberg::f_tag) + { + retained.insert(ref_snap_id); + } + else + { + UNREACHABLE(); + } + } + } + + if (!main_branch_walked) + graph.walkBranchAncestors(now_ms, current_snapshot_id, policy.min_snapshots_to_keep, policy.max_snapshot_age_ms, retained); + + return {retained, expired_ref_names}; +} + +static void collectAllFilePaths( + const Iceberg::ManifestFileIterator::ManifestFileEntriesHandle & entries_handle, + std::set & out) +{ + for (const auto & entry : entries_handle.getFilesWithoutDeleted(FileContentType::DATA)) + out.insert(entry->file_path); + for (const auto & entry : entries_handle.getFilesWithoutDeleted(FileContentType::POSITION_DELETE)) + out.insert(entry->file_path); + for (const auto & entry : entries_handle.getFilesWithoutDeleted(FileContentType::EQUALITY_DELETE)) + out.insert(entry->file_path); +} + +/// Collect all file paths (manifest lists, manifests, data/delete files) +/// referenced by retained snapshots. +/// +/// NOTE: We only collect files with status ADDED/EXISTING (via getFilesWithoutDeleted). +/// Files with status DELETED are being removed by that snapshot and don't need retention +/// from it. A DELETED entry's data file was ADDED in an earlier snapshot — if that snapshot +/// is retained, the file is in the retained set from there; if expired, it will be collected +/// for cleanup from that snapshot's ADDED/EXISTING entries. +/// +/// TODO: To handle partially-failed prior expire_snapshots (where the ADDED snapshot +/// was removed but its data files were not cleaned up), we could also traverse DELETED +/// entries in expired manifests. This requires extending ManifestFileIterator to expose +/// DELETED entries. +static void collectRetainedFiles( + const Poco::JSON::Array::Ptr & retained_snapshots, + ObjectStoragePtr object_storage, + PersistentTableComponents & persistent_table_components, + ContextPtr context, + LoggerPtr log, + Int32 current_schema_id, + std::set & retained_manifest_paths, + std::set & retained_data_file_paths, + std::set & retained_manifest_list_paths) +{ + for (UInt32 i = 0; i < retained_snapshots->size(); ++i) + { + auto snapshot = retained_snapshots->getObject(i); + if (!snapshot->has(Iceberg::f_manifest_list)) + continue; + + String manifest_list_path = snapshot->getValue(Iceberg::f_manifest_list); + retained_manifest_list_paths.insert(manifest_list_path); + + String storage_manifest_list_path = getProperFilePathFromMetadataInfo( + manifest_list_path, persistent_table_components.table_path, persistent_table_components.table_location); + + auto manifest_keys = getManifestList( + object_storage, persistent_table_components, context, storage_manifest_list_path, log); + + for (const auto & mf_key : manifest_keys) + { + retained_manifest_paths.insert(mf_key.manifest_file_path); + auto entries_handle = getManifestFileEntriesHandle( + object_storage, persistent_table_components, context, log, + mf_key, current_schema_id); + collectAllFilePaths(entries_handle, retained_data_file_paths); + } + } +} + +struct ExpiredFiles +{ + Strings all_paths; + Int64 data_files = 0; + Int64 position_delete_files = 0; + Int64 equality_delete_files = 0; + Int64 manifest_files = 0; + Int64 manifest_lists = 0; +}; + +/// Collect files from expired snapshots that are not referenced by any retained snapshot. +static ExpiredFiles collectExpiredFiles( + const std::vector & expired_manifest_list_paths, + const std::set & retained_manifest_list_paths, + const std::set & retained_manifest_paths, + const std::set & retained_data_file_paths, + ObjectStoragePtr object_storage, + PersistentTableComponents & persistent_table_components, + ContextPtr context, + LoggerPtr log, + Int32 current_schema_id) +{ + ExpiredFiles result; + for (const auto & ml_path : expired_manifest_list_paths) + { + if (retained_manifest_list_paths.contains(ml_path)) + continue; + + String storage_ml_path = getProperFilePathFromMetadataInfo( + ml_path, persistent_table_components.table_path, persistent_table_components.table_location); + + ManifestFileCacheKeys manifest_keys; + try + { + manifest_keys = getManifestList( + object_storage, persistent_table_components, context, storage_ml_path, log); + } + catch (...) + { + LOG_WARNING(log, "Failed to read manifest list {}, skipping", storage_ml_path); + continue; + } + + for (const auto & mf_key : manifest_keys) + { + if (retained_manifest_paths.contains(mf_key.manifest_file_path)) + continue; + + try + { + auto entries_handle = getManifestFileEntriesHandle( + object_storage, persistent_table_components, context, log, + mf_key, current_schema_id); + + for (const auto & entry : entries_handle.getFilesWithoutDeleted(FileContentType::DATA)) + if (!retained_data_file_paths.contains(entry->file_path)) + { + result.all_paths.push_back(entry->file_path); + ++result.data_files; + } + for (const auto & entry : entries_handle.getFilesWithoutDeleted(FileContentType::POSITION_DELETE)) + if (!retained_data_file_paths.contains(entry->file_path)) + { + result.all_paths.push_back(entry->file_path); + ++result.position_delete_files; + } + for (const auto & entry : entries_handle.getFilesWithoutDeleted(FileContentType::EQUALITY_DELETE)) + if (!retained_data_file_paths.contains(entry->file_path)) + { + result.all_paths.push_back(entry->file_path); + ++result.equality_delete_files; + } + } + catch (...) + { + LOG_WARNING(log, "Failed to read manifest file {}, skipping", mf_key.manifest_file_path); + continue; + } + + result.all_paths.push_back(mf_key.manifest_file_path); + ++result.manifest_files; + } + + result.all_paths.push_back(storage_ml_path); + ++result.manifest_lists; + } + return result; +} + +/// Trim snapshot-log to the suffix of entries referencing only retained snapshots. +static void trimSnapshotLog( + Poco::JSON::Object::Ptr metadata, + const std::set & expired_snapshot_ids) +{ + if (!metadata->has(Iceberg::f_snapshot_log)) + return; + + auto snapshot_log = metadata->get(Iceberg::f_snapshot_log).extract(); + Int32 suffix_start = static_cast(snapshot_log->size()); + for (Int32 j = static_cast(snapshot_log->size()) - 1; j >= 0; --j) + { + auto entry = snapshot_log->getObject(static_cast(j)); + Int64 snap_id = entry->getValue(Iceberg::f_metadata_snapshot_id); + if (expired_snapshot_ids.contains(snap_id)) + break; + suffix_start = j; + } + Poco::JSON::Array::Ptr retained_log = new Poco::JSON::Array; + for (UInt32 j = static_cast(suffix_start); j < snapshot_log->size(); ++j) + retained_log->add(snapshot_log->getObject(j)); + metadata->set(Iceberg::f_snapshot_log, retained_log); +} + +struct SnapshotPartition +{ + Poco::JSON::Array::Ptr retained_snapshots = new Poco::JSON::Array; + std::set expired_snapshot_ids; + std::vector expired_manifest_list_paths; +}; + +/// Split snapshots into retained and expired. +/// A snapshot is retained if the retention policy selected it, or if the +/// user-provided fuse timestamp protects it (snapshot newer than fuse). +static SnapshotPartition partitionSnapshots( + const Poco::JSON::Array::Ptr & snapshots, + const std::set & retention_retained_ids, + std::optional expire_before_ms) +{ + SnapshotPartition result; + for (UInt32 i = 0; i < snapshots->size(); ++i) + { + auto snapshot = snapshots->getObject(i); + Int64 snap_id = snapshot->getValue(Iceberg::f_metadata_snapshot_id); + Int64 snap_ts = snapshot->getValue(Iceberg::f_timestamp_ms); + + bool is_retained_by_policy = retention_retained_ids.contains(snap_id); + bool is_protected_by_fuse = expire_before_ms.has_value() && (snap_ts >= *expire_before_ms); + + if (is_retained_by_policy || is_protected_by_fuse) + { + result.retained_snapshots->add(snapshot); + } + else + { + result.expired_snapshot_ids.insert(snap_id); + if (snapshot->has(Iceberg::f_manifest_list)) + result.expired_manifest_list_paths.push_back(snapshot->getValue(Iceberg::f_manifest_list)); + } + } + return result; +} + +/// Mutate metadata: remove expired refs, update snapshots, trim log, bump timestamp. +static void updateMetadataForExpiration( + Poco::JSON::Object::Ptr metadata, + const Strings & expired_ref_names, + const Poco::JSON::Array::Ptr & retained_snapshots, + const std::set & expired_snapshot_ids) +{ + for (const auto & ref_name : expired_ref_names) + metadata->getObject(Iceberg::f_refs)->remove(ref_name); + + metadata->set(Iceberg::f_snapshots, retained_snapshots); + trimSnapshotLog(metadata, expired_snapshot_ids); + + auto now = std::chrono::system_clock::now(); + auto ms = duration_cast(now.time_since_epoch()); + metadata->set(Iceberg::f_last_updated_ms, ms.count()); +} + +static void deleteExpiredFiles( + const Strings & files_to_delete, + ObjectStoragePtr object_storage, + LoggerPtr log) +{ + for (const auto & file_path : files_to_delete) + { + try + { + object_storage->removeObjectIfExists(StoredObject(file_path)); + LOG_DEBUG(log, "Deleted expired file {}", file_path); + } + catch (...) + { + LOG_WARNING(log, "Failed to delete file {}: {}", file_path, getCurrentExceptionMessage(false)); + } + } +} + +/// Expire old Iceberg snapshots following the spec's Snapshot Retention Policy. +/// +/// The process: +/// 1. Read retention policy from table properties (with spec defaults). +/// 2. Build the snapshot parent graph and determine which snapshots to retain +/// based on branch/tag refs and their min-snapshots-to-keep / max-snapshot-age-ms. +/// 3. If the caller provided expire_before_ms, it acts as an additional safety +/// fuse — snapshots newer than this timestamp are never expired regardless +/// of retention policy. +/// 4. Collect files exclusively owned by expired snapshots and delete them. +/// 5. Write updated metadata with optimistic concurrency (retry on conflict). +ExpireSnapshotsResult expireSnapshots( + std::optional expire_before_ms, + ContextPtr context, + ObjectStoragePtr object_storage, + const DataLakeStorageSettings & data_lake_settings, + PersistentTableComponents & persistent_table_components, + const String & write_format, + std::shared_ptr catalog, + const String & blob_storage_type_name, + const String & blob_storage_namespace_name, + const String & table_name) +{ + auto common_path = persistent_table_components.table_path; + if (!common_path.starts_with('/')) + common_path = "/" + common_path; + + int max_retries = MAX_TRANSACTION_RETRIES; + while (--max_retries > 0) + { + FileNamesGenerator filename_generator(common_path, common_path, false, CompressionMethod::None, write_format); + auto log = getLogger("IcebergExpireSnapshots"); + auto [last_version, metadata_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion( + object_storage, + persistent_table_components.table_path, + data_lake_settings, + persistent_table_components.metadata_cache, + context, + log.get(), + persistent_table_components.table_uuid); + + filename_generator.setVersion(last_version + 1); + filename_generator.setCompressionMethod(compression_method); + + auto metadata = getMetadataJSONObject( + metadata_path, object_storage, persistent_table_components.metadata_cache, context, log, compression_method, persistent_table_components.table_uuid); + + if (metadata->getValue(f_format_version) < 2) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "expire_snapshots is supported only for the second version of iceberg format"); + + if (!metadata->has(Iceberg::f_current_snapshot_id)) + { + LOG_INFO(log, "No snapshots to expire (table has no current snapshot)"); + return {}; + } + + Int64 current_snapshot_id = metadata->getValue(Iceberg::f_current_snapshot_id); + if (current_snapshot_id < 0) + { + LOG_INFO(log, "No snapshots to expire (table has no current snapshot)"); + return {}; + } + + auto snapshots = metadata->get(Iceberg::f_snapshots).extract(); + auto now_ms = duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + + auto policy = readRetentionPolicy(metadata); + SnapshotGraph graph(snapshots); + auto [retention_retained_ids, expired_ref_names] = applyRetentionPolicy(metadata, current_snapshot_id, graph, policy, now_ms); + auto partition = partitionSnapshots(snapshots, retention_retained_ids, expire_before_ms); + + if (partition.expired_snapshot_ids.empty()) + { + LOG_INFO(log, "No snapshots to expire"); + return {}; + } + LOG_INFO(log, "Expiring {} snapshots", partition.expired_snapshot_ids.size()); + + Int32 current_schema_id = metadata->getValue(Iceberg::f_current_schema_id); + + std::set retained_manifest_paths; + std::set retained_data_file_paths; + std::set retained_manifest_list_paths; + collectRetainedFiles( + partition.retained_snapshots, object_storage, persistent_table_components, context, log, + current_schema_id, retained_manifest_paths, retained_data_file_paths, retained_manifest_list_paths); + auto expired_files = collectExpiredFiles( + partition.expired_manifest_list_paths, retained_manifest_list_paths, retained_manifest_paths, retained_data_file_paths, + object_storage, persistent_table_components, context, log, current_schema_id); + + updateMetadataForExpiration(metadata, expired_ref_names, partition.retained_snapshots, partition.expired_snapshot_ids); + + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + Poco::JSON::Stringifier::stringify(metadata, oss, 4); + std::string json_representation = removeEscapedSlashes(oss.str()); + auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName(); + auto hint = filename_generator.generateVersionHint(); + if (!writeMetadataFileAndVersionHint( + storage_metadata_name, + json_representation, + hint.path_in_storage, + storage_metadata_name, + object_storage, + context, + compression_method, + data_lake_settings[DataLakeStorageSetting::iceberg_use_version_hint])) + { + LOG_WARNING(log, "Metadata commit conflict during expire_snapshots, retrying ({} retries left)", max_retries); + continue; + } + + if (catalog) + { + String catalog_filename = metadata_name; + if (!catalog_filename.starts_with(blob_storage_type_name)) + catalog_filename = blob_storage_type_name + "://" + blob_storage_namespace_name + "/" + metadata_name; + + const auto & [namespace_name, parsed_table_name] = DataLake::parseTableName(table_name); + if (!catalog->updateMetadata(namespace_name, parsed_table_name, catalog_filename, nullptr)) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Failed to update catalog metadata after writing new metadata file. " + "The table metadata may be in an inconsistent state"); + } + } + + LOG_INFO(log, "Deleting {} expired files for {} expired snapshots", expired_files.all_paths.size(), partition.expired_snapshot_ids.size()); + deleteExpiredFiles(expired_files.all_paths, object_storage, log); + LOG_INFO(log, "Expired {} snapshots, deleted {} files", partition.expired_snapshot_ids.size(), expired_files.all_paths.size()); + + return ExpireSnapshotsResult{ + .deleted_data_files_count = expired_files.data_files, + .deleted_position_delete_files_count = expired_files.position_delete_files, + .deleted_equality_delete_files_count = expired_files.equality_delete_files, + .deleted_manifest_files_count = expired_files.manifest_files, + .deleted_manifest_lists_count = expired_files.manifest_lists, + }; + } + + if (max_retries == 0) + throw Exception(ErrorCodes::LIMIT_EXCEEDED, "Too many unsuccessful retries to expire iceberg snapshots"); + + UNREACHABLE(); +} + #endif } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h index c19282318319..abdbfeea842a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h @@ -39,6 +39,28 @@ void alter( const DataLakeStorageSettings & data_lake_settings, PersistentTableComponents & persistent_table_components, const String & write_format); + +struct ExpireSnapshotsResult +{ + Int64 deleted_data_files_count = 0; + Int64 deleted_position_delete_files_count = 0; + Int64 deleted_equality_delete_files_count = 0; + Int64 deleted_manifest_files_count = 0; + Int64 deleted_manifest_lists_count = 0; + Int64 deleted_statistics_files_count = 0; +}; + +ExpireSnapshotsResult expireSnapshots( + std::optional expire_before_ms, + ContextPtr context, + ObjectStoragePtr object_storage, + const DataLakeStorageSettings & data_lake_settings, + PersistentTableComponents & persistent_table_components, + const String & write_format, + std::shared_ptr catalog, + const String & blob_storage_type_name, + const String & blob_storage_namespace_name, + const String & table_name); } #endif diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index f04be5d5f946..8e9ec991eea1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -786,6 +787,14 @@ void StorageObjectStorage::checkMutationIsPossible(const MutationCommands & comm configuration->checkMutationIsPossible(commands); } +Pipe StorageObjectStorage::executeCommand(const String & command_name, const ASTPtr & args, ContextPtr context) +{ + auto * metadata = getExternalMetadata(context); + if (!metadata) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "EXECUTE command '{}' is not supported by this storage", command_name); + return metadata->executeCommand(command_name, args, object_storage, configuration, catalog, context, storage_id); +} + void StorageObjectStorage::alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & /*alter_lock_holder*/) { StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 61e223f969b3..5b1080716c5c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -177,6 +177,8 @@ class StorageObjectStorage : public IStorage void mutate(const MutationCommands &, ContextPtr) override; void checkMutationIsPossible(const MutationCommands & commands, const Settings & /* settings */) const override; + Pipe executeCommand(const String & command_name, const ASTPtr & args, ContextPtr context) override; + void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & alter_lock_holder) override; void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override; diff --git a/tests/integration/test_storage_iceberg_with_spark/test_expire_snapshots.py b/tests/integration/test_storage_iceberg_with_spark/test_expire_snapshots.py new file mode 100644 index 000000000000..958b48422c33 --- /dev/null +++ b/tests/integration/test_storage_iceberg_with_spark/test_expire_snapshots.py @@ -0,0 +1,630 @@ +import glob +import json +import os +import re +import time + +import pytest + +from helpers.iceberg_utils import ( + create_iceberg_table, + default_download_directory, + default_upload_directory, + get_uuid_str, +) + + +ICEBERG_SETTINGS = {"allow_insert_into_iceberg": 1} +FAR_FUTURE = "2099-12-31 23:59:59" +AGGRESSIVE_RETENTION = { + "history.expire.max-snapshot-age-ms": "1", + "history.expire.min-snapshots-to-keep": "1", +} + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _read_iceberg_metadata(instance, table_name): + metadata_dir = f"/var/lib/clickhouse/user_files/iceberg_data/default/{table_name}/metadata" + latest = instance.exec_in_container( + ["bash", "-c", f"ls -v {metadata_dir}/v*.metadata.json | tail -1"] + ).strip() + raw = instance.exec_in_container(["cat", latest]) + return json.loads(raw), latest + + +def _write_iceberg_metadata(instance, table_name, meta, prev_path): + metadata_dir = f"/var/lib/clickhouse/user_files/iceberg_data/default/{table_name}/metadata" + meta["last-updated-ms"] = int(time.time() * 1000) + version_match = re.search(r"/v(\d+)[^/]*\.metadata\.json$", prev_path) + new_version = int(version_match.group(1)) + 1 + new_path = f"{metadata_dir}/v{new_version}.metadata.json" + new_content = json.dumps(meta, indent=4) + instance.exec_in_container( + ["bash", "-c", f"cat > {new_path} << 'JSONEOF'\n{new_content}\nJSONEOF"] + ) + + +def read_iceberg_metadata(instance, table_name): + meta, _ = _read_iceberg_metadata(instance, table_name) + return meta + + +def update_iceberg_metadata(instance, table_name, updater_fn): + meta, prev_path = _read_iceberg_metadata(instance, table_name) + updater_fn(meta) + _write_iceberg_metadata(instance, table_name, meta, prev_path) + + +def _fix_version_hint_for_spark(table_name): + """Rewrite version-hint.text as a plain version number. + ClickHouse writes the full filename (e.g. 'v3.metadata.json'); + Spark's Hadoop catalog expects just the number (e.g. '3'). + """ + metadata_dir = f"/var/lib/clickhouse/user_files/iceberg_data/default/{table_name}/metadata" + latest = 0 + for f in glob.glob(os.path.join(metadata_dir, "*.metadata.json")): + m = re.search(r"v(\d+)", os.path.basename(f)) + if m: + latest = max(latest, int(m.group(1))) + with open(os.path.join(metadata_dir, "version-hint.text"), "w") as f: + f.write(str(latest)) + + +def spark_alter_table(cluster, spark, storage_type, table_name, *sql_fragments): + """Execute Spark SQL ALTER TABLE on a ClickHouse-created Iceberg table. + + Downloads the table from storage to the host (so Spark can see it), + executes the SQL statements, then uploads the result back. + + Each sql_fragment is appended to 'ALTER TABLE {table_name} '. + Example: spark_alter_table(..., "SET TBLPROPERTIES('key' = 'val')") + """ + table_dir = f"/var/lib/clickhouse/user_files/iceberg_data/default/{table_name}/" + default_download_directory(cluster, storage_type, table_dir, table_dir) + _fix_version_hint_for_spark(table_name) + for fragment in sql_fragments: + spark.sql(f"ALTER TABLE {table_name} {fragment}") + default_upload_directory(cluster, storage_type, table_dir, table_dir) + + +def create_and_populate(cluster, instance, storage_type, table_name, n_rows, format_version=2): + create_iceberg_table( + storage_type, instance, table_name, cluster, "(x Int)", format_version + ) + for val in range(1, n_rows + 1): + instance.query( + f"INSERT INTO {table_name} VALUES ({val});", + settings=ICEBERG_SETTINGS, + ) + + +def expire_snapshots(instance, table_name, timestamp=None): + if timestamp: + result = instance.query( + f"ALTER TABLE {table_name} EXECUTE expire_snapshots('{timestamp}');", + settings=ICEBERG_SETTINGS, + ) + else: + result = instance.query( + f"ALTER TABLE {table_name} EXECUTE expire_snapshots();", + settings=ICEBERG_SETTINGS, + ) + return result + + +def parse_expire_result(result): + """Parse the metric_name/metric_value rows from expire_snapshots into a dict.""" + counts = {} + for line in result.strip().split("\n"): + if not line: + continue + parts = line.split("\t") + if len(parts) == 2: + counts[parts[0]] = int(parts[1]) + return counts + + +def get_snapshot_ids(instance, table_name): + """Return snapshot IDs sorted by (timestamp, id) for stable ordering.""" + meta = read_iceberg_metadata(instance, table_name) + pairs = [(s["snapshot-id"], s["timestamp-ms"]) for s in meta["snapshots"]] + pairs.sort(key=lambda x: (x[1], x[0])) + return [sid for sid, _ in pairs] + + +def get_retained_ids(instance, table_name): + meta = read_iceberg_metadata(instance, table_name) + return {s["snapshot-id"] for s in meta["snapshots"]} + + +def get_refs(instance, table_name): + meta = read_iceberg_metadata(instance, table_name) + return meta.get("refs", {}) + + +def get_history_count(instance, table_name): + return int(instance.query( + f"SELECT count() FROM system.iceberg_history " + f"WHERE database = 'default' AND table = '{table_name}'" + ).strip()) + + +def assert_data_intact(instance, table_name, n_rows): + expected = "".join(f"{i}\n" for i in range(1, n_rows + 1)) + assert instance.query(f"SELECT * FROM {table_name} ORDER BY x") == expected + + +def make_table_name(prefix, storage_type): + return f"{prefix}_{storage_type}_{get_uuid_str()}" + + +# --------------------------------------------------------------------------- +# Basic / sanity tests +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_expire_snapshots_basic(started_cluster_iceberg_with_spark, storage_type): + instance = started_cluster_iceberg_with_spark.instances["node1"] + TABLE_NAME = make_table_name("test_expire_basic", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 3 + ) + + assert instance.query(f"SELECT count() FROM {TABLE_NAME}") == "3\n" + assert get_history_count(instance, TABLE_NAME) >= 3 + + time.sleep(2) + expire_timestamp = time.strftime("%Y-%m-%d %H:%M:%S") + + instance.query( + f"INSERT INTO {TABLE_NAME} VALUES (4);", settings=ICEBERG_SETTINGS + ) + + result = expire_snapshots(instance, TABLE_NAME, expire_timestamp) + counts = parse_expire_result(result) + assert len(counts) == 6, f"Expected 6 metrics, got {counts}" + assert all(v >= 0 for v in counts.values()), f"All counts should be non-negative, got {counts}" + assert_data_intact(instance, TABLE_NAME, 4) + + +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_expire_snapshots_no_expirable(started_cluster_iceberg_with_spark, storage_type): + """No-op when no snapshots match the criteria.""" + instance = started_cluster_iceberg_with_spark.instances["node1"] + TABLE_NAME = make_table_name("test_expire_no_expirable", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 1 + ) + result = expire_snapshots(instance, TABLE_NAME, "2020-01-01 00:00:00") + counts = parse_expire_result(result) + assert all(v == 0 for v in counts.values()), f"Expected all zeros for no-op, got {counts}" + assert_data_intact(instance, TABLE_NAME, 1) + + +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_expire_snapshots_format_v1_error(started_cluster_iceberg_with_spark, storage_type): + """expire_snapshots rejects format version 1 with BAD_ARGUMENTS.""" + instance = started_cluster_iceberg_with_spark.instances["node1"] + TABLE_NAME = make_table_name("test_expire_v1_error", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 1, + format_version=1, + ) + + error = instance.query_and_get_error( + f"ALTER TABLE {TABLE_NAME} EXECUTE expire_snapshots('2020-01-01 00:00:00');", + settings=ICEBERG_SETTINGS, + ) + assert "BAD_ARGUMENTS" in error, f"Expected BAD_ARGUMENTS error for v1, got: {error}" + assert "second version" in error, f"Expected v2-only message, got: {error}" + + +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_expire_snapshots_preserves_current(started_cluster_iceberg_with_spark, storage_type): + """Current snapshot is never expired, even with a far-future timestamp.""" + instance = started_cluster_iceberg_with_spark.instances["node1"] + TABLE_NAME = make_table_name("test_expire_preserves_current", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 2 + ) + result = expire_snapshots(instance, TABLE_NAME, FAR_FUTURE) + counts = parse_expire_result(result) + assert counts["deleted_data_files_count"] >= 0 + assert counts["deleted_manifest_lists_count"] >= 0 + assert_data_intact(instance, TABLE_NAME, 2) + + +@pytest.mark.parametrize("storage_type", ["local"]) +def test_expire_snapshots_files_cleaned(started_cluster_iceberg_with_spark, storage_type): + """Expired snapshot files are physically removed; retained snapshot files survive. + Local-only: needs direct filesystem access to verify file deletion. + """ + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = make_table_name("test_expire_files_cleaned", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 2 + ) + + time.sleep(2) + expire_timestamp = time.strftime("%Y-%m-%d %H:%M:%S") + + instance.query( + f"INSERT INTO {TABLE_NAME} VALUES (3);", settings=ICEBERG_SETTINGS + ) + + spark_alter_table( + started_cluster_iceberg_with_spark, spark, storage_type, TABLE_NAME, + f"SET TBLPROPERTIES('history.expire.max-snapshot-age-ms' = '1', " + f"'history.expire.min-snapshots-to-keep' = '1')", + ) + + meta_before = read_iceberg_metadata(instance, TABLE_NAME) + current_id = meta_before["current-snapshot-id"] + expired_manifest_lists = [] + retained_manifest_list = None + for snap in meta_before["snapshots"]: + ml = snap.get("manifest-list", "") + if snap["snapshot-id"] == current_id: + retained_manifest_list = ml + else: + expired_manifest_lists.append(ml) + + table_dir = f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}/" + files_before = default_download_directory( + started_cluster_iceberg_with_spark, storage_type, table_dir, table_dir, + ) + + result = expire_snapshots(instance, TABLE_NAME, expire_timestamp) + counts = parse_expire_result(result) + + files_after = default_download_directory( + started_cluster_iceberg_with_spark, storage_type, table_dir, table_dir, + ) + files_after_set = set(files_after) + + assert len(files_after) < len(files_before), \ + f"Expected fewer files after expire: {len(files_after)} >= {len(files_before)}" + + for ml in expired_manifest_lists: + assert not any(ml in f for f in files_after_set), \ + f"Expired manifest-list should be deleted: {ml}" + + if retained_manifest_list: + assert any(retained_manifest_list in f for f in files_after_set), \ + f"Current snapshot's manifest-list should survive: {retained_manifest_list}" + + assert counts["deleted_manifest_lists_count"] > 0, \ + f"Expected deleted manifest lists, got {counts}" + assert sum(counts.values()) > 0, f"Expected some files to be deleted, got {counts}" + + assert_data_intact(instance, TABLE_NAME, 3) + + +# --------------------------------------------------------------------------- +# Retention policy tests (table-level properties via Spark SQL) +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize("storage_type", ["s3", "local"]) +def test_expire_snapshots_retention_min_keep(started_cluster_iceberg_with_spark, storage_type): + """min-snapshots-to-keep prevents expiring the N most recent ancestors.""" + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = make_table_name("test_expire_min_keep", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 5 + ) + assert get_history_count(instance, TABLE_NAME) == 5 + + spark_alter_table( + started_cluster_iceberg_with_spark, spark, storage_type, TABLE_NAME, + "SET TBLPROPERTIES('history.expire.min-snapshots-to-keep' = '3', " + "'history.expire.max-snapshot-age-ms' = '1')", + ) + expire_snapshots(instance, TABLE_NAME, FAR_FUTURE) + + assert get_history_count(instance, TABLE_NAME) == 3 + assert_data_intact(instance, TABLE_NAME, 5) + + +@pytest.mark.parametrize("storage_type", ["s3", "local"]) +def test_expire_snapshots_retention_max_age(started_cluster_iceberg_with_spark, storage_type): + """max-snapshot-age-ms preserves recent snapshots regardless of timestamp cutoff.""" + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = make_table_name("test_expire_max_age", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 5 + ) + + spark_alter_table( + started_cluster_iceberg_with_spark, spark, storage_type, TABLE_NAME, + f"SET TBLPROPERTIES('history.expire.min-snapshots-to-keep' = '1', " + f"'history.expire.max-snapshot-age-ms' = '{3600 * 1000}')", + ) + expire_snapshots(instance, TABLE_NAME, FAR_FUTURE) + + assert get_history_count(instance, TABLE_NAME) == 5, \ + "All 5 snapshots are within 1 hour, all should be retained" + assert_data_intact(instance, TABLE_NAME, 5) + + +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_expire_snapshots_no_args_default_retention(started_cluster_iceberg_with_spark, storage_type): + """No-arg expire uses default 5-day max-age; all recent snapshots survive.""" + instance = started_cluster_iceberg_with_spark.instances["node1"] + TABLE_NAME = make_table_name("test_expire_no_args_default", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 3 + ) + assert get_history_count(instance, TABLE_NAME) == 3 + + expire_snapshots(instance, TABLE_NAME) + + assert get_history_count(instance, TABLE_NAME) == 3, \ + "All snapshots < 5 days old, none should be expired" + assert_data_intact(instance, TABLE_NAME, 3) + + +@pytest.mark.parametrize("storage_type", ["s3", "local"]) +def test_expire_snapshots_no_args_with_short_max_age(started_cluster_iceberg_with_spark, storage_type): + """No-arg expire with very short max-age expires old snapshots.""" + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = make_table_name("test_expire_no_args_short_age", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 5 + ) + time.sleep(2) + + spark_alter_table( + started_cluster_iceberg_with_spark, spark, storage_type, TABLE_NAME, + "SET TBLPROPERTIES('history.expire.max-snapshot-age-ms' = '1', " + "'history.expire.min-snapshots-to-keep' = '1')", + ) + expire_snapshots(instance, TABLE_NAME) + + assert get_history_count(instance, TABLE_NAME) == 1, \ + "With 1ms max-age and min-keep=1, only current snapshot should remain" + assert_data_intact(instance, TABLE_NAME, 5) + + +@pytest.mark.parametrize("storage_type", ["local"]) +def test_expire_snapshots_boundary_max_age(started_cluster_iceberg_with_spark, storage_type): + """Two-phase boundary: generous max-age retains all, then 1ms max-age expires old. + Local-only: needs read_iceberg_metadata for snapshot timestamps. + """ + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = make_table_name("test_expire_boundary_age", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 3 + ) + + snap_ids = get_snapshot_ids(instance, TABLE_NAME) + oldest_id = snap_ids[0] + + spark_alter_table( + started_cluster_iceberg_with_spark, spark, storage_type, TABLE_NAME, + f"SET TBLPROPERTIES('history.expire.max-snapshot-age-ms' = '{3600 * 1000}', " + f"'history.expire.min-snapshots-to-keep' = '1')", + ) + + expire_snapshots(instance, TABLE_NAME, FAR_FUTURE) + + retained = get_retained_ids(instance, TABLE_NAME) + assert oldest_id in retained, \ + f"Snapshot within max-age should be retained, but {oldest_id} expired" + assert len(retained) == 3, f"All 3 within max-age, got {len(retained)}" + + spark_alter_table( + started_cluster_iceberg_with_spark, spark, storage_type, TABLE_NAME, + "SET TBLPROPERTIES('history.expire.max-snapshot-age-ms' = '1', " + "'history.expire.min-snapshots-to-keep' = '1')", + ) + expire_snapshots(instance, TABLE_NAME, FAR_FUTURE) + + retained = get_retained_ids(instance, TABLE_NAME) + assert oldest_id not in retained, "Snapshot outside 1ms max-age should be expired" + assert len(retained) == 1, f"Only current should remain, got {len(retained)}" + assert_data_intact(instance, TABLE_NAME, 3) + + +# --------------------------------------------------------------------------- +# Ref behavior tests (branches, tags, dangling refs) +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize("storage_type", ["local"]) +def test_expire_snapshots_tag_retained(started_cluster_iceberg_with_spark, storage_type): + """A tag's snapshot is retained even when age/min-keep would otherwise expire it. + Local-only: needs get_snapshot_ids / get_retained_ids which read container metadata. + """ + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = make_table_name("test_expire_tag_retained", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 5 + ) + + snap_ids = get_snapshot_ids(instance, TABLE_NAME) + current_id = snap_ids[-1] + tag_id = snap_ids[1] + + spark_alter_table( + started_cluster_iceberg_with_spark, spark, storage_type, TABLE_NAME, + "SET TBLPROPERTIES('history.expire.max-snapshot-age-ms' = '1', " + "'history.expire.min-snapshots-to-keep' = '1')", + f"CREATE TAG `release_v1` AS OF VERSION {tag_id}", + ) + + expire_snapshots(instance, TABLE_NAME, FAR_FUTURE) + + assert get_retained_ids(instance, TABLE_NAME) == {current_id, tag_id} + assert_data_intact(instance, TABLE_NAME, 5) + + +@pytest.mark.parametrize("storage_type", ["local"]) +def test_expire_snapshots_branch_min_keep_override(started_cluster_iceberg_with_spark, storage_type): + """Branch-level min-snapshots-to-keep overrides the table default. + Local-only: needs get_snapshot_ids / get_retained_ids which read container metadata. + """ + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = make_table_name("test_expire_branch_min_keep", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 5 + ) + + snap_ids = get_snapshot_ids(instance, TABLE_NAME) + current_id = snap_ids[-1] + branch_head_id = snap_ids[2] + + spark_alter_table( + started_cluster_iceberg_with_spark, spark, storage_type, TABLE_NAME, + "SET TBLPROPERTIES('history.expire.max-snapshot-age-ms' = '1', " + "'history.expire.min-snapshots-to-keep' = '1')", + f"CREATE BRANCH `feature` AS OF VERSION {branch_head_id} " + f"WITH SNAPSHOT RETENTION 2 SNAPSHOTS", + ) + + expire_snapshots(instance, TABLE_NAME, FAR_FUTURE) + + # main: min-keep=1 → current (snap5) + # feature: min-keep=2 → snap3, snap2 + expected = {current_id, branch_head_id, snap_ids[1]} + assert get_retained_ids(instance, TABLE_NAME) == expected, \ + f"Expected main(1) + feature(2)" + assert_data_intact(instance, TABLE_NAME, 5) + + +# --------------------------------------------------------------------------- +# Tests requiring manual metadata patching +# These cannot use Spark SQL because: +# - Dangling refs: Spark validates snapshot existence on CREATE BRANCH/TAG +# - 1ms max-ref-age-ms: Spark RETAIN clause minimum granularity is minutes +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize("storage_type", ["local"]) +def test_expire_snapshots_max_ref_age(started_cluster_iceberg_with_spark, storage_type): + """Non-main branch ref with age > max-ref-age-ms is expired and removed. + Manual patching: Spark RETAIN has minute granularity; we need 1ms.""" + instance = started_cluster_iceberg_with_spark.instances["node1"] + TABLE_NAME = make_table_name("test_expire_max_ref_age", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 3 + ) + + snap_ids = get_snapshot_ids(instance, TABLE_NAME) + current_id = snap_ids[-1] + oldest_id = snap_ids[0] + + def add_short_lived_branch(meta): + meta["refs"] = { + "main": {"snapshot-id": current_id, "type": "branch"}, + "feature": { + "snapshot-id": oldest_id, + "type": "branch", + "max-ref-age-ms": 1, + }, + } + meta.setdefault("properties", {}).update(AGGRESSIVE_RETENTION) + update_iceberg_metadata(instance, TABLE_NAME, add_short_lived_branch) + time.sleep(2) + + expire_snapshots(instance, TABLE_NAME, FAR_FUTURE) + + assert "feature" not in get_refs(instance, TABLE_NAME), \ + "Feature ref should be expired (age > 1ms)" + assert get_retained_ids(instance, TABLE_NAME) == {current_id} + assert_data_intact(instance, TABLE_NAME, 3) + + +@pytest.mark.parametrize("storage_type", ["local"]) +def test_expire_snapshots_dangling_ref_removed(started_cluster_iceberg_with_spark, storage_type): + """Refs pointing to non-existent snapshots are removed during expiration. + Manual patching: Spark validates snapshot existence on CREATE BRANCH/TAG.""" + instance = started_cluster_iceberg_with_spark.instances["node1"] + TABLE_NAME = make_table_name("test_expire_dangling_ref", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 3 + ) + + snap_ids = get_snapshot_ids(instance, TABLE_NAME) + current_id = snap_ids[-1] + + def add_dangling_ref(meta): + meta["refs"] = { + "main": {"snapshot-id": current_id, "type": "branch"}, + "orphan-branch": {"snapshot-id": 999999999, "type": "branch"}, + } + meta.setdefault("properties", {}).update(AGGRESSIVE_RETENTION) + update_iceberg_metadata(instance, TABLE_NAME, add_dangling_ref) + + expire_snapshots(instance, TABLE_NAME, FAR_FUTURE) + + refs = get_refs(instance, TABLE_NAME) + assert "orphan-branch" not in refs, "Dangling ref should be removed" + assert "main" in refs, "Main branch should always be preserved" + assert_data_intact(instance, TABLE_NAME, 3) + + +@pytest.mark.parametrize("storage_type", ["local"]) +def test_expire_snapshots_boundary_max_ref_age(started_cluster_iceberg_with_spark, storage_type): + """Two-phase boundary: generous max-ref-age retains ref, then 1ms expires it. + Manual patching: Spark RETAIN has minute granularity; we need 1ms.""" + instance = started_cluster_iceberg_with_spark.instances["node1"] + TABLE_NAME = make_table_name("test_expire_boundary_ref_age", storage_type) + + create_and_populate( + started_cluster_iceberg_with_spark, instance, storage_type, TABLE_NAME, 3 + ) + + meta = read_iceberg_metadata(instance, TABLE_NAME) + snap_ts = {s["snapshot-id"]: s["timestamp-ms"] for s in meta["snapshots"]} + snap_ids = get_snapshot_ids(instance, TABLE_NAME) + current_id = snap_ids[-1] + oldest_id = snap_ids[0] + + now_ms = int(time.time() * 1000) + generous_age = (now_ms - snap_ts[oldest_id]) + 10000 + + def add_retained_ref(m): + m["refs"] = { + "main": {"snapshot-id": current_id, "type": "branch"}, + "feature": { + "snapshot-id": oldest_id, + "type": "branch", + "max-ref-age-ms": generous_age, + }, + } + m.setdefault("properties", {}).update(AGGRESSIVE_RETENTION) + update_iceberg_metadata(instance, TABLE_NAME, add_retained_ref) + + expire_snapshots(instance, TABLE_NAME, FAR_FUTURE) + assert "feature" in get_refs(instance, TABLE_NAME), \ + "Ref within max-ref-age should NOT be expired" + + def expire_ref(m): + m["refs"]["feature"]["max-ref-age-ms"] = 1 + update_iceberg_metadata(instance, TABLE_NAME, expire_ref) + + expire_snapshots(instance, TABLE_NAME, FAR_FUTURE) + assert "feature" not in get_refs(instance, TABLE_NAME), \ + "Ref with 1ms max-ref-age should be expired" diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index 7c157dc83556..d7641833bf72 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -49,6 +49,7 @@ ALTER EXPORT PARTITION ['ALTER EXPORT PARTITION','EXPORT PARTITION'] TABLE ALTER ALTER FETCH PARTITION ['ALTER FETCH PART','FETCH PARTITION'] TABLE ALTER TABLE ALTER FREEZE PARTITION ['FREEZE PARTITION','UNFREEZE'] TABLE ALTER TABLE ALTER UNLOCK SNAPSHOT ['UNLOCK SNAPSHOT'] TABLE ALTER TABLE +ALTER EXECUTE ['ALTER TABLE EXECUTE'] TABLE ALTER TABLE ALTER DATABASE SETTINGS ['ALTER DATABASE SETTING','ALTER MODIFY DATABASE SETTING','MODIFY DATABASE SETTING'] DATABASE ALTER DATABASE ALTER NAMED COLLECTION [] NAMED_COLLECTION NAMED COLLECTION ADMIN ALTER TABLE [] \N ALTER diff --git a/tests/queries/0_stateless/03978_alter_table_execute_expire_snapshots_parse.reference b/tests/queries/0_stateless/03978_alter_table_execute_expire_snapshots_parse.reference new file mode 100644 index 000000000000..eda9a2ca200a --- /dev/null +++ b/tests/queries/0_stateless/03978_alter_table_execute_expire_snapshots_parse.reference @@ -0,0 +1,7 @@ +ALTER TABLE t (EXECUTE expire_snapshots( \'2024-06-01 00:00:00\')) +ALTER TABLE db.t (EXECUTE expire_snapshots( \'2024-06-01 00:00:00\')) +ALTER TABLE t (EXECUTE expire_snapshots( )) +ALTER TABLE t (EXECUTE compact( )) +ALTER TABLE t (EXECUTE optimize_manifests( )) +ALTER TABLE t (EXECUTE some_future_command( \'arg1\', 42)) +ALTER TABLE t (EXECUTE no_args_command( )) diff --git a/tests/queries/0_stateless/03978_alter_table_execute_expire_snapshots_parse.sql b/tests/queries/0_stateless/03978_alter_table_execute_expire_snapshots_parse.sql new file mode 100644 index 000000000000..dee43733c7fe --- /dev/null +++ b/tests/queries/0_stateless/03978_alter_table_execute_expire_snapshots_parse.sql @@ -0,0 +1,23 @@ +-- Parsing: expire_snapshots with timestamp +SELECT formatQuerySingleLine('ALTER TABLE t EXECUTE expire_snapshots(\'2024-06-01 00:00:00\')'); +SELECT formatQuerySingleLine('ALTER TABLE db.t EXECUTE expire_snapshots(\'2024-06-01 00:00:00\')'); + +-- Parsing: expire_snapshots without arguments (uses default retention) +SELECT formatQuerySingleLine('ALTER TABLE t EXECUTE expire_snapshots()'); + +-- Parsing: other command names should parse successfully (generic EXECUTE syntax) +SELECT formatQuerySingleLine('ALTER TABLE t EXECUTE compact()'); +SELECT formatQuerySingleLine('ALTER TABLE t EXECUTE optimize_manifests()'); +SELECT formatQuerySingleLine('ALTER TABLE t EXECUTE some_future_command(\'arg1\', 42)'); +SELECT formatQuerySingleLine('ALTER TABLE t EXECUTE no_args_command()'); + +-- Runtime: EXECUTE on MergeTree should fail with NOT_IMPLEMENTED +DROP TABLE IF EXISTS test_execute_03978; +CREATE TABLE test_execute_03978 (x UInt32) ENGINE = MergeTree ORDER BY x; +ALTER TABLE test_execute_03978 EXECUTE expire_snapshots('2024-06-01 00:00:00'); -- { serverError NOT_IMPLEMENTED } +ALTER TABLE test_execute_03978 EXECUTE expire_snapshots(); -- { serverError NOT_IMPLEMENTED } +ALTER TABLE test_execute_03978 EXECUTE compact(); -- { serverError NOT_IMPLEMENTED } +ALTER TABLE test_execute_03978 EXECUTE unknown_command(); -- { serverError NOT_IMPLEMENTED } +DROP TABLE test_execute_03978; + +-- Privilege hierarchy is verified by 01271_show_privileges (ALTER EXECUTE listed under ALTER TABLE)