Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 54 additions & 14 deletions src/iceberg/catalog/memory/in_memory_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
#include <iterator>

#include "iceberg/table.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand Down Expand Up @@ -120,6 +123,13 @@ class ICEBERG_EXPORT InMemoryNamespace {
/// \return The metadata location if the table exists; error otherwise.
Result<std::string> GetTableMetadataLocation(const TableIdentifier& table_ident) const;

/// \brief Updates the metadata location for the specified table.
///
/// \param table_ident The identifier of the table.
/// \param metadata_location The new metadata location.
Status UpdateTableMetadataLocation(const TableIdentifier& table_ident,
const std::string& metadata_location);

/// \brief Internal utility for retrieving a namespace node pointer from the tree.
///
/// \tparam NamespacePtr The type of the namespace node pointer.
Expand Down Expand Up @@ -278,7 +288,7 @@ Result<std::vector<std::string>> InMemoryNamespace::ListTables(
return table_names;
}

Status InMemoryNamespace::RegisterTable(TableIdentifier const& table_ident,
Status InMemoryNamespace::RegisterTable(const TableIdentifier& table_ident,
const std::string& metadata_location) {
const auto ns = GetNamespace(this, table_ident.ns);
ICEBERG_RETURN_UNEXPECTED(ns);
Expand All @@ -289,21 +299,21 @@ Status InMemoryNamespace::RegisterTable(TableIdentifier const& table_ident,
return {};
}

Status InMemoryNamespace::UnregisterTable(TableIdentifier const& table_ident) {
Status InMemoryNamespace::UnregisterTable(const TableIdentifier& table_ident) {
const auto ns = GetNamespace(this, table_ident.ns);
ICEBERG_RETURN_UNEXPECTED(ns);
ns.value()->table_metadata_locations_.erase(table_ident.name);
return {};
}

Result<bool> InMemoryNamespace::TableExists(TableIdentifier const& table_ident) const {
Result<bool> InMemoryNamespace::TableExists(const TableIdentifier& table_ident) const {
const auto ns = GetNamespace(this, table_ident.ns);
ICEBERG_RETURN_UNEXPECTED(ns);
return ns.value()->table_metadata_locations_.contains(table_ident.name);
}

Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
TableIdentifier const& table_ident) const {
const TableIdentifier& table_ident) const {
const auto ns = GetNamespace(this, table_ident.ns);
ICEBERG_RETURN_UNEXPECTED(ns);
const auto it = ns.value()->table_metadata_locations_.find(table_ident.name);
Expand All @@ -313,17 +323,24 @@ Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
return it->second;
}

Status InMemoryNamespace::UpdateTableMetadataLocation(
const TableIdentifier& table_ident, const std::string& metadata_location) {
ICEBERG_ASSIGN_OR_RAISE(auto ns, GetNamespace(this, table_ident.ns));
ns->table_metadata_locations_[table_ident.name] = metadata_location;
return {};
}

std::shared_ptr<InMemoryCatalog> InMemoryCatalog::Make(
std::string const& name, std::shared_ptr<FileIO> const& file_io,
std::string const& warehouse_location,
std::unordered_map<std::string, std::string> const& properties) {
const std::string& name, const std::shared_ptr<FileIO>& file_io,
const std::string& warehouse_location,
const std::unordered_map<std::string, std::string>& properties) {
return std::make_shared<InMemoryCatalog>(name, file_io, warehouse_location, properties);
}

InMemoryCatalog::InMemoryCatalog(
std::string const& name, std::shared_ptr<FileIO> const& file_io,
std::string const& warehouse_location,
std::unordered_map<std::string, std::string> const& properties)
const std::string& name, const std::shared_ptr<FileIO>& file_io,
const std::string& warehouse_location,
const std::unordered_map<std::string, std::string>& properties)
: catalog_name_(std::move(name)),
properties_(std::move(properties)),
file_io_(std::move(file_io)),
Expand Down Expand Up @@ -395,7 +412,31 @@ Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
std::unique_lock lock(mutex_);
return NotImplemented("update table");
ICEBERG_ASSIGN_OR_RAISE(auto base_metadata_location,
root_namespace_->GetTableMetadataLocation(identifier));

ICEBERG_ASSIGN_OR_RAISE(auto base,
TableMetadataUtil::Read(*file_io_, base_metadata_location));

for (const auto& requirement : requirements) {
ICEBERG_RETURN_UNEXPECTED(requirement->Validate(base.get()));
}

auto builder = TableMetadataBuilder::BuildFrom(base.get());
for (const auto& update : updates) {
update->ApplyTo(*builder);
}
ICEBERG_ASSIGN_OR_RAISE(auto updated, builder->Build());
ICEBERG_ASSIGN_OR_RAISE(
auto new_metadata_location,
TableMetadataUtil::Write(*file_io_, base.get(), base_metadata_location, *updated));
ICEBERG_RETURN_UNEXPECTED(
root_namespace_->UpdateTableMetadataLocation(identifier, new_metadata_location));
TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated);

return std::make_unique<Table>(identifier, std::move(updated),
std::move(new_metadata_location), file_io_,
std::static_pointer_cast<Catalog>(shared_from_this()));
}

Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
Expand Down Expand Up @@ -438,9 +479,8 @@ Result<std::unique_ptr<Table>> InMemoryCatalog::LoadTable(

ICEBERG_ASSIGN_OR_RAISE(auto metadata,
TableMetadataUtil::Read(*file_io_, metadata_location));

return std::make_unique<Table>(identifier, std::move(metadata), metadata_location,
file_io_,
return std::make_unique<Table>(identifier, std::move(metadata),
std::move(metadata_location), file_io_,
std::static_pointer_cast<Catalog>(shared_from_this()));
}

Expand Down
4 changes: 1 addition & 3 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -797,9 +797,7 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
json[kSortOrders] = ToJsonList(table_metadata.sort_orders);

// write properties map
if (table_metadata.properties) {
json[kProperties] = table_metadata.properties->configs();
}
json[kProperties] = table_metadata.properties.configs();

if (std::ranges::find_if(table_metadata.snapshots, [&](const auto& snapshot) {
return snapshot->snapshot_id == table_metadata.current_snapshot_id;
Expand Down
11 changes: 6 additions & 5 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ Status Table::Refresh() {
}

ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table, catalog_->LoadTable(identifier_));
if (metadata_location_ != refreshed_table->metadata_location_) {
if (metadata_location_ != refreshed_table->metadata_file_location()) {
metadata_ = std::move(refreshed_table->metadata_);
metadata_location_ = std::move(refreshed_table->metadata_location_);
io_ = std::move(refreshed_table->io_);
metadata_cache_ = std::make_unique<TableMetadataCache>(metadata_.get());
}
Expand Down Expand Up @@ -87,12 +86,14 @@ Table::sort_orders() const {
return metadata_cache_->GetSortOrdersById();
}

const std::shared_ptr<TableProperties>& Table::properties() const {
return metadata_->properties;
}
const TableProperties& Table::properties() const { return metadata_->properties; }

const std::string& Table::metadata_file_location() const { return metadata_location_; }

const std::string& Table::location() const { return metadata_->location; }

const TimePointMs& Table::last_updated_ms() const { return metadata_->last_updated_ms; }

Result<std::shared_ptr<Snapshot>> Table::current_snapshot() const {
return metadata_->Snapshot();
}
Expand Down
11 changes: 10 additions & 1 deletion src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "iceberg/snapshot.h"
#include "iceberg/table_identifier.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/timepoint.h"

namespace iceberg {

Expand Down Expand Up @@ -82,11 +83,19 @@ class ICEBERG_EXPORT Table {
sort_orders() const;

/// \brief Return a map of string properties for this table
const std::shared_ptr<TableProperties>& properties() const;
const TableProperties& properties() const;

/// \brief Return the table's metadata file location
const std::string& metadata_file_location() const;

/// \brief Return the table's base location
const std::string& location() const;

/// \brief Get the time when this table was last updated
///
/// \return the time when this table was last updated
const TimePointMs& last_updated_ms() const;

/// \brief Return the table's current snapshot, return NotFoundError if not found
Result<std::shared_ptr<Snapshot>> current_snapshot() const;

Expand Down
Loading
Loading