Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,053 changes: 583 additions & 470 deletions be/src/exec/scan/file_scanner.cpp

Large diffs are not rendered by default.

50 changes: 34 additions & 16 deletions be/src/exec/scan/file_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ class FileScanner : public Scanner {
: Scanner(state, profile),
_params(params),
_col_name_to_slot_id(colname_to_slot_id),
_real_tuple_desc(tuple_desc) {};
_real_tuple_desc(tuple_desc) {
_configure_file_scan_handlers();
};

Status read_lines_from_range(const TFileRangeDesc& range, const std::list<int64_t>& row_ids,
Block* result_block, const ExternalFileMappingInfo& external_info,
Expand All @@ -107,6 +109,9 @@ class FileScanner : public Scanner {

Status _get_next_reader();

// Build a ReaderInitContext with shared fields from FileScanner members.
void _fill_base_init_context(ReaderInitContext* ctx);

// TODO: cast input block columns type to string.
Status _cast_src_block(Block* block) { return Status::OK(); }

Expand All @@ -128,10 +133,10 @@ class FileScanner : public Scanner {
std::vector<SlotDescriptor*> _file_slot_descs;
// col names from _file_slot_descs
std::vector<std::string> _file_col_names;
// Unified column descriptors for init_reader (includes file, partition, missing, synthesized cols)
std::vector<ColumnDescriptor> _column_descs;

// Partition source slot descriptors
std::vector<SlotDescriptor*> _partition_slot_descs;
// Partition slot id to index in _partition_slot_descs
// Partition slot id to partition key index (for matching columns_from_path)
std::unordered_map<SlotId, int> _partition_slot_index_map;
// created from param.expr_of_dest_slot
// For query, it saves default value expr of all dest columns, or nullptr for NULL.
Expand All @@ -152,8 +157,6 @@ class FileScanner : public Scanner {
// Get from GenericReader, save the existing columns in file to their type.
std::unordered_map<std::string, DataTypePtr> _slot_lower_name_to_col_type;
// Get from GenericReader, save columns that required by scan but not exist in file.
// These columns will be filled by default value or null.
std::unordered_set<std::string> _missing_cols;

// The col lowercase name of source file to type of source file.
std::map<std::string, DataTypePtr> _source_file_col_name_types;
Expand Down Expand Up @@ -192,7 +195,6 @@ class FileScanner : public Scanner {
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
_partition_col_descs;
std::unordered_map<std::string, bool> _partition_value_is_null;
std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;

// idx of skip_bitmap_col in _input_tuple_desc
int32_t _skip_bitmap_col_idx {-1};
Expand Down Expand Up @@ -232,32 +234,43 @@ class FileScanner : public Scanner {

std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> _row_id_column_iterator_pair = {nullptr,
-1};
bool _need_iceberg_rowid_column = false;
int _iceberg_rowid_column_pos = -1;
// for iceberg row lineage
RowLineageColumns _row_lineage_columns;
int64_t _last_bytes_read_from_local = 0;
int64_t _last_bytes_read_from_remote = 0;

Status (FileScanner::*_init_src_block_handler)(Block* block) = nullptr;
Status (FileScanner::*_process_src_block_after_read_handler)(Block* block) = nullptr;
bool (FileScanner::*_should_push_down_predicates_handler)(
TFileFormatType::type format_type) const = nullptr;
bool (FileScanner::*_should_enable_condition_cache_handler)() const = nullptr;

// Condition cache for external tables
uint64_t _condition_cache_digest = 0;
segment_v2::ConditionCache::ExternalCacheKey _condition_cache_key;
std::shared_ptr<std::vector<bool>> _condition_cache;
std::shared_ptr<ConditionCacheContext> _condition_cache_ctx;
int64_t _condition_cache_hit_count = 0;

void _configure_file_scan_handlers();

Status _init_expr_ctxes();
Status _init_src_block(Block* block);
Status _check_output_block_types();
Status _cast_to_input_block(Block* block);
Status _init_src_block_for_load(Block* block);
Status _init_src_block_for_query(Block* block);
Status _process_src_block_after_read(Block* block);
Status _process_src_block_after_read_for_load(Block* block);
Status _process_src_block_after_read_for_query(Block* block);
Status _fill_columns_from_path(size_t rows);
Status _fill_missing_columns(size_t rows);
Status _check_output_block_types();
Status _cast_to_input_block(Block* block);
Status _pre_filter_src_block();
Status _convert_to_output_block(Block* block);
Status _truncate_char_or_varchar_columns(Block* block);
void _truncate_char_or_varchar_column(Block* block, int idx, int len);
Status _generate_partition_columns();
Status _generate_missing_columns();

bool _check_partition_prune_expr(const VExprSPtr& expr);
void _init_runtime_filter_partition_prune_ctxs();
void _init_runtime_filter_partition_prune_block();
Expand All @@ -267,10 +280,10 @@ class FileScanner : public Scanner {
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
Status _generate_truncate_columns(bool need_to_get_parsed_schema);
Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);
Status _init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
FileMetaCache* file_meta_cache_ptr);
Status _init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader,
FileMetaCache* file_meta_cache_ptr);
Status _init_orc_reader(FileMetaCache* file_meta_cache_ptr,
std::unique_ptr<OrcReader> orc_reader = nullptr);
Status _init_parquet_reader(FileMetaCache* file_meta_cache_ptr,
std::unique_ptr<ParquetReader> parquet_reader = nullptr);
Status _create_row_id_column_iterator();

TFileFormatType::type _get_current_format_type() {
Expand All @@ -291,6 +304,11 @@ class FileScanner : public Scanner {
}

bool _should_enable_condition_cache();
bool _should_enable_condition_cache_for_load() const;
bool _should_enable_condition_cache_for_query() const;
bool _should_push_down_predicates(TFileFormatType::type format_type) const;
bool _should_push_down_predicates_for_load(TFileFormatType::type format_type) const;
bool _should_push_down_predicates_for_query(TFileFormatType::type format_type) const;
void _init_reader_condition_cache();
void _finalize_reader_condition_cache();

Expand Down
6 changes: 3 additions & 3 deletions be/src/format/arrow/arrow_stream_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Status ArrowStreamReader::init_reader() {
return Status::OK();
}

Status ArrowStreamReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
Status ArrowStreamReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) {
bool has_next = false;
RETURN_IF_ERROR(_pip_stream->HasNext(&has_next));
if (!has_next) {
Expand Down Expand Up @@ -126,8 +126,8 @@ Status ArrowStreamReader::get_next_block(Block* block, size_t* read_rows, bool*
return Status::OK();
}

Status ArrowStreamReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
Status ArrowStreamReader::_get_columns_impl(
std::unordered_map<std::string, DataTypePtr>* name_to_type) {
for (const auto& slot : _file_slot_descs) {
name_to_type->emplace(slot->col_name(), slot->type());
}
Expand Down
8 changes: 5 additions & 3 deletions be/src/format/arrow/arrow_stream_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ class ArrowStreamReader : public GenericReader {

Status init_reader();

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
Status _get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) override;

protected:
Status _do_init_reader(ReaderInitContext* /*ctx*/) override { return init_reader(); }

private:
RuntimeState* _state;
Expand Down
56 changes: 56 additions & 0 deletions be/src/format/column_descriptor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <string>

#include "exprs/vexpr_fwd.h"

namespace doris {
class SlotDescriptor;

/// Column categories for table format reading.
///
/// Each column requested by the query is classified into one of these categories.
/// The category determines how the column's value is obtained:
/// - REGULAR: Read directly from the data file (Parquet/ORC).
/// If the column is absent from a file (schema evolution),
/// its default_expr is used to produce a default value.
/// - PARTITION_KEY: Filled from partition metadata (e.g. Hive path partitions).
/// - SYNTHESIZED: Never in the data file; fully computed at runtime
/// (e.g. Doris V2 __DORIS_ICEBERG_ROWID_COL__).
/// - GENERATED: May or may not exist in the data file. If present but null,
/// the value is backfilled at runtime (e.g. Iceberg V3 _row_id).
enum class ColumnCategory {
REGULAR,
PARTITION_KEY,
SYNTHESIZED,
GENERATED,
};

/// Describes a column requested by the query, along with its category.
struct ColumnDescriptor {
std::string name;
const SlotDescriptor* slot_desc = nullptr;
ColumnCategory category = ColumnCategory::REGULAR;
/// Default value expression when this column is missing from the data file.
/// nullptr means fill with NULL. Built once per table scan in FileScanner.
VExprContextSPtr default_expr;
};

} // namespace doris
106 changes: 106 additions & 0 deletions be/src/format/count_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstddef>
#include <cstdint>
#include <memory>

#include "common/status.h"
#include "core/block/block.h"
#include "format/generic_reader.h"

namespace doris {
#include "common/compile_check_begin.h"

/// A lightweight reader that emits row counts without reading any actual data.
/// Used as a decorator to replace the real reader when COUNT(*) push down is active.
///
/// Instead of duplicating the COUNT short-circuit logic in every format reader
/// (ORC, Parquet, etc.), FileScanner creates a CountReader after the real reader
/// is initialized and the total row count is known. The CountReader then serves
/// all subsequent get_next_block calls by simply resizing columns.
///
/// This cleanly separates the "how many rows" concern from the actual data reading,
/// eliminating duplicated COUNT blocks across format readers.
class CountReader : public GenericReader {
public:
/// @param total_rows Total number of rows to emit (post-filter).
/// @param batch_size Maximum rows per batch.
/// @param inner_reader The original reader, kept alive for profile collection
/// and lifecycle management. Ownership is transferred.
CountReader(int64_t total_rows, size_t batch_size,
std::unique_ptr<GenericReader> inner_reader = nullptr)
: _remaining_rows(total_rows),
_batch_size(batch_size),
_inner_reader(std::move(inner_reader)) {
set_push_down_agg_type(TPushAggOp::type::COUNT);
}

~CountReader() override = default;

Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override {
auto rows = std::min(_remaining_rows, static_cast<int64_t>(_batch_size));
_remaining_rows -= rows;

auto mutate_columns = block->mutate_columns();
for (auto& col : mutate_columns) {
col->resize(rows);
}
block->set_columns(std::move(mutate_columns));

*read_rows = rows;
*eof = (_remaining_rows == 0);
return Status::OK();
}

/// CountReader counts rows by definition.
bool count_read_rows() override { return true; }

/// Delegate to inner reader if available, otherwise return our total.
int64_t get_total_rows() const override {
return _inner_reader ? _inner_reader->get_total_rows() : _initial_total_rows();
}

Status close() override {
if (_inner_reader) {
return _inner_reader->close();
}
return Status::OK();
}

/// Access the inner reader for profile collection or other lifecycle needs.
GenericReader* inner_reader() const { return _inner_reader.get(); }

protected:
void _collect_profile_before_close() override {
if (_inner_reader) {
_inner_reader->collect_profile_before_close();
}
}

private:
int64_t _initial_total_rows() const { return _remaining_rows; }

int64_t _remaining_rows;
size_t _batch_size;
std::unique_ptr<GenericReader> _inner_reader;
};

#include "common/compile_check_end.h"
} // namespace doris
Loading
Loading