Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions cpp/src/arrow/adapters/orc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <list>
#include <memory>
#include <optional>
#include <sstream>
#include <string>
#include <vector>
Expand All @@ -30,9 +31,11 @@

#include "arrow/adapters/orc/util.h"
#include "arrow/builder.h"
#include "arrow/compute/expression.h"
#include "arrow/io/interfaces.h"
#include "arrow/memory_pool.h"
#include "arrow/record_batch.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/table_builder.h"
Expand Down Expand Up @@ -100,6 +103,119 @@ constexpr uint64_t kOrcNaturalWriteSize = 128 * 1024;

using internal::checked_cast;

// Statistics container for min/max values from ORC stripe statistics
struct MinMaxStats {
int64_t min;
int64_t max;
bool has_null;

MinMaxStats(int64_t min_val, int64_t max_val, bool null_flag)
: min(min_val), max(max_val), has_null(null_flag) {}
};

// Extract stripe-level statistics for a specific column
// Returns nullopt if statistics are missing or invalid
std::optional<MinMaxStats> ExtractStripeStatistics(
const std::unique_ptr<liborc::StripeStatistics>& stripe_stats,
uint32_t orc_column_id,
const std::shared_ptr<DataType>& field_type) {

if (!stripe_stats) {
return std::nullopt; // No statistics available
}

// Get column statistics
const liborc::ColumnStatistics* col_stats =
stripe_stats->getColumnStatistics(orc_column_id);

if (!col_stats) {
return std::nullopt; // Column statistics missing
}

// Only INT64 support in this initial implementation
if (field_type->id() != Type::INT64) {
return std::nullopt; // Unsupported type
}

// Dynamic cast to get integer-specific statistics
const auto* int_stats =
dynamic_cast<const liborc::IntegerColumnStatistics*>(col_stats);

if (!int_stats) {
return std::nullopt; // Wrong statistics type
}

// Check if min/max are available
if (!int_stats->hasMinimum() || !int_stats->hasMaximum()) {
return std::nullopt; // Statistics incomplete
}

// Extract raw values
int64_t min_value = int_stats->getMinimum();
int64_t max_value = int_stats->getMaximum();
bool has_null = col_stats->hasNull();

// Sanity check: min should be <= max
if (min_value > max_value) {
return std::nullopt; // Invalid statistics
}

return MinMaxStats(min_value, max_value, has_null);
}

// Build Arrow Expression representing stripe statistics guarantee
// Returns expression: (field >= min AND field <= max) OR is_null(field)
//
// This expression describes what values COULD exist in the stripe.
// Arrow's SimplifyWithGuarantee() will use this to determine if
// a predicate could be satisfied by this stripe.
//
// Example: If stripe has min=0, max=100, the guarantee is:
// (field >= 0 AND field <= 100) OR is_null(field)
//
// Then for predicate "field > 200", SimplifyWithGuarantee returns literal(false),
// indicating the stripe can be skipped.
compute::Expression BuildMinMaxExpression(
const FieldRef& field_ref,
const std::shared_ptr<DataType>& field_type,
const Scalar& min_value,
const Scalar& max_value,
bool has_null) {

// Create field reference expression
auto field_expr = compute::field_ref(field_ref);

// Build range expression: field >= min AND field <= max
auto min_expr = compute::greater_equal(field_expr, compute::literal(min_value));
auto max_expr = compute::less_equal(field_expr, compute::literal(max_value));
auto range_expr = compute::and_(std::move(min_expr), std::move(max_expr));

// If stripe contains nulls, add null handling
// This ensures we don't skip stripes with nulls when predicate
// could match null values
if (has_null) {
auto null_expr = compute::is_null(field_expr);
return compute::or_(std::move(range_expr), std::move(null_expr));
}

return range_expr;
}

