Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 11 additions & 34 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ Table::Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata
metadata_location_(std::move(metadata_location)),
io_(std::move(io)),
catalog_(std::move(catalog)),
properties_(TableProperties::FromMap(metadata_->properties)) {}
properties_(TableProperties::FromMap(metadata_->properties)),
metadata_cache_(std::make_unique<TableMetadataCache>(metadata_.get())) {}

const std::string& Table::uuid() const { return metadata_->table_uuid; }

Expand All @@ -57,60 +58,36 @@ Status Table::Refresh() {
metadata_location_ = std::move(refreshed_table->metadata_location_);
io_ = std::move(refreshed_table->io_);
properties_ = std::move(refreshed_table->properties_);

schemas_map_.reset();
partition_spec_map_.reset();
sort_orders_map_.reset();
metadata_cache_ = std::make_unique<TableMetadataCache>(metadata_.get());
}
return {};
}

Result<std::shared_ptr<Schema>> Table::schema() const { return metadata_->Schema(); }

const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>&
Result<std::reference_wrapper<const std::unordered_map<int32_t, std::shared_ptr<Schema>>>>
Table::schemas() const {
if (!schemas_map_) {
schemas_map_ =
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<Schema>>>();
for (const auto& schema : metadata_->schemas) {
if (schema->schema_id()) {
schemas_map_->emplace(schema->schema_id().value(), schema);
}
}
}
return schemas_map_;
return metadata_cache_->GetSchemasById();
}

Result<std::shared_ptr<PartitionSpec>> Table::spec() const {
return metadata_->PartitionSpec();
}

const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>&
Result<std::reference_wrapper<
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>>
Table::specs() const {
if (!partition_spec_map_) {
partition_spec_map_ =
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>();
for (const auto& spec : metadata_->partition_specs) {
partition_spec_map_->emplace(spec->spec_id(), spec);
}
}
return partition_spec_map_;
return metadata_cache_->GetPartitionSpecsById();
}

Result<std::shared_ptr<SortOrder>> Table::sort_order() const {
return metadata_->SortOrder();
}

const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>&
Result<
std::reference_wrapper<const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>>
Table::sort_orders() const {
if (!sort_orders_map_) {
sort_orders_map_ =
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>();
for (const auto& order : metadata_->sort_orders) {
sort_orders_map_->emplace(order->order_id(), order);
}
}
return sort_orders_map_;
return metadata_cache_->GetSortOrdersById();
}

const TableProperties& Table::properties() const { return *properties_; }
Expand Down
24 changes: 9 additions & 15 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -60,24 +61,24 @@ class ICEBERG_EXPORT Table {
Result<std::shared_ptr<Schema>> schema() const;

/// \brief Return a map of schema for this table
/// \note This method is **not** thread-safe in the current implementation.
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>& schemas()
const;
Result<
std::reference_wrapper<const std::unordered_map<int32_t, std::shared_ptr<Schema>>>>
schemas() const;

/// \brief Return the partition spec for this table, return NotFoundError if not found
Result<std::shared_ptr<PartitionSpec>> spec() const;

/// \brief Return a map of partition specs for this table
/// \note This method is **not** thread-safe in the current implementation.
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>&
Result<std::reference_wrapper<
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>>
specs() const;

/// \brief Return the sort order for this table, return NotFoundError if not found
Result<std::shared_ptr<SortOrder>> sort_order() const;

/// \brief Return a map of sort order IDs to sort orders for this table
/// \note This method is **not** thread-safe in the current implementation.
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>&
Result<std::reference_wrapper<
const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>>
sort_orders() const;

/// \brief Return a map of string properties for this table
Expand Down Expand Up @@ -124,14 +125,7 @@ class ICEBERG_EXPORT Table {
std::shared_ptr<FileIO> io_;
std::shared_ptr<Catalog> catalog_;
std::unique_ptr<TableProperties> properties_;

// Cache lazy-initialized maps.
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>
schemas_map_;
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>
partition_spec_map_;
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>
sort_orders_map_;
std::unique_ptr<class TableMetadataCache> metadata_cache_;
};

} // namespace iceberg
64 changes: 64 additions & 0 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,70 @@ bool operator==(const TableMetadata& lhs, const TableMetadata& rhs) {
lhs.next_row_id == rhs.next_row_id;
}

// TableMetadataCache implementation

Result<TableMetadataCache::SchemasMapRef> TableMetadataCache::GetSchemasById() const {
return schemas_map_.Get(metadata_);
}

Result<TableMetadataCache::PartitionSpecsMapRef>
TableMetadataCache::GetPartitionSpecsById() const {
return partition_specs_map_.Get(metadata_);
}

Result<TableMetadataCache::SortOrdersMapRef> TableMetadataCache::GetSortOrdersById()
const {
return sort_orders_map_.Get(metadata_);
}

Result<TableMetadataCache::SnapshotsMapRef> TableMetadataCache::GetSnapshotsById() const {
return snapshot_map_.Get(metadata_);
}

Result<TableMetadataCache::SchemasMap> TableMetadataCache::InitSchemasMap(
const TableMetadata* metadata) {
SchemasMap schemas_map;
schemas_map.reserve(metadata->schemas.size());
for (const auto& schema : metadata->schemas) {
if (schema->schema_id()) {
schemas_map.emplace(schema->schema_id().value(), schema);
}
}
return schemas_map;
}

