Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 69 additions & 2 deletions docs/en/engines/database-engines/datalake.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ The following settings are supported:
| `auth_header` | Custom HTTP header for authentication with the catalog service |
| `auth_scope` | OAuth2 scope for authentication (if using OAuth) |
| `storage_endpoint` | Endpoint URL for the underlying storage |
| `default_base_location` | Base URI for new tables when the catalog does not report `default-base-location`. New tables are placed under `<default_base_location>/<namespace>/<table>` (e.g. `s3://warehouse/data`) |
| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication |
| `vended_credentials` | Boolean indicating whether to use vended credentials from the catalog (supports AWS S3 and Azure ADLS Gen2) |
| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) |
Expand All @@ -63,6 +64,72 @@ The following settings are supported:
| `dlf_access_key_secret` | Access key Secret for DLF access |
| `namespaces` | Comma-separated list of namespaces, implemented for catalog types: `rest`, `glue` and `unity` |

## Creating tables {#creating-tables}

An Iceberg table in a `DataLakeCatalog` database can be created directly from ClickHouse.
The table name must be quoted with backticks and include the namespace separated by a dot:

```sql
CREATE TABLE catalog_db.`namespace.table_name`
(
id Int64,
name String,
value Float64
)
PARTITION BY id
ORDER BY name
SETTINGS allow_database_iceberg = 1;
```

Iceberg accepts only a fixed set of partition transforms, so `PARTITION BY`
must use one of the following expressions:

| Expression | Iceberg transform |
|-------------------------------|-------------------|
| `<column>` | `identity` |
| `toYearNumSinceEpoch(<col>)` | `year` |
| `toMonthNumSinceEpoch(<col>)` | `month` |
| `toRelativeDayNum(<col>)` | `day` |
| `toRelativeHourNum(<col>)` | `hour` |
| `icebergTruncate(N, <col>)` | `truncate[N]` |
| `icebergBucket(N, <col>)` | `bucket[N]` |

Composite partitioning is supported via `PARTITION BY (expr1, expr2, ...)`.
Other expressions (e.g. `toYYYYMM`, `intDiv`) are rejected at `CREATE TABLE`.

You can also create an Iceberg table that inherits the schema of an existing table:

```sql
CREATE TABLE catalog_db.`namespace.table_name`
AS other_db.source_table
SETTINGS allow_database_iceberg = 1;
```

If the source table's `PARTITION BY` and `ORDER BY` use only the expressions
listed above, they are copied into the new Iceberg table.

## Dropping tables {#dropping-tables}

Tables can be dropped from a `DataLakeCatalog` database.
`DROP TABLE` sends a delete request to the remote catalog, which removes
the table entry from the catalog.

```sql
DROP TABLE catalog_db.`namespace.table_name`
```

By default, ClickHouse does not request the catalog to delete the underlying data. In order to do it, use the `data_lake_delete_data_on_drop` setting:

```sql
DROP TABLE catalog_db.`namespace.table_name`
SETTINGS data_lake_delete_data_on_drop = 1
```

:::note
Whether data files are actually deleted depends on the catalog itself.
The `purgeRequested` flag is sent to the catalog, but the catalog may choose to ignore it.
:::

## Examples {#examples}

