Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion docs/en/sql-reference/table-functions/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ y: 993

### Schema evolution {#iceberg-writes-schema-evolution}

ClickHouse allows you to add, drop, or modify columns with simple types (non-tuple, non-array, non-map).
ClickHouse allows you to add, drop, modify, or rename columns with simple types (non-tuple, non-array, non-map).

### Example {#example-iceberg-writes-evolution}

Expand Down Expand Up @@ -479,6 +479,27 @@ Row 1:
──────
x: Ivanov
y: 993

ALTER TABLE iceberg_writes_example RENAME COLUMN y TO value;
SHOW CREATE TABLE iceberg_writes_example;

┌─statement─────────────────────────────────────────────────┐
1. │ CREATE TABLE default.iceberg_writes_example ↴│
│↳( ↴│
│↳ `x` Nullable(String), ↴│
│↳ `value` Nullable(Int64) ↴│
│↳) ↴│
│↳ENGINE = IcebergLocal('/home/scanhex12/iceberg_example/') │
└───────────────────────────────────────────────────────────┘

SELECT *
FROM iceberg_writes_example
FORMAT VERTICAL;

Row 1:
──────
x: Ivanov
value: 993
```

### Compaction {#iceberg-writes-compaction}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ void IcebergMetadata::checkAlterIsPossible(const AlterCommands & commands)
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::DROP_COLUMN
&& command.type != AlterCommand::Type::MODIFY_COLUMN)
&& command.type != AlterCommand::Type::MODIFY_COLUMN && command.type != AlterCommand::Type::RENAME_COLUMN)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by Iceberg storage", command.type);
}
}
Expand Down
47 changes: 47 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,53 @@ void MetadataGenerator::generateModifyColumnMetadata(const String & column_name,
metadata_object->getArray(Iceberg::f_schemas)->add(current_schema);
}

void MetadataGenerator::generateRenameColumnMetadata(const String & column_name, const String & new_column_name)
{
auto current_schema_id = metadata_object->getValue<Int32>(Iceberg::f_current_schema_id);

Poco::JSON::Object::Ptr current_schema;
auto schemas = metadata_object->getArray(Iceberg::f_schemas);
for (UInt32 i = 0; i < schemas->size(); ++i)
{
if (schemas->getObject(i)->getValue<Int32>(Iceberg::f_schema_id) == current_schema_id)
{
current_schema = schemas->getObject(i);
break;
}
}

if (!current_schema)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found schema with id {}", current_schema_id);
current_schema = deepCopy(current_schema);

auto schema_fields = current_schema->getArray(Iceberg::f_fields);

for (UInt32 i = 0; i < schema_fields->size(); ++i)
{
if (schema_fields->getObject(i)->getValue<String>(Iceberg::f_name) == new_column_name)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Column {} already exists", new_column_name);
}

bool found = false;
for (UInt32 i = 0; i < schema_fields->size(); ++i)
{
auto current_field = schema_fields->getObject(i);
if (current_field->getValue<String>(Iceberg::f_name) == column_name)
{
current_field->set(Iceberg::f_name, new_column_name);
found = true;
break;
}
}

if (!found)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found column {}", column_name);

metadata_object->set(Iceberg::f_current_schema_id, current_schema_id + 1);
current_schema->set(Iceberg::f_schema_id, current_schema_id + 1);
metadata_object->getArray(Iceberg::f_schemas)->add(current_schema);
}

}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class MetadataGenerator
void generateAddColumnMetadata(const String & column_name, DataTypePtr type);
void generateDropColumnMetadata(const String & column_name);
void generateModifyColumnMetadata(const String & column_name, DataTypePtr type);
void generateRenameColumnMetadata(const String & column_name, const String & new_column_name);

private:
Poco::JSON::Object::Ptr metadata_object;
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,9 @@ void alter(
case AlterCommand::Type::MODIFY_COLUMN:
metadata_json_generator.generateModifyColumnMetadata(params[0].column_name, params[0].data_type);
break;
case AlterCommand::Type::RENAME_COLUMN:
metadata_json_generator.generateRenameColumnMetadata(params[0].column_name, params[0].rename_to);
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown type of alter {}", params[0].type);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import pytest

from helpers.iceberg_utils import (
create_iceberg_table,
get_uuid_str,
)

INSERT_SETTINGS = {"allow_experimental_insert_into_iceberg": 1}


@pytest.mark.parametrize("format_version", [1, 2])
@pytest.mark.parametrize("storage_type", ["local", "s3"])
def test_rename_column_basic(started_cluster_iceberg_no_spark, format_version, storage_type):
"""Rename a column: existing rows are readable under the new name, and new inserts work."""
instance = started_cluster_iceberg_no_spark.instances["node1"]
TABLE_NAME = "test_rename_column_basic_" + storage_type + "_" + get_uuid_str()

create_iceberg_table(
storage_type,
instance,
TABLE_NAME,
started_cluster_iceberg_no_spark,
"(id Int32, value Nullable(String))",
format_version,
)

instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'hello'), (2, 'world');", settings=INSERT_SETTINGS)
assert instance.query(f"SELECT id, value FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n"

instance.query(f"ALTER TABLE {TABLE_NAME} RENAME COLUMN value TO label;", settings=INSERT_SETTINGS)

# existing rows readable under the new name
assert instance.query(f"SELECT id, label FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n"

# new inserts work under the new name
instance.query(f"INSERT INTO {TABLE_NAME} VALUES (3, 'foo');", settings=INSERT_SETTINGS)
assert instance.query(f"SELECT id, label FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n3\tfoo\n"


@pytest.mark.parametrize("format_version", [1, 2])
@pytest.mark.parametrize("storage_type", ["local", "s3"])
def test_rename_column_errors(started_cluster_iceberg_no_spark, format_version, storage_type):
"""Renaming a non-existent column or renaming to an already-used name must raise BAD_ARGUMENTS."""
instance = started_cluster_iceberg_no_spark.instances["node1"]
TABLE_NAME = "test_rename_column_errors_" + storage_type + "_" + get_uuid_str()

create_iceberg_table(
storage_type,
instance,
TABLE_NAME,
started_cluster_iceberg_no_spark,
"(id Int32, value Nullable(String))",
format_version,
)

# rename a column that does not exist — rejected by AlterCommands::validate (NOT_FOUND_COLUMN_IN_BLOCK)
error = instance.query_and_get_error(
f"ALTER TABLE {TABLE_NAME} RENAME COLUMN nonexistent TO other;",
settings=INSERT_SETTINGS,
)
assert "nonexistent" in error

# rename to a name already used by another column — rejected by AlterCommands::validate (DUPLICATE_COLUMN)
error = instance.query_and_get_error(
f"ALTER TABLE {TABLE_NAME} RENAME COLUMN value TO id;",
settings=INSERT_SETTINGS,
)
assert "DUPLICATE_COLUMN" in error
assert "id" in error

# table structure must be unchanged after both failed renames
assert instance.query(
f"SELECT name FROM system.columns WHERE database = currentDatabase() AND table = '{TABLE_NAME}' ORDER BY name"
) == "id\nvalue\n"
Loading