Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 76 additions & 1 deletion datafusion/datasource-parquet/src/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
// under the License.

use crate::sort::reverse_row_selection;
use datafusion_common::{Result, assert_eq_or_internal_err};
use datafusion_common::{Result, assert_eq_or_internal_err, internal_datafusion_err};
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use std::collections::VecDeque;

/// A selection of rows and row groups within a ParquetFile to decode.
///
Expand Down Expand Up @@ -396,6 +397,80 @@ impl PreparedAccessPlan {

Ok(self)
}

/// Split this access plan into one plan per selected row group.
///
/// The returned plans preserve the current row-group ordering. If
/// `row_selection` is present, it is partitioned so each returned plan
/// contains only the selection entries for its single row group.
pub(crate) fn into_single_row_group_plans(
self,
file_metadata: &ParquetMetaData,
) -> Result<Vec<Self>> {
let Self {
row_group_indexes,
row_selection,
} = self;

let Some(row_selection) = row_selection else {
return Ok(row_group_indexes
.into_iter()
.map(|row_group_index| Self {
row_group_indexes: vec![row_group_index],
row_selection: None,
})
.collect());
};

let mut selectors: VecDeque<RowSelector> =
Vec::<RowSelector>::from(row_selection).into();
let mut plans = Vec::with_capacity(row_group_indexes.len());

for row_group_index in row_group_indexes {
let mut remaining_rows =
file_metadata.row_groups()[row_group_index].num_rows() as usize;
let mut row_group_selectors = Vec::new();

while remaining_rows > 0 {
let selector = selectors.pop_front().ok_or_else(|| {
internal_datafusion_err!(
"PreparedAccessPlan row selection ended before row group {row_group_index} was fully described"
)
})?;

let rows_for_group = selector.row_count.min(remaining_rows);
row_group_selectors.push(if selector.skip {
RowSelector::skip(rows_for_group)
} else {
RowSelector::select(rows_for_group)
});

if selector.row_count > rows_for_group {
let remaining_selector_rows = selector.row_count - rows_for_group;
selectors.push_front(if selector.skip {
RowSelector::skip(remaining_selector_rows)
} else {
RowSelector::select(remaining_selector_rows)
});
}

remaining_rows -= rows_for_group;
}

plans.push(Self {
row_group_indexes: vec![row_group_index],
row_selection: Some(row_group_selectors.into()),
});
}

if !selectors.is_empty() {
return Err(internal_datafusion_err!(
"PreparedAccessPlan row selection had leftover selectors after splitting by row group"
));
}

Ok(plans)
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions datafusion/datasource-parquet/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod writer;
pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use file_format::*;
pub use metrics::ParquetFileMetrics;
pub use opener::ParquetMorselizer;
pub use page_filter::PagePruningAccessPlanFilter;
pub use reader::*; // Expose so downstream crates can use it
pub use row_filter::build_row_filter;
Expand Down
Loading
Loading