Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/en/sql-reference/statements/grant.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ The hierarchy of privileges in ClickHouse is shown below:
- `ALTER MODIFY STATISTICS`
- `ALTER TTL`
- `ALTER UPDATE`
- `ALTER TABLE EXECUTE`
- `ALTER VIEW`
- `ALTER VIEW MODIFY QUERY`
- `ALTER VIEW REFRESH`
Expand Down Expand Up @@ -438,6 +439,7 @@ Allows executing [ALTER](../../sql-reference/statements/alter/index.md) queries
- `ALTER MOVE PARTITION`. Level: `TABLE`. Aliases: `ALTER MOVE PART`, `MOVE PARTITION`, `MOVE PART`
- `ALTER FETCH PARTITION`. Level: `TABLE`. Aliases: `ALTER FETCH PART`, `FETCH PARTITION`, `FETCH PART`
- `ALTER FREEZE PARTITION`. Level: `TABLE`. Aliases: `FREEZE PARTITION`
- `ALTER EXECUTE`. Level: `TABLE`. Aliases: `ALTER TABLE EXECUTE`
- `ALTER VIEW`. Level: `GROUP`
- `ALTER VIEW REFRESH`. Level: `VIEW`. Aliases: `REFRESH VIEW`
- `ALTER VIEW MODIFY QUERY`. Level: `VIEW`. Aliases: `ALTER TABLE MODIFY QUERY`
Expand Down
95 changes: 95 additions & 0 deletions docs/en/sql-reference/table-functions/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,101 @@ x: Ivanov
y: 993
```

### Expire Snapshots {#iceberg-expire-snapshots}

Iceberg tables accumulate snapshots with each INSERT, DELETE, or UPDATE operation. Over time, this can lead to a large number of snapshots and associated data files. The `expire_snapshots` command removes old snapshots and cleans up data files that are no longer referenced by any retained snapshot.

**Syntax:**

```sql
ALTER TABLE iceberg_table EXECUTE expire_snapshots(['timestamp']);
```

Which snapshots to keep is determined by the [retention policy](#iceberg-snapshot-retention-policy) (table properties `min-snapshots-to-keep`, `max-snapshot-age-ms`, and per-ref overrides). The retention policy is always evaluated regardless of whether a timestamp argument is provided.

The optional `timestamp` argument is a datetime string (e.g., `'2024-06-01 00:00:00'`) interpreted in the **server's timezone**. It acts as a safety fuse: snapshots whose `timestamp-ms` is at or after this value are protected from expiration, even if the retention policy would otherwise expire them. This lets you guarantee that no snapshot newer than the given point in time is removed.

When no timestamp is provided, only the retention policy governs which snapshots are expired.

**Example:**

```sql
SET allow_insert_into_iceberg = 1;

-- Create some snapshots by inserting data
INSERT INTO iceberg_table VALUES (1);
INSERT INTO iceberg_table VALUES (2);
INSERT INTO iceberg_table VALUES (3);

-- Expire using retention policy; additionally protect snapshots newer than the timestamp
ALTER TABLE iceberg_table EXECUTE expire_snapshots('2025-01-01 00:00:00');

