diff --git a/docs/en/engines/table-engines/integrations/iceberg.md b/docs/en/engines/table-engines/integrations/iceberg.md index 83c912f1a488..c57178052b0d 100644 --- a/docs/en/engines/table-engines/integrations/iceberg.md +++ b/docs/en/engines/table-engines/integrations/iceberg.md @@ -10,7 +10,7 @@ doc_type: 'reference' # Iceberg table engine {#iceberg-table-engine} -:::warning +:::warning We recommend using the [Iceberg Table Function](/sql-reference/table-functions/iceberg.md) for working with Iceberg data in ClickHouse. The Iceberg Table Function currently provides sufficient functionality, offering a partial read-only interface for Iceberg tables. The Iceberg Table Engine is available but may have limitations. ClickHouse wasn't originally designed to support tables with externally changing schemas, which can affect the functionality of the Iceberg Table Engine. As a result, some features that work with regular tables may be unavailable or may not function correctly, especially when using the old analyzer. @@ -78,7 +78,7 @@ Table engine `Iceberg` is an alias to `IcebergS3` now. At the moment, with the help of CH, you can read iceberg tables, the schema of which has changed over time. We currently support reading tables where columns have been added and removed, and their order has changed. You can also change a column where a value is required to one where NULL is allowed. Additionally, we support permitted type casting for simple types, namely:   * int -> long * float -> double -* decimal(P, S) -> decimal(P', S) where P' > P. +* decimal(P, S) -> decimal(P', S) where P' > P. Currently, it is not possible to change nested structures or the types of elements within arrays and maps. @@ -94,7 +94,7 @@ ClickHouse supports time travel for Iceberg tables, allowing you to query histor ## Processing of tables with deleted rows {#deleted-rows} -Currently, only Iceberg tables with [position deletes](https://iceberg.apache.org/spec/#position-delete-files) are supported. +Currently, only Iceberg tables with [position deletes](https://iceberg.apache.org/spec/#position-delete-files) are supported. The following deletion methods are **not supported**: - [Equality deletes](https://iceberg.apache.org/spec/#equality-delete-files) @@ -102,12 +102,12 @@ The following deletion methods are **not supported**: ### Basic usage {#basic-usage} ```sql - SELECT * FROM example_table ORDER BY 1 + SELECT * FROM example_table ORDER BY 1 SETTINGS iceberg_timestamp_ms = 1714636800000 ``` ```sql - SELECT * FROM example_table ORDER BY 1 + SELECT * FROM example_table ORDER BY 1 SETTINGS iceberg_snapshot_id = 3547395809148285433 ``` @@ -132,21 +132,21 @@ Consider this sequence of operations: ```sql -- Create a table with two columns CREATE TABLE IF NOT EXISTS spark_catalog.db.time_travel_example ( - order_number int, + order_number int, product_code string - ) - USING iceberg + ) + USING iceberg OPTIONS ('format-version'='2') -- Insert data into the table - INSERT INTO spark_catalog.db.time_travel_example VALUES + INSERT INTO spark_catalog.db.time_travel_example VALUES (1, 'Mars') ts1 = now() // A piece of pseudo code -- Alter table to add a new column ALTER TABLE spark_catalog.db.time_travel_example ADD COLUMN (price double) - + ts2 = now() -- Insert data into the table @@ -192,10 +192,10 @@ A time travel query at a current moment might show a different schema than the c ```sql -- Create a table CREATE TABLE IF NOT EXISTS spark_catalog.db.time_travel_example_2 ( - order_number int, + order_number int, product_code string - ) - USING iceberg + ) + USING iceberg OPTIONS ('format-version'='2') -- Insert initial data into the table @@ -234,10 +234,10 @@ The second one is that while doing time travel you can't get state of table befo ```sql -- Create a table CREATE TABLE IF NOT EXISTS spark_catalog.db.time_travel_example_3 ( - order_number int, + order_number int, product_code string - ) - USING iceberg + ) + USING iceberg OPTIONS ('format-version'='2'); ts = now(); @@ -275,9 +275,9 @@ After identifying candidate files using the above rules, the system determines w * The file with the highest version number is selected * (Version appears as `V` in filenames formatted as `V.metadata.json` or `V-uuid.metadata.json`) -**Note**: All mentioned settings are engine-level settings and must be specified during table creation as shown below: +**Note**: All mentioned settings (unless explicitly specified otherwise) are engine-level settings and must be specified during table creation as shown below: -```sql +```sql CREATE TABLE example_table ENGINE = Iceberg( 's3://bucket/path/to/iceberg_table' ) SETTINGS iceberg_metadata_table_uuid = '6f6f6407-c6a5-465f-a808-ea8900e35a38'; @@ -293,6 +293,34 @@ CREATE TABLE example_table ENGINE = Iceberg( `Iceberg` table engine and table function support metadata cache storing the information of manifest files, manifest list and metadata json. The cache is stored in memory. This feature is controlled by setting `use_iceberg_metadata_files_cache`, which is enabled by default. +## Asynchronous metadata prefetching {#async-metadata-prefetch} + +Asynchronous metadata prefetching can be enabled at `Iceberg` table creation by setting `iceberg_metadata_async_prefetch_period_ms`. If set to 0 (default) or if metadata caching is not enabled, the asynchronous prefetching is disabled. +In order to enable this feature, a non-zero value of milliseconds should be given. It represents interval between prefetching cycles. + +If enabled, the server will run a recurring background operation to list the remote catalog and to detect new metadata version. It will then parse it and recursively walk the snapshot, fetching active manifest list files and manifest files. +The files already available at the metadata cache, won't be downloaded again. At the end of each prefetching cycle, the latest metadata snapshot is available at the metadata cache. + +```sql +CREATE TABLE example_table ENGINE = Iceberg( + 's3://bucket/path/to/iceberg_table' +) SETTINGS + iceberg_metadata_async_prefetch_period_ms = 60000; +``` + +In order to make the most of asynchronous metadata prefetching at read operations, `iceberg_metadata_staleness_ms` parameter should be specified as Query or Session parameter. By default (0 - not specified) in the context of each query, the server will fetch latest metadata from the remote catalog. +By specifying tolerance to metadata staleness, the server is allowed to use the cached version of metadata snapshot without calling the remote catalog. If there's metadata version in cache, and it has been downloaded within the given window of staleness, it will be used to process the query. +Otherwise the latest version will be fetched from the remote catalog. + +```sql +SELECT count() FROM icebench_table WHERE ... +SETTINGS iceberg_metadata_staleness_ms=120000 +``` + +**Note**: Asynchronous metadata prefetching runs at `ICEBERG_SCEDULE_POOL`, which is server-side threadpool for background operations on active `Iceberg` tables. The size of this threadpool is controlled by `iceberg_background_schedule_pool_size` server configuration parameter (default is 10). + +**Note**: Current expectation is that metadata cache size is sufficient to hold the latest metadata snapshot in full for all active tables, if asynchronous prefetching is enabled. + ## Altinity Antalya branch ### Specify storage type in arguments diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index d32293c0de6f..d8a3c4df0699 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -208,6 +208,8 @@ M(IcebergCatalogThreads, "Number of threads in the IcebergCatalog thread pool.") \ M(IcebergCatalogThreadsActive, "Number of threads in the IcebergCatalog thread pool running a task.") \ M(IcebergCatalogThreadsScheduled, "Number of queued or active jobs in the IcebergCatalog thread pool.") \ + M(IcebergSchedulePoolTask, "Number of tasks in the background schedule pool for Iceberg tables.") \ + M(IcebergSchedulePoolSize, "Limit on number of tasks in the background schedule pool for Iceberg tables.") \ M(ParallelWithQueryThreads, "Number of threads in the threadpool for processing PARALLEL WITH queries.") \ M(ParallelWithQueryActiveThreads, "Number of active threads in the threadpool for processing PARALLEL WITH queries.") \ M(ParallelWithQueryScheduledThreads, "Number of queued or active jobs in the threadpool for processing PARALLEL WITH queries.") \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 8aba8a63b392..1c9f2d9b9ac0 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -100,6 +100,7 @@ M(PrimaryIndexCacheMisses, "Number of times an entry has not been found in the primary index cache, so we had to load a index file in memory, which is a costly operation, adding to query latency.", ValueType::Number) \ M(IcebergMetadataFilesCacheHits, "Number of times iceberg metadata files have been found in the cache.", ValueType::Number) \ M(IcebergMetadataFilesCacheMisses, "Number of times iceberg metadata files have not been found in the iceberg metadata cache and had to be read from (remote) disk.", ValueType::Number) \ + M(IcebergMetadataFilesCacheStaleMisses, "Number of times iceberg metadata files have been found in the cache, but were considered stale and had to be read from (remote) disk.", ValueType::Number) \ M(IcebergMetadataFilesCacheWeightLost, "Approximate number of bytes evicted from the iceberg metadata cache.", ValueType::Number) \ M(IcebergMetadataReadWaitTimeMicroseconds, "Total time data readers spend waiting for iceberg metadata files to be read and parsed, summed across all reader threads.", ValueType::Microseconds) \ M(IcebergIteratorInitializationMicroseconds, "Total time spent on synchronous initialization of iceberg data iterators.", ValueType::Microseconds) \ diff --git a/src/Common/setThreadName.h b/src/Common/setThreadName.h index 9bfa3e407737..381d6a456545 100644 --- a/src/Common/setThreadName.h +++ b/src/Common/setThreadName.h @@ -66,6 +66,7 @@ namespace DB M(HASHED_DICT_LOAD, "HashedDictLoad") \ M(HTTP_HANDLER, "HTTPHandler") \ M(ICEBERG_ITERATOR, "IcebergIter") \ + M(ICEBERG_SCHEDULE_POOL, "IcebergSchPool") \ M(INTERSERVER_HANDLER, "IntersrvHandler") \ M(IO_URING_MONITOR, "IoUringMonitr") \ M(KEEPER_HANDLER, "KeeperHandler") \ diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index bfab31f62819..fa2dc98c303e 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -1138,6 +1138,7 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_ DECLARE(UInt64, threadpool_writer_queue_size, 10000, R"(Number of tasks which is possible to push into background pool for write requests to object storages)", 0) \ DECLARE(UInt64, iceberg_catalog_threadpool_pool_size, 50, R"(Size of background pool for iceberg catalog)", 0) \ DECLARE(UInt64, iceberg_catalog_threadpool_queue_size, 10000, R"(Number of tasks which is possible to push into iceberg catalog pool)", 0) \ + DECLARE(UInt64, iceberg_background_schedule_pool_size, 10, "Size of thread pool to asynchronously fetch the latest metadata from a remote iceberg catalog; the pool is shared by all the active tables.", 0) \ DECLARE(UInt64, drop_distributed_cache_pool_size, 8, R"(The size of the threadpool used for dropping distributed cache.)", 0) \ DECLARE(UInt64, drop_distributed_cache_queue_size, 1000, R"(The queue size of the threadpool used for dropping distributed cache.)", 0) \ DECLARE(Bool, distributed_cache_apply_throttling_settings_from_client, true, R"(Whether cache server should apply throttling settings received from client.)", 0) \ diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 6e9d3ad9b276..8882874422e6 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5164,7 +5164,9 @@ Possible values: - 0 - Disabled - 1 - Enabled )", 0) \ - \ + DECLARE(UInt64, iceberg_metadata_staleness_ms, 0, R"( +If non-zero, skip fetching iceberg metadata from remote catalog if there is a cached metadata snapshot, more recent than the given staleness window. Zero means to always fetch the latest metadata version from the remote catalog. Setting this a non-zero trades staleness to a lower latency of read operations. +)", 0) \ DECLARE(Bool, use_query_cache, false, R"( If turned on, `SELECT` queries may utilize the [query cache](../query-cache.md). Parameters [enable_reads_from_query_cache](#enable_reads_from_query_cache) and [enable_writes_to_query_cache](#enable_writes_to_query_cache) control in more detail how the cache is used. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 1088a8163b0b..7c212510482c 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -45,6 +45,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() // {"object_storage_max_nodes", 0, 0, "Antalya: New setting"}, {"s3_propagate_credentials_to_other_storages", false, false, "New setting"}, {"export_merge_tree_part_filename_pattern", "", "{part_name}_{checksum}", "New setting"}, + {"iceberg_metadata_staleness_ms", 0, 0, "New setting allowing using cached metadata version at READ operations to prevent fetching from remote catalog"}, }); addSettingsChanges(settings_changes_history, "26.1", { diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index bc75c583fc30..876180dad66d 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -213,6 +213,8 @@ namespace CurrentMetrics extern const Metric BackgroundFetchesPoolSize; extern const Metric BackgroundCommonPoolTask; extern const Metric BackgroundCommonPoolSize; + extern const Metric IcebergSchedulePoolTask; + extern const Metric IcebergSchedulePoolSize; extern const Metric MarksLoaderThreads; extern const Metric MarksLoaderThreadsActive; extern const Metric MarksLoaderThreadsScheduled; @@ -342,6 +344,7 @@ namespace ServerSetting extern const ServerSettingsFloat background_merges_mutations_concurrency_ratio; extern const ServerSettingsString background_merges_mutations_scheduling_policy; extern const ServerSettingsUInt64 background_message_broker_schedule_pool_size; + extern const ServerSettingsUInt64 iceberg_background_schedule_pool_size; extern const ServerSettingsUInt64 background_move_pool_size; extern const ServerSettingsUInt64 background_pool_size; extern const ServerSettingsUInt64 background_schedule_pool_size; @@ -557,6 +560,8 @@ struct ContextSharedPart : boost::noncopyable mutable BackgroundSchedulePoolPtr distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends) OnceFlag message_broker_schedule_pool_initialized; mutable BackgroundSchedulePoolPtr message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka) + OnceFlag iceberg_schedule_pool_initialized; + mutable BackgroundSchedulePoolPtr iceberg_schedule_pool; /// A thread pool that runs background metadata refresh for all active Iceberg tables mutable OnceFlag readers_initialized; mutable std::unique_ptr asynchronous_remote_fs_reader; @@ -909,6 +914,7 @@ struct ContextSharedPart : boost::noncopyable BackgroundSchedulePoolPtr delete_schedule_pool; BackgroundSchedulePoolPtr delete_distributed_schedule_pool; BackgroundSchedulePoolPtr delete_message_broker_schedule_pool; + BackgroundSchedulePoolPtr delete_iceberg_schedule_pool; std::unique_ptr delete_access_control; @@ -987,6 +993,7 @@ struct ContextSharedPart : boost::noncopyable delete_schedule_pool = std::move(schedule_pool); delete_distributed_schedule_pool = std::move(distributed_schedule_pool); delete_message_broker_schedule_pool = std::move(message_broker_schedule_pool); + delete_iceberg_schedule_pool = std::move(iceberg_schedule_pool); delete_access_control = std::move(access_control); @@ -1035,6 +1042,7 @@ struct ContextSharedPart : boost::noncopyable join_background_pool(std::move(delete_schedule_pool)); join_background_pool(std::move(delete_distributed_schedule_pool)); join_background_pool(std::move(delete_message_broker_schedule_pool)); + join_background_pool(std::move(delete_iceberg_schedule_pool)); delete_access_control.reset(); @@ -4374,6 +4382,20 @@ BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const return *shared->message_broker_schedule_pool; } +BackgroundSchedulePool & Context::getIcebergSchedulePool() const +{ + callOnce(shared->iceberg_schedule_pool_initialized, [&] { + shared->iceberg_schedule_pool = BackgroundSchedulePool::create( + shared->server_settings[ServerSetting::iceberg_background_schedule_pool_size], + /*max_parallel_tasks_per_type*/ 0, + CurrentMetrics::IcebergSchedulePoolTask, + CurrentMetrics::IcebergSchedulePoolSize, + DB::ThreadName::ICEBERG_SCHEDULE_POOL); + }); + + return *shared->iceberg_schedule_pool; +} + void Context::configureServerWideThrottling() { if (shared->application_type == ApplicationType::LOCAL || shared->application_type == ApplicationType::SERVER || shared->application_type == ApplicationType::DISKS) diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 00b4d8b9fb85..a67d236151c0 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1415,6 +1415,7 @@ class Context: public ContextData, public std::enable_shared_from_this BackgroundSchedulePool & getSchedulePool() const; BackgroundSchedulePool & getMessageBrokerSchedulePool() const; BackgroundSchedulePool & getDistributedSchedulePool() const; + BackgroundSchedulePool & getIcebergSchedulePool() const; /// Has distributed_ddl configuration or not. bool hasDistributedDDL() const; diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 0da0b12d7a1a..b53ff9d8dcf8 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -428,10 +428,10 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl } private: - DataLakeMetadataPtr current_metadata; - LoggerPtr log = getLogger("DataLakeConfiguration"); const DataLakeStorageSettingsPtr settings; ObjectStoragePtr ready_object_storage; + DataLakeMetadataPtr current_metadata; + LoggerPtr log = getLogger("DataLakeConfiguration"); void assertInitializedDL() const { diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h index fb305b0eb3f9..59ece70828ae 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h @@ -54,6 +54,9 @@ Explicit table UUID to read metadata for. Ignored if iceberg_metadata_file_path )", 0) \ DECLARE(Bool, iceberg_recent_metadata_file_by_last_updated_ms_field, false, R"( If enabled, the engine would use the metadata file with the most recent last_updated_ms json field. Does not make sense to use with iceberg_metadata_file_path. +)", 0) \ + DECLARE(UInt32, iceberg_metadata_async_prefetch_period_ms, 0, R"( +The period in milliseconds to asynchronously prefetch the latest metadata snapshot from a remote iceberg catalog. Default is 0 - disabled. )", 0) \ DECLARE(Bool, iceberg_use_version_hint, false, R"( Get latest metadata path from version-hint.text file. diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 58f2926cc07e..b78739deb826 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -1,3 +1,5 @@ +#include +#include #include "config.h" #if USE_AVRO @@ -31,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -87,6 +90,7 @@ namespace DataLakeStorageSetting { extern const DataLakeStorageSettingsString iceberg_metadata_file_path; extern const DataLakeStorageSettingsString iceberg_metadata_table_uuid; +extern const DataLakeStorageSettingsUInt32 iceberg_metadata_async_prefetch_period_ms; extern const DataLakeStorageSettingsBool iceberg_recent_metadata_file_by_last_updated_ms_field; extern const DataLakeStorageSettingsBool iceberg_use_version_hint; extern const DataLakeStorageSettingsNonZeroUInt64 iceberg_format_version; @@ -148,7 +152,7 @@ Iceberg::PersistentTableComponents IcebergMetadata::initializePersistentTableCom StorageObjectStorageConfigurationPtr configuration, IcebergMetadataFilesCachePtr cache_ptr, ContextPtr context_) { const auto [metadata_version, metadata_file_path, compression_method] - = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration->getPathForRead().path, configuration->getDataLakeSettings(), cache_ptr, context_, log.get(), std::nullopt); + = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration->getPathForRead().path, configuration->getDataLakeSettings(), cache_ptr, context_, log.get(), std::nullopt, true); LOG_DEBUG(log, "Latest metadata file path is {}, version {}", metadata_file_path, metadata_version); auto metadata_object = getMetadataJSONObject(metadata_file_path, object_storage, cache_ptr, context_, log, compression_method, std::nullopt); @@ -182,7 +186,7 @@ Iceberg::PersistentTableComponents IcebergMetadata::initializePersistentTableCom }; } -std::pair IcebergMetadata::getRelevantState(const ContextPtr & context) const +std::pair IcebergMetadata::getRelevantState(const ContextPtr & context, bool force_fetch_latest_metadata) const { const auto [metadata_version, metadata_file_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion( object_storage, @@ -191,7 +195,8 @@ std::pair IcebergMetadata::getReleva persistent_components.metadata_cache, context, log.get(), - persistent_components.table_uuid); + persistent_components.table_uuid, + force_fetch_latest_metadata); return getState(context, metadata_file_path, metadata_version); } @@ -207,6 +212,80 @@ IcebergMetadata::IcebergMetadata( , data_lake_settings(configuration_->getDataLakeSettings()) , write_format(configuration_->getFormat()) { + /// TODO: for now it's okay to start/stop the task via constructor/destructor. Once refactored, we'd need to plumb startup/shutdown and schedule the task from there + if (cache_ptr && data_lake_settings[DataLakeStorageSetting::iceberg_metadata_async_prefetch_period_ms] != 0) + { + background_metadata_prefetch_task = context_->getIcebergSchedulePool().createTask( + StorageID("", persistent_components.table_uuid ? *persistent_components.table_uuid : persistent_components.table_path), + "backgroundMetadataPrefetcherThread", + [this] + { + this->backgroundMetadataPrefetcherThread(); + } + ); + background_metadata_prefetch_task->activateAndSchedule(); + } +} + +IcebergMetadata::~IcebergMetadata() +{ + if (background_metadata_prefetch_task) + background_metadata_prefetch_task->deactivate(); +} + +void IcebergMetadata::backgroundMetadataPrefetcherThread() +{ + size_t interval = data_lake_settings[DataLakeStorageSetting::iceberg_metadata_async_prefetch_period_ms]; + SCOPE_EXIT({ + background_metadata_prefetch_task->scheduleAfter(interval); + }); + + try + { + if (!Context::getGlobalContextInstance()) + { + /// Should never happen, but if seen, this is clear indicator that the task should be started/stopped via startup/shutdown mechanism (check TODOs above) + LOG_DEBUG(log, "backgroundMetadataPrefetcherThread: no global context - skipping"); + return; + } + + Stopwatch watch; + + /// TODO: also we'd want to run all these download operations as separate scheduled tasks - to parallelize it and + /// to prevent running a heavy multi-step operation as: IO > deserialization > parsing > IO > deserialization > parsing > ... + /// We'll be able to achieve that after getting asyncIterator refactoring + + /// first, we fetch the latest metadata version and cache it; + /// as a part of the same method, we download metadata.json of the latest metadata version + /// and after parsing it, we fetch manifest lists, parse and cache them + auto ctx = Context::createCopy(Context::getGlobalContextInstance()); + auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(ctx, true); + if (actual_data_snapshot) + { + for (const auto & entry : actual_data_snapshot->manifest_list_entries) + { + /// second, we fetch, parse and cache each manifest file + auto manifest_file_ptr = Iceberg::getManifestFile( + object_storage, persistent_components, ctx, log, + entry.manifest_file_absolute_path, + entry.added_sequence_number, + entry.added_snapshot_id, + *secondary_storages); + } + } + + LOG_TRACE(log, "backgroundMetadataPrefetcherThread: interval={} prefetch_time_ms={} table_path={}/{} latest_metadata={}/{}", + interval, + watch.elapsedMilliseconds(), + persistent_components.table_path, + persistent_components.table_uuid ? *(persistent_components.table_uuid) : "no_uuid", + actual_table_state_snapshot.metadata_version, + actual_table_state_snapshot.metadata_file_path); + } + catch (...) + { + DB::tryLogCurrentException(log); + } } Int32 IcebergMetadata::parseTableSchema( diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index dcb2b91131bd..169e93a11d4a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -48,6 +48,8 @@ class IcebergMetadata : public IDataLakeMetadata const ContextPtr & context_, IcebergMetadataFilesCachePtr cache_ptr); + ~IcebergMetadata() override; + /// Get table schema parsed from metadata. NamesAndTypesList getTableSchema(ContextPtr local_context) const override; @@ -158,7 +160,7 @@ class IcebergMetadata : public IDataLakeMetadata getState(const ContextPtr & local_context, const String & metadata_path, Int32 metadata_version) const; Iceberg::IcebergDataSnapshotPtr getRelevantDataSnapshotFromTableStateSnapshot(Iceberg::TableStateSnapshot table_state_snapshot, ContextPtr local_context) const; - std::pair getRelevantState(const ContextPtr & context) const; + std::pair getRelevantState(const ContextPtr & context, bool force_fetch_latest_metadata = false) const; std::optional getPartitionKey(ContextPtr local_context, Iceberg::TableStateSnapshot actual_table_state_snapshot) const; KeyDescription getSortingKey(ContextPtr local_context, Iceberg::TableStateSnapshot actual_table_state_snapshot) const; @@ -169,6 +171,10 @@ class IcebergMetadata : public IDataLakeMetadata DB::Iceberg::PersistentTableComponents persistent_components; const DataLakeStorageSettings & data_lake_settings; const String write_format; + BackgroundSchedulePoolTaskHolder background_metadata_prefetch_task; + + + void backgroundMetadataPrefetcherThread(); }; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h index 2eb02960a044..403d26502f42 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h @@ -1,4 +1,8 @@ #pragma once +#include +#include +#include +#include #include "config.h" #if USE_AVRO @@ -14,6 +18,7 @@ namespace ProfileEvents { extern const Event IcebergMetadataFilesCacheMisses; + extern const Event IcebergMetadataFilesCacheStaleMisses; extern const Event IcebergMetadataFilesCacheHits; extern const Event IcebergMetadataFilesCacheWeightLost; } @@ -27,6 +32,25 @@ namespace CurrentMetrics namespace DB { +namespace Iceberg +{ +struct MetadataFileWithInfo +{ + Int32 version; + String path; + CompressionMethod compression_method; +}; +} + +struct LatestMetadataVersion +{ + /// time when it's been received from the remote catalog and cached + std::chrono::time_point cached_at; + /// the actual metadata version reference + Iceberg::MetadataFileWithInfo latest_metadata; +}; +using LatestMetadataVersionPtr = std::shared_ptr; + /// The structure that can identify a manifest file. We store it in cache. /// And we can get `ManifestFileContent` from cache by ManifestFileEntry. struct ManifestFileCacheKey @@ -43,11 +67,12 @@ using ManifestFileCacheKeys = std::vector; /// For simplicity, we keep them in the same cache. struct IcebergMetadataFilesCacheCell : private boost::noncopyable { - /// The cached element could be - /// - metadata.json content - /// - manifest list consists of cache keys which will retrieve the manifest file from cache - /// - manifest file - std::variant cached_element; + /// The cached entities can be: + /// - reference to the latest metadata version (metadata.json path) [table_path --> LatestMetadataVersionPtr] + /// - metadata.json content [file_path --> String] + /// - manifest list consists of cache keys which will retrieve the manifest file from cache [file_path --> ManifestFileCacheKeys] + /// - manifest file [file_path --> Iceberg::ManifestFileCacheableInfo] + std::variant cached_element; Int64 memory_bytes; explicit IcebergMetadataFilesCacheCell(String && metadata_json_str) @@ -55,6 +80,11 @@ struct IcebergMetadataFilesCacheCell : private boost::noncopyable , memory_bytes(std::get(cached_element).capacity() + SIZE_IN_MEMORY_OVERHEAD) { } + explicit IcebergMetadataFilesCacheCell(LatestMetadataVersionPtr latest_metadata_version) + : cached_element(latest_metadata_version) + , memory_bytes(getMemorySizeOfMetadataVersion(latest_metadata_version) + SIZE_IN_MEMORY_OVERHEAD) + { + } explicit IcebergMetadataFilesCacheCell(ManifestFileCacheKeys && manifest_file_cache_keys_) : cached_element(std::move(manifest_file_cache_keys_)) , memory_bytes(getMemorySizeOfManifestCacheKeys(std::get(cached_element)) + SIZE_IN_MEMORY_OVERHEAD) @@ -68,6 +98,12 @@ struct IcebergMetadataFilesCacheCell : private boost::noncopyable private: static constexpr size_t SIZE_IN_MEMORY_OVERHEAD = 200; /// we always underestimate the size of an object; + static size_t getMemorySizeOfMetadataVersion(const LatestMetadataVersionPtr & metadata_version) + { + chassert(metadata_version); + return sizeof(LatestMetadataVersion) + metadata_version->latest_metadata.path.size(); + } + static size_t getMemorySizeOfManifestCacheKeys(const ManifestFileCacheKeys & manifest_file_cache_keys) { size_t total_size = 0; @@ -147,6 +183,63 @@ class IcebergMetadataFilesCache : public CacheBase(result.first->cached_element); } + template + LatestMetadataVersionPtr getOrSetLatestMetadataVersion(const String & table_path, const std::optional & table_uuid, LoadFunc && load_fn, time_t tolerated_staleness_ms) + { + /// This caching method for latest metadata version: + /// 1. Takes two keys to reference a table - path and [optional] uuid + /// 2. Probes the cache only if stale values are tolerated - sometimes we just have to force the latest value from the remote catalog and to cache it + /// 2.1. Lookup in cache is performed using table uuid if it's provided, or using table path + /// 3. If the value needs to be forced via the remote catalog, we first load it, then we place it in cache under both keys + + /// NOTE: There're couple of caveats and nuances regarding current implementation + /// (a) can't use getOrSet (as should've) at the moment - because load_fn accesses the same cache to extract uuid from the metadata.json file + /// (b) also can't use getOrSet - because there's no mechanism to disregard the existing value because of its staleness + /// (c) single value is referenced using two keys + /// For now the potential impact of these nuances is considered low for the potential gain - in the worst case, we'll call remote catalog more than once, which is still better than now + /// TODO: + /// (a) and (c) will be solved by moving to AsyncIterator for Iceberg - the loading process should be decoupled, refactored and broken down into independent pieces; + /// and later by refactoring caching to create a new logical layer of snapshot caching + /// (b) #97410 will address several design flaws of CacheBase + will introduce custom predicates for get/set operations + + /// tolerated_staleness_ms=0 would mean that a non-cached value is required + if (tolerated_staleness_ms > 0) + { + const String & data_path = table_uuid ? *table_uuid : table_path; + auto found = Base::get(data_path); + + if (found) + { + LatestMetadataVersionPtr cell = std::get(found->cached_element); + if (std::chrono::duration_cast(std::chrono::system_clock::now() - cell->cached_at).count() <= tolerated_staleness_ms) + { + /// the cached element is found and it's not stale accurding to our expectation + ProfileEvents::increment(ProfileEvents::IcebergMetadataFilesCacheHits); + return cell; + } + else + { + ProfileEvents::increment(ProfileEvents::IcebergMetadataFilesCacheStaleMisses); + } + } + else + { + ProfileEvents::increment(ProfileEvents::IcebergMetadataFilesCacheMisses); + } + } + + LatestMetadataVersionPtr cell = std::make_shared(); + cell->latest_metadata = load_fn(); + cell->cached_at = std::chrono::system_clock::now(); + + Base::set(table_path, std::make_shared(cell)); + if (table_uuid) + Base::set(*table_uuid, std::make_shared(cell)); + + return cell; + } + + private: /// Called for each individual entry being evicted from cache void onEntryRemoval(const size_t weight_loss, const MappedPtr & mapped_ptr) override diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index 3809dfcd38c2..fe30deb08ba7 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -643,7 +643,8 @@ IcebergStorageSink::IcebergStorageSink( persistent_table_components.metadata_cache, context_, log.get(), - persistent_table_components.table_uuid); + persistent_table_components.table_uuid, + true); metadata = getMetadataJSONObject( metadata_path, @@ -828,6 +829,7 @@ void IcebergStorageSink::finalizeBuffers() if (writer_per_partition_key.empty()) return; + /// TODO: there's a chance that initializeMetadata() doesn't succeed within MAX_TRANSACTION_RETRIES without throwing, perhaps we should fail in this case size_t i = 0; while (i < MAX_TRANSACTION_RETRIES) { @@ -894,7 +896,8 @@ bool IcebergStorageSink::initializeMetadata() persistent_table_components.metadata_cache, context, getLogger("IcebergWrites").get(), - persistent_table_components.table_uuid); + persistent_table_components.table_uuid, + true); LOG_DEBUG(log, "Rereading metadata file {} with version {}", metadata_path, last_version); @@ -1035,6 +1038,16 @@ bool IcebergStorageSink::initializeMetadata() } } } + + if (persistent_table_components.metadata_cache) + { + /// If there's an active metadata cache + /// We can't just cache 'our' written version as latest, because it could've been overwritten by a concurrent catalog update + /// This is why, we are safely invalidating the cache, and the very next reader will get the most up-to-date latest version + persistent_table_components.metadata_cache->remove(persistent_table_components.table_path); + if (persistent_table_components.table_uuid) + persistent_table_components.metadata_cache->remove(*persistent_table_components.table_uuid); + } } catch (...) { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index db170fb7fca7..050d4f70b7c1 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -87,6 +87,7 @@ namespace ProfileEvents namespace DB::Setting { + extern const SettingsUInt64 iceberg_metadata_staleness_ms; extern const SettingsUInt64 output_format_compression_level; extern const SettingsTimezone iceberg_partition_timezone; } @@ -913,7 +914,6 @@ std::pair createEmptyMetadataFile( return {new_metadata_file_content, removeEscapedSlashes(oss.str())}; } - /** * Each version of table metadata is stored in a `metadata` directory and * has one of 2 formats: @@ -930,99 +930,120 @@ static MetadataFileWithInfo getLatestMetadataFileAndVersion( IcebergMetadataFilesCachePtr metadata_cache, const ContextPtr & local_context, std::optional table_uuid, - bool use_table_uuid_for_metadata_file_selection) + bool use_table_uuid_for_metadata_file_selection, + bool force_fetch_latest_metadata) { - auto log = getLogger("IcebergMetadataFileResolver"); - MostRecentMetadataFileSelectionWay selection_way - = data_lake_settings[DataLakeStorageSetting::iceberg_recent_metadata_file_by_last_updated_ms_field].value - ? MostRecentMetadataFileSelectionWay::BY_LAST_UPDATED_MS_FIELD - : MostRecentMetadataFileSelectionWay::BY_METADATA_FILE_VERSION; - bool need_all_metadata_files_parsing = (selection_way == MostRecentMetadataFileSelectionWay::BY_LAST_UPDATED_MS_FIELD) - || (table_uuid.has_value() && use_table_uuid_for_metadata_file_selection); - const auto metadata_files = listFiles(*object_storage, table_path, "metadata", ".metadata.json"); - if (metadata_files.empty()) - { - throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", table_path); - } - std::vector metadata_files_with_versions; - metadata_files_with_versions.reserve(metadata_files.size()); - for (const auto & path : metadata_files) + auto load_fn = [&]() { - String filename = std::filesystem::path(path).filename(); - if (isTemporaryMetadataFile(filename)) - continue; - auto [version, metadata_file_path, compression_method] = getMetadataFileAndVersion(path); - - if (need_all_metadata_files_parsing) + auto log = getLogger("IcebergMetadataFileResolver"); + MostRecentMetadataFileSelectionWay selection_way + = data_lake_settings[DataLakeStorageSetting::iceberg_recent_metadata_file_by_last_updated_ms_field].value + ? MostRecentMetadataFileSelectionWay::BY_LAST_UPDATED_MS_FIELD + : MostRecentMetadataFileSelectionWay::BY_METADATA_FILE_VERSION; + bool need_all_metadata_files_parsing = (selection_way == MostRecentMetadataFileSelectionWay::BY_LAST_UPDATED_MS_FIELD) + || (table_uuid.has_value() && use_table_uuid_for_metadata_file_selection); + const auto metadata_files = listFiles(*object_storage, table_path, "metadata", ".metadata.json"); + if (metadata_files.empty()) { - auto metadata_file_object = getMetadataJSONObject( - metadata_file_path, object_storage, metadata_cache, local_context, log, compression_method, table_uuid); - if (table_uuid.has_value() && use_table_uuid_for_metadata_file_selection) + throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", table_path); + } + std::vector metadata_files_with_versions; + metadata_files_with_versions.reserve(metadata_files.size()); + for (const auto & path : metadata_files) + { + String filename = std::filesystem::path(path).filename(); + if (isTemporaryMetadataFile(filename)) + continue; + auto [version, metadata_file_path, compression_method] = getMetadataFileAndVersion(path); + + if (need_all_metadata_files_parsing) { - if (metadata_file_object->has(Iceberg::f_table_uuid)) + auto metadata_file_object = getMetadataJSONObject( + metadata_file_path, object_storage, metadata_cache, local_context, log, compression_method, table_uuid); + if (table_uuid.has_value() && use_table_uuid_for_metadata_file_selection) { - auto current_table_uuid = metadata_file_object->getValue(Iceberg::f_table_uuid); - if (normalizeUuid(table_uuid.value()) == normalizeUuid(current_table_uuid)) + if (metadata_file_object->has(Iceberg::f_table_uuid)) + { + auto current_table_uuid = metadata_file_object->getValue(Iceberg::f_table_uuid); + if (normalizeUuid(table_uuid.value()) == normalizeUuid(current_table_uuid)) + { + metadata_files_with_versions.emplace_back( + version, metadata_file_object->getValue(Iceberg::f_last_updated_ms), metadata_file_path); + } + } + else { - metadata_files_with_versions.emplace_back( - version, metadata_file_object->getValue(Iceberg::f_last_updated_ms), metadata_file_path); + Int64 format_version = metadata_file_object->getValue(Iceberg::f_format_version); + throw Exception( + format_version == 1 ? ErrorCodes::BAD_ARGUMENTS : ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, + "Table UUID is not specified in some metadata files for table by path {}", + metadata_file_path); } } else { - Int64 format_version = metadata_file_object->getValue(Iceberg::f_format_version); - throw Exception( - format_version == 1 ? ErrorCodes::BAD_ARGUMENTS : ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, - "Table UUID is not specified in some metadata files for table by path {}", - metadata_file_path); + metadata_files_with_versions.emplace_back(version, metadata_file_object->getValue(Iceberg::f_last_updated_ms), metadata_file_path); } } else { - metadata_files_with_versions.emplace_back(version, metadata_file_object->getValue(Iceberg::f_last_updated_ms), metadata_file_path); + metadata_files_with_versions.emplace_back(version, 0, metadata_file_path); } } - else - { - metadata_files_with_versions.emplace_back(version, 0, metadata_file_path); - } - } - if (metadata_files_with_versions.empty()) - { - if (table_uuid.has_value() && use_table_uuid_for_metadata_file_selection) + if (metadata_files_with_versions.empty()) { + if (table_uuid.has_value() && use_table_uuid_for_metadata_file_selection) + { + throw Exception( + ErrorCodes::FILE_DOESNT_EXIST, + "The metadata file for Iceberg table with path {} and table UUID {} doesn't exist", + table_path, + table_uuid.value()); + } throw Exception( ErrorCodes::FILE_DOESNT_EXIST, - "The metadata file for Iceberg table with path {} and table UUID {} doesn't exist", - table_path, - table_uuid.value()); + "The metadata file for Iceberg table with path {} doesn't exist", + table_path); } - throw Exception( - ErrorCodes::FILE_DOESNT_EXIST, - "The metadata file for Iceberg table with path {} doesn't exist", - table_path); - } - /// Get the latest version of metadata file: v.metadata.json - const ShortMetadataFileInfo & latest_metadata_file_info = [&]() - { - if (selection_way == MostRecentMetadataFileSelectionWay::BY_LAST_UPDATED_MS_FIELD) + /// Get the latest version of metadata file: v.metadata.json + const ShortMetadataFileInfo & latest_metadata_file_info = [&]() { - return *std::max_element( - metadata_files_with_versions.begin(), - metadata_files_with_versions.end(), - [](const ShortMetadataFileInfo & a, const ShortMetadataFileInfo & b) { return a.last_updated_ms < b.last_updated_ms; }); - } - else - { - return *std::max_element( - metadata_files_with_versions.begin(), - metadata_files_with_versions.end(), - [](const ShortMetadataFileInfo & a, const ShortMetadataFileInfo & b) { return a.version < b.version; }); - } - }(); - return {latest_metadata_file_info.version, latest_metadata_file_info.path, getCompressionMethodFromMetadataFile(latest_metadata_file_info.path)}; + if (selection_way == MostRecentMetadataFileSelectionWay::BY_LAST_UPDATED_MS_FIELD) + { + return *std::max_element( + metadata_files_with_versions.begin(), + metadata_files_with_versions.end(), + [](const ShortMetadataFileInfo & a, const ShortMetadataFileInfo & b) { return a.last_updated_ms < b.last_updated_ms; }); + } + else + { + return *std::max_element( + metadata_files_with_versions.begin(), + metadata_files_with_versions.end(), + [](const ShortMetadataFileInfo & a, const ShortMetadataFileInfo & b) { return a.version < b.version; }); + } + }(); + return MetadataFileWithInfo{latest_metadata_file_info.version, latest_metadata_file_info.path, getCompressionMethodFromMetadataFile(latest_metadata_file_info.path)}; + }; + + /// We'll query latest metadata from either cache or the actual remote catalog with a certain configured tolerance of staleness + size_t tolerated_staleness_ms = static_cast(local_context->getSettingsRef()[Setting::iceberg_metadata_staleness_ms]); + /// If the force is been requested (normally it's required by certain types of operations), the tolerance will be 0 and we'll go to the remote catalog + if (force_fetch_latest_metadata) + tolerated_staleness_ms = 0; + + if (metadata_cache) + { + return metadata_cache->getOrSetLatestMetadataVersion( + table_path, + table_uuid, + load_fn, + tolerated_staleness_ms + )->latest_metadata; + } + return load_fn(); } MetadataFileWithInfo getLatestOrExplicitMetadataFileAndVersion( @@ -1032,7 +1053,8 @@ MetadataFileWithInfo getLatestOrExplicitMetadataFileAndVersion( IcebergMetadataFilesCachePtr metadata_cache, const ContextPtr & local_context, Poco::Logger * log, - const std::optional & table_uuid) + const std::optional & table_uuid, + bool force_fetch_latest_metadata) { if (data_lake_settings[DataLakeStorageSetting::iceberg_metadata_file_path].changed) { @@ -1076,7 +1098,7 @@ MetadataFileWithInfo getLatestOrExplicitMetadataFileAndVersion( explicit_table_uuid, table_path); return getLatestMetadataFileAndVersion( - object_storage, table_path, data_lake_settings, metadata_cache, local_context, normalizeUuid(explicit_table_uuid), true); + object_storage, table_path, data_lake_settings, metadata_cache, local_context, normalizeUuid(explicit_table_uuid), true, force_fetch_latest_metadata); } else if (data_lake_settings[DataLakeStorageSetting::iceberg_use_version_hint].value) { @@ -1100,7 +1122,7 @@ MetadataFileWithInfo getLatestOrExplicitMetadataFileAndVersion( else { return getLatestMetadataFileAndVersion( - object_storage, table_path, data_lake_settings, metadata_cache, local_context, table_uuid, false); + object_storage, table_path, data_lake_settings, metadata_cache, local_context, table_uuid, false, force_fetch_latest_metadata); } } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h index 3c69fe05d523..6aa8013a4214 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h @@ -84,12 +84,6 @@ Poco::JSON::Object::Ptr getMetadataJSONObject( CompressionMethod compression_method, const std::optional & table_uuid); -struct MetadataFileWithInfo -{ - Int32 version; - String path; - CompressionMethod compression_method; -}; std::pair getIcebergType(DataTypePtr type, Int32 & iter); Poco::Dynamic::Var getAvroType(DataTypePtr type); @@ -110,7 +104,8 @@ MetadataFileWithInfo getLatestOrExplicitMetadataFileAndVersion( IcebergMetadataFilesCachePtr metadata_cache, const ContextPtr & local_context, Poco::Logger * log, - const std::optional & table_uuid); + const std::optional & table_uuid, + bool force_fetch_latest_metadata = true); std::pair parseTableSchemaV1Method(const Poco::JSON::Object::Ptr & metadata_object); std::pair parseTableSchemaV2Method(const Poco::JSON::Object::Ptr & metadata_object); diff --git a/tests/integration/test_storage_iceberg_with_spark/test_async_metadata_refresh.py b/tests/integration/test_storage_iceberg_with_spark/test_async_metadata_refresh.py new file mode 100644 index 000000000000..52683eb256bd --- /dev/null +++ b/tests/integration/test_storage_iceberg_with_spark/test_async_metadata_refresh.py @@ -0,0 +1,324 @@ +import time +import pytest +from helpers.iceberg_utils import ( + create_iceberg_table, + generate_data, + get_uuid_str, + write_iceberg_from_df, + default_upload_directory, +) + +_ASYNC_CACHE_REFRESH_CONFIG_PATH = "/etc/clickhouse-server/config.d/iceberg_async_cache_refresh.xml" + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_selecting_with_stale_vs_latest_metadata(started_cluster_iceberg_with_spark, storage_type): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = ( + "test_selecting_with_stale_vs_latest_metadata" + + "_" + + storage_type + + "_" + + get_uuid_str() + ) + + write_iceberg_from_df( + spark, + generate_data(spark, 0, 100), + TABLE_NAME, + mode="overwrite", + ) + default_upload_directory( + started_cluster_iceberg_with_spark, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", + ) + + # disabling async refresher to validate that the latest metadata will be pulled at SELECT + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark, + additional_settings = [ + f"iceberg_metadata_async_prefetch_period_ms=0" + ]) + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 + + # 1. Validating that SELECT will pull the latest metadata by default + write_iceberg_from_df( + spark, + generate_data(spark, 100, 200), + TABLE_NAME, + mode="append", + ) + default_upload_directory( + started_cluster_iceberg_with_spark, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", + ) + + # Now we will SELECT data with accepting a stale metadata - the expectation is that no call to remote catalog will occur and the cached metadata to be used + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}", settings={ + "iceberg_metadata_staleness_ms":600_000, + "log_comment":f"{TABLE_NAME}_01" + })) == 100 + + # no reads from S3 should be performed as a part of this operation + instance.query("SYSTEM FLUSH LOGS query_log") + s3_read, s3_get, s3_head, s3_list, cache_hit, cache_miss, cache_stale_miss = instance.query(f""" + SELECT + ProfileEvents['S3ReadRequestsCount'], + ProfileEvents['S3GetObject'], + ProfileEvents['S3HeadObject'], + ProfileEvents['S3ListObjects'], + ProfileEvents['IcebergMetadataFilesCacheHits'], + ProfileEvents['IcebergMetadataFilesCacheMisses'], + ProfileEvents['IcebergMetadataFilesCacheStaleMisses'] + FROM system.query_log + WHERE type = 'QueryFinish' AND log_comment = '{TABLE_NAME}_01' + """).strip().split('\t') + # nothing has been requested from the remote catalog (s3) + assert 0 == int(s3_read) + assert 0 == int(s3_get) + assert 0 == int(s3_head) + assert 0 == int(s3_list) + # cache hits only, no misses + assert 0 < int(cache_hit) + assert 0 == int(cache_miss) + assert 0 == int(cache_stale_miss) + + + # by default, SELECT will query remote catalog for the latest metadata + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}", settings={ + "log_comment":f"{TABLE_NAME}_02" + })) == 200 + + # some reads should occur as a part of this operation + instance.query("SYSTEM FLUSH LOGS query_log") + s3_read, s3_get, s3_head, s3_list, cache_hit, cache_miss, cache_stale_miss = instance.query(f""" + SELECT + ProfileEvents['S3ReadRequestsCount'], + ProfileEvents['S3GetObject'], + ProfileEvents['S3HeadObject'], + ProfileEvents['S3ListObjects'], + ProfileEvents['IcebergMetadataFilesCacheHits'], + ProfileEvents['IcebergMetadataFilesCacheMisses'], + ProfileEvents['IcebergMetadataFilesCacheStaleMisses'] + FROM system.query_log + WHERE type = 'QueryFinish' AND log_comment = '{TABLE_NAME}_02' + """).strip().split('\t') + assert 0 < int(s3_read) + assert 0 < int(s3_get) + assert 0 < int(s3_head) + assert 0 < int(s3_list) + assert 0 < int(cache_hit) # old manifest lists & files are found in cache + assert 0 < int(cache_miss) # new manifest lists & files are not found in local cache + assert 0 == int(cache_stale_miss) + + # 2. Validating that SELECT will pull the latest metadata if the cached version is stale + write_iceberg_from_df( + spark, + generate_data(spark, 200, 300), + TABLE_NAME, + mode="append", + ) + default_upload_directory( + started_cluster_iceberg_with_spark, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", + ) + + # first, we sleep to make the cached metadata to be measurably stale + time.sleep(5) + + # then, we accept really stale metadata at SELECT - no call to remote catalog + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}", settings={ + "iceberg_metadata_staleness_ms":600_000, + "log_comment":f"{TABLE_NAME}_03" + })) == 200 + + instance.query("SYSTEM FLUSH LOGS query_log") + # nothing has been queried from s3 + assert 0 == int(instance.query(f"""SELECT ProfileEvents['S3ReadRequestsCount'] + FROM system.query_log + WHERE type = 'QueryFinish' AND log_comment = '{TABLE_NAME}_03' + """).strip()) + + # lastly, we SELECT with tiny tolerance to stale metadata - latest metadata will be fetched from s3 + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}", settings={ + "iceberg_metadata_staleness_ms":4_000, + "log_comment":f"{TABLE_NAME}_04" + })) == 300 + # latest metadata was fetched from s3 + instance.query("SYSTEM FLUSH LOGS query_log") + s3_read, s3_get, s3_head, s3_list, cache_hit, cache_miss, cache_stale_miss = instance.query(f""" + SELECT + ProfileEvents['S3ReadRequestsCount'], + ProfileEvents['S3GetObject'], + ProfileEvents['S3HeadObject'], + ProfileEvents['S3ListObjects'], + ProfileEvents['IcebergMetadataFilesCacheHits'], + ProfileEvents['IcebergMetadataFilesCacheMisses'], + ProfileEvents['IcebergMetadataFilesCacheStaleMisses'] + FROM system.query_log + WHERE type = 'QueryFinish' AND log_comment = '{TABLE_NAME}_04' + """).strip().split('\t') + assert 0 < int(s3_read) + assert 0 < int(s3_get) + assert 0 < int(s3_head) + assert 0 < int(s3_list) + assert 0 < int(cache_hit) # old manifest lists & files are found in cache + assert 0 < int(cache_miss) # new manifest lists & files are not found in local cache + assert 0 < int(cache_stale_miss) # the cached metadata has been considered stale + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_default_async_metadata_refresh(started_cluster_iceberg_with_spark, storage_type): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = ( + "test_default_async_metadata_refresh" + + "_" + + storage_type + + "_" + + get_uuid_str() + ) + + write_iceberg_from_df( + spark, + generate_data(spark, 0, 100), + TABLE_NAME, + mode="overwrite", + ) + default_upload_directory( + started_cluster_iceberg_with_spark, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", + ) + + # The expectation is that async metadata fetcher is disabled by default + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark) + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 + + write_iceberg_from_df( + spark, + generate_data(spark, 100, 200), + TABLE_NAME, + mode="append", + ) + default_upload_directory( + started_cluster_iceberg_with_spark, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", + ) + + # the fresh metadata won't get pulled at SELECT, so we see stale data + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME} SETTINGS iceberg_metadata_staleness_ms=600000")) == 100 + # sleeping a little bit + time.sleep(10) + # the metadata is not updated after sleep + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME} SETTINGS iceberg_metadata_staleness_ms=600000")) == 100 + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_async_metadata_refresh(started_cluster_iceberg_with_spark, storage_type): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = ( + "test_async_metadata_refresh" + + "_" + + storage_type + + "_" + + get_uuid_str() + ) + + write_iceberg_from_df( + spark, + generate_data(spark, 0, 100), + TABLE_NAME, + mode="overwrite", + ) + default_upload_directory( + started_cluster_iceberg_with_spark, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", + ) + + ASYNC_METADATA_REFRESH_PERIOD_MS=5_000 + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark, + additional_settings = [ + f"iceberg_metadata_async_prefetch_period_ms={ASYNC_METADATA_REFRESH_PERIOD_MS}" + ]) + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 + + # In order to track background activity against S3, let's remember current metric + s3_reads_before = int(instance.query( + "SELECT value FROM system.events WHERE name = 'S3ReadRequestsCount' SETTINGS system_events_show_zero_values=1" + )) + + write_iceberg_from_df( + spark, + generate_data(spark, 100, 200), + TABLE_NAME, + mode="append", + ) + default_upload_directory( + started_cluster_iceberg_with_spark, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", + ) + # the fresh metadata won't get pulled at SELECT, so we see stale data + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME} SETTINGS iceberg_metadata_staleness_ms=600000")) == 100 + # Wait for the background async refresher to pick up the new metadata (2 periods of ASYNC_METADATA_REFRESH_PERIOD_MS) + time.sleep(ASYNC_METADATA_REFRESH_PERIOD_MS/1000 * 2) + + # we expect some background activity against S3 + s3_reads_after = int(instance.query( + "SELECT value FROM system.events WHERE name = 'S3ReadRequestsCount' SETTINGS system_events_show_zero_values=1" + )) + assert s3_reads_after > s3_reads_before + # we don't pull fresh metadata at SELECT, but the data is up to date because of the async refresh + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME} SETTINGS iceberg_metadata_staleness_ms=600000")) == 200 + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_insert_updates_metadata_cache(started_cluster_iceberg_with_spark, storage_type): + instance = started_cluster_iceberg_with_spark.instances["node1"] + TABLE_NAME = ( + "test_insert_updates_metadata_cache" + + "_" + + storage_type + + "_" + + get_uuid_str() + ) + + schema = "(a Int64)" + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark, schema, + additional_settings = [ + f"iceberg_metadata_async_prefetch_period_ms=0" + ]) + + instance.query( + f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)", + settings={"allow_experimental_insert_into_iceberg": 1}, + ) + + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}", settings={ + "iceberg_metadata_staleness_ms":600_000, + "log_comment":f"{TABLE_NAME}_40" + })) == 100 + + instance.query("SYSTEM FLUSH LOGS query_log") + s3_reads, iceberg_cache_stale_misses, iceberg_cache_misses = instance.query(f""" + SELECT ProfileEvents['S3ReadRequestsCount'], ProfileEvents['IcebergMetadataFilesCacheStaleMisses'], ProfileEvents['IcebergMetadataFilesCacheMisses'] + FROM system.query_log + WHERE type = 'QueryFinish' AND log_comment = '{TABLE_NAME}_40' + """).strip().split('\t') + assert int(s3_reads) > 0 + assert int(iceberg_cache_misses) > 0 + assert int(iceberg_cache_stale_misses) == 0 diff --git a/tests/integration/test_storage_iceberg_with_spark/test_writes_create_table.py b/tests/integration/test_storage_iceberg_with_spark/test_writes_create_table.py index 560f43fc87de..402be0f8f9c3 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_writes_create_table.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_writes_create_table.py @@ -20,7 +20,7 @@ def test_writes_create_table(started_cluster_iceberg_with_spark, format_version, with pytest.raises(Exception): create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark, "(x String)", format_version) - create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark, "(x String)", format_version, "", True) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark, "(x String)", format_version, "", True) assert '`x` String' in instance.query(f"SHOW CREATE TABLE {TABLE_NAME}") @@ -54,18 +54,18 @@ def test_writes_create_table_order_by(started_cluster_iceberg_with_spark, format TABLE_NAME = "test_writes_create_table_" + storage_type + "_" + get_uuid_str() create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark, "(x Int32, y String)", order_by="(x)", format_version=format_version) - instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'abc'), (4, 'bc'), (2, 'd');", settings={"allow_experimental_insert_into_iceberg": 1}) - assert instance.query(f"SELECT x FROM {TABLE_NAME}") == '1\n2\n4\n' + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'abc'), (8, 'bc'), (4, 'd');", settings={"allow_experimental_insert_into_iceberg": 1}) + assert instance.query(f"SELECT x FROM {TABLE_NAME}") == '1\n4\n8\n' TABLE_NAME = "test_writes_create_table_" + storage_type + "_" + get_uuid_str() create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark, "(x Int32, y String)", order_by="(identity(x))", format_version=format_version) - instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'abc'), (4, 'bc'), (2, 'd');", settings={"allow_experimental_insert_into_iceberg": 1}) - assert instance.query(f"SELECT x FROM {TABLE_NAME}") == '1\n2\n4\n' + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'abc'), (8, 'bc'), (4, 'd');", settings={"allow_experimental_insert_into_iceberg": 1}) + assert instance.query(f"SELECT x FROM {TABLE_NAME}") == '1\n4\n8\n' TABLE_NAME = "test_writes_create_table_" + storage_type + "_" + get_uuid_str() create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark, "(x Int32, y String)", order_by="(icebergBucket(16, x))", format_version=format_version) - instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'abc'), (4, 'bc'), (2, 'd');", settings={"allow_experimental_insert_into_iceberg": 1}) - assert instance.query(f"SELECT x FROM {TABLE_NAME}") == '1\n2\n4\n' + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'abc'), (8, 'bc'), (4, 'd');", settings={"allow_experimental_insert_into_iceberg": 1}) + assert instance.query(f"SELECT x FROM {TABLE_NAME}") == '1\n4\n8\n' TABLE_NAME = "test_writes_create_table_" + storage_type + "_" + get_uuid_str() create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark, "(x UInt32, y String)", order_by="(icebergTruncate(16, x))", format_version=format_version) @@ -171,14 +171,14 @@ def test_order_by_bad_arguments_ast_parsing( format_version=format_version, order_by=order_by, ) - + error = instance.query_and_get_error(query) assert ( "Code: 36" in error or "BAD_ARGUMENTS" in error ), f"Expected BAD_ARGUMENTS error (Code: 36), got: {error}" assert ( - "Invalid iceberg sort order" in error - or "Unsupported function" in error + "Invalid iceberg sort order" in error + or "Unsupported function" in error or "expected a column identifier" in error.lower() or "expected (integer_literal, column_identifier)" in error.lower() or "expected 1 or 2 arguments" in error.lower() @@ -210,7 +210,7 @@ def test_order_by_bad_arguments_wrong_count( format_version=format_version, order_by=order_by, ) - + error = instance.query_and_get_error(query) assert ( "Code: 42" in error or "NUMBER_OF_ARGUMENTS_DOESNT_MATCH" in error @@ -242,7 +242,7 @@ def test_order_by_bad_arguments_wrong_type( format_version=format_version, order_by=order_by, ) - + error = instance.query_and_get_error(query) assert ( "Code: 43" in error or "ILLEGAL_TYPE_OF_ARGUMENT" in error