Result<TableMetadataCache::PartitionSpecsMap> TableMetadataCache::InitPartitionSpecsMap(
const TableMetadata* metadata) {
PartitionSpecsMap partition_specs_map;
partition_specs_map.reserve(metadata->partition_specs.size());
for (const auto& spec : metadata->partition_specs) {
partition_specs_map.emplace(spec->spec_id(), spec);
}
return partition_specs_map;
}

Result<TableMetadataCache::SortOrdersMap> TableMetadataCache::InitSortOrdersMap(
const TableMetadata* metadata) {
SortOrdersMap sort_orders_map;
sort_orders_map.reserve(metadata->sort_orders.size());
for (const auto& order : metadata->sort_orders) {
sort_orders_map.emplace(order->order_id(), order);
}
return sort_orders_map;
}

Result<TableMetadataCache::SnapshotsMap> TableMetadataCache::InitSnapshotMap(
const TableMetadata* metadata) {
SnapshotsMap snapshots_map;
snapshots_map.reserve(metadata->snapshots.size());
for (const auto& snapshot : metadata->snapshots) {
snapshots_map.emplace(snapshot->snapshot_id, snapshot);
}
return snapshots_map;
}

// TableMetadataUtil implementation

Result<MetadataFileCodecType> TableMetadataUtil::CodecFromFileName(
std::string_view file_name) {
if (file_name.find(".metadata.json") == std::string::npos) {
Expand Down
35 changes: 35 additions & 0 deletions src/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "iceberg/iceberg_export.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/lazy.h"
#include "iceberg/util/timepoint.h"

namespace iceberg {
Expand Down Expand Up @@ -139,6 +140,40 @@ struct ICEBERG_EXPORT TableMetadata {
const TableMetadata& rhs);
};

// Cache for table metadata mappings to facilitate fast lookups.
class ICEBERG_EXPORT TableMetadataCache {
public:
explicit TableMetadataCache(const TableMetadata* metadata) : metadata_(metadata) {}

template <typename T>
using ByIdMap = std::unordered_map<int32_t, std::shared_ptr<T>>;
using SchemasMap = ByIdMap<Schema>;
using PartitionSpecsMap = ByIdMap<PartitionSpec>;
using SortOrdersMap = ByIdMap<SortOrder>;
using SnapshotsMap = std::unordered_map<int64_t, std::shared_ptr<Snapshot>>;
using SchemasMapRef = std::reference_wrapper<const SchemasMap>;
using PartitionSpecsMapRef = std::reference_wrapper<const PartitionSpecsMap>;
using SortOrdersMapRef = std::reference_wrapper<const SortOrdersMap>;
using SnapshotsMapRef = std::reference_wrapper<const SnapshotsMap>;

Result<SchemasMapRef> GetSchemasById() const;
Result<PartitionSpecsMapRef> GetPartitionSpecsById() const;
Result<SortOrdersMapRef> GetSortOrdersById() const;
Result<SnapshotsMapRef> GetSnapshotsById() const;

private:
static Result<SchemasMap> InitSchemasMap(const TableMetadata* metadata);
static Result<PartitionSpecsMap> InitPartitionSpecsMap(const TableMetadata* metadata);
static Result<SortOrdersMap> InitSortOrdersMap(const TableMetadata* metadata);
static Result<SnapshotsMap> InitSnapshotMap(const TableMetadata* metadata);

const TableMetadata* metadata_;
Lazy<InitSchemasMap> schemas_map_;
Lazy<InitPartitionSpecsMap> partition_specs_map_;
Lazy<InitSortOrdersMap> sort_orders_map_;
Lazy<InitSnapshotMap> snapshot_map_;
};

/// \brief Returns a string representation of a SnapshotLogEntry
ICEBERG_EXPORT std::string ToString(const SnapshotLogEntry& entry);

Expand Down
12 changes: 6 additions & 6 deletions src/iceberg/test/table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@ TEST(Table, TableV1) {
ASSERT_TRUE(schema.has_value());
ASSERT_EQ(schema.value()->fields().size(), 3);
auto schemas = table.schemas();
ASSERT_TRUE(schemas->empty());
ASSERT_TRUE(schemas->get().empty());

// Check table spec
auto spec = table.spec();
ASSERT_TRUE(spec.has_value());
auto specs = table.specs();
ASSERT_EQ(1UL, specs->size());
ASSERT_EQ(1UL, specs->get().size());

// Check table sort_order
auto sort_order = table.sort_order();
ASSERT_TRUE(sort_order.has_value());
auto sort_orders = table.sort_orders();
ASSERT_EQ(1UL, sort_orders->size());
ASSERT_EQ(1UL, sort_orders->get().size());

// Check table location
auto location = table.location();
Expand Down Expand Up @@ -89,19 +89,19 @@ TEST(Table, TableV2) {
ASSERT_TRUE(schema.has_value());
ASSERT_EQ(schema.value()->fields().size(), 3);
auto schemas = table.schemas();
ASSERT_FALSE(schemas->empty());
ASSERT_FALSE(schemas->get().empty());

// Check partition spec
auto spec = table.spec();
ASSERT_TRUE(spec.has_value());
auto specs = table.specs();
ASSERT_EQ(1UL, specs->size());
ASSERT_EQ(1UL, specs->get().size());

// Check sort order
auto sort_order = table.sort_order();
ASSERT_TRUE(sort_order.has_value());
auto sort_orders = table.sort_orders();
ASSERT_EQ(1UL, sort_orders->size());
ASSERT_EQ(1UL, sort_orders->get().size());

// Check table location
auto location = table.location();
Expand Down
Loading