-- Expire using retention policy only (no additional fuse)
ALTER TABLE iceberg_table EXECUTE expire_snapshots();
```

**Output:**

The command returns a table with two columns (`metric_name String`, `metric_value Int64`) containing one row per metric. The metric names follow the [Iceberg spec](https://iceberg.apache.org/docs/latest/spark-procedures/#output):

| metric_name | Description |
|---|---|
| `deleted_data_files_count` | Number of data files deleted |
| `deleted_position_delete_files_count` | Number of position delete files deleted |
| `deleted_equality_delete_files_count` | Number of equality delete files deleted |
| `deleted_manifest_files_count` | Number of manifest files deleted |
| `deleted_manifest_lists_count` | Number of manifest list files deleted |
| `deleted_statistics_files_count` | Number of statistics files deleted (always 0 currently) |

The command performs the following steps:

1. Evaluates the retention policy (see below) to determine which snapshots must be preserved
2. If a timestamp argument was provided, additionally protects all snapshots at or newer than that timestamp
3. Expires snapshots that are neither retained by the policy nor protected by the timestamp fuse
4. Computes which files are exclusively associated with expired snapshots
5. Generates new metadata without the expired snapshots
6. Physically deletes unreachable manifest lists, manifest files, and data files

#### Snapshot Retention Policy {#iceberg-snapshot-retention-policy}

The `expire_snapshots` command respects the [Iceberg snapshot retention policy](https://iceberg.apache.org/spec/#snapshot-retention-policy). Retention is configured via Iceberg table properties and per-reference overrides:

| Property | Scope | Default | Description |
|---|---|---|---|
| `history.expire.min-snapshots-to-keep` | Table | 1 | Minimum number of snapshots to keep in each branch's ancestor chain |
| `history.expire.max-snapshot-age-ms` | Table | 432000000 (5 days) | Maximum age (in ms) of snapshots to retain in a branch |
| `history.expire.max-ref-age-ms` | Table | ∞ (never) | Maximum age (in ms) for a snapshot reference (branch or tag) before the reference itself is removed |

Each snapshot reference (`refs` in the Iceberg metadata) can override these with per-ref fields: `min-snapshots-to-keep`, `max-snapshot-age-ms`, and `max-ref-age-ms`.

**Retention evaluation:**

- **For each branch** (including `main`): the ancestor chain is walked starting from the branch head. Snapshots are retained while either of these conditions is true:
- The snapshot is one of the first `min-snapshots-to-keep` in the chain
- The snapshot's age is within `max-snapshot-age-ms` (i.e., `now - timestamp-ms <= max-snapshot-age-ms`)
- **For tags**: the tagged snapshot is retained unless the tag has exceeded its `max-ref-age-ms`, in which case the tag reference is removed
- **Non-main references** whose age exceeds `max-ref-age-ms` are removed entirely (the `main` branch is never removed)
- **Dangling references** that point to non-existent snapshots are removed with a warning
- **The current snapshot is always preserved**, regardless of retention settings

**Required privileges:**

The `ALTER TABLE EXECUTE` privilege is required, which is a child of `ALTER TABLE` in the ClickHouse access control hierarchy. You can grant it specifically or via the parent:

```sql
-- Grant only EXECUTE permission
GRANT ALTER TABLE EXECUTE ON my_iceberg_table TO my_user;

