Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 197 additions & 6 deletions src/iceberg/metrics_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,201 @@

#include "iceberg/metrics_config.h"

#include <charconv>
#include <string>
#include <unordered_map>

#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
#include "iceberg/table.h"
#include "iceberg/table_properties.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/type_util.h"

namespace iceberg {

namespace {

constexpr std::string_view kNoneName = "none";
constexpr std::string_view kCountsName = "counts";
constexpr std::string_view kFullName = "full";
constexpr std::string_view kTruncatePrefix = "truncate(";
constexpr int32_t kDefaultTruncateLength = 16;
constexpr MetricsMode kDefaultMetricsMode = {.kind = MetricsMode::Kind::kTruncate,
.length = kDefaultTruncateLength};

MetricsMode SortedColumnDefaultMode(MetricsMode default_mode) {
if (default_mode.kind == MetricsMode::Kind::kNone ||
default_mode.kind == MetricsMode::Kind::kCounts) {
return kDefaultMetricsMode;
} else {
return default_mode;
}
}

int32_t MaxInferredColumns(const TableProperties& properties) {
int32_t max_inferred_columns =
properties.Get(TableProperties::kMetricsMaxInferredColumnDefaults);
if (max_inferred_columns < 0) {
// fallback to default
return TableProperties::kMetricsMaxInferredColumnDefaults.value();
}
return max_inferred_columns;
}

Result<MetricsMode> ParseMode(std::string_view mode, MetricsMode fallback) {
return MetricsMode::FromString(mode).value_or(fallback);
}

} // namespace

MetricsMode MetricsMode::None() { return {.kind = Kind::kNone}; }

MetricsMode MetricsMode::Counts() { return {.kind = Kind::kCounts}; }

MetricsMode MetricsMode::Full() { return {.kind = Kind::kFull}; }

Result<MetricsMode> MetricsMode::FromString(std::string_view mode) {
if (StringUtils::EqualsIgnoreCase(mode, kNoneName)) {
return MetricsMode::None();
} else if (StringUtils::EqualsIgnoreCase(mode, kCountsName)) {
return MetricsMode::Counts();
} else if (StringUtils::EqualsIgnoreCase(mode, kFullName)) {
return MetricsMode::Full();
}

if (StringUtils::StartsWithIgnoreCase(mode, kTruncatePrefix) && mode.ends_with(")")) {
int32_t length;
auto [ptr, ec] = std::from_chars(mode.data() + 9 /* "truncate(" length */,
mode.data() + mode.size() - 1, length);
if (ec != std::errc{}) {
return InvalidArgument("Invalid truncate mode: {}", mode);
}
if (length == kDefaultTruncateLength) {
return kDefaultMetricsMode;
}
ICEBERG_PRECHECK(length > 0, "Truncate length should be positive.");
return MetricsMode{.kind = Kind::kTruncate, .length = length};
}
return InvalidArgument("Invalid metrics mode: {}", mode);
}

MetricsConfig::MetricsConfig(ColumnModeMap column_modes, MetricsMode default_mode)
: column_modes_(std::move(column_modes)), default_mode_(default_mode) {}

const std::shared_ptr<MetricsConfig>& MetricsConfig::Default() {
static const std::shared_ptr<MetricsConfig> kDefaultConfig(
new MetricsConfig(/*column_modes=*/{}, kDefaultMetricsMode));
return kDefaultConfig;
}

Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(const Table& table) {
ICEBERG_ASSIGN_OR_RAISE(auto schema, table.schema());
auto sort_order = table.sort_order();
return MakeInternal(table.properties(), *schema,
*sort_order.value_or(SortOrder::Unsorted()));
}

Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
const TableProperties& props, const Schema& schema, const SortOrder& order) {
ColumnModeMap column_modes;

MetricsMode default_mode = kDefaultMetricsMode;
if (props.configs().contains(TableProperties::kDefaultWriteMetricsMode.key())) {
std::string configured_metrics_mode =
props.Get(TableProperties::kDefaultWriteMetricsMode);
ICEBERG_ASSIGN_OR_RAISE(default_mode,
ParseMode(configured_metrics_mode, kDefaultMetricsMode));
} else {
int32_t max_inferred_columns = MaxInferredColumns(props);
GetProjectedIdsVisitor visitor(/*include_struct_ids=*/true);
ICEBERG_RETURN_UNEXPECTED(visitor.Visit(schema));
auto projected_columns = static_cast<int32_t>(visitor.Finish().size());
if (max_inferred_columns < projected_columns) {
ICEBERG_ASSIGN_OR_RAISE(auto limit_field_ids,
LimitFieldIds(schema, max_inferred_columns));
for (auto id : limit_field_ids) {
ICEBERG_ASSIGN_OR_RAISE(auto column_name, schema.FindColumnNameById(id));
ICEBERG_CHECK(column_name.has_value(), "Field id {} not found in schema", id);
column_modes[std::string(column_name.value())] = kDefaultMetricsMode;
}
// All other columns don't use metrics
default_mode = MetricsMode::None();
}
}

// First set sorted column with sorted column default (can be overridden by user)
auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode);
auto sorted_columns = SortOrder::OrderPreservingSortedColumns(schema, order);
for (const auto& sorted_column : sorted_columns) {
column_modes[std::string(sorted_column)] = sorted_col_default_mode;
}