See below sections for examples of using the `DataLakeCatalog` engine:
Expand All @@ -79,8 +146,8 @@ SETTINGS
warehouse = warehouse,
onelake_tenant_id = tenant_id,
oauth_server_uri = server_uri,
auth_scope = auth_scope,
onelake_client_id = client_id,
auth_scope = auth_scope,
onelake_client_id = client_id,
onelake_client_secret = client_secret;
SHOW TABLES IN databse_name;
SELECT count() from database_name.table_name;
Expand Down
2 changes: 1 addition & 1 deletion src/Client/BuzzHouse/Generator/SessionSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ std::unordered_map<String, CHSetting> serverSettings = {
{"http_skip_not_found_url_for_globs", trueOrFalseSettingNoOracle},
{"http_wait_end_of_query", trueOrFalseSettingNoOracle},
{"http_write_exception_in_output_format", trueOrFalseSettingNoOracle},
{"iceberg_delete_data_on_drop", trueOrFalseSettingNoOracle},
{"data_lake_delete_data_on_drop", trueOrFalseSettingNoOracle},
{"iceberg_metadata_compression_method",
CHSetting([](RandomGenerator & rg, FuzzConfig &) { return "'" + rg.pickRandomly(compressionMethods) + "'"; }, {}, false)},
{"iceberg_metadata_log_level",
Expand Down
6 changes: 3 additions & 3 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5278,9 +5278,9 @@ Possible values:
- manifest_file_entry - Everything above + traversed avro manifest files entries.
)", 0) \
\
DECLARE(Bool, iceberg_delete_data_on_drop, false, R"(
Whether to delete all iceberg files on drop or not.
)", 0) \
DECLARE_WITH_ALIAS(Bool, data_lake_delete_data_on_drop, false, R"(
Whether to delete the underlying data files when dropping a data lake table. For catalog databases the catalog is asked to purge the data (`purgeRequested=true`); for self-managed tables ClickHouse removes the files directly.
)", 0, iceberg_delete_data_on_drop) \
DECLARE(Int64, iceberg_expire_default_min_snapshots_to_keep, 1, R"(
Default value for Iceberg table property `history.expire.min-snapshots-to-keep` used by `expire_snapshots` when that property is absent.
)", 0) \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"export_merge_tree_partition_task_timeout_seconds", "3600", "86400", "Increase default value to make it more realistic"},
{"data_lake_delete_data_on_drop", false, false, "New setting that unifies dropping of data lake data; the released `iceberg_delete_data_on_drop` is kept as an alias for it."},
});
addSettingsChanges(settings_changes_history, "26.3",
{
Expand Down
80 changes: 80 additions & 0 deletions src/Databases/DataLake/Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>

#include <fmt/format.h>
#include <Poco/URI.h>

namespace DB::ErrorCodes
{
extern const int BAD_ARGUMENTS;
Expand Down Expand Up @@ -121,4 +124,81 @@ std::pair<std::string, std::string> parseTableName(const std::string & name)
return {namespace_name, table_name};
}

String constructTableLocation(
const String & location_scheme,
const String & storage_endpoint,
const String & namespace_name,
const String & table_name)
{
Poco::URI uri(storage_endpoint);
auto path = uri.getPath();
while (path.starts_with('/'))
path.erase(0, 1);
while (path.ends_with('/'))
path.pop_back();

if (location_scheme == "abfss")
{
/// Azure: `abfss://<container>@<host>/<path>`. `storage_endpoint` is
/// `https://<host>/<container>/<extra>` or `abfss://<container>@<host>/<extra>`
String container = uri.getUserInfo();
String account_host = uri.getHost();
String extra_path = path;

if (container.empty())
{
auto first_slash = extra_path.find('/');
if (first_slash == String::npos)
{
container = std::move(extra_path);
extra_path.clear();
}
else
{
container = extra_path.substr(0, first_slash);
extra_path = extra_path.substr(first_slash + 1);
}
}

if (account_host.empty() || container.empty())
throw DB::Exception(
DB::ErrorCodes::BAD_ARGUMENTS,
"`storage_endpoint` ({}) for Azure must include both account host and container "
"(expected https://<account>.dfs.core.windows.net/<container>[/<sub-path>] or "
"abfss://<container>@<account>.dfs.core.windows.net[/<sub-path>])",
storage_endpoint);

if (extra_path.empty())
return fmt::format("abfss://{}@{}/{}/{}", container, account_host, namespace_name, table_name);
return fmt::format("abfss://{}@{}/{}/{}/{}", container, account_host, extra_path, namespace_name, table_name);
}

if (location_scheme == "s3")
{
if (path.empty())
throw DB::Exception(
DB::ErrorCodes::BAD_ARGUMENTS,
"`storage_endpoint` ({}) does not contain a bucket; "
"CREATE TABLE in DataLakeCatalog requires `storage_endpoint` to include a non-empty bucket path.",
storage_endpoint);
return fmt::format("s3://{}/{}/{}", path, namespace_name, table_name);
}

/// HDFS / file / other schemes that may have `authority`.
String authority = uri.getAuthority();
if (authority.empty())
{
if (path.empty())
throw DB::Exception(
DB::ErrorCodes::BAD_ARGUMENTS,
"`storage_endpoint` ({}) does not contain a path",
storage_endpoint);
return fmt::format("{}:///{}/{}/{}", location_scheme, path, namespace_name, table_name);
}

if (path.empty())
return fmt::format("{}://{}/{}/{}", location_scheme, authority, namespace_name, table_name);
return fmt::format("{}://{}/{}/{}/{}", location_scheme, authority, path, namespace_name, table_name);
}

}
6 changes: 6 additions & 0 deletions src/Databases/DataLake/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr
/// `E` is a table name.
std::pair<std::string, std::string> parseTableName(const std::string & name);

String constructTableLocation(
const String & location_scheme,
const String & storage_endpoint,
const String & namespace_name,
const String & table_name);

}
Loading
Loading