From 5e642393a3aa6aed5ba88f79c5b48623cfb8384e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 18 May 2026 13:37:06 -0700 Subject: [PATCH 1/2] refactor(parquet-datasource): split opener.rs into an opener/ module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pure code motion, no behavior change and no public API change (`opener` was already a private module). Splits the ~2,700 LOC `opener.rs` into a directory module: - `opener/early_stop.rs` — `EarlyStoppingStream`, the dynamic-filter early-termination wrapper applied at the end of `build_stream`. - `opener/encryption.rs` — `EncryptionContext` and the `ParquetMorselizer::get_encryption_context` helpers, isolating the `#[cfg(feature = "parquet_encryption")]` gating that previously bled through the main file. `opener.rs` becomes `opener/mod.rs`. Split out of #22156, which originally also extracted an `opener/push_decoder_stream.rs`; that move is now obsolete since #22289 already extracted `PushDecoderStreamState` into `push_decoder.rs`. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/opener/early_stop.rs | 107 ++++++++++++ .../src/opener/encryption.rs | 104 ++++++++++++ .../src/{opener.rs => opener/mod.rs} | 160 +----------------- 3 files changed, 218 insertions(+), 153 deletions(-) create mode 100644 datafusion/datasource-parquet/src/opener/early_stop.rs create mode 100644 datafusion/datasource-parquet/src/opener/encryption.rs rename datafusion/datasource-parquet/src/{opener.rs => opener/mod.rs} (96%) diff --git a/datafusion/datasource-parquet/src/opener/early_stop.rs b/datafusion/datasource-parquet/src/opener/early_stop.rs new file mode 100644 index 0000000000000..75749d284068b --- /dev/null +++ b/datafusion/datasource-parquet/src/opener/early_stop.rs @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`EarlyStoppingStream`] terminates a Parquet file scan when a dynamic +//! filter narrows after the scan has already started. + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use arrow::array::RecordBatch; +use datafusion_common::Result; +use datafusion_physical_plan::metrics::PruningMetrics; +use datafusion_pruning::FilePruner; +use futures::{Stream, StreamExt, ready}; + +/// Wraps an inner RecordBatchStream and a [`FilePruner`] +/// +/// This can terminate the scan early when some dynamic filters is updated after +/// the scan starts, so we discover after the scan starts that the file can be +/// pruned (can't have matching rows). +pub(super) struct EarlyStoppingStream { + /// Has the stream finished processing? All subsequent polls will return + /// None + done: bool, + file_pruner: FilePruner, + files_ranges_pruned_statistics: PruningMetrics, + /// The inner stream + inner: S, +} + +impl EarlyStoppingStream { + pub(super) fn new( + stream: S, + file_pruner: FilePruner, + files_ranges_pruned_statistics: PruningMetrics, + ) -> Self { + Self { + done: false, + inner: stream, + file_pruner, + files_ranges_pruned_statistics, + } + } +} + +impl EarlyStoppingStream +where + S: Stream> + Unpin, +{ + fn check_prune(&mut self, input: Result) -> Result> { + let batch = input?; + + // Since dynamic filters may have been updated, see if we can stop + // reading this stream entirely. + if self.file_pruner.should_prune()? { + self.files_ranges_pruned_statistics.add_pruned(1); + // Previously this file range has been counted as matched + self.files_ranges_pruned_statistics.subtract_matched(1); + self.done = true; + Ok(None) + } else { + // Return the adapted batch + Ok(Some(batch)) + } + } +} + +impl Stream for EarlyStoppingStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + if self.done { + return Poll::Ready(None); + } + match ready!(self.inner.poll_next_unpin(cx)) { + None => { + // input done + self.done = true; + Poll::Ready(None) + } + Some(input_batch) => { + let output = self.check_prune(input_batch); + Poll::Ready(output.transpose()) + } + } + } +} diff --git a/datafusion/datasource-parquet/src/opener/encryption.rs b/datafusion/datasource-parquet/src/opener/encryption.rs new file mode 100644 index 0000000000000..b725198237bbf --- /dev/null +++ b/datafusion/datasource-parquet/src/opener/encryption.rs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Encryption context used during Parquet file open. +//! +//! Isolated here so the `#[cfg(feature = "parquet_encryption")]` gating does +//! not pollute the rest of the opener module. + +#[cfg(feature = "parquet_encryption")] +use std::sync::Arc; + +use datafusion_common::Result; +#[cfg(feature = "parquet_encryption")] +use datafusion_common::config::EncryptionFactoryOptions; +#[cfg(feature = "parquet_encryption")] +use datafusion_common::encryption::FileDecryptionProperties; +#[cfg(feature = "parquet_encryption")] +use datafusion_execution::parquet_encryption::EncryptionFactory; + +use super::ParquetMorselizer; + +#[derive(Default)] +pub(super) struct EncryptionContext { + #[cfg(feature = "parquet_encryption")] + file_decryption_properties: Option>, + #[cfg(feature = "parquet_encryption")] + encryption_factory: Option<(Arc, EncryptionFactoryOptions)>, +} + +#[cfg(feature = "parquet_encryption")] +impl EncryptionContext { + fn new( + file_decryption_properties: Option>, + encryption_factory: Option<( + Arc, + EncryptionFactoryOptions, + )>, + ) -> Self { + Self { + file_decryption_properties, + encryption_factory, + } + } + + pub(super) async fn get_file_decryption_properties( + &self, + file_location: &object_store::path::Path, + ) -> Result>> { + match &self.file_decryption_properties { + Some(file_decryption_properties) => { + Ok(Some(Arc::clone(file_decryption_properties))) + } + None => match &self.encryption_factory { + Some((encryption_factory, encryption_config)) => Ok(encryption_factory + .get_file_decryption_properties(encryption_config, file_location) + .await?), + None => Ok(None), + }, + } + } +} + +#[cfg(not(feature = "parquet_encryption"))] +#[expect(dead_code)] +impl EncryptionContext { + pub(super) async fn get_file_decryption_properties( + &self, + _file_location: &object_store::path::Path, + ) -> Result< + Option>, + > { + Ok(None) + } +} + +impl ParquetMorselizer { + #[cfg(feature = "parquet_encryption")] + pub(super) fn get_encryption_context(&self) -> EncryptionContext { + EncryptionContext::new( + self.file_decryption_properties.clone(), + self.encryption_factory.clone(), + ) + } + + #[cfg(not(feature = "parquet_encryption"))] + #[expect(dead_code)] + pub(super) fn get_encryption_context(&self) -> EncryptionContext { + EncryptionContext::default() + } +} diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener/mod.rs similarity index 96% rename from datafusion/datasource-parquet/src/opener.rs rename to datafusion/datasource-parquet/src/opener/mod.rs index 11cf786a3d6b7..78eefbe76cabc 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -17,6 +17,12 @@ //! [`ParquetMorselizer`] state machines for opening Parquet files +mod early_stop; +mod encryption; + +use self::early_stop::EarlyStoppingStream; +#[cfg(feature = "parquet_encryption")] +use self::encryption::EncryptionContext; use crate::access_plan::PreparedAccessPlan; use crate::page_filter::PagePruningAccessPlanFilter; use crate::push_decoder::{DecoderBuilderConfig, PushDecoderStreamState}; @@ -36,9 +42,7 @@ use std::collections::{HashMap, VecDeque}; use std::fmt; use std::future::Future; use std::mem; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; use arrow::datatypes::{SchemaRef, TimeUnit}; use datafusion_common::encryption::FileDecryptionProperties; @@ -53,7 +57,6 @@ use datafusion_physical_expr_common::physical_expr::{ use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, - PruningMetrics, }; use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate}; @@ -61,9 +64,7 @@ use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate}; use datafusion_common::config::EncryptionFactoryOptions; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; -use futures::{ - FutureExt, Stream, StreamExt, future::BoxFuture, ready, stream::BoxStream, -}; +use futures::{FutureExt, StreamExt, future::BoxFuture, stream::BoxStream}; use log::debug; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; @@ -1322,153 +1323,6 @@ fn constant_value_from_stats( None } -/// Wraps an inner RecordBatchStream and a [`FilePruner`] -/// -/// This can terminate the scan early when some dynamic filters is updated after -/// the scan starts, so we discover after the scan starts that the file can be -/// pruned (can't have matching rows). -struct EarlyStoppingStream { - /// Has the stream finished processing? All subsequent polls will return - /// None - done: bool, - file_pruner: FilePruner, - files_ranges_pruned_statistics: PruningMetrics, - /// The inner stream - inner: S, -} - -impl EarlyStoppingStream { - pub fn new( - stream: S, - file_pruner: FilePruner, - files_ranges_pruned_statistics: PruningMetrics, - ) -> Self { - Self { - done: false, - inner: stream, - file_pruner, - files_ranges_pruned_statistics, - } - } -} - -impl EarlyStoppingStream -where - S: Stream> + Unpin, -{ - fn check_prune(&mut self, input: Result) -> Result> { - let batch = input?; - - // Since dynamic filters may have been updated, see if we can stop - // reading this stream entirely. - if self.file_pruner.should_prune()? { - self.files_ranges_pruned_statistics.add_pruned(1); - // Previously this file range has been counted as matched - self.files_ranges_pruned_statistics.subtract_matched(1); - self.done = true; - Ok(None) - } else { - // Return the adapted batch - Ok(Some(batch)) - } - } -} - -impl Stream for EarlyStoppingStream -where - S: Stream> + Unpin, -{ - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - if self.done { - return Poll::Ready(None); - } - match ready!(self.inner.poll_next_unpin(cx)) { - None => { - // input done - self.done = true; - Poll::Ready(None) - } - Some(input_batch) => { - let output = self.check_prune(input_batch); - Poll::Ready(output.transpose()) - } - } - } -} - -#[derive(Default)] -struct EncryptionContext { - #[cfg(feature = "parquet_encryption")] - file_decryption_properties: Option>, - #[cfg(feature = "parquet_encryption")] - encryption_factory: Option<(Arc, EncryptionFactoryOptions)>, -} - -#[cfg(feature = "parquet_encryption")] -impl EncryptionContext { - fn new( - file_decryption_properties: Option>, - encryption_factory: Option<( - Arc, - EncryptionFactoryOptions, - )>, - ) -> Self { - Self { - file_decryption_properties, - encryption_factory, - } - } - - async fn get_file_decryption_properties( - &self, - file_location: &object_store::path::Path, - ) -> Result>> { - match &self.file_decryption_properties { - Some(file_decryption_properties) => { - Ok(Some(Arc::clone(file_decryption_properties))) - } - None => match &self.encryption_factory { - Some((encryption_factory, encryption_config)) => Ok(encryption_factory - .get_file_decryption_properties(encryption_config, file_location) - .await?), - None => Ok(None), - }, - } - } -} - -#[cfg(not(feature = "parquet_encryption"))] -#[expect(dead_code)] -impl EncryptionContext { - async fn get_file_decryption_properties( - &self, - _file_location: &object_store::path::Path, - ) -> Result>> { - Ok(None) - } -} - -impl ParquetMorselizer { - #[cfg(feature = "parquet_encryption")] - fn get_encryption_context(&self) -> EncryptionContext { - EncryptionContext::new( - self.file_decryption_properties.clone(), - self.encryption_factory.clone(), - ) - } - - #[cfg(not(feature = "parquet_encryption"))] - #[expect(dead_code)] - fn get_encryption_context(&self) -> EncryptionContext { - EncryptionContext::default() - } -} - /// Return the initial [`ParquetAccessPlan`] /// /// If the user has supplied one as an extension, use that From 5bb94e08d6d5d4545ae2683dc6be1f3893496620 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 18 May 2026 20:41:57 -0700 Subject: [PATCH 2/2] fix: gate FileDecryptionProperties import behind parquet_encryption Both uses in opener/mod.rs are cfg-gated on the feature; with default features the import was unused after the module split. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/datasource-parquet/src/opener/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index 78eefbe76cabc..e5929fd43f11a 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -45,6 +45,7 @@ use std::mem; use std::sync::Arc; use arrow::datatypes::{SchemaRef, TimeUnit}; +#[cfg(feature = "parquet_encryption")] use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics, exec_err};