Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 46 additions & 18 deletions docs/en/engines/table-engines/integrations/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ doc_type: 'reference'

# Iceberg table engine {#iceberg-table-engine}

:::warning
:::warning
We recommend using the [Iceberg Table Function](/sql-reference/table-functions/iceberg.md) for working with Iceberg data in ClickHouse. The Iceberg Table Function currently provides sufficient functionality, offering a partial read-only interface for Iceberg tables.

The Iceberg Table Engine is available but may have limitations. ClickHouse wasn't originally designed to support tables with externally changing schemas, which can affect the functionality of the Iceberg Table Engine. As a result, some features that work with regular tables may be unavailable or may not function correctly, especially when using the old analyzer.
Expand Down Expand Up @@ -78,7 +78,7 @@ Table engine `Iceberg` is an alias to `IcebergS3` now.
At the moment, with the help of CH, you can read iceberg tables, the schema of which has changed over time. We currently support reading tables where columns have been added and removed, and their order has changed. You can also change a column where a value is required to one where NULL is allowed. Additionally, we support permitted type casting for simple types, namely:  
* int -> long
* float -> double
* decimal(P, S) -> decimal(P', S) where P' > P.
* decimal(P, S) -> decimal(P', S) where P' > P.

Currently, it is not possible to change nested structures or the types of elements within arrays and maps.

Expand All @@ -94,20 +94,20 @@ ClickHouse supports time travel for Iceberg tables, allowing you to query histor

## Processing of tables with deleted rows {#deleted-rows}

Currently, only Iceberg tables with [position deletes](https://iceberg.apache.org/spec/#position-delete-files) are supported.
Currently, only Iceberg tables with [position deletes](https://iceberg.apache.org/spec/#position-delete-files) are supported.

The following deletion methods are **not supported**:
- [Equality deletes](https://iceberg.apache.org/spec/#equality-delete-files)
- [Deletion vectors](https://iceberg.apache.org/spec/#deletion-vectors) (introduced in v3)

### Basic usage {#basic-usage}
```sql
SELECT * FROM example_table ORDER BY 1
SELECT * FROM example_table ORDER BY 1
SETTINGS iceberg_timestamp_ms = 1714636800000
```

```sql
SELECT * FROM example_table ORDER BY 1
SELECT * FROM example_table ORDER BY 1
SETTINGS iceberg_snapshot_id = 3547395809148285433
```

Expand All @@ -132,21 +132,21 @@ Consider this sequence of operations:
```sql
-- Create a table with two columns
CREATE TABLE IF NOT EXISTS spark_catalog.db.time_travel_example (
order_number int,
order_number int,
product_code string
)
USING iceberg
)
USING iceberg
OPTIONS ('format-version'='2')

-- Insert data into the table
INSERT INTO spark_catalog.db.time_travel_example VALUES
INSERT INTO spark_catalog.db.time_travel_example VALUES
(1, 'Mars')

ts1 = now() // A piece of pseudo code

-- Alter table to add a new column
ALTER TABLE spark_catalog.db.time_travel_example ADD COLUMN (price double)

ts2 = now()

-- Insert data into the table
Expand Down Expand Up @@ -192,10 +192,10 @@ A time travel query at a current moment might show a different schema than the c
```sql
-- Create a table
CREATE TABLE IF NOT EXISTS spark_catalog.db.time_travel_example_2 (
order_number int,
order_number int,
product_code string
)
USING iceberg
)
USING iceberg
OPTIONS ('format-version'='2')

-- Insert initial data into the table
Expand Down Expand Up @@ -234,10 +234,10 @@ The second one is that while doing time travel you can't get state of table befo
```sql
-- Create a table
CREATE TABLE IF NOT EXISTS spark_catalog.db.time_travel_example_3 (
order_number int,
order_number int,
product_code string
)
USING iceberg
)
USING iceberg
OPTIONS ('format-version'='2');

ts = now();
Expand Down Expand Up @@ -275,9 +275,9 @@ After identifying candidate files using the above rules, the system determines w
* The file with the highest version number is selected
* (Version appears as `V` in filenames formatted as `V.metadata.json` or `V-uuid.metadata.json`)

**Note**: All mentioned settings are engine-level settings and must be specified during table creation as shown below:
**Note**: All mentioned settings (unless explicitly specified otherwise) are engine-level settings and must be specified during table creation as shown below:

```sql
```sql
CREATE TABLE example_table ENGINE = Iceberg(
's3://bucket/path/to/iceberg_table'
) SETTINGS iceberg_metadata_table_uuid = '6f6f6407-c6a5-465f-a808-ea8900e35a38';
Expand All @@ -293,6 +293,34 @@ CREATE TABLE example_table ENGINE = Iceberg(

`Iceberg` table engine and table function support metadata cache storing the information of manifest files, manifest list and metadata json. The cache is stored in memory. This feature is controlled by setting `use_iceberg_metadata_files_cache`, which is enabled by default.

## Asynchronous metadata prefetching {#async-metadata-prefetch}

Asynchronous metadata prefetching can be enabled at `Iceberg` table creation by setting `iceberg_metadata_async_prefetch_period_ms`. If set to 0 (default) or if metadata caching is not enabled, the asynchronous prefetching is disabled.
In order to enable this feature, a non-zero value of milliseconds should be given. It represents interval between prefetching cycles.

If enabled, the server will run a recurring background operation to list the remote catalog and to detect new metadata version. It will then parse it and recursively walk the snapshot, fetching active manifest list files and manifest files.
The files already available at the metadata cache, won't be downloaded again. At the end of each prefetching cycle, the latest metadata snapshot is available at the metadata cache.

```sql
CREATE TABLE example_table ENGINE = Iceberg(
's3://bucket/path/to/iceberg_table'
) SETTINGS
iceberg_metadata_async_prefetch_period_ms = 60000;
```

In order to make the most of asynchronous metadata prefetching at read operations, `iceberg_metadata_staleness_ms` parameter should be specified as Query or Session parameter. By default (0 - not specified) in the context of each query, the server will fetch latest metadata from the remote catalog.
By specifying tolerance to metadata staleness, the server is allowed to use the cached version of metadata snapshot without calling the remote catalog. If there's metadata version in cache, and it has been downloaded within the given window of staleness, it will be used to process the query.
Otherwise the latest version will be fetched from the remote catalog.

```sql
SELECT count() FROM icebench_table WHERE ...
SETTINGS iceberg_metadata_staleness_ms=120000
```

**Note**: Asynchronous metadata prefetching runs at `ICEBERG_SCEDULE_POOL`, which is server-side threadpool for background operations on active `Iceberg` tables. The size of this threadpool is controlled by `iceberg_background_schedule_pool_size` server configuration parameter (default is 10).

**Note**: Current expectation is that metadata cache size is sufficient to hold the latest metadata snapshot in full for all active tables, if asynchronous prefetching is enabled.

## Altinity Antalya branch

### Specify storage type in arguments
Expand Down
2 changes: 2 additions & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@
M(IcebergCatalogThreads, "Number of threads in the IcebergCatalog thread pool.") \
M(IcebergCatalogThreadsActive, "Number of threads in the IcebergCatalog thread pool running a task.") \
M(IcebergCatalogThreadsScheduled, "Number of queued or active jobs in the IcebergCatalog thread pool.") \
M(IcebergSchedulePoolTask, "Number of tasks in the background schedule pool for Iceberg tables.") \
M(IcebergSchedulePoolSize, "Limit on number of tasks in the background schedule pool for Iceberg tables.") \
M(ParallelWithQueryThreads, "Number of threads in the threadpool for processing PARALLEL WITH queries.") \
M(ParallelWithQueryActiveThreads, "Number of active threads in the threadpool for processing PARALLEL WITH queries.") \
M(ParallelWithQueryScheduledThreads, "Number of queued or active jobs in the threadpool for processing PARALLEL WITH queries.") \
Expand Down
1 change: 1 addition & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
M(PrimaryIndexCacheMisses, "Number of times an entry has not been found in the primary index cache, so we had to load a index file in memory, which is a costly operation, adding to query latency.", ValueType::Number) \
M(IcebergMetadataFilesCacheHits, "Number of times iceberg metadata files have been found in the cache.", ValueType::Number) \
M(IcebergMetadataFilesCacheMisses, "Number of times iceberg metadata files have not been found in the iceberg metadata cache and had to be read from (remote) disk.", ValueType::Number) \
M(IcebergMetadataFilesCacheStaleMisses, "Number of times iceberg metadata files have been found in the cache, but were considered stale and had to be read from (remote) disk.", ValueType::Number) \
M(IcebergMetadataFilesCacheWeightLost, "Approximate number of bytes evicted from the iceberg metadata cache.", ValueType::Number) \
M(IcebergMetadataReadWaitTimeMicroseconds, "Total time data readers spend waiting for iceberg metadata files to be read and parsed, summed across all reader threads.", ValueType::Microseconds) \
M(IcebergIteratorInitializationMicroseconds, "Total time spent on synchronous initialization of iceberg data iterators.", ValueType::Microseconds) \
Expand Down
1 change: 1 addition & 0 deletions src/Common/setThreadName.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ namespace DB
M(HASHED_DICT_LOAD, "HashedDictLoad") \
M(HTTP_HANDLER, "HTTPHandler") \
M(ICEBERG_ITERATOR, "IcebergIter") \
M(ICEBERG_SCHEDULE_POOL, "IcebergSchPool") \
M(INTERSERVER_HANDLER, "IntersrvHandler") \
M(IO_URING_MONITOR, "IoUringMonitr") \
M(KEEPER_HANDLER, "KeeperHandler") \
Expand Down
1 change: 1 addition & 0 deletions src/Core/ServerSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,7 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
DECLARE(UInt64, threadpool_writer_queue_size, 10000, R"(Number of tasks which is possible to push into background pool for write requests to object storages)", 0) \
DECLARE(UInt64, iceberg_catalog_threadpool_pool_size, 50, R"(Size of background pool for iceberg catalog)", 0) \
DECLARE(UInt64, iceberg_catalog_threadpool_queue_size, 10000, R"(Number of tasks which is possible to push into iceberg catalog pool)", 0) \
DECLARE(UInt64, iceberg_background_schedule_pool_size, 10, "Size of thread pool to asynchronously fetch the latest metadata from a remote iceberg catalog; the pool is shared by all the active tables.", 0) \
DECLARE(UInt64, drop_distributed_cache_pool_size, 8, R"(The size of the threadpool used for dropping distributed cache.)", 0) \
DECLARE(UInt64, drop_distributed_cache_queue_size, 1000, R"(The queue size of the threadpool used for dropping distributed cache.)", 0) \
DECLARE(Bool, distributed_cache_apply_throttling_settings_from_client, true, R"(Whether cache server should apply throttling settings received from client.)", 0) \
Expand Down
4 changes: 3 additions & 1 deletion src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5164,7 +5164,9 @@ Possible values:
- 0 - Disabled
- 1 - Enabled
)", 0) \
\
DECLARE(UInt64, iceberg_metadata_staleness_ms, 0, R"(
If non-zero, skip fetching iceberg metadata from remote catalog if there is a cached metadata snapshot, more recent than the given staleness window. Zero means to always fetch the latest metadata version from the remote catalog. Setting this a non-zero trades staleness to a lower latency of read operations.
)", 0) \
DECLARE(Bool, use_query_cache, false, R"(
If turned on, `SELECT` queries may utilize the [query cache](../query-cache.md). Parameters [enable_reads_from_query_cache](#enable_reads_from_query_cache)
and [enable_writes_to_query_cache](#enable_writes_to_query_cache) control in more detail how the cache is used.
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
// {"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
{"s3_propagate_credentials_to_other_storages", false, false, "New setting"},
{"export_merge_tree_part_filename_pattern", "", "{part_name}_{checksum}", "New setting"},
{"iceberg_metadata_staleness_ms", 0, 0, "New setting allowing using cached metadata version at READ operations to prevent fetching from remote catalog"},
});
addSettingsChanges(settings_changes_history, "26.1",
{
Expand Down
22 changes: 22 additions & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ namespace CurrentMetrics
extern const Metric BackgroundFetchesPoolSize;
extern const Metric BackgroundCommonPoolTask;
extern const Metric BackgroundCommonPoolSize;
extern const Metric IcebergSchedulePoolTask;
extern const Metric IcebergSchedulePoolSize;
extern const Metric MarksLoaderThreads;
extern const Metric MarksLoaderThreadsActive;
extern const Metric MarksLoaderThreadsScheduled;
Expand Down Expand Up @@ -342,6 +344,7 @@ namespace ServerSetting
extern const ServerSettingsFloat background_merges_mutations_concurrency_ratio;
extern const ServerSettingsString background_merges_mutations_scheduling_policy;
extern const ServerSettingsUInt64 background_message_broker_schedule_pool_size;
extern const ServerSettingsUInt64 iceberg_background_schedule_pool_size;
extern const ServerSettingsUInt64 background_move_pool_size;
extern const ServerSettingsUInt64 background_pool_size;
extern const ServerSettingsUInt64 background_schedule_pool_size;
Expand Down Expand Up @@ -557,6 +560,8 @@ struct ContextSharedPart : boost::noncopyable
mutable BackgroundSchedulePoolPtr distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
OnceFlag message_broker_schedule_pool_initialized;
mutable BackgroundSchedulePoolPtr message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka)
OnceFlag iceberg_schedule_pool_initialized;
mutable BackgroundSchedulePoolPtr iceberg_schedule_pool; /// A thread pool that runs background metadata refresh for all active Iceberg tables

mutable OnceFlag readers_initialized;
mutable std::unique_ptr<IAsynchronousReader> asynchronous_remote_fs_reader;
Expand Down Expand Up @@ -909,6 +914,7 @@ struct ContextSharedPart : boost::noncopyable
BackgroundSchedulePoolPtr delete_schedule_pool;
BackgroundSchedulePoolPtr delete_distributed_schedule_pool;
BackgroundSchedulePoolPtr delete_message_broker_schedule_pool;
BackgroundSchedulePoolPtr delete_iceberg_schedule_pool;

std::unique_ptr<AccessControl> delete_access_control;

Expand Down Expand Up @@ -987,6 +993,7 @@ struct ContextSharedPart : boost::noncopyable
delete_schedule_pool = std::move(schedule_pool);
delete_distributed_schedule_pool = std::move(distributed_schedule_pool);
delete_message_broker_schedule_pool = std::move(message_broker_schedule_pool);
delete_iceberg_schedule_pool = std::move(iceberg_schedule_pool);

delete_access_control = std::move(access_control);

Expand Down Expand Up @@ -1035,6 +1042,7 @@ struct ContextSharedPart : boost::noncopyable
join_background_pool(std::move(delete_schedule_pool));
join_background_pool(std::move(delete_distributed_schedule_pool));
join_background_pool(std::move(delete_message_broker_schedule_pool));
join_background_pool(std::move(delete_iceberg_schedule_pool));

delete_access_control.reset();

Expand Down Expand Up @@ -4374,6 +4382,20 @@ BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const
return *shared->message_broker_schedule_pool;
}

BackgroundSchedulePool & Context::getIcebergSchedulePool() const
{
callOnce(shared->iceberg_schedule_pool_initialized, [&] {
shared->iceberg_schedule_pool = BackgroundSchedulePool::create(
shared->server_settings[ServerSetting::iceberg_background_schedule_pool_size],
/*max_parallel_tasks_per_type*/ 0,
CurrentMetrics::IcebergSchedulePoolTask,
CurrentMetrics::IcebergSchedulePoolSize,
DB::ThreadName::ICEBERG_SCHEDULE_POOL);
});

return *shared->iceberg_schedule_pool;
}

void Context::configureServerWideThrottling()
{
if (shared->application_type == ApplicationType::LOCAL || shared->application_type == ApplicationType::SERVER || shared->application_type == ApplicationType::DISKS)
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,7 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
BackgroundSchedulePool & getSchedulePool() const;
BackgroundSchedulePool & getMessageBrokerSchedulePool() const;
BackgroundSchedulePool & getDistributedSchedulePool() const;
BackgroundSchedulePool & getIcebergSchedulePool() const;

/// Has distributed_ddl configuration or not.
bool hasDistributedDDL() const;
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,10 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
}

private:
DataLakeMetadataPtr current_metadata;
LoggerPtr log = getLogger("DataLakeConfiguration");
const DataLakeStorageSettingsPtr settings;
ObjectStoragePtr ready_object_storage;
DataLakeMetadataPtr current_metadata;
LoggerPtr log = getLogger("DataLakeConfiguration");

void assertInitializedDL() const
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ Explicit table UUID to read metadata for. Ignored if iceberg_metadata_file_path
)", 0) \
DECLARE(Bool, iceberg_recent_metadata_file_by_last_updated_ms_field, false, R"(
If enabled, the engine would use the metadata file with the most recent last_updated_ms json field. Does not make sense to use with iceberg_metadata_file_path.
)", 0) \
DECLARE(UInt32, iceberg_metadata_async_prefetch_period_ms, 0, R"(
The period in milliseconds to asynchronously prefetch the latest metadata snapshot from a remote iceberg catalog. Default is 0 - disabled.
)", 0) \
DECLARE(Bool, iceberg_use_version_hint, false, R"(
Get latest metadata path from version-hint.text file.
Expand Down
Loading
Loading