// Convenience overload that takes MinMaxStats directly
compute::Expression BuildMinMaxExpression(
const FieldRef& field_ref,
const std::shared_ptr<DataType>& field_type,
const MinMaxStats& stats) {

// Convert int64 to Arrow scalar
auto min_scalar = std::make_shared<Int64Scalar>(stats.min);
auto max_scalar = std::make_shared<Int64Scalar>(stats.max);

return BuildMinMaxExpression(field_ref, field_type,
*min_scalar, *max_scalar,
stats.has_null);
}

class ArrowInputFile : public liborc::InputStream {
public:
explicit ArrowInputFile(const std::shared_ptr<io::RandomAccessFile>& file)
Expand Down
53 changes: 53 additions & 0 deletions cpp/src/arrow/dataset/file_orc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>

#include "arrow/adapters/orc/adapter.h"
#include "arrow/compute/expression.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/scanner.h"
Expand Down Expand Up @@ -58,6 +59,18 @@ Result<std::unique_ptr<arrow::adapters::orc::ORCFileReader>> OpenORCReader(
return reader;
}

// Fold expression into accumulator using AND logic
// Special handling for literal(true) to avoid building large expression trees
void FoldingAnd(compute::Expression* left, compute::Expression right) {
if (left->Equals(compute::literal(true))) {
// First expression - replace true with actual expression
*left = std::move(right);
} else {
// Combine with existing expression using AND
*left = compute::and_(std::move(*left), std::move(right));
}
}

/// \brief A ScanTask backed by an ORC file.
class OrcScanTask {
public:
Expand Down Expand Up @@ -212,6 +225,46 @@ Future<std::optional<int64_t>> OrcFileFormat::CountRows(
}));
}

// //
// // OrcFileFragment
// //

OrcFileFragment::OrcFileFragment(FileSource source,
std::shared_ptr<FileFormat> format,
compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema)
: FileFragment(std::move(source), std::move(format),
std::move(partition_expression), std::move(physical_schema)) {}

Status OrcFileFragment::EnsureMetadataCached() {
auto lock = metadata_mutex_.Lock();

if (metadata_cached_) {
return Status::OK();
}

// Open reader to get schema and stripe information
ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(source()));
ARROW_ASSIGN_OR_RAISE(cached_schema_, reader->ReadSchema());

// Get number of stripes
int num_stripes = reader->NumberOfStripes();

// Initialize lazy evaluation structures
// One expression per stripe, starting as literal(true) (unprocessed)
statistics_expressions_.resize(num_stripes);
for (int i = 0; i < num_stripes; i++) {
statistics_expressions_[i] = compute::literal(true);
}

// One flag per field, starting as false (not processed)
int num_fields = cached_schema_->num_fields();
statistics_expressions_complete_.resize(num_fields, false);

metadata_cached_ = true;
return Status::OK();
}

// //
// // OrcFileWriter, OrcFileWriteOptions
// //
Expand Down
28 changes: 28 additions & 0 deletions cpp/src/arrow/dataset/file_orc.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
#include <memory>
#include <string>

#include "arrow/compute/type_fwd.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/io/type_fwd.h"
#include "arrow/result.h"
#include "arrow/util/mutex.h"

namespace arrow {
namespace dataset {
Expand Down Expand Up @@ -69,6 +71,32 @@ class ARROW_DS_EXPORT OrcFileFormat : public FileFormat {
std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
};

/// \brief A FileFragment implementation for ORC files with predicate pushdown
class ARROW_DS_EXPORT OrcFileFragment : public FileFragment {
public:
/// \brief Ensure metadata is cached
Status EnsureMetadataCached();

private:
OrcFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema);

// Cached metadata to avoid repeated I/O
mutable util::Mutex metadata_mutex_;
mutable std::shared_ptr<Schema> cached_schema_;
mutable bool metadata_cached_ = false;

// Lazy evaluation structures for predicate pushdown
// Each stripe starts with literal(true) and gets refined as fields are processed
mutable std::vector<compute::Expression> statistics_expressions_;

// Track which fields have been processed to avoid duplicate work
mutable std::vector<bool> statistics_expressions_complete_;

friend class OrcFileFormat;
};

/// @}

} // namespace dataset
Expand Down
Loading