// Handle user overrides of defaults
for (const auto& prop : props.configs()) {
if (prop.first.starts_with(TableProperties::kMetricModeColumnConfPrefix)) {
std::string column_alias =
prop.first.substr(TableProperties::kMetricModeColumnConfPrefix.size());
ICEBERG_ASSIGN_OR_RAISE(auto mode, ParseMode(prop.second, default_mode));
column_modes[std::move(column_alias)] = mode;
}
}

return std::shared_ptr<MetricsConfig>(
new MetricsConfig(std::move(column_modes), default_mode));
}

Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& schema,
int32_t limit) {
class Visitor {
public:
explicit Visitor(int32_t limit) : limit_(limit) {}

Status Visit(const Type& type) {
if (type.is_nested()) {
return VisitNested(internal::checked_cast<const NestedType&>(type));
} else {
return VisitPrimitive(internal::checked_cast<const PrimitiveType&>(type));
}
}

Status VisitNested(const NestedType& type) {
for (const auto& field : type.fields()) {
if (!ShouldContinue()) {
break;
}
// TODO(zhuo.wang): variant type should also be handled here
if (field.type()->is_primitive()) {
ids_.insert(field.field_id());
}
}

for (const auto& field : type.fields()) {
if (ShouldContinue()) {
ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
}
}
return {};
}

Status VisitPrimitive(const PrimitiveType& type) { return {}; }

std::unordered_set<int32_t> Finish() const { return ids_; }

private:
bool ShouldContinue() { return ids_.size() < limit_; }

private:
std::unordered_set<int32_t> ids_;
int32_t limit_;
};

Visitor visitor(limit);
ICEBERG_RETURN_UNEXPECTED(visitor.Visit(schema));
return visitor.Finish();
}

Status MetricsConfig::VerifyReferencedColumns(
const std::unordered_map<std::string, std::string>& updates, const Schema& schema) {
for (const auto& [key, value] : updates) {
Expand All @@ -37,14 +223,19 @@ Status MetricsConfig::VerifyReferencedColumns(
auto field_name =
std::string_view(key).substr(TableProperties::kMetricModeColumnConfPrefix.size());
ICEBERG_ASSIGN_OR_RAISE(auto field, schema.FindFieldByName(field_name));
if (!field.has_value()) {
return ValidationFailed(
"Invalid metrics config, could not find column {} from table prop {} in "
"schema {}",
field_name, key, schema.ToString());
}
ICEBERG_CHECK(field.has_value(),
"Invalid metrics config, could not find column {} from table prop {} "
"in schema {}",
field_name, key, schema.ToString());
}
return {};
}

MetricsMode MetricsConfig::ColumnMode(std::string_view column_name) const {
if (auto it = column_modes_.find(column_name); it != column_modes_.end()) {
return it->second;
}
return default_mode_;
}

} // namespace iceberg
61 changes: 60 additions & 1 deletion src/iceberg/metrics_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,83 @@
/// \file iceberg/metrics_config.h
/// \brief Metrics configuration for Iceberg tables

