From f0ce15e6b1e246d6da49aabc2f62a03494f0fa95 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Fri, 12 Jun 2026 01:15:33 +0200 Subject: [PATCH 1/5] squash: native create, drop table --- docs/en/engines/database-engines/datalake.md | 71 +++- .../BuzzHouse/Generator/SessionSettings.cpp | 2 +- src/Core/Settings.cpp | 6 +- src/Core/SettingsChangesHistory.cpp | 25 ++ src/Databases/DataLake/Common.cpp | 80 ++++ src/Databases/DataLake/Common.h | 6 + src/Databases/DataLake/DatabaseDataLake.cpp | 161 +++++++- src/Databases/DataLake/DatabaseDataLake.h | 7 +- .../DataLake/DatabaseDataLakeSettings.cpp | 1 + src/Databases/DataLake/GlueCatalog.cpp | 42 +- src/Databases/DataLake/GlueCatalog.h | 2 +- src/Databases/DataLake/ICatalog.cpp | 21 +- src/Databases/DataLake/ICatalog.h | 7 +- src/Databases/DataLake/RestCatalog.cpp | 63 ++- src/Databases/DataLake/RestCatalog.h | 4 +- src/Databases/DataLake/S3TablesCatalog.cpp | 4 +- src/Databases/DataLake/S3TablesCatalog.h | 2 +- .../tests/gtest_construct_table_location.cpp | 120 ++++++ src/Databases/IDatabase.h | 1 + src/Interpreters/InterpreterAlterQuery.cpp | 3 + src/Interpreters/InterpreterCreateQuery.cpp | 100 ++++- src/Interpreters/InterpreterCreateQuery.h | 2 +- src/Interpreters/InterpreterDropQuery.cpp | 3 + src/Interpreters/executeDDLQueryOnCluster.cpp | 9 + src/Interpreters/executeDDLQueryOnCluster.h | 5 + .../DataLakes/Iceberg/Compaction.cpp | 2 +- .../DataLakes/Iceberg/IcebergMetadata.cpp | 14 +- .../DataLakes/Iceberg/IcebergWrites.cpp | 6 +- .../DataLakes/Iceberg/IcebergWrites.h | 1 + .../DataLakes/Iceberg/MetadataGenerator.cpp | 8 +- .../DataLakes/Iceberg/MetadataGenerator.h | 2 +- .../DataLakes/Iceberg/Mutations.cpp | 17 +- .../ObjectStorage/DataLakes/Iceberg/Utils.cpp | 12 +- .../ObjectStorage/StorageObjectStorage.cpp | 2 +- .../integration/test_database_iceberg/test.py | 363 ++++++++++++++++++ 35 files changed, 1100 insertions(+), 74 deletions(-) create mode 100644 src/Databases/DataLake/tests/gtest_construct_table_location.cpp diff --git a/docs/en/engines/database-engines/datalake.md b/docs/en/engines/database-engines/datalake.md index 9c20511172b3..19e5a6fa0142 100644 --- a/docs/en/engines/database-engines/datalake.md +++ b/docs/en/engines/database-engines/datalake.md @@ -54,6 +54,7 @@ The following settings are supported: | `auth_header` | Custom HTTP header for authentication with the catalog service | | `auth_scope` | OAuth2 scope for authentication (if using OAuth) | | `storage_endpoint` | Endpoint URL for the underlying storage | +| `default_base_location` | Base URI for new tables when the catalog does not report `default-base-location`. New tables are placed under `//` (e.g. `s3://warehouse/data`) | | `oauth_server_uri` | URI of the OAuth2 authorization server for authentication | | `vended_credentials` | Boolean indicating whether to use vended credentials from the catalog (supports AWS S3 and Azure ADLS Gen2) | | `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) | @@ -63,6 +64,72 @@ The following settings are supported: | `dlf_access_key_secret` | Access key Secret for DLF access | | `namespaces` | Comma-separated list of namespaces, implemented for catalog types: `rest`, `glue` and `unity` | +## Creating tables {#creating-tables} + +An Iceberg table in a `DataLakeCatalog` database can be created directly from ClickHouse. +The table name must be quoted with backticks and include the namespace separated by a dot: + +```sql +CREATE TABLE catalog_db.`namespace.table_name` +( + id Int64, + name String, + value Float64 +) +PARTITION BY id +ORDER BY name +SETTINGS allow_database_iceberg = 1; +``` + +Iceberg accepts only a fixed set of partition transforms, so `PARTITION BY` +must use one of the following expressions: + +| Expression | Iceberg transform | +|-------------------------------|-------------------| +| `` | `identity` | +| `toYearNumSinceEpoch()` | `year` | +| `toMonthNumSinceEpoch()` | `month` | +| `toRelativeDayNum()` | `day` | +| `toRelativeHourNum()` | `hour` | +| `icebergTruncate(N, )` | `truncate[N]` | +| `icebergBucket(N, )` | `bucket[N]` | + +Composite partitioning is supported via `PARTITION BY (expr1, expr2, ...)`. +Other expressions (e.g. `toYYYYMM`, `intDiv`) are rejected at `CREATE TABLE`. + +You can also create an Iceberg table that inherits the schema of an existing table: + +```sql +CREATE TABLE catalog_db.`namespace.table_name` +AS other_db.source_table +SETTINGS allow_database_iceberg = 1; +``` + +If the source table's `PARTITION BY` and `ORDER BY` use only the expressions +listed above, they are copied into the new Iceberg table. + +## Dropping tables {#dropping-tables} + +Tables can be dropped from a `DataLakeCatalog` database. +`DROP TABLE` sends a delete request to the remote catalog, which removes +the table entry from the catalog. + +```sql +DROP TABLE catalog_db.`namespace.table_name` +``` + +By default, ClickHouse does not request the catalog to delete the underlying data. In order to do it, use the `data_lake_delete_data_on_drop` setting: + +```sql +DROP TABLE catalog_db.`namespace.table_name` +SETTINGS data_lake_delete_data_on_drop = 1 +``` + +:::note +Whether data files are actually deleted depends on the catalog itself. +The `purgeRequested` flag is sent to the catalog, but the catalog may choose to ignore it. +::: + ## Examples {#examples} See below sections for examples of using the `DataLakeCatalog` engine: @@ -79,8 +146,8 @@ SETTINGS warehouse = warehouse, onelake_tenant_id = tenant_id, oauth_server_uri = server_uri, - auth_scope = auth_scope, - onelake_client_id = client_id, + auth_scope = auth_scope, + onelake_client_id = client_id, onelake_client_secret = client_secret; SHOW TABLES IN databse_name; SELECT count() from database_name.table_name; diff --git a/src/Client/BuzzHouse/Generator/SessionSettings.cpp b/src/Client/BuzzHouse/Generator/SessionSettings.cpp index 107566db278e..a7dd31588a41 100644 --- a/src/Client/BuzzHouse/Generator/SessionSettings.cpp +++ b/src/Client/BuzzHouse/Generator/SessionSettings.cpp @@ -654,7 +654,7 @@ std::unordered_map serverSettings = { {"http_skip_not_found_url_for_globs", trueOrFalseSettingNoOracle}, {"http_wait_end_of_query", trueOrFalseSettingNoOracle}, {"http_write_exception_in_output_format", trueOrFalseSettingNoOracle}, - {"iceberg_delete_data_on_drop", trueOrFalseSettingNoOracle}, + {"data_lake_delete_data_on_drop", trueOrFalseSettingNoOracle}, {"iceberg_metadata_compression_method", CHSetting([](RandomGenerator & rg, FuzzConfig &) { return "'" + rg.pickRandomly(compressionMethods) + "'"; }, {}, false)}, {"iceberg_metadata_log_level", diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index ba022db0f83a..e6965652d4a0 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5278,9 +5278,9 @@ Possible values: - manifest_file_entry - Everything above + traversed avro manifest files entries. )", 0) \ \ - DECLARE(Bool, iceberg_delete_data_on_drop, false, R"( -Whether to delete all iceberg files on drop or not. -)", 0) \ + DECLARE_WITH_ALIAS(Bool, data_lake_delete_data_on_drop, false, R"( +Whether to delete the underlying data files when dropping a data lake table. For catalog databases the catalog is asked to purge the data (`purgeRequested=true`); for self-managed tables ClickHouse removes the files directly. +)", 0, iceberg_delete_data_on_drop) \ DECLARE(Int64, iceberg_expire_default_min_snapshots_to_keep, 1, R"( Default value for Iceberg table property `history.expire.min-snapshots-to-keep` used by `expire_snapshots` when that property is absent. )", 0) \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 50cc8c572561..de701ed4d274 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -43,6 +43,31 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() { {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"export_merge_tree_partition_task_timeout_seconds", "3600", "86400", "Increase default value to make it more realistic"}, + {"enable_join_runtime_filter_shared_fixed_hash_table", false, true, "New setting to share the hash join's FixedHashMap as the runtime filter for the probe side, replacing the Set/BloomFilter built upstream by the runtime filter framework."}, + {"ai_function_embedding_max_batch_size", 100, 100, "New setting"}, + {"data_lake_delete_data_on_drop", false, false, "New setting that unifies dropping of data lake data; the released `iceberg_delete_data_on_drop` is kept as an alias for it."}, + {"enable_sharding_aggregator", false, false, "New setting to enable sharded `GROUP BY` optimization that distributes rows across threads by hashing the grouping key, so each thread aggregates a disjoint subset of keys without a merge phase; this is efficient for high cardinality keys with evenly distributed data."}, + {"allow_experimental_text_index_lazy_apply", false, false, "New setting to gate experimental lazy posting list apply mode"}, + {"text_index_posting_list_apply_mode", "materialize", "materialize", "New setting for lazy posting list apply mode"}, + {"text_index_density_threshold", 0.2, 0.2, "New setting for lazy posting list density threshold"}, + {"show_remote_databases_in_system_tables", false, false, "Renamed from `show_data_lake_catalogs_in_system_tables` and broadened to also hide `MySQL` and `PostgreSQL` databases from `system.tables`, `system.columns` and `system.completions` by default, since enumerating their tables requires expensive remote calls. Users who relied on the previous behavior must set this setting to `true`. The old name is kept as an alias."}, + {"enable_streaming_queries", false, false, "New setting"}, + {"optimize_prewhere_after_pushdown", false, false, "New setting that enables a second PREWHERE promotion pass to merge filters deposited above a MergeTree read step by later optimizations (predicate pushdown through JOIN, projection rewrites) into the existing PREWHERE chain."}, + {"wait_for_part_commit_in_dependent_materialized_views", false, false, "New setting"}, + {"output_format_float_precision", 0, 0, "A new setting to control decimal digits in float output"}, + {"file_like_engine_default_partition_strategy", "wildcard", "hive", "Change the default partition strategy for file-like table engines (S3, AzureBlobStorage, etc.) from `wildcard` to `hive` when no `partition_strategy` is provided."}, + {"allow_limit_by_partitions_independently", false, true, "New setting to enable independent per-partition evaluation of `LIMIT BY` when the partition expression is a deterministic function of the `LIMIT BY` columns."}, + {"optimize_rewrite_has_to_in", false, true, "New setting"}, + {"unique_key_max_encoded_size", 256, 256, "New setting: maximum size (bytes) of the order-preserving binary encoding of a single UNIQUE KEY row"}, + {"query_plan_push_limit_by_into_sort", false, true, "New setting that pushes a per-stream LIMIT BY into the sort pipeline when LIMIT BY's columns are a prefix of ORDER BY, reducing rows flowing through the final merge."}, + {"optimize_limit_by_in_order", false, true, "New setting to optimize `LIMIT BY` queries when `BY` columns are a prefix of the table's sorting key."}, + {"analyzer_compatibility_prefer_alias_over_subcolumn", false, false, "New compatibility setting"}, + {"query_plan_max_set_size_for_projection_match", 0, 10000, "Added new setting that bounds the cost of content-hashing IN-clause sets in the projection matcher (today: aggregate projection). Sets larger than the limit are treated as non-matching. Zero disables content-hash comparison entirely (compatibility value: projection match never succeeds for nodes with IN-sets)."}, + {"use_reader_executor", false, false, "New experimental setting to route reads through the new pipeline ReaderExecutor instead of the legacy matryoshka of read buffers."}, + {"function_base58_max_input_size", 0, 10000, "New setting that limits the input size of `base58Encode`, `base58Decode` and `tryBase58Decode` (whose conversion is quadratic in the input length) to 10 KB by default. The compatibility value `0` disables the limit, restoring the previous behavior of accepting arbitrarily large inputs."}, + {"format_avro_schema_registry_max_retries", 0, 5, "New setting controlling the maximum number of retries for transient failures (transport timeouts, connection refused, DNS errors, HTTP 5xx/408/429) when communicating with the Confluent Schema Registry. Set to 0 to disable retries. Previous behavior (no retries) is preserved by `compatibility = '26.5'`."}, + {"format_avro_schema_registry_retry_initial_backoff_ms", 100, 100, "New setting controlling the initial backoff (in milliseconds) before retrying a failed Confluent Schema Registry request. The backoff doubles on each retry, capped at 10 seconds. Has no effect when `format_avro_schema_registry_max_retries = 0` (the pre-26.6 behavior restored by `compatibility = '26.5'`)."}, + {"enable_join_transitive_predicates", false, true, "Turn on enable_join_transitive_predicates by default"}, }); addSettingsChanges(settings_changes_history, "26.3", { diff --git a/src/Databases/DataLake/Common.cpp b/src/Databases/DataLake/Common.cpp index 8946d3412d70..9be8b6a02009 100644 --- a/src/Databases/DataLake/Common.cpp +++ b/src/Databases/DataLake/Common.cpp @@ -16,6 +16,9 @@ #include +#include +#include + namespace DB::ErrorCodes { extern const int BAD_ARGUMENTS; @@ -121,4 +124,81 @@ std::pair parseTableName(const std::string & name) return {namespace_name, table_name}; } +String constructTableLocation( + const String & location_scheme, + const String & storage_endpoint, + const String & namespace_name, + const String & table_name) +{ + Poco::URI uri(storage_endpoint); + auto path = uri.getPath(); + while (path.starts_with('/')) + path.erase(0, 1); + while (path.ends_with('/')) + path.pop_back(); + + if (location_scheme == "abfss") + { + /// Azure: `abfss://@/`. `storage_endpoint` is + /// `https:////` or `abfss://@/` + String container = uri.getUserInfo(); + String account_host = uri.getHost(); + String extra_path = path; + + if (container.empty()) + { + auto first_slash = extra_path.find('/'); + if (first_slash == String::npos) + { + container = std::move(extra_path); + extra_path.clear(); + } + else + { + container = extra_path.substr(0, first_slash); + extra_path = extra_path.substr(first_slash + 1); + } + } + + if (account_host.empty() || container.empty()) + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "`storage_endpoint` ({}) for Azure must include both account host and container " + "(expected https://.dfs.core.windows.net/[/] or " + "abfss://@.dfs.core.windows.net[/])", + storage_endpoint); + + if (extra_path.empty()) + return fmt::format("abfss://{}@{}/{}/{}", container, account_host, namespace_name, table_name); + return fmt::format("abfss://{}@{}/{}/{}/{}", container, account_host, extra_path, namespace_name, table_name); + } + + if (location_scheme == "s3") + { + if (path.empty()) + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "`storage_endpoint` ({}) does not contain a bucket; " + "CREATE TABLE in DataLakeCatalog requires `storage_endpoint` to include a non-empty bucket path.", + storage_endpoint); + return fmt::format("s3://{}/{}/{}", path, namespace_name, table_name); + } + + /// HDFS / file / other schemes that may have `authority`. + String authority = uri.getAuthority(); + if (authority.empty()) + { + if (path.empty()) + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "`storage_endpoint` ({}) does not contain a path", + storage_endpoint); + return fmt::format("{}:///{}/{}/{}", location_scheme, path, namespace_name, table_name); + } + + if (path.empty()) + return fmt::format("{}://{}/{}/{}", location_scheme, authority, namespace_name, table_name); + return fmt::format("{}://{}/{}/{}/{}", location_scheme, authority, path, namespace_name, table_name); +} + } diff --git a/src/Databases/DataLake/Common.h b/src/Databases/DataLake/Common.h index 9b0dd7c626a6..b58ce749a4be 100644 --- a/src/Databases/DataLake/Common.h +++ b/src/Databases/DataLake/Common.h @@ -19,4 +19,10 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr /// `E` is a table name. std::pair parseTableName(const std::string & name); +String constructTableLocation( + const String & location_scheme, + const String & storage_endpoint, + const String & namespace_name, + const String & table_name); + } diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 112c07809a03..b704a22287e7 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #if USE_AVRO && USE_PARQUET @@ -49,6 +50,9 @@ #include #include #include +#include +#include +#include #include #include @@ -62,6 +66,7 @@ namespace DatabaseDataLakeSetting extern const DatabaseDataLakeSettingsString auth_header; extern const DatabaseDataLakeSettingsString auth_scope; extern const DatabaseDataLakeSettingsString storage_endpoint; + extern const DatabaseDataLakeSettingsString default_base_location; extern const DatabaseDataLakeSettingsString oauth_server_uri; extern const DatabaseDataLakeSettingsBool oauth_server_use_request_body; extern const DatabaseDataLakeSettingsBool vended_credentials; @@ -100,6 +105,7 @@ namespace Setting extern const SettingsBool parallel_replicas_for_cluster_engines; extern const SettingsString cluster_for_parallel_replicas; extern const SettingsBool database_datalake_require_metadata_access; + extern const SettingsBool data_lake_delete_data_on_drop; } @@ -124,6 +130,41 @@ namespace FailPoints extern const char datalake_try_get_table_return_nullptr[]; } +namespace +{ + +String getLocationSchemeForTableCreation(const std::shared_ptr & catalog) +{ + if (auto storage_type = catalog->getStorageType(); storage_type.has_value()) + return DataLake::storageTypeToScheme(*storage_type); + + /// Fall back only for catalogs whose backing storage is fixed. + /// REST/Hive/Glue/Paimon/Unity can be backed by anything, so we refuse to guess. + switch (catalog->getCatalogType()) + { + case DatabaseDataLakeCatalogType::ICEBERG_ONELAKE: + return "abfss"; /// Azure-only + case DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE: + return "s3"; /// GCS via S3 API + case DatabaseDataLakeCatalogType::ICEBERG_REST: + case DatabaseDataLakeCatalogType::ICEBERG_HIVE: + case DatabaseDataLakeCatalogType::GLUE: + case DatabaseDataLakeCatalogType::PAIMON_REST: + case DatabaseDataLakeCatalogType::UNITY: + case DatabaseDataLakeCatalogType::NONE: + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Cannot determine storage scheme for CREATE TABLE for catalog type '{}': the catalog does not " + "report a backing storage type. Set `default_base_location` on the database or configure " + "the catalog to expose `default-base-location`.", + catalog->getCatalogType()); + } + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected catalog type in CREATE TABLE location scheme resolution"); +} + +} + DatabaseDataLake::DatabaseDataLake( const std::string & database_name_, const std::string & url_, @@ -647,9 +688,8 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con if (!metadata_location.empty()) { metadata_location = table_metadata.getMetadataLocation(metadata_location); + (*storage_settings)[DB::DataLakeStorageSetting::iceberg_metadata_file_path] = metadata_location; } - - (*storage_settings)[DB::DataLakeStorageSetting::iceberg_metadata_file_path] = metadata_location; } const auto configuration = getConfiguration(storage_type, storage_settings); @@ -743,16 +783,123 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con return storage_cluster; } -void DatabaseDataLake::dropTable( /// NOLINT +void DatabaseDataLake::createTable( ContextPtr context_, const String & name, - bool /*sync*/) + const StoragePtr & table, + const ASTPtr & query) { - auto table = tryGetTable(name, context_); + /// Engine-clause path: the storage's own initialization (IcebergMetadata::createInitial) + /// already wrote metadata and registered the table in the catalog. if (table) - table->drop(); + return; + + auto catalog = getCatalog(); + const auto & create = query->as(); + const auto [namespace_name, table_name] = DataLake::parseTableName(name); + + ColumnsDescription columns; + if (create.columns_list && create.columns_list->columns) + { + for (const auto & child : create.columns_list->columns->children) + { + const auto * col_decl = child->as(); + if (!col_decl || !col_decl->getType()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid column declaration in CREATE TABLE"); + + if (col_decl->default_specifier != ColumnDefaultSpecifier::Empty) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Column '{}': {} is not yet supported by DataLakeCatalog table creation", + col_decl->name, + toString(col_decl->default_specifier)); + + if (col_decl->getComment() || col_decl->getCodec() || col_decl->getTTL() + || col_decl->getStatisticsDesc() || col_decl->getSettings() + || col_decl->primary_key_specifier) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Column '{}': COMMENT, CODEC, TTL, STATISTICS, SETTINGS, and PRIMARY KEY are not supported by DataLakeCatalog table creation", + col_decl->name); + + columns.add(ColumnDescription(col_decl->name, DataTypeFactory::instance().get(col_decl->getType()))); + } + } + + if (columns.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot create table without columns"); + + if (create.columns_list + && ((create.columns_list->indices && !create.columns_list->indices->children.empty()) + || (create.columns_list->constraints && !create.columns_list->constraints->children.empty()) + || (create.columns_list->projections && !create.columns_list->projections->children.empty()) + || create.columns_list->primary_key + || create.columns_list->primary_key_from_columns)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "DataLakeCatalog CREATE TABLE does not support PRIMARY KEY, indices, constraints, or projections"); + + ASTPtr partition_by; + ASTPtr order_by; + if (create.storage) + { + if (create.storage->primary_key || create.storage->sample_by + || create.storage->ttl_table || create.storage->unique_key + || create.storage->settings) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "DataLakeCatalog CREATE TABLE supports only PARTITION BY and ORDER BY; " + "PRIMARY KEY, SAMPLE BY, TTL, UNIQUE KEY, and engine SETTINGS are not supported"); + + if (create.storage->partition_by) + partition_by = create.storage->partition_by->clone(); + if (create.storage->order_by) + order_by = create.storage->order_by->clone(); + } + + String base_location = catalog->getDefaultBaseLocation(); + if (base_location.empty()) + base_location = settings[DatabaseDataLakeSetting::default_base_location].value; + + String location; + if (!base_location.empty()) + { + while (base_location.ends_with('/')) + base_location.pop_back(); + location = fmt::format("{}/{}/{}", base_location, namespace_name, table_name); + } else - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot drop table {} because it does not exist", name); + { + const auto storage_endpoint = settings[DatabaseDataLakeSetting::storage_endpoint].value; + if (storage_endpoint.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "CREATE TABLE in DataLakeCatalog requires `default_base_location` or `storage_endpoint`"); + location = DataLake::constructTableLocation( + getLocationSchemeForTableCreation(catalog), storage_endpoint, namespace_name, table_name); + } + + auto [metadata_content, metadata_str] = Iceberg::createEmptyMetadataFile( + location, + columns, + partition_by, + order_by, + context_); + + catalog->createTable(namespace_name, table_name, /* metadata_path */ "", metadata_content); + + LOG_INFO(log, "Created table {}.{}", namespace_name, table_name); +} + +void DatabaseDataLake::dropTable( /// NOLINT + ContextPtr context_, + const String & name, + bool /*sync*/) +{ + auto catalog = getCatalog(); + const auto [namespace_name, table_name] = DataLake::parseTableName(name); + + bool purge = context_->getSettingsRef()[Setting::data_lake_delete_data_on_drop]; + catalog->dropTable(namespace_name, table_name, purge); + + LOG_TRACE(log, "Dropped table {}.{} (purge={})", namespace_name, table_name, purge); } DatabaseTablesIteratorPtr DatabaseDataLake::getTablesIterator( diff --git a/src/Databases/DataLake/DatabaseDataLake.h b/src/Databases/DataLake/DatabaseDataLake.h index b6e918af4ba0..67a10a056c77 100644 --- a/src/Databases/DataLake/DatabaseDataLake.h +++ b/src/Databases/DataLake/DatabaseDataLake.h @@ -29,6 +29,7 @@ class DatabaseDataLake final : public IDatabase, WithContext bool shouldBeEmptyOnDetach() const override { return false; } bool isRemoteDatabase() const override { return true; } + bool isDatalakeCatalog() const override { return true; } bool empty() const override; @@ -53,10 +54,10 @@ class DatabaseDataLake final : public IDatabase, WithContext std::vector> getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const override { return {}; } void createTable( - ContextPtr /*context*/, - const String & /*name*/, + ContextPtr context, + const String & name, const StoragePtr & /*table*/, - const ASTPtr & /*query*/) override {} + const ASTPtr & query) override; void dropTable( /// NOLINT ContextPtr context_, diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp index c65e0f18cea7..5903115a07da 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp @@ -31,6 +31,7 @@ namespace ErrorCodes DECLARE(String, aws_role_arn, "", "Role arn for AWS connection for Glue catalog", 0) \ DECLARE(String, aws_role_session_name, "", "Role session name for AWS connection for Glue catalog", 0) \ DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \ + DECLARE(String, default_base_location, "", "Base URI under which CREATE TABLE places new tables. Used only when the catalog does not report `default-base-location`", 0) \ DECLARE(String, onelake_tenant_id, "", "Tenant id from azure", 0) \ DECLARE(String, onelake_client_id, "", "Client id from azure", 0) \ DECLARE(String, onelake_client_secret, "", "Client secret from azure", 0) \ diff --git a/src/Databases/DataLake/GlueCatalog.cpp b/src/Databases/DataLake/GlueCatalog.cpp index ea95bb67bc39..9539e8fe1ebe 100644 --- a/src/Databases/DataLake/GlueCatalog.cpp +++ b/src/Databases/DataLake/GlueCatalog.cpp @@ -50,6 +50,7 @@ #include #include #include +#include #include #include #include @@ -367,7 +368,7 @@ bool GlueCatalog::tryGetTableMetadata( auto setup_specific_properties = [&] { const auto & table_params = table_outcome.GetParameters(); - if (table_params.contains("metadata_location")) + if (table_params.contains("metadata_location") && !table_params.at("metadata_location").empty()) { result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = table_params.at("metadata_location")}); } @@ -639,7 +640,7 @@ void GlueCatalog::createNamespaceIfNotExists(const String & namespace_name) cons glue_client->CreateDatabase(create_request); } -void GlueCatalog::createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr /*metadata_content*/) const +void GlueCatalog::createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr metadata_content) const { if (!isNamespaceAllowed(namespace_name)) throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, @@ -648,6 +649,28 @@ void GlueCatalog::createTable(const String & namespace_name, const String & tabl createNamespaceIfNotExists(namespace_name); + String effective_metadata_path = new_metadata_path; + + if (effective_metadata_path.empty() && metadata_content && metadata_content->has("location")) + { + String table_location = metadata_content->getValue("location"); + while (table_location.ends_with('/')) + table_location = table_location.substr(0, table_location.size() - 1); + + TableMetadata dummy_metadata; + auto [object_storage, bucket_name, table_path] = createObjectStorageForEarlyTableAccess(table_location, dummy_metadata); + + String metadata_filename = table_path + "/metadata/v1.metadata.json"; + + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + Poco::JSON::Stringifier::stringify(metadata_content, oss, 4); + String metadata_str = DB::removeEscapedSlashes(oss.str()); + + DB::Iceberg::writeMessageToFile(metadata_str, metadata_filename, object_storage, getContext(), "*", ""); + + effective_metadata_path = "s3://" + bucket_name + "/" + metadata_filename; + } + Aws::Glue::Model::CreateTableRequest request; request.SetDatabaseName(namespace_name); @@ -655,18 +678,21 @@ void GlueCatalog::createTable(const String & namespace_name, const String & tabl table_input.SetName(table_name); Aws::Glue::Model::StorageDescriptor sd; - fs::path original_path = new_metadata_path; + if (!effective_metadata_path.empty()) + { + fs::path original_path = effective_metadata_path; - fs::path parent = original_path.parent_path(); - fs::path grandparent = parent.parent_path(); + fs::path parent = original_path.parent_path(); + fs::path grandparent = parent.parent_path(); - sd.SetLocation(grandparent.c_str()); + sd.SetLocation(grandparent.c_str()); + } table_input.SetStorageDescriptor(sd); table_input.SetTableType("ICEBERG"); Aws::Map parameters; - parameters["metadata_location"] = new_metadata_path; + parameters["metadata_location"] = effective_metadata_path; parameters["table_type"] = "ICEBERG"; table_input.SetParameters(parameters); @@ -728,7 +754,7 @@ bool GlueCatalog::updateMetadata(const String & namespace_name, const String & t return true; } -void GlueCatalog::dropTable(const String & namespace_name, const String & table_name) const +void GlueCatalog::dropTable(const String & namespace_name, const String & table_name, bool /*purge*/) const { if (!isNamespaceAllowed(namespace_name)) throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, diff --git a/src/Databases/DataLake/GlueCatalog.h b/src/Databases/DataLake/GlueCatalog.h index 21674c25edc7..9b9de5a83132 100644 --- a/src/Databases/DataLake/GlueCatalog.h +++ b/src/Databases/DataLake/GlueCatalog.h @@ -69,7 +69,7 @@ class GlueCatalog final : public ICatalog, private DB::WithContext void createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr metadata_content) const override; bool updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const override; - void dropTable(const String & namespace_name, const String & table_name) const override; + void dropTable(const String & namespace_name, const String & table_name, bool purge) const override; /// Returns a callback that re-vends fresh AWS credentials from the configured /// credentials provider chain. Invoked by `ReadBufferFromS3` when an S3 call diff --git a/src/Databases/DataLake/ICatalog.cpp b/src/Databases/DataLake/ICatalog.cpp index a70038741bdc..80faa7c4c79f 100644 --- a/src/Databases/DataLake/ICatalog.cpp +++ b/src/Databases/DataLake/ICatalog.cpp @@ -80,6 +80,25 @@ StorageType parseStorageTypeFromString(const std::string & type) return *storage_type; } +std::string storageTypeToScheme(StorageType type) +{ + switch (type) + { + case StorageType::S3: + return "s3"; + case StorageType::Azure: + return "abfss"; + case StorageType::Local: + return "file"; + case StorageType::HDFS: + return "hdfs"; + case StorageType::Other: + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "Cannot determine URI scheme for storage type 'Other'"); + } +} + void TableMetadata::setLocation(const std::string & location_) { if (!with_location) @@ -366,7 +385,7 @@ bool ICatalog::updateMetadata(const String & /*namespace_name*/, const String & throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "updateMetadata is not implemented"); } -void ICatalog::dropTable(const String & /*namespace_name*/, const String & /*table_name*/) const +void ICatalog::dropTable(const String & /*namespace_name*/, const String & /*table_name*/, bool /*purge*/) const { throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "dropTable is not implemented"); } diff --git a/src/Databases/DataLake/ICatalog.h b/src/Databases/DataLake/ICatalog.h index 9ccb420ac759..750bea184558 100644 --- a/src/Databases/DataLake/ICatalog.h +++ b/src/Databases/DataLake/ICatalog.h @@ -24,6 +24,7 @@ namespace DataLake using StorageType = DB::DatabaseDataLakeStorageType; StorageType parseStorageTypeFromLocation(const std::string & location); StorageType parseStorageTypeFromString(const std::string &type); +std::string storageTypeToScheme(StorageType type); struct DataLakeSpecificProperties { @@ -199,6 +200,9 @@ class ICatalog /// E.g. one of S3, Azure, Local, HDFS. virtual std::optional getStorageType() const = 0; + /// Catalog-wide base location for new tables, e.g. `s3://warehouse/data`. Empty if unknown. + virtual String getDefaultBaseLocation() const { return ""; } + /// Creates new table in catalog. virtual void createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr metadata_content) const; @@ -206,7 +210,8 @@ class ICatalog virtual bool updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const; /// Drop table from catalog. - virtual void dropTable(const String & namespace_name, const String & table_name) const; + /// If purge, the catalog is requested to also delete underlying data files. + virtual void dropTable(const String & namespace_name, const String & table_name, bool purge) const; /// Does the catalog support transactions or anything like that? /// For example, the Iceberg REST catalog supports atomic operations "compare if snapshot X is equal to" and "add new snapshot Y". diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index 60e6dbc59c12..ea6af8c2f019 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -139,6 +139,22 @@ String encodeNamespaceForURI(const String & namespace_name) return encoded; } +/// Per Iceberg REST spec, `namespace` is a JSON array of segments. Split `ns.a.b` on dots. +Poco::JSON::Array::Ptr namespaceToJSONArray(const String & namespace_name) +{ + Poco::JSON::Array::Ptr segments = new Poco::JSON::Array; + size_t start = 0; + while (start <= namespace_name.size()) + { + size_t dot = namespace_name.find('.', start); + if (dot == String::npos) + dot = namespace_name.size(); + segments->add(namespace_name.substr(start, dot - start)); + start = dot + 1; + } + return segments; +} + } std::string RestCatalog::Config::toString() const @@ -566,6 +582,11 @@ std::optional RestCatalog::getStorageType() const return parseStorageTypeFromLocation(config.default_base_location); } +String RestCatalog::getDefaultBaseLocation() const +{ + return config.default_base_location; +} + DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer( const std::string & endpoint, const Poco::URI::QueryParameters & params, @@ -1070,11 +1091,7 @@ void RestCatalog::createNamespaceIfNotExists(const String & namespace_name, cons const std::string endpoint = base_url / config.prefix / "namespaces"; Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object; - { - Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array; - namespaces->add(namespace_name); - request_body->set("namespace", namespaces); - } + request_body->set("namespace", namespaceToJSONArray(namespace_name)); { Poco::JSON::Object::Ptr properties = new Poco::JSON::Object; properties->set("location", location); @@ -1099,13 +1116,14 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Failed to create table {}, namespace {} is filtered by `namespaces` database parameter", table_name, namespace_name); - createNamespaceIfNotExists(namespace_name, metadata_content->getValue("location")); + String location = metadata_content->getValue("location"); + createNamespaceIfNotExists(namespace_name, location); const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables"; Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object; request_body->set("name", table_name); - request_body->set("location", metadata_content->getValue("location")); + request_body->set("location", location); { Poco::JSON::Object::Ptr initial_schema = metadata_content->getArray("schemas")->getObject(0); Poco::JSON::Array::Ptr identifier_fields = new Poco::JSON::Array; @@ -1114,13 +1132,21 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl } request_body->set("partition-spec", metadata_content->getArray("partition-specs")->get(0)); + if (metadata_content->has("sort-orders")) { - Poco::JSON::Object::Ptr write_order = new Poco::JSON::Object; - write_order->set("order-id", 0); - Poco::JSON::Array::Ptr fields = new Poco::JSON::Array; - write_order->set("fields", fields); - request_body->set("write-order", write_order); + if (auto sort_orders = metadata_content->getArray("sort-orders"); sort_orders->size() > 0) + { + auto sort_order = sort_orders->getObject(0); + auto fields = sort_order->getArray("fields"); + if (fields && fields->size() > 0) + { + if (sort_order->getValue("order-id") == 0) + sort_order->set("order-id", 1); + request_body->set("write-order", sort_order); + } + } } + request_body->set("stage-create", false); Poco::JSON::Object::Ptr properties = new Poco::JSON::Object; request_body->set("properties", properties); @@ -1146,9 +1172,7 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t { Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object; identifier->set("name", table_name); - Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array; - namespaces->add(namespace_name); - identifier->set("namespace", namespaces); + identifier->set("namespace", namespaceToJSONArray(namespace_name)); request_body->set("identifier", identifier); } @@ -1205,16 +1229,17 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t return true; } -void RestCatalog::dropTable(const String & namespace_name, const String & table_name) const +void RestCatalog::dropTable(const String & namespace_name, const String & table_name, bool purge) const { if (!allowed_namespaces.isNamespaceAllowed(namespace_name, /*nested*/ false)) throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter", table_name, namespace_name); - const std::string endpoint - = (base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name).string() - + "?purgeRequested=False"; + /// Same URL shape as createTable / updateMetadata / getTableMetadataImpl. + const std::string base_endpoint = + (base_url / config.prefix / NAMESPACES_ENDPOINT / encodeNamespaceForURI(namespace_name) / "tables" / table_name).generic_string(); + const std::string endpoint = fmt::format("{}?purgeRequested={}", base_endpoint, purge ? "true" : "false"); Poco::JSON::Object::Ptr request_body = nullptr; try diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h index 49fe684e0eaa..b35eb741de0e 100644 --- a/src/Databases/DataLake/RestCatalog.h +++ b/src/Databases/DataLake/RestCatalog.h @@ -67,6 +67,8 @@ class RestCatalog : public ICatalog, public DB::WithContext std::optional getStorageType() const override; + String getDefaultBaseLocation() const override; + DB::DatabaseDataLakeCatalogType getCatalogType() const override { return DB::DatabaseDataLakeCatalogType::ICEBERG_REST; @@ -78,7 +80,7 @@ class RestCatalog : public ICatalog, public DB::WithContext bool isTransactional() const override { return true; } - void dropTable(const String & namespace_name, const String & table_name) const override; + void dropTable(const String & namespace_name, const String & table_name, bool purge) const override; ICatalog::CredentialsRefreshCallback getCredentialsConfigurationCallback(const DB::StorageID & storage_id) override; diff --git a/src/Databases/DataLake/S3TablesCatalog.cpp b/src/Databases/DataLake/S3TablesCatalog.cpp index e00a7de586c7..40cf4174f892 100644 --- a/src/Databases/DataLake/S3TablesCatalog.cpp +++ b/src/Databases/DataLake/S3TablesCatalog.cpp @@ -218,7 +218,9 @@ ICatalog::CredentialsRefreshCallback S3TablesCatalog::getCredentialsConfiguratio }; } -void S3TablesCatalog::dropTable(const String & namespace_name, const String & table_name) const +/// S3Tables manages the underlying data itself, so the data is always purged on drop +/// regardless of the `purge` flag. +void S3TablesCatalog::dropTable(const String & namespace_name, const String & table_name, bool /*purge*/) const { if (!allowed_namespaces.isNamespaceAllowed(namespace_name, /*nested*/ false)) throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, diff --git a/src/Databases/DataLake/S3TablesCatalog.h b/src/Databases/DataLake/S3TablesCatalog.h index 45ad049f0199..71d63565ea00 100644 --- a/src/Databases/DataLake/S3TablesCatalog.h +++ b/src/Databases/DataLake/S3TablesCatalog.h @@ -42,7 +42,7 @@ class S3TablesCatalog final : public RestCatalog DB::ContextPtr context_, TableMetadata & result) const override; - void dropTable(const String & namespace_name, const String & table_name) const override; + void dropTable(const String & namespace_name, const String & table_name, bool purge) const override; ICatalog::CredentialsRefreshCallback getCredentialsConfigurationCallback(const DB::StorageID & storage_id) override; diff --git a/src/Databases/DataLake/tests/gtest_construct_table_location.cpp b/src/Databases/DataLake/tests/gtest_construct_table_location.cpp new file mode 100644 index 000000000000..b545d0af6a27 --- /dev/null +++ b/src/Databases/DataLake/tests/gtest_construct_table_location.cpp @@ -0,0 +1,120 @@ +#include +#include + +#include + +#include +#include + +namespace DataLake::Test +{ + +class ConstructTableLocationTest : public ::testing::Test +{ +}; + +/// S3: bucket is the first path segment of the HTTPS-style storage_endpoint; +/// a sub-prefix after the bucket must be preserved. +TEST_F(ConstructTableLocationTest, S3HttpsEndpoint) +{ + EXPECT_EQ( + constructTableLocation("s3", "http://minio:9000/warehouse-rest", "ns", "tbl"), + "s3://warehouse-rest/ns/tbl"); + EXPECT_EQ( + constructTableLocation("s3", "http://minio:9000/warehouse/data", "ns", "tbl"), + "s3://warehouse/data/ns/tbl"); +} + +TEST_F(ConstructTableLocationTest, S3RejectsEndpointWithoutBucket) +{ + EXPECT_THROW( + constructTableLocation("s3", "http://minio:9000/", "ns", "tbl"), + DB::Exception); +} + +/// Azure: HTTPS-form storage_endpoint must round-trip through `setLocation`, +/// which means the constructed URI must include the `@` authority. +/// A sub-path after the container must be preserved, a trailing slash ignored. +TEST_F(ConstructTableLocationTest, AzureHttpsEndpoint) +{ + const String location = constructTableLocation( + "abfss", + "https://account.dfs.core.windows.net/mycontainer", + "ns", + "tbl"); + EXPECT_EQ(location, "abfss://mycontainer@account.dfs.core.windows.net/ns/tbl"); + + /// Verify the produced URI parses back into the expected components. + TableMetadata metadata; + metadata.withLocation(); + metadata.setLocation(location); + EXPECT_EQ(metadata.getLocation(), location); + EXPECT_EQ(metadata.getStorageType(), StorageType::Azure); + + EXPECT_EQ( + constructTableLocation( + "abfss", + "https://account.dfs.core.windows.net/mycontainer/warehouse/data", + "ns", + "tbl"), + "abfss://mycontainer@account.dfs.core.windows.net/warehouse/data/ns/tbl"); + EXPECT_EQ( + constructTableLocation( + "abfss", + "https://account.dfs.core.windows.net/mycontainer/", + "ns", + "tbl"), + "abfss://mycontainer@account.dfs.core.windows.net/ns/tbl"); +} + +/// Azure: ABFSS-form storage_endpoint (container in the authority's user-info) +/// is also accepted, with and without a sub-path. +TEST_F(ConstructTableLocationTest, AzureAbfssEndpoint) +{ + EXPECT_EQ( + constructTableLocation( + "abfss", + "abfss://mycontainer@account.dfs.core.windows.net/", + "ns", + "tbl"), + "abfss://mycontainer@account.dfs.core.windows.net/ns/tbl"); + EXPECT_EQ( + constructTableLocation( + "abfss", + "abfss://mycontainer@account.dfs.core.windows.net/warehouse/data", + "ns", + "tbl"), + "abfss://mycontainer@account.dfs.core.windows.net/warehouse/data/ns/tbl"); +} + +TEST_F(ConstructTableLocationTest, AzureRejectsEndpointWithoutContainer) +{ + EXPECT_THROW( + constructTableLocation("abfss", "https://account.dfs.core.windows.net/", "ns", "tbl"), + DB::Exception); + EXPECT_THROW( + constructTableLocation("abfss", "abfss://account.dfs.core.windows.net/", "ns", "tbl"), + DB::Exception); +} + +/// HDFS: the authority (host:port) must be preserved in the location URI, +/// with or without a warehouse path after it. +TEST_F(ConstructTableLocationTest, HdfsPreservesAuthority) +{ + EXPECT_EQ( + constructTableLocation("hdfs", "hdfs://namenode:9000/warehouse", "ns", "tbl"), + "hdfs://namenode:9000/warehouse/ns/tbl"); + EXPECT_EQ( + constructTableLocation("hdfs", "hdfs://namenode:9000", "ns", "tbl"), + "hdfs://namenode:9000/ns/tbl"); +} + +/// `file://` URIs have an empty authority and just a local filesystem path. +TEST_F(ConstructTableLocationTest, FileWithoutAuthority) +{ + EXPECT_EQ( + constructTableLocation("file", "file:///var/iceberg/warehouse", "ns", "tbl"), + "file:///var/iceberg/warehouse/ns/tbl"); +} + +} diff --git a/src/Databases/IDatabase.h b/src/Databases/IDatabase.h index 9e55415223b9..6f76f4e6cac9 100644 --- a/src/Databases/IDatabase.h +++ b/src/Databases/IDatabase.h @@ -313,6 +313,7 @@ class IDatabase : public std::enable_shared_from_this virtual bool empty() const = 0; virtual bool isReadOnly() const { return false; } + virtual bool isDatalakeCatalog() const { return false; } /// Add the table to the database. Record its presence in the metadata. virtual void createTable( diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index a3481a53223b..3500e1c03deb 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -125,6 +125,9 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter) if (table && table->as()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Mutations with ON CLUSTER are not allowed for KeeperMap tables"); + if (table_id) + checkDatabaseSupportsOnClusterDDL(DatabaseCatalog::instance().tryGetDatabase(table_id.database_name)); + DDLQueryOnClusterParams params; params.access_to_check = getRequiredAccess(); return executeDDLQueryOnCluster(query_ptr, getContext(), params); diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index cb82a691f35d..8665edccda7b 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -31,6 +31,7 @@ #include #include +#include #include #include #include @@ -1673,6 +1674,10 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) if (!UserDefinedSQLFunctionFactory::instance().empty()) UserDefinedSQLFunctionVisitor::visit(query_ptr, getContext()); + /// Snapshot whether the user wrote an explicit ENGINE before `setEngine` fills in a default below. + /// Used to detect an engine-less `CREATE TABLE` on a `DataLakeCatalog` database. + const bool engine_user_specified = create.storage && create.storage->engine; + /// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way. TableProperties properties = getTablePropertiesAndNormalizeCreateQuery(create, mode); @@ -1745,6 +1750,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) if (!create.cluster.empty()) { + checkDatabaseSupportsOnClusterDDL(database); chassert(!ddl_guard); return executeQueryOnCluster(create); } @@ -1752,6 +1758,28 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) if (need_add_to_database && !database) throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database {} does not exist", backQuoteIfNeed(database_name)); + if (database && database->isDatalakeCatalog()) + { + if (create.is_ordinary_view || create.is_materialized_view || create.is_window_view + || create.is_dictionary || create.attach || create.is_clone_as + || create.replace_table || create.replace_view) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "DataLakeCatalog supports only plain CREATE TABLE; " + "views, dictionaries, ATTACH, CLONE AS, and REPLACE TABLE are not allowed"); + + if (engine_user_specified && !create.storage->engine->name.starts_with("Iceberg")) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "DataLakeCatalog only supports Iceberg-family table engines; got '{}'", + create.storage->engine->name); + + /// The comment is not persisted in Iceberg metadata or catalog properties, + /// so reject it instead of silently dropping it. + if (create.comment) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Table COMMENT is not supported by DataLakeCatalog table creation " + "(note: CREATE TABLE ... AS inherits the comment from the source table)"); + } + if (create.isTemporary() && create.replace_table) { chassert(!ddl_guard); @@ -1766,7 +1794,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) } /// Actually creates table - bool created = doCreateTable(create, properties, ddl_guard, mode); + bool created = doCreateTable(create, properties, ddl_guard, mode, engine_user_specified); ddl_guard.reset(); if (!created) /// Table already exists @@ -1844,7 +1872,7 @@ catch (...) bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create, const InterpreterCreateQuery::TableProperties & properties, - DDLGuardPtr & ddl_guard, LoadingStrictnessLevel mode) + DDLGuardPtr & ddl_guard, LoadingStrictnessLevel mode, bool engine_user_specified) { if (create.isTemporary()) { @@ -1941,6 +1969,71 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create, database->checkTableNameLength(create.getTable()); } + auto & create_query = query_ptr->as(); + if (database->isDatalakeCatalog() && !engine_user_specified) + { + /// Extract partition/order from the source table if CREATE TABLE ... AS was used + if (!as_table_saved.empty()) + { + String as_database_name = getContext()->resolveDatabase(as_database_saved); + StoragePtr as_storage = DatabaseCatalog::instance().getTable({as_database_name, as_table_saved}, getContext()); + auto as_storage_metadata = as_storage->getInMemoryMetadataPtr(getContext(), false); + + if (!create_query.storage) + { + auto storage_ast = make_intrusive(); + create_query.set(create_query.storage, storage_ast); + } + + if (!create_query.storage->partition_by + && as_storage_metadata->isPartitionKeyDefined() + && as_storage_metadata->hasPartitionKey()) + { + create_query.storage->set( + create_query.storage->partition_by, + as_storage_metadata->getPartitionKeyAST()->clone()); + } + + if (!create_query.storage->order_by + && as_storage_metadata->isSortingKeyDefined() + && as_storage_metadata->hasSortingKey()) + { + create_query.storage->set( + create_query.storage->order_by, + as_storage_metadata->getSortingKeyAST()->clone()); + } + } + + /// Ensure columns are in the query AST + if (!create_query.columns_list + || !create_query.columns_list->columns + || create_query.columns_list->columns->children.empty()) + { + auto columns_declare_list = make_intrusive(); + auto columns_expression_list = make_intrusive(); + columns_declare_list->set(columns_declare_list->columns, columns_expression_list); + create_query.set(create_query.columns_list, columns_declare_list); + + for (const auto & column : properties.columns) + { + const auto column_declaration = make_intrusive(); + column_declaration->name = column.name; + column_declaration->setType(makeASTDataType(column.type->getName())); + /// Preserve non-plain kinds so DatabaseDataLake::createTable can reject them. + if (column.default_desc.kind != ColumnDefaultKind::Default || column.default_desc.expression) + { + column_declaration->default_specifier = toColumnDefaultSpecifier(column.default_desc.kind); + if (column.default_desc.expression) + column_declaration->setDefaultExpression(column.default_desc.expression->clone()); + } + columns_expression_list->children.emplace_back(column_declaration); + } + } + + database->createTable(getContext(), create.getTable(), nullptr, query_ptr); + return true; + } + data_path = database->getTableDataPath(create); // When creating a table, when checking if the data path exists, it should use the local disk to check, not the database disk. Because the database disk stores metadata files only. auto full_data_path = fs::path{getContext()->getPath()} / data_path; @@ -2046,7 +2139,6 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create, is_restore_from_backup); /// If schema was inferred while storage creation, add columns description to create query. - auto & create_query = query_ptr->as(); addColumnsDescriptionToCreateQueryIfNecessary(create_query, res); /// Add any inferred engine args if needed. For example, data format for engines File/S3/URL/etc if (auto * engine_args = getEngineArgsFromCreateQuery(create_query)) @@ -2203,7 +2295,7 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create, { /// Create temporary table (random name will be generated) DDLGuardPtr ddl_guard; - [[maybe_unused]] bool done = InterpreterCreateQuery(query_ptr, create_context).doCreateTable(create, properties, ddl_guard, mode); + [[maybe_unused]] bool done = InterpreterCreateQuery(query_ptr, create_context).doCreateTable(create, properties, ddl_guard, mode, /*engine_user_specified=*/false); ddl_guard.reset(); assert(done); created = true; diff --git a/src/Interpreters/InterpreterCreateQuery.h b/src/Interpreters/InterpreterCreateQuery.h index 37e2ba6b487c..8684b12e657c 100644 --- a/src/Interpreters/InterpreterCreateQuery.h +++ b/src/Interpreters/InterpreterCreateQuery.h @@ -107,7 +107,7 @@ class InterpreterCreateQuery : public IInterpreter, WithMutableContext AccessRightsElements getRequiredAccess() const; /// Create IStorage and add it to database. If table already exists and IF NOT EXISTS specified, do nothing and return false. - bool doCreateTable(ASTCreateQuery & create, const TableProperties & properties, DDLGuardPtr & ddl_guard, LoadingStrictnessLevel mode); + bool doCreateTable(ASTCreateQuery & create, const TableProperties & properties, DDLGuardPtr & ddl_guard, LoadingStrictnessLevel mode, bool engine_user_specified); BlockIO doCreateOrReplaceTable(ASTCreateQuery & create, const InterpreterCreateQuery::TableProperties & properties, LoadingStrictnessLevel mode); BlockIO doCreateOrReplaceTemporaryTable(ASTCreateQuery & create, const InterpreterCreateQuery::TableProperties & properties, LoadingStrictnessLevel mode); /// Inserts data in created table if it's CREATE ... SELECT diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index 2e829ec8dc72..8a7c6637974d 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -96,6 +96,9 @@ BlockIO InterpreterDropQuery::execute() BlockIO InterpreterDropQuery::executeSingleDropQuery(const ASTPtr & drop_query_ptr) { auto & drop = drop_query_ptr->as(); + if (!drop.cluster.empty() && drop.table) + checkDatabaseSupportsOnClusterDDL( + DatabaseCatalog::instance().tryGetDatabase(getContext()->resolveDatabase(drop.getDatabase()))); if (!drop.cluster.empty() && drop.table && !drop.if_empty && !maybeRemoveOnCluster(current_query_ptr, getContext())) { DDLQueryOnClusterParams params; diff --git a/src/Interpreters/executeDDLQueryOnCluster.cpp b/src/Interpreters/executeDDLQueryOnCluster.cpp index 26f36d403fc1..3d698d052dee 100644 --- a/src/Interpreters/executeDDLQueryOnCluster.cpp +++ b/src/Interpreters/executeDDLQueryOnCluster.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -221,6 +222,14 @@ BlockIO getDDLOnClusterStatus(const String & node_path, const String & replicas_ return io; } +void checkDatabaseSupportsOnClusterDDL(const DatabasePtr & database) +{ + if (database && database->isDatalakeCatalog()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "ON CLUSTER is not supported for DataLakeCatalog databases: " + "the catalog is shared, run the query without ON CLUSTER"); +} + bool maybeRemoveOnCluster(const ASTPtr & query_ptr, ContextPtr context) { const auto * query = dynamic_cast(query_ptr.get()); diff --git a/src/Interpreters/executeDDLQueryOnCluster.h b/src/Interpreters/executeDDLQueryOnCluster.h index 69e0c38834e6..5bfe50ff4736 100644 --- a/src/Interpreters/executeDDLQueryOnCluster.h +++ b/src/Interpreters/executeDDLQueryOnCluster.h @@ -19,10 +19,15 @@ namespace DB struct DDLLogEntry; class Cluster; using ClusterPtr = std::shared_ptr; +class IDatabase; +using DatabasePtr = std::shared_ptr; /// Returns true if provided ALTER type can be executed ON CLUSTER bool isSupportedAlterTypeForOnClusterDDLQuery(int type); +/// Throws if DDL on the database's tables cannot be mutated with ON CLUSTER +void checkDatabaseSupportsOnClusterDDL(const DatabasePtr & database); + struct DDLQueryOnClusterParams { /// A cluster to execute a distributed query. diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp index e687811f4532..c75cce3fc700 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp @@ -359,7 +359,7 @@ void writeMetadataFiles( auto new_snapshot = metadata_generator.generateNextMetadata( plan.generator, - generated_metadata_info.path, + Iceberg::IcebergPathFromMetadata{}, history_record.parent_id, history_record.added_files, total_records_count, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 6016be1cd341..b63459f446ca 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -130,7 +130,7 @@ extern const SettingsString iceberg_metadata_compression_method; extern const SettingsBool allow_insert_into_iceberg; extern const SettingsBool allow_experimental_iceberg_compaction; extern const SettingsBool allow_experimental_expire_snapshots; -extern const SettingsBool iceberg_delete_data_on_drop; +extern const SettingsBool data_lake_delete_data_on_drop; } static constexpr size_t MAX_TRANSACTION_RETRIES = 100; @@ -888,7 +888,11 @@ void IcebergMetadata::createInitial( if (!compression_suffix.empty()) compression_suffix = "." + compression_suffix; - auto filename = fmt::format("{}metadata/v1{}.metadata.json", configuration_ptr->getRawPath().path, compression_suffix); + auto table_uuid = metadata_content_object->getValue(Iceberg::f_table_uuid); + auto metadata_file_name = (catalog && catalog->isTransactional()) + ? fmt::format("v1-{}{}.metadata.json", table_uuid, compression_suffix) + : fmt::format("v1{}.metadata.json", compression_suffix); + auto filename = fmt::format("{}metadata/{}", configuration_ptr->getRawPath().path, metadata_file_name); try { @@ -897,7 +901,7 @@ void IcebergMetadata::createInitial( catch (const Exception & e) { /// The write uses `If-None-Match: *`, so S3 returns PreconditionFailed when the metadata file - /// already exists (e.g. leftover data after `DROP TABLE` with `iceberg_delete_data_on_drop` off, + /// already exists (e.g. leftover data after `DROP TABLE` with `data_lake_delete_data_on_drop` off, /// or a concurrent creation). When `IF NOT EXISTS` was specified, this is expected. if (if_not_exists && e.code() == ErrorCodes::S3_ERROR && e.message().find("PreconditionFailed") != String::npos) @@ -914,7 +918,7 @@ void IcebergMetadata::createInitial( if (catalog) { auto catalog_filename = configuration_ptr->getTypeName() + "://" + configuration_ptr->getNamespace() + "/" - + configuration_ptr->getRawPath().path + "metadata/v1.metadata.json"; + + configuration_ptr->getRawPath().path + "metadata/" + metadata_file_name; const auto & [namespace_name, table_name] = DataLake::parseTableName(table_id_.getTableName()); catalog->createTable(namespace_name, table_name, catalog_filename, metadata_content_object); } @@ -1398,7 +1402,7 @@ SinkToStoragePtr IcebergMetadata::write( void IcebergMetadata::drop(ContextPtr context) { - if (context->getSettingsRef()[Setting::iceberg_delete_data_on_drop].value) + if (context->getSettingsRef()[Setting::data_lake_delete_data_on_drop].value) { auto files = listFiles(*object_storage, persistent_components.table_path, persistent_components.table_path, ""); for (const auto & file : files) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index 026e19dfcd04..f5a166ba4f2f 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -870,7 +870,7 @@ void generateManifestList( writer.write(entry_datum); } - if (use_previous_snapshots) + if (use_previous_snapshots && new_snapshot->has(Iceberg::f_parent_snapshot_id)) { auto parent_snapshot_id = new_snapshot->getValue(Iceberg::f_parent_snapshot_id); auto snapshots = metadata->getArray(Iceberg::f_snapshots); @@ -995,6 +995,7 @@ IcebergStorageSink::IcebergStorageSink( compression_method, persistent_table_components.table_uuid); metadata_compression_method = compression_method; + previous_metadata_file_path = metadata_path; filename_generator = FileNamesGenerator( persistent_table_components.path_resolver.getTableLocation(), (catalog != nullptr && catalog->isTransactional()), metadata_compression_method, write_format); @@ -1211,7 +1212,7 @@ bool IcebergStorageSink::initializeMetadata() total_data_files += writer.getDataFiles().size(); auto [new_snapshot, manifest_list_path] = MetadataGenerator(metadata).generateNextMetadata( filename_generator, - metadata_info.path, + previous_metadata_file_path.empty() ? Iceberg::IcebergPathFromMetadata{} : resolver.reverseResolve(previous_metadata_file_path), parent_snapshot, total_data_files, total_rows, @@ -1261,6 +1262,7 @@ bool IcebergStorageSink::initializeMetadata() LOG_DEBUG(log, "Rereading metadata file {} with version {}", metadata_path, last_version); metadata_compression_method = compression_method; + previous_metadata_file_path = metadata_path; filename_generator.setVersion(last_version + 1); metadata = getMetadataJSONObject( diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h index ffaeb5d10588..806a433585aa 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h @@ -152,6 +152,7 @@ class IcebergStorageSink : public SinkToStorage bool initializeMetadata(); FileNamesGenerator filename_generator; + String previous_metadata_file_path; std::optional partitioner; Poco::JSON::Object::Ptr partititon_spec; Int64 partition_spec_id; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp index 7d2b2ac6c550..ff1144833dbc 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp @@ -105,7 +105,7 @@ Poco::JSON::Object::Ptr MetadataGenerator::getParentSnapshot(Int64 parent_snapsh MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata( FileNamesGenerator & generator, - const Iceberg::IcebergPathFromMetadata & metadata_file_path, + const Iceberg::IcebergPathFromMetadata & previous_metadata_file_path, Int64 parent_snapshot_id, Int64 added_files, Int64 added_records, @@ -129,7 +129,8 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata( auto manifest_list_path = generator.generateManifestListName(snapshot_id, format_version); new_snapshot->set(Iceberg::f_metadata_snapshot_id, snapshot_id); - new_snapshot->set(Iceberg::f_parent_snapshot_id, parent_snapshot_id); + if (parent_snapshot_id != -1) + new_snapshot->set(Iceberg::f_parent_snapshot_id, parent_snapshot_id); auto now = std::chrono::system_clock::now(); auto ms = duration_cast(now.time_since_epoch()); @@ -205,9 +206,10 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata( else metadata_object->getObject(Iceberg::f_refs)->getObject(Iceberg::f_main)->set(Iceberg::f_metadata_snapshot_id, snapshot_id); + if (!previous_metadata_file_path.empty()) { Poco::JSON::Object::Ptr new_metadata_item = new Poco::JSON::Object; - new_metadata_item->set(Iceberg::f_metadata_file, metadata_file_path.serialize()); + new_metadata_item->set(Iceberg::f_metadata_file, previous_metadata_file_path.serialize()); new_metadata_item->set(Iceberg::f_timestamp_ms, timestamp); metadata_object->getArray(Iceberg::f_metadata_log)->add(new_metadata_item); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h index fa9c340b86f0..0f6a6a4e29f0 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h @@ -30,7 +30,7 @@ class MetadataGenerator NextMetadataResult generateNextMetadata( FileNamesGenerator & generator, - const Iceberg::IcebergPathFromMetadata & metadata_file_path, + const Iceberg::IcebergPathFromMetadata & previous_metadata_file_path, Int64 parent_snapshot_id, Int64 added_files, Int64 added_records, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp index 10d75fd4f3e8..d7b709a224c6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp @@ -370,12 +370,13 @@ static bool writeMetadataFiles( std::optional & chunk_partitioner, Iceberg::FileContentType content_type, SharedHeader sample_block, - bool write_metadata_json_file) + bool write_metadata_json_file, + const Iceberg::IcebergPathFromMetadata & previous_metadata_file_path) { auto metadata_info = filename_generator.generateMetadataPathWithInfo(); auto storage_metadata_name = path_resolver.resolve(metadata_info.path); Int64 parent_snapshot = -1; - if (metadata->has(Iceberg::f_current_snapshot_id)) + if (metadata->has(Iceberg::f_current_snapshot_id) && !metadata->isNull(Iceberg::f_current_snapshot_id)) parent_snapshot = metadata->getValue(Iceberg::f_current_snapshot_id); Int32 total_rows = 0; @@ -394,7 +395,7 @@ static bool writeMetadataFiles( { auto result = MetadataGenerator(metadata).generateNextMetadata( filename_generator, - metadata_info.path, + previous_metadata_file_path, parent_snapshot, /* added_files */ 0, /* added_records */ 0, @@ -409,7 +410,7 @@ static bool writeMetadataFiles( { auto result = MetadataGenerator(metadata).generateNextMetadata( filename_generator, - metadata_info.path, + previous_metadata_file_path, parent_snapshot, /* added_files */ total_files, /* added_records */ total_rows, @@ -600,6 +601,8 @@ void mutate( filename_generator.setCompressionMethod(compression_method); auto metadata = getMetadataJSONObject(metadata_path, object_storage, persistent_table_components.metadata_cache, context, log, compression_method, persistent_table_components.table_uuid); + auto previous_metadata_path = persistent_table_components.path_resolver.reverseResolve(metadata_path); + if (metadata->getValue(f_format_version) < 2) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Mutations are supported only for the second version of iceberg format"); auto partition_spec_id = metadata->getValue(Iceberg::f_default_spec_id); @@ -662,7 +665,8 @@ void mutate( chunk_partitioner, Iceberg::FileContentType::POSITION_DELETE, std::make_shared(getPositionDeleteFileSampleBlock()), - !mutation_files->data_file); + !mutation_files->data_file, + mutation_files->data_file ? Iceberg::IcebergPathFromMetadata{} : previous_metadata_path); if (!result_delete_files_metadata) continue; @@ -684,7 +688,8 @@ void mutate( chunk_partitioner, Iceberg::FileContentType::DATA, sample_block, - true); + true, + previous_metadata_path); if (!result_data_files_metadata) { continue; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 8564eebda3ef..2594fc562a24 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -334,7 +334,17 @@ bool writeMetadataFileAndVersionHint( } else { - break; + /// Remove the metadata file written above, otherwise the resolver could later + /// pick this uncommitted file as the latest version. + try + { + object_storage->removeObjectIfExists(StoredObject(storage_metadata_path)); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + return false; } ++i; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index c08db678c46c..1557dc83716d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -774,7 +774,7 @@ void StorageObjectStorage::drop() if (catalog) { const auto [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName()); - catalog->dropTable(namespace_name, table_name); + catalog->dropTable(namespace_name, table_name, false); } /// We cannot use query context here, because drop is executed in the background. configuration->drop(Context::getGlobalContextInstance()); diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 6bbf25204a5a..03b82ffcc6c2 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -865,6 +865,369 @@ def test_system_tables_with_nullptr_table(started_cluster): node.query(f"DROP DATABASE IF EXISTS {CATALOG_NAME}") + +def test_create_table_as(started_cluster): + node = started_cluster.instances["node1"] + + namespace = "test_ctas_ns" + src_table = "src_ctas" + catalog = load_catalog_impl(started_cluster) + + create_clickhouse_iceberg_database( + started_cluster, + node, + CATALOG_NAME, + additional_settings={"default_base_location": "s3://warehouse-rest/data"}, + ) + + node.query(f"DROP TABLE IF EXISTS default.{src_table}") + for table in ["from_as", "override"]: + node.query( + f"DROP TABLE IF EXISTS {CATALOG_NAME}.`{namespace}.{table}` SETTINGS allow_database_iceberg=1" + ) + + # Intentional: CTAS must work from a source with a functional partition key. + node.query( + f""" + CREATE TABLE default.{src_table} + ( + id Int64, + name String, + dt Date + ) + ENGINE = MergeTree + PARTITION BY toYearNumSinceEpoch(dt) + ORDER BY (id, name) + """ + ) + + node.query( + f""" + CREATE TABLE {CATALOG_NAME}.`{namespace}.from_as` + AS default.{src_table} SETTINGS allow_database_iceberg=1; + """ + ) + + # CTAS with explicit PARTITION BY / ORDER BY overriding the source table. + node.query( + f""" + CREATE TABLE {CATALOG_NAME}.`{namespace}.override` + AS default.{src_table} + PARTITION BY id + ORDER BY name + SETTINGS allow_database_iceberg=1; + """ + ) + + tables = catalog.list_tables(namespace) + table_names = [t[1] for t in tables] + assert "from_as" in table_names + assert "override" in table_names + + tbl = catalog.load_table(f"{namespace}.from_as") + col_names = [f.name for f in tbl.schema().fields] + assert col_names == ["id", "name", "dt"] + + tbl = catalog.load_table(f"{namespace}.override") + assert len(tbl.spec().fields) == 1 + assert tbl.spec().fields[0].name == "id" + assert str(tbl.spec().fields[0].transform) == "identity" + + col_names = [f.name for f in tbl.schema().fields] + assert col_names == ["id", "name", "dt"] + + for table in ["from_as", "override"]: + node.query( + f"DROP TABLE {CATALOG_NAME}.`{namespace}.{table}` SETTINGS allow_database_iceberg=1" + ) + node.query(f"DROP TABLE default.{src_table}") + + +def test_create_table_explicit_columns(started_cluster): + node = started_cluster.instances["node1"] + + namespace = "test_ctex_ns" + catalog = load_catalog_impl(started_cluster) + + create_clickhouse_iceberg_database( + started_cluster, + node, + CATALOG_NAME, + additional_settings={"default_base_location": "s3://warehouse-rest/data"}, + ) + + node.query( + f"DROP TABLE IF EXISTS {CATALOG_NAME}.`{namespace}.explicit` SETTINGS allow_database_iceberg=1" + ) + + node.query( + f""" + CREATE TABLE {CATALOG_NAME}.`{namespace}.explicit` + ( + id Int64, + name String, + value Float64 + ) + PARTITION BY id + ORDER BY name + SETTINGS allow_database_iceberg=1; + """ + ) + + tables = catalog.list_tables(namespace) + table_names = [t[1] for t in tables] + assert "explicit" in table_names + + tbl = catalog.load_table(f"{namespace}.explicit") + col_names = [f.name for f in tbl.schema().fields] + assert col_names == ["id", "name", "value"] + + iceberg_types = {f.name: str(f.field_type) for f in tbl.schema().fields} + assert iceberg_types["id"] == "long" + assert iceberg_types["name"] == "string" + assert iceberg_types["value"] == "double" + + node.query( + f"INSERT INTO {CATALOG_NAME}.`{namespace}.explicit` VALUES (1, 'a', 1.5);", + settings={ + "allow_insert_into_iceberg": 1, + "write_full_path_in_iceberg_metadata": 1, + }, + ) + assert ( + node.query( + f"SELECT id, name, value FROM {CATALOG_NAME}.`{namespace}.explicit`" + ) + == "1\ta\t1.5\n" + ) + + node.query( + f"DROP TABLE {CATALOG_NAME}.`{namespace}.explicit` SETTINGS allow_database_iceberg=1" + ) + + +def test_create_table_nested_namespace(started_cluster): + node = started_cluster.instances["node1"] + + namespace = "test_nested_ns.a.b" + catalog = load_catalog_impl(started_cluster) + + create_clickhouse_iceberg_database( + started_cluster, + node, + CATALOG_NAME, + additional_settings={"default_base_location": "s3://warehouse-rest/data"}, + ) + + node.query( + f"DROP TABLE IF EXISTS {CATALOG_NAME}.`{namespace}.nested` SETTINGS allow_database_iceberg=1" + ) + node.query( + f""" + CREATE TABLE {CATALOG_NAME}.`{namespace}.nested` + ( + id Int64 + ) + SETTINGS allow_database_iceberg=1; + """ + ) + + tables = catalog.list_tables(namespace) + table_names = [t[-1] for t in tables] + assert "nested" in table_names + + write_settings = {"allow_insert_into_iceberg": 1, "write_full_path_in_iceberg_metadata": 1} + + # INSERT exercises RestCatalog::updateMetadata for the nested namespace. + node.query( + f"INSERT INTO {CATALOG_NAME}.`{namespace}.nested` VALUES (1);", + settings=write_settings, + ) + assert node.query( + f"SELECT id FROM {CATALOG_NAME}.`{namespace}.nested`", + settings={"allow_database_iceberg": 1}, + ).strip() == "1" + + # ALTER exercises RestCatalog::updateSchema for the nested namespace. + node.query( + f"ALTER TABLE {CATALOG_NAME}.`{namespace}.nested` ADD COLUMN z Nullable(String);", + settings=write_settings, + ) + assert "z" in node.query( + f"DESCRIBE TABLE {CATALOG_NAME}.`{namespace}.nested`", + settings=write_settings, + ) + + node.query( + f"DROP TABLE {CATALOG_NAME}.`{namespace}.nested` SETTINGS allow_database_iceberg=1" + ) + + +def test_create_non_table_rejected(started_cluster): + node = started_cluster.instances["node1"] + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + # Each DDL flips a distinct flag in the interpreter's "plain CREATE TABLE only" guard. + for ddl in [ + f"CREATE VIEW {CATALOG_NAME}.`ns.v` AS SELECT 1", + f"CREATE MATERIALIZED VIEW {CATALOG_NAME}.`ns.mv` ENGINE = Memory AS SELECT 1", + f"ATTACH TABLE {CATALOG_NAME}.`ns.attached` (x Int32) ENGINE = Memory", + f"CREATE OR REPLACE TABLE {CATALOG_NAME}.`ns.replaced` (x Int32)", + ]: + err = node.query_and_get_error(ddl, settings={"allow_database_iceberg": 1}) + assert "supports only plain CREATE TABLE" in err + + node.query("DROP TABLE IF EXISTS default.src_clone") + node.query( + "CREATE TABLE default.src_clone (x Int32) ENGINE = MergeTree ORDER BY x" + ) + err = node.query_and_get_error( + f"CREATE TABLE {CATALOG_NAME}.`ns.cloned` CLONE AS default.src_clone", + settings={"allow_database_iceberg": 1}, + ) + assert "supports only plain CREATE TABLE" in err + node.query("DROP TABLE default.src_clone") + + err = node.query_and_get_error( + f"CREATE TABLE {CATALOG_NAME}.`ns.mem` (x Int32) ENGINE = Memory", + settings={"allow_database_iceberg": 1}, + ) + assert "only supports Iceberg-family table engines" in err + + +def test_create_table_unsupported_clauses(started_cluster): + node = started_cluster.instances["node1"] + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + base_ddl = f"CREATE TABLE {CATALOG_NAME}.`ns.unsupp` (id Int64, name String)" + for clause in [ + "PRIMARY KEY id ORDER BY id", + "ORDER BY id SAMPLE BY id", + "ORDER BY id TTL toDate('2099-01-01')", + "ORDER BY id SETTINGS index_granularity = 8192", + ]: + err = node.query_and_get_error( + f"{base_ddl} {clause}", + settings={"allow_database_iceberg": 1}, + ) + assert "supports only PARTITION BY and ORDER BY" in err + + # The table COMMENT is not persisted anywhere, so it is rejected by a separate check. + err = node.query_and_get_error( + f"{base_ddl} ORDER BY id COMMENT 'tbl comment'", + settings={"allow_database_iceberg": 1}, + ) + assert "Table COMMENT is not supported" in err + + for table_element in [ + "INDEX idx_name name TYPE bloom_filter GRANULARITY 1", + "PROJECTION p (SELECT id ORDER BY name)", + "CONSTRAINT c CHECK id > 0", + ]: + err = node.query_and_get_error( + f"CREATE TABLE {CATALOG_NAME}.`ns.unsupp_elem` (id Int64, name String, {table_element}) ORDER BY id", + settings={"allow_database_iceberg": 1}, + ) + assert "does not support PRIMARY KEY, indices" in err + + # Column-level PRIMARY KEY is normalized into the storage-level clause, covered above. + for col_clause in [ + "(id Int64 COMMENT 'pk', name String)", + "(id Int64, name String CODEC(ZSTD))", + "(id Int64, dt Date TTL dt + INTERVAL 1 DAY)", + "(id Int64, name String SETTINGS (max_compress_block_size = 1))", + ]: + err = node.query_and_get_error( + f"CREATE TABLE {CATALOG_NAME}.`ns.unsupp_col` {col_clause} ORDER BY id", + settings={"allow_database_iceberg": 1}, + ) + assert "COMMENT, CODEC, TTL, STATISTICS, SETTINGS, and PRIMARY KEY are not supported" in err + + for col_clause in [ + "(id Int64, d Int64 DEFAULT 1)", + "(id Int64, d Int64 MATERIALIZED id + 1)", + ]: + err = node.query_and_get_error( + f"CREATE TABLE {CATALOG_NAME}.`ns.unsupp_def` {col_clause} ORDER BY id", + settings={"allow_database_iceberg": 1}, + ) + assert "is not yet supported" in err + + +def test_drop_table_purge(started_cluster): + node = started_cluster.instances["node1"] + + namespace = "test_drop_purge_ns" + catalog = load_catalog_impl(started_cluster) + minio_client = Minio( + f"{started_cluster.minio_ip}:{started_cluster.minio_port}", + access_key=minio_access_key, + secret_key=minio_secret_key, + secure=False, + ) + + create_clickhouse_iceberg_database( + started_cluster, + node, + CATALOG_NAME, + additional_settings={"default_base_location": "s3://warehouse-rest/data"}, + ) + + for table in ["to_keep", "to_purge"]: + node.query( + f"DROP TABLE IF EXISTS {CATALOG_NAME}.`{namespace}.{table}` SETTINGS allow_database_iceberg=1" + ) + node.query( + f""" + CREATE TABLE {CATALOG_NAME}.`{namespace}.{table}` + ( + id Int64 + ) + SETTINGS allow_database_iceberg=1; + """ + ) + + table_names = [t[1] for t in catalog.list_tables(namespace)] + assert "to_keep" in table_names + assert "to_purge" in table_names + + def table_prefix(table): + location = catalog.load_table(f"{namespace}.{table}").location() + assert location.startswith("s3://warehouse-rest/") + prefix = location[len("s3://warehouse-rest/"):].rstrip("/") + "/" + assert list( + minio_client.list_objects("warehouse-rest", prefix=prefix, recursive=True) + ), f"Expected metadata under {prefix} before drop" + return prefix + + keep_prefix = table_prefix("to_keep") + purge_prefix = table_prefix("to_purge") + + node.query( + f"DROP TABLE {CATALOG_NAME}.`{namespace}.to_keep` SETTINGS allow_database_iceberg=1" + ) + node.query( + f"DROP TABLE {CATALOG_NAME}.`{namespace}.to_purge` SETTINGS allow_database_iceberg=1, data_lake_delete_data_on_drop=1" + ) + + table_names = [t[1] for t in catalog.list_tables(namespace)] + assert "to_keep" not in table_names + assert "to_purge" not in table_names + + # A drop without purge unregisters the table from the catalog but must keep its data. + assert list( + minio_client.list_objects("warehouse-rest", prefix=keep_prefix, recursive=True) + ), f"Expected objects under {keep_prefix} to survive a drop without purge" + + remaining = [ + o.object_name + for o in minio_client.list_objects("warehouse-rest", prefix=purge_prefix, recursive=True) + ] + assert not remaining, f"Expected purge to remove objects under {purge_prefix}, found: {remaining}" + + def test_gcs(started_cluster): node = started_cluster.instances["node1"] From 3ac9ae4810841c898a7c6b0405163ae5f083a810 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Fri, 12 Jun 2026 01:41:49 +0200 Subject: [PATCH 2/5] Fixups for the port of ClickHouse#98670 to antalya-26.3 - Port \`IcebergPathResolver::reverseResolve\` from upstream (predates the PR upstream but was missing on this branch). - Handle the Altinity-only \`S3_TABLES\` catalog type in \`getLocationSchemeForTableCreation\` (it is always S3-backed). - Drop the \`unique_key\` check in \`DatabaseDataLake::createTable\`: \`ASTStorage\` has no \`UNIQUE KEY\` clause on this branch. - \`getInMemoryMetadataPtr\` does not take a context argument on this branch. https://github.com/ClickHouse/ClickHouse/pull/98670 Co-Authored-By: Claude Fable 5 --- src/Databases/DataLake/DatabaseDataLake.cpp | 7 ++++--- src/Interpreters/InterpreterCreateQuery.cpp | 2 +- .../ObjectStorage/DataLakes/Iceberg/IcebergPath.h | 8 ++++++++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index b704a22287e7..fc5c92566ff5 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -146,6 +146,8 @@ String getLocationSchemeForTableCreation(const std::shared_ptrprimary_key || create.storage->sample_by - || create.storage->ttl_table || create.storage->unique_key - || create.storage->settings) + || create.storage->ttl_table || create.storage->settings) throw Exception(ErrorCodes::BAD_ARGUMENTS, "DataLakeCatalog CREATE TABLE supports only PARTITION BY and ORDER BY; " - "PRIMARY KEY, SAMPLE BY, TTL, UNIQUE KEY, and engine SETTINGS are not supported"); + "PRIMARY KEY, SAMPLE BY, TTL, and engine SETTINGS are not supported"); if (create.storage->partition_by) partition_by = create.storage->partition_by->clone(); diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 8665edccda7b..5c87955c1f2d 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -1977,7 +1977,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create, { String as_database_name = getContext()->resolveDatabase(as_database_saved); StoragePtr as_storage = DatabaseCatalog::instance().getTable({as_database_name, as_table_saved}, getContext()); - auto as_storage_metadata = as_storage->getInMemoryMetadataPtr(getContext(), false); + auto as_storage_metadata = as_storage->getInMemoryMetadataPtr(); if (!create_query.storage) { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h index d535f3c672a5..26cb3a564d0f 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h @@ -87,6 +87,14 @@ class IcebergPathResolver /// Convert a metadata path to an actual storage path for I/O operations. String resolve(const IcebergPathFromMetadata & metadata_path) const; + IcebergPathFromMetadata reverseResolve(const String & storage_path) const + { + if (storage_path.size() > table_root.size() + && storage_path.starts_with(table_root)) + return IcebergPathFromMetadata::deserialize(table_location + storage_path.substr(table_root.size())); + return IcebergPathFromMetadata::deserialize(table_location + storage_path); + } + /// Convert a metadata path to a catalog-compatible path. /// Done this way because backward compatibility reasons. String resolveForCatalog(const IcebergPathFromMetadata & metadata_path) const From 13f9f822aa3bb21bc6333980f5217b6edc610574 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Fri, 12 Jun 2026 11:36:02 +0200 Subject: [PATCH 3/5] Update SettingsChangesHistory.cpp --- src/Core/SettingsChangesHistory.cpp | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index de701ed4d274..bd4ef861dad8 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -43,31 +43,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() { {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"export_merge_tree_partition_task_timeout_seconds", "3600", "86400", "Increase default value to make it more realistic"}, - {"enable_join_runtime_filter_shared_fixed_hash_table", false, true, "New setting to share the hash join's FixedHashMap as the runtime filter for the probe side, replacing the Set/BloomFilter built upstream by the runtime filter framework."}, - {"ai_function_embedding_max_batch_size", 100, 100, "New setting"}, {"data_lake_delete_data_on_drop", false, false, "New setting that unifies dropping of data lake data; the released `iceberg_delete_data_on_drop` is kept as an alias for it."}, - {"enable_sharding_aggregator", false, false, "New setting to enable sharded `GROUP BY` optimization that distributes rows across threads by hashing the grouping key, so each thread aggregates a disjoint subset of keys without a merge phase; this is efficient for high cardinality keys with evenly distributed data."}, - {"allow_experimental_text_index_lazy_apply", false, false, "New setting to gate experimental lazy posting list apply mode"}, - {"text_index_posting_list_apply_mode", "materialize", "materialize", "New setting for lazy posting list apply mode"}, - {"text_index_density_threshold", 0.2, 0.2, "New setting for lazy posting list density threshold"}, - {"show_remote_databases_in_system_tables", false, false, "Renamed from `show_data_lake_catalogs_in_system_tables` and broadened to also hide `MySQL` and `PostgreSQL` databases from `system.tables`, `system.columns` and `system.completions` by default, since enumerating their tables requires expensive remote calls. Users who relied on the previous behavior must set this setting to `true`. The old name is kept as an alias."}, - {"enable_streaming_queries", false, false, "New setting"}, - {"optimize_prewhere_after_pushdown", false, false, "New setting that enables a second PREWHERE promotion pass to merge filters deposited above a MergeTree read step by later optimizations (predicate pushdown through JOIN, projection rewrites) into the existing PREWHERE chain."}, - {"wait_for_part_commit_in_dependent_materialized_views", false, false, "New setting"}, - {"output_format_float_precision", 0, 0, "A new setting to control decimal digits in float output"}, - {"file_like_engine_default_partition_strategy", "wildcard", "hive", "Change the default partition strategy for file-like table engines (S3, AzureBlobStorage, etc.) from `wildcard` to `hive` when no `partition_strategy` is provided."}, - {"allow_limit_by_partitions_independently", false, true, "New setting to enable independent per-partition evaluation of `LIMIT BY` when the partition expression is a deterministic function of the `LIMIT BY` columns."}, - {"optimize_rewrite_has_to_in", false, true, "New setting"}, - {"unique_key_max_encoded_size", 256, 256, "New setting: maximum size (bytes) of the order-preserving binary encoding of a single UNIQUE KEY row"}, - {"query_plan_push_limit_by_into_sort", false, true, "New setting that pushes a per-stream LIMIT BY into the sort pipeline when LIMIT BY's columns are a prefix of ORDER BY, reducing rows flowing through the final merge."}, - {"optimize_limit_by_in_order", false, true, "New setting to optimize `LIMIT BY` queries when `BY` columns are a prefix of the table's sorting key."}, - {"analyzer_compatibility_prefer_alias_over_subcolumn", false, false, "New compatibility setting"}, - {"query_plan_max_set_size_for_projection_match", 0, 10000, "Added new setting that bounds the cost of content-hashing IN-clause sets in the projection matcher (today: aggregate projection). Sets larger than the limit are treated as non-matching. Zero disables content-hash comparison entirely (compatibility value: projection match never succeeds for nodes with IN-sets)."}, - {"use_reader_executor", false, false, "New experimental setting to route reads through the new pipeline ReaderExecutor instead of the legacy matryoshka of read buffers."}, - {"function_base58_max_input_size", 0, 10000, "New setting that limits the input size of `base58Encode`, `base58Decode` and `tryBase58Decode` (whose conversion is quadratic in the input length) to 10 KB by default. The compatibility value `0` disables the limit, restoring the previous behavior of accepting arbitrarily large inputs."}, - {"format_avro_schema_registry_max_retries", 0, 5, "New setting controlling the maximum number of retries for transient failures (transport timeouts, connection refused, DNS errors, HTTP 5xx/408/429) when communicating with the Confluent Schema Registry. Set to 0 to disable retries. Previous behavior (no retries) is preserved by `compatibility = '26.5'`."}, - {"format_avro_schema_registry_retry_initial_backoff_ms", 100, 100, "New setting controlling the initial backoff (in milliseconds) before retrying a failed Confluent Schema Registry request. The backoff doubles on each retry, capped at 10 seconds. Has no effect when `format_avro_schema_registry_max_retries = 0` (the pre-26.6 behavior restored by `compatibility = '26.5'`)."}, - {"enable_join_transitive_predicates", false, true, "Turn on enable_join_transitive_predicates by default"}, }); addSettingsChanges(settings_changes_history, "26.3", { From 10e7d44b8f1687aac88aa21aa6a054b1459e9deb Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Sun, 14 Jun 2026 22:03:12 +0200 Subject: [PATCH 4/5] fix test --- src/Storages/ObjectStorage/StorageObjectStorage.cpp | 6 +++++- tests/integration/test_database_iceberg/test.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 1557dc83716d..414255b0f0cf 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -914,6 +914,8 @@ Pipe StorageObjectStorage::executeCommand(const String & command_name, const AST void StorageObjectStorage::alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & /*alter_lock_holder*/) { + updateExternalDynamicMetadataIfExists(context); + StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); params.apply(new_metadata, context); @@ -925,8 +927,10 @@ void StorageObjectStorage::alter(const AlterCommands & params, ContextPtr contex setInMemoryMetadata(new_metadata); } -void StorageObjectStorage::checkAlterIsPossible(const AlterCommands & commands, ContextPtr /*context*/) const +void StorageObjectStorage::checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const { + if (configuration->isDataLakeConfiguration()) + configuration->update(object_storage, context); configuration->checkAlterIsPossible(commands); } diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 03b82ffcc6c2..9f9a15b5c2fe 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -1162,7 +1162,7 @@ def test_drop_table_purge(started_cluster): namespace = "test_drop_purge_ns" catalog = load_catalog_impl(started_cluster) minio_client = Minio( - f"{started_cluster.minio_ip}:{started_cluster.minio_port}", + f"{started_cluster.get_instance_ip('minio')}:9000", access_key=minio_access_key, secret_key=minio_secret_key, secure=False, From 2e437fca7abaae43bb92668f11af8f6f167d5270 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Mon, 15 Jun 2026 15:28:06 +0200 Subject: [PATCH 5/5] fix ci --- tests/integration/test_database_iceberg/test.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 9f9a15b5c2fe..c0868bae0509 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -1048,16 +1048,6 @@ def test_create_table_nested_namespace(started_cluster): settings={"allow_database_iceberg": 1}, ).strip() == "1" - # ALTER exercises RestCatalog::updateSchema for the nested namespace. - node.query( - f"ALTER TABLE {CATALOG_NAME}.`{namespace}.nested` ADD COLUMN z Nullable(String);", - settings=write_settings, - ) - assert "z" in node.query( - f"DESCRIBE TABLE {CATALOG_NAME}.`{namespace}.nested`", - settings=write_settings, - ) - node.query( f"DROP TABLE {CATALOG_NAME}.`{namespace}.nested` SETTINGS allow_database_iceberg=1" )