-- Or grant all ALTER TABLE permissions (includes ALTER TABLE EXECUTE)
GRANT ALTER TABLE ON my_iceberg_table TO my_user;
```

:::note
- Only Iceberg format version 2 tables are supported (v1 snapshots do not guarantee `manifest-list`, which is required to safely identify files for cleanup)
- The current snapshot is always preserved, even if it is older than the specified timestamp
- Requires the `allow_insert_into_iceberg` setting to be enabled
- The catalog's own authorization (REST catalog auth, AWS Glue IAM, etc.) is enforced independently when ClickHouse updates the metadata
:::

## Altinity Antalya branch

Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ enum class AccessType : uint8_t
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
M(ALTER_UNLOCK_SNAPSHOT, "UNLOCK SNAPSHOT", TABLE, ALTER_TABLE) \
M(ALTER_EXECUTE, "ALTER TABLE EXECUTE", TABLE, ALTER_TABLE) \
\
M(ALTER_DATABASE_SETTINGS, "ALTER DATABASE SETTING, ALTER MODIFY DATABASE SETTING, MODIFY DATABASE SETTING", DATABASE, ALTER_DATABASE) /* allows to execute ALTER MODIFY SETTING */\
M(ALTER_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) /* allows to execute ALTER NAMED COLLECTION */\
Expand Down
20 changes: 19 additions & 1 deletion src/Interpreters/InterpreterAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,16 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
AlterCommands alter_commands;
PartitionCommands partition_commands;
MutationCommands mutation_commands;
std::vector<const ASTAlterCommand *> execute_commands;

for (const auto & child : alter.command_list->children)
{
auto * command_ast = child->as<ASTAlterCommand>();
if (auto alter_command = AlterCommand::parse(command_ast))
if (command_ast->type == ASTAlterCommand::EXECUTE_COMMAND)
{
execute_commands.push_back(command_ast);
}
else if (auto alter_command = AlterCommand::parse(command_ast))
{
alter_commands.emplace_back(std::move(*alter_command));
}
Expand Down Expand Up @@ -303,6 +308,14 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
res.pipeline = QueryPipeline(std::move(partition_commands_pipe));
}

for (const auto * execute_command : execute_commands)
{
ASTPtr args_ast = execute_command->execute_args ? execute_command->execute_args->ptr() : nullptr;
auto execute_pipe = table->executeCommand(execute_command->execute_command_name, args_ast, getContext());
if (!execute_pipe.empty())
res.pipeline = QueryPipeline(std::move(execute_pipe));
}

return res;
}

Expand Down Expand Up @@ -642,6 +655,11 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::ALTER_UPDATE, database, table);
break;
}
case ASTAlterCommand::EXECUTE_COMMAND:
{
required_access.emplace_back(AccessType::ALTER_EXECUTE, database, table);
break;
}
}

return required_access;
Expand Down
10 changes: 10 additions & 0 deletions src/Parsers/ASTAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ ASTPtr ASTAlterCommand::clone() const
res->to_table_function = res->children.emplace_back(to_table_function->clone()).get();
if (partition_by_expr)
res->partition_by_expr = res->children.emplace_back(partition_by_expr->clone()).get();
if (execute_args)
res->execute_args = res->children.emplace_back(execute_args->clone()).get();

return res;
}
Expand Down Expand Up @@ -592,6 +594,13 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett
partition->format(ostr, settings, state, frame);
}
}
else if (type == ASTAlterCommand::EXECUTE_COMMAND)
{
ostr << "EXECUTE " << execute_command_name << "(";
if (execute_args)
execute_args->format(ostr, settings, state, frame);
ostr << ")";
}
else
throw Exception(ErrorCodes::UNEXPECTED_AST_STRUCTURE, "Unexpected type of ALTER");
}
Expand Down Expand Up @@ -621,6 +630,7 @@ void ASTAlterCommand::forEachPointerToChild(std::function<void(IAST **, boost::i
f(&rename_to, nullptr);
f(&to_table_function, nullptr);
f(&partition_by_expr, nullptr);
f(&execute_args, nullptr);
}


Expand Down
6 changes: 6 additions & 0 deletions src/Parsers/ASTAlterQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class ASTAlterCommand : public IAST
MODIFY_SQL_SECURITY,

UNLOCK_SNAPSHOT,

EXECUTE_COMMAND,
};

Type type = NO_TYPE;
Expand Down Expand Up @@ -228,6 +230,10 @@ class ASTAlterCommand : public IAST
String snapshot_name;
IAST * snapshot_desc;

/// For EXECUTE command (e.g. expire_snapshots)
String execute_command_name;
IAST * execute_args = nullptr;

/// Which property user want to remove
String remove_property;

Expand Down
28 changes: 28 additions & 0 deletions src/Parsers/ParserAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_remove_sample_by(Keyword::REMOVE_SAMPLE_BY);
ParserKeyword s_apply_deleted_mask(Keyword::APPLY_DELETED_MASK);
ParserKeyword s_apply_patches(Keyword::APPLY_PATCHES);
ParserKeyword s_execute(Keyword::EXECUTE);
ParserKeyword s_all(Keyword::ALL);

ParserToken parser_opening_round_bracket(TokenType::OpeningRoundBracket);
Expand Down Expand Up @@ -1085,6 +1086,33 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
return false;
}
}
else if (s_execute.ignore(pos, expected))
{
command->type = ASTAlterCommand::EXECUTE_COMMAND;

ParserIdentifier command_name_parser;
ASTPtr command_name_ast;
if (!command_name_parser.parse(pos, command_name_ast, expected))
return false;
command->execute_command_name = getIdentifierName(command_name_ast);

if (!parser_opening_round_bracket.ignore(pos, expected))
return false;

ASTPtr execute_args_list;
ParserList args_parser(
std::make_unique<ParserExpressionWithOptionalAlias>(false),
std::make_unique<ParserToken>(TokenType::Comma),
/* allow_empty = */ true);
if (!args_parser.parse(pos, execute_args_list, expected))
return false;

if (!parser_closing_round_bracket.ignore(pos, expected))
return false;

if (execute_args_list)
command->execute_args = command->children.emplace_back(std::move(execute_args_list)).get();
}
else
return false;
break;
Expand Down
5 changes: 5 additions & 0 deletions src/Storages/IStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,11 @@ void IStorage::mutate(const MutationCommands &, ContextPtr)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Mutations are not supported by storage {}", getName());
}

Pipe IStorage::executeCommand(const String & command_name, const ASTPtr & /*args*/, ContextPtr /*context*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "EXECUTE command '{}' is not supported by storage {}", command_name, getName());
}

CancellationCode IStorage::killMutation(const String & /*mutation_id*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Mutations are not supported by storage {}", getName());
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,8 @@ It is currently only implemented in StorageObjectStorage.
/// Mutate the table contents
virtual void mutate(const MutationCommands &, ContextPtr);

virtual Pipe executeCommand(const String & command_name, const ASTPtr & args, ContextPtr context);

/// Cancel a mutation.
virtual CancellationCode killMutation(const String & /*mutation_id*/);

Expand Down
14 changes: 14 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <fmt/format.h>

#include <Core/NamesAndTypes.h>
#include <Core/Types.h>
Expand Down Expand Up @@ -196,6 +197,19 @@ class IDataLakeMetadata : boost::noncopyable
virtual void addDeleteTransformers(ObjectInfoPtr, QueryPipelineBuilder &, const std::optional<FormatSettings> &, FormatParserSharedResourcesPtr, ContextPtr) const { }
virtual void checkAlterIsPossible(const AlterCommands & /*commands*/) { throwNotImplemented("alter"); }
virtual void alter(const AlterCommands & /*params*/, ContextPtr /*context*/) { throwNotImplemented("alter"); }

virtual Pipe executeCommand(
const String & command_name,
const ASTPtr & /*args*/,
ObjectStoragePtr /*object_storage*/,
StorageObjectStorageConfigurationPtr /*configuration*/,
std::shared_ptr<DataLake::ICatalog> /*catalog*/,
ContextPtr /*context*/,
const StorageID & /*storage_id*/)
{
throwNotImplemented(fmt::format("EXECUTE {}", command_name));
}

virtual void drop(ContextPtr) { }

virtual std::optional<String> partitionKey(ContextPtr) const { return {}; }
Expand Down
18 changes: 18 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <base/types.h>
#include <limits>
namespace DB::Iceberg
{
/// This file define the field name appearing in Iceberg files.
Expand Down Expand Up @@ -62,6 +63,7 @@ DEFINE_ICEBERG_FIELD(file_format);
DEFINE_ICEBERG_FIELD(file_size_in_bytes);
DEFINE_ICEBERG_FIELD(refs);
DEFINE_ICEBERG_FIELD(branch);
DEFINE_ICEBERG_FIELD(tag);
DEFINE_ICEBERG_FIELD(main);
DEFINE_ICEBERG_FIELD(operation);
DEFINE_ICEBERG_FIELD(append);
Expand Down Expand Up @@ -126,6 +128,13 @@ DEFINE_ICEBERG_FIELD_ALIAS(last_sequence_number, last-sequence-number);
DEFINE_ICEBERG_FIELD_ALIAS(metadata_file, metadata-file);
DEFINE_ICEBERG_FIELD_ALIAS(metadata_log, metadata-log);
DEFINE_ICEBERG_FIELD_ALIAS(metadata_sequence_number, sequence-number);
DEFINE_ICEBERG_FIELD_ALIAS(min_snapshots_to_keep, history.expire.min-snapshots-to-keep);
DEFINE_ICEBERG_FIELD_ALIAS(max_snapshot_age_ms, history.expire.max-snapshot-age-ms);
DEFINE_ICEBERG_FIELD_ALIAS(max_ref_age_ms, history.expire.max-ref-age-ms);
/// Ref-level override fields (short names inside the "refs" map per Iceberg spec).
DEFINE_ICEBERG_FIELD_ALIAS(ref_min_snapshots_to_keep, min-snapshots-to-keep);
DEFINE_ICEBERG_FIELD_ALIAS(ref_max_snapshot_age_ms, max-snapshot-age-ms);
DEFINE_ICEBERG_FIELD_ALIAS(ref_max_ref_age_ms, max-ref-age-ms);
/// These are compound fields like `data_file.file_path`, we use prefix 'c_' to distinguish them.
DEFINE_ICEBERG_FIELD_COMPOUND(data_file, file_path);
DEFINE_ICEBERG_FIELD_COMPOUND(data_file, file_format);
Expand All @@ -139,4 +148,13 @@ DEFINE_ICEBERG_FIELD_COMPOUND(data_file, lower_bounds);
DEFINE_ICEBERG_FIELD_COMPOUND(data_file, upper_bounds);
DEFINE_ICEBERG_FIELD_COMPOUND(data_file, referenced_data_file);
DEFINE_ICEBERG_FIELD_COMPOUND(data_file, sort_order_id);

/// Fallback defaults for snapshot retention policy when table properties are absent.
/// These values follow the Java reference implementation; the Iceberg spec does not
/// mandate specific defaults.
/// TODO: consider exposing these as ClickHouse server-level settings so users can
/// change the global defaults without patching every table's properties.
constexpr Int32 default_min_snapshots_to_keep = 1;
constexpr Int64 default_max_snapshot_age_ms = 432000000; // 5 days
constexpr Int64 default_max_ref_age_ms = std::numeric_limits<Int64>::max(); // forever, main branch never expires
}
Loading
Loading