Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ set(ICEBERG_SOURCES
table_requirements.cc
table_scan.cc
table_update.cc
update/update_properties.cc
transform.cc
transform_function.cc
type.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ iceberg_sources = files(
'transform.cc',
'transform_function.cc',
'type.cc',
'update/update_properties.cc',
'util/bucket_util.cc',
'util/conversions.cc',
'util/decimal.cc',
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/table_scan.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand Down Expand Up @@ -114,6 +115,10 @@ std::unique_ptr<Transaction> Table::NewTransaction() const {
throw NotImplemented("Table::NewTransaction is not implemented");
}

std::unique_ptr<PropertiesUpdate> Table::UpdateProperties() {
return std::make_unique<PropertiesUpdate>(this);
}

const std::shared_ptr<FileIO>& Table::io() const { return io_; }

std::unique_ptr<TableScanBuilder> Table::NewScan() const {
Expand Down
7 changes: 7 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

namespace iceberg {

class PropertiesUpdate;

/// \brief Represents an Iceberg table
class ICEBERG_EXPORT Table {
public:
Expand Down Expand Up @@ -115,10 +117,15 @@ class ICEBERG_EXPORT Table {
/// \return a pointer to the new Transaction
virtual std::unique_ptr<Transaction> NewTransaction() const;

/// \brief Create a pending update to modify table properties
std::unique_ptr<PropertiesUpdate> UpdateProperties();

/// \brief Returns a FileIO to read and write table data and metadata files
const std::shared_ptr<FileIO>& io() const;

private:
friend class PropertiesUpdate;

const TableIdentifier identifier_;
std::shared_ptr<TableMetadata> metadata_;
std::string metadata_location_;
Expand Down
24 changes: 22 additions & 2 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -476,12 +476,32 @@ TableMetadataBuilder& TableMetadataBuilder::RemovePartitionStatistics(

TableMetadataBuilder& TableMetadataBuilder::SetProperties(
const std::unordered_map<std::string, std::string>& updated) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
if (updated.empty()) {
return *this;
}

for (const auto& [key, value] : updated) {
impl_->metadata.properties[key] = value;
}

impl_->changes.push_back(std::make_unique<table::SetProperties>(updated));

return *this;
}

TableMetadataBuilder& TableMetadataBuilder::RemoveProperties(
const std::vector<std::string>& removed) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
if (removed.empty()) {
return *this;
}

for (const auto& key : removed) {
impl_->metadata.properties.erase(key);
}

impl_->changes.push_back(std::make_unique<table::RemoveProperties>(removed));

return *this;
}

TableMetadataBuilder& TableMetadataBuilder::SetLocation(std::string_view location) {
Expand Down
8 changes: 4 additions & 4 deletions src/iceberg/table_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,21 @@ Status SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
// SetProperties

void SetProperties::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
builder.SetProperties(updated_);
}

Status SetProperties::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("SetTableProperties::GenerateRequirements not implemented");
return {};
}

// RemoveProperties

void RemoveProperties::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
builder.RemoveProperties(removed_);
}

Status RemoveProperties::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("RemoveTableProperties::GenerateRequirements not implemented");
return {};
}

// SetLocation
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ add_iceberg_test(table_test
table_metadata_builder_test.cc
table_requirement_test.cc
table_update_test.cc
update_properties_test.cc
test_common.cc)

add_iceberg_test(expression_test
Expand Down
111 changes: 111 additions & 0 deletions src/iceberg/test/update_properties_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/update/update_properties.h"

#include <memory>
#include <string>
#include <unordered_map>

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "iceberg/file_format.h"
#include "iceberg/partition_spec.h"
#include "iceberg/snapshot.h"
#include "iceberg/sort_order.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/mock_catalog.h"

namespace iceberg {

namespace {

std::shared_ptr<TableMetadata> MakeBaseMetadata(
std::unordered_map<std::string, std::string> properties) {
auto metadata = std::make_shared<TableMetadata>();
metadata->format_version = 2;
metadata->table_uuid = "test-uuid";
metadata->location = "s3://bucket/table";
metadata->last_sequence_number = TableMetadata::kInitialSequenceNumber;
metadata->last_updated_ms = TimePointMs{};
metadata->last_column_id = 0;
metadata->default_spec_id = PartitionSpec::kInitialSpecId;
metadata->last_partition_id = 0;
metadata->default_sort_order_id = SortOrder::kInitialSortOrderId;
metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId;
metadata->next_row_id = TableMetadata::kInitialRowId;
metadata->properties = std::move(properties);
return metadata;
}

TableIdentifier MakeIdentifier() {
return TableIdentifier{.ns = Namespace{{"ns"}}, .name = "tbl"};
}

} // namespace

using ::testing::_;
using ::testing::ByMove;
using ::testing::Return;

TEST(UpdatePropertiesTest, ApplyMergesUpdatesAndRemovals) {
auto metadata =
MakeBaseMetadata({{"foo", "bar"}, {"keep", "yes"}, {"format-version", "2"}});
Table table(MakeIdentifier(), metadata, "loc", /*io=*/nullptr, /*catalog=*/nullptr);

auto updater = table.UpdateProperties();
updater->Set("foo", "baz").Remove("keep").DefaultFormat(FileFormatType::kOrc);

auto applied = updater->Apply();
ASSERT_THAT(applied, IsOk());

const auto& props = *applied;
EXPECT_EQ(props.at("foo"), "baz");
EXPECT_FALSE(props.contains("keep"));
EXPECT_EQ(props.at(TableProperties::kDefaultFileFormat.key()), "orc");
}

TEST(UpdatePropertiesTest, CommitUsesCatalogAndRefreshesTable) {
auto catalog = std::make_shared<MockCatalog>();
auto base_metadata = MakeBaseMetadata({{"foo", "bar"}});
Table table(MakeIdentifier(), base_metadata, "loc", /*io=*/nullptr, catalog);

auto updated_metadata = MakeBaseMetadata({{"foo", "new"}}); // response metadata

EXPECT_CALL(*catalog, LoadTable(table.name()))
.WillOnce(Return(ByMove(Result<std::unique_ptr<Table>>{std::make_unique<Table>(
table.name(), MakeBaseMetadata({{"foo", "bar"}}), "loc", nullptr, catalog)})));

EXPECT_CALL(*catalog, UpdateTable(table.name(), _, _))
.WillOnce(Return(ByMove(Result<std::unique_ptr<Table>>{std::make_unique<Table>(
table.name(), updated_metadata, "loc2", nullptr, catalog)})));

auto updater = table.UpdateProperties();
updater->Set("foo", "new");

EXPECT_THAT(updater->Commit(), IsOk());
EXPECT_EQ(table.properties().configs().at("foo"), "new");
EXPECT_EQ(table.location(), "s3://bucket/table");
}

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class Table;
class TableProperties;
class FileIO;
class Transaction;
class PropertiesUpdate;
class Transform;
class TransformFunction;

Expand Down
130 changes: 130 additions & 0 deletions src/iceberg/update/update_properties.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/update/update_properties.h"

#include <format>
#include <vector>

#include "iceberg/catalog.h"
#include "iceberg/exception.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
#include "iceberg/util/macros.h"

namespace iceberg {

PropertiesUpdate::PropertiesUpdate(Table* table) : table_(table) {}

PropertiesUpdate& PropertiesUpdate::Set(std::string key, std::string value) {
if (key.empty()) {
throw IcebergError("Property key cannot be empty");
}
if (removals_.contains(key)) {
throw IcebergError(std::format("Cannot remove and update the same key: {}", key));
}

updates_[std::move(key)] = std::move(value);
return *this;
}

PropertiesUpdate& PropertiesUpdate::Remove(std::string key) {
if (key.empty()) {
throw IcebergError("Property key cannot be empty");
}
if (updates_.contains(key)) {
throw IcebergError(std::format("Cannot remove and update the same key: {}", key));
}

removals_.insert(std::move(key));
return *this;
}

PropertiesUpdate& PropertiesUpdate::DefaultFormat(FileFormatType format) {
return Set(std::string(TableProperties::kDefaultFileFormat.key()),
std::string(ToString(format)));
}

Result<std::unordered_map<std::string, std::string>> PropertiesUpdate::Apply() {
if (table_ == nullptr) {
return InvalidArgument("Cannot apply updates on a null table");
}

if (table_->catalog_) {
if (auto status = table_->Refresh(); !status) {
return std::unexpected(status.error());
}
}

std::unordered_map<std::string, std::string> new_properties =
table_->properties().configs();

for (const auto& key : removals_) {
new_properties.erase(key);
}
for (const auto& [key, value] : updates_) {
new_properties[key] = value;
}

return new_properties;
}

Status PropertiesUpdate::Commit() {
if (table_ == nullptr) {
return InvalidArgument("Cannot commit updates on a null table");
}

if (updates_.empty() && removals_.empty()) {
return {};
}

ICEBERG_ASSIGN_OR_RAISE(auto applied, Apply());
(void)applied; // apply for validation

if (!table_->catalog_) {
return NotSupported("Commit requires a catalog-backed table");
}

std::vector<std::unique_ptr<TableRequirement>> requirements;
std::vector<std::unique_ptr<TableUpdate>> updates;

if (!updates_.empty()) {
updates.push_back(std::make_unique<table::SetProperties>(updates_));
}
if (!removals_.empty()) {
std::vector<std::string> removed(removals_.begin(), removals_.end());
updates.push_back(std::make_unique<table::RemoveProperties>(std::move(removed)));
}

ICEBERG_ASSIGN_OR_RAISE(auto updated_table, table_->catalog_->UpdateTable(
table_->name(), requirements, updates));

table_->metadata_ = std::move(updated_table->metadata_);
table_->metadata_location_ = std::move(updated_table->metadata_location_);
table_->io_ = std::move(updated_table->io_);
table_->properties_ = std::move(updated_table->properties_);
table_->metadata_cache_ = std::make_unique<TableMetadataCache>(table_->metadata_.get());

return {};
}

} // namespace iceberg
Loading
Loading