#include <memory>
#include <string>
#include <string_view>
#include <unordered_map>
#include <unordered_set>
#include <variant>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/string_util.h"

namespace iceberg {

/// \brief Configuration utilities for table metrics
struct ICEBERG_EXPORT MetricsMode {
public:
enum class Kind : uint8_t {
kNone,
kCounts,
kTruncate,
kFull,
};

static Result<MetricsMode> FromString(std::string_view mode);
static MetricsMode None();
static MetricsMode Counts();
static MetricsMode Full();

Kind kind;
std::variant<std::monostate, int32_t> length;
};

/// \brief Configuration for collecting column metrics for an Iceberg table.
class ICEBERG_EXPORT MetricsConfig {
public:
/// \brief Get the default metrics config.
static const std::shared_ptr<MetricsConfig>& Default();

/// \brief Creates a metrics config from a table.
static Result<std::shared_ptr<MetricsConfig>> Make(const Table& table);

/// \brief Get `limit` num of primitive field ids from schema
static Result<std::unordered_set<int32_t>> LimitFieldIds(const Schema& schema,
int32_t limit);

/// \brief Verify that all referenced columns are valid
/// \param updates The updates to verify
/// \param schema The schema to verify against
/// \return OK if all referenced columns are valid
static Status VerifyReferencedColumns(
const std::unordered_map<std::string, std::string>& updates, const Schema& schema);

/// \brief Get the metrics mode for a specific column
/// \param column_name The full name of the column
/// \return The metrics mode for the column
MetricsMode ColumnMode(std::string_view column_name) const;

private:
using ColumnModeMap =
std::unordered_map<std::string, MetricsMode, StringHash, StringEqual>;

MetricsConfig(ColumnModeMap column_modes, MetricsMode default_mode);

/// \brief Generate a MetricsConfig for all columns based on overrides, schema, and sort
/// order.
///
/// \param props will be read for metrics overrides (write.metadata.metrics.column.*)
/// and default(write.metadata.metrics.default)
/// \param schema table schema
/// \param order sort order columns, will be promoted to truncate(16)
/// \return metrics configuration
static Result<std::shared_ptr<MetricsConfig>> MakeInternal(const TableProperties& props,
const Schema& schema,
const SortOrder& order);

ColumnModeMap column_modes_;
MetricsMode default_mode_;
};

} // namespace iceberg
14 changes: 14 additions & 0 deletions src/iceberg/sort_order.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,18 @@ Result<std::unique_ptr<SortOrder>> SortOrder::Make(int32_t sort_id,
return std::unique_ptr<SortOrder>(new SortOrder(sort_id, std::move(fields)));
}

std::unordered_set<std::string_view> SortOrder::OrderPreservingSortedColumns(
const Schema& schema, const SortOrder& order) {
return order.fields() | std::views::filter([&schema](const SortField& field) {
return field.transform()->PreservesOrder();
}) |
std::views::transform([&schema](const SortField& field) {
return schema.FindColumnNameById(field.source_id())
.value_or(std::nullopt)
.value_or("");
}) |
std::views::filter([](std::string_view name) { return !name.empty(); }) |
std::ranges::to<std::unordered_set<std::string_view>>();
}

} // namespace iceberg
4 changes: 4 additions & 0 deletions src/iceberg/sort_order.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cstdint>
#include <memory>
#include <span>
#include <unordered_set>
#include <vector>

#include "iceberg/iceberg_export.h"
Expand Down Expand Up @@ -91,6 +92,9 @@ class ICEBERG_EXPORT SortOrder : public util::Formattable {
static Result<std::unique_ptr<SortOrder>> Make(int32_t sort_id,
std::vector<SortField> fields);

static std::unordered_set<std::string_view> OrderPreservingSortedColumns(
const Schema& schema, const SortOrder& order);

private:
/// \brief Constructs a SortOrder instance.
/// \param order_id The sort order id.
Expand Down
Loading
Loading