Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions datafusion/datasource-parquet/src/opener/early_stop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! [`EarlyStoppingStream`] terminates a Parquet file scan when a dynamic
//! filter narrows after the scan has already started.

use std::pin::Pin;
use std::task::{Context, Poll};

use arrow::array::RecordBatch;
use datafusion_common::Result;
use datafusion_physical_plan::metrics::PruningMetrics;
use datafusion_pruning::FilePruner;
use futures::{Stream, StreamExt, ready};

/// Wraps an inner RecordBatchStream and a [`FilePruner`]
///
/// This can terminate the scan early when some dynamic filters is updated after
/// the scan starts, so we discover after the scan starts that the file can be
/// pruned (can't have matching rows).
pub(super) struct EarlyStoppingStream<S> {
/// Has the stream finished processing? All subsequent polls will return
/// None
done: bool,
file_pruner: FilePruner,
files_ranges_pruned_statistics: PruningMetrics,
/// The inner stream
inner: S,
}

impl<S> EarlyStoppingStream<S> {
pub(super) fn new(
stream: S,
file_pruner: FilePruner,
files_ranges_pruned_statistics: PruningMetrics,
) -> Self {
Self {
done: false,
inner: stream,
file_pruner,
files_ranges_pruned_statistics,
}
}
}

impl<S> EarlyStoppingStream<S>
where
S: Stream<Item = Result<RecordBatch>> + Unpin,
{
fn check_prune(&mut self, input: Result<RecordBatch>) -> Result<Option<RecordBatch>> {
let batch = input?;

// Since dynamic filters may have been updated, see if we can stop
// reading this stream entirely.
if self.file_pruner.should_prune()? {
self.files_ranges_pruned_statistics.add_pruned(1);
// Previously this file range has been counted as matched
self.files_ranges_pruned_statistics.subtract_matched(1);
self.done = true;
Ok(None)
} else {
// Return the adapted batch
Ok(Some(batch))
}
}
}

impl<S> Stream for EarlyStoppingStream<S>
where
S: Stream<Item = Result<RecordBatch>> + Unpin,
{
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}
match ready!(self.inner.poll_next_unpin(cx)) {
None => {
// input done
self.done = true;
Poll::Ready(None)
}
Some(input_batch) => {
let output = self.check_prune(input_batch);
Poll::Ready(output.transpose())
}
}
}
}
104 changes: 104 additions & 0 deletions datafusion/datasource-parquet/src/opener/encryption.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Encryption context used during Parquet file open.
//!
//! Isolated here so the `#[cfg(feature = "parquet_encryption")]` gating does
//! not pollute the rest of the opener module.

#[cfg(feature = "parquet_encryption")]
use std::sync::Arc;

use datafusion_common::Result;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::encryption::FileDecryptionProperties;
#[cfg(feature = "parquet_encryption")]
use datafusion_execution::parquet_encryption::EncryptionFactory;

use super::ParquetMorselizer;

#[derive(Default)]
pub(super) struct EncryptionContext {
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
#[cfg(feature = "parquet_encryption")]
encryption_factory: Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
}

#[cfg(feature = "parquet_encryption")]
impl EncryptionContext {
fn new(
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
encryption_factory: Option<(
Arc<dyn EncryptionFactory>,
EncryptionFactoryOptions,
)>,
) -> Self {
Self {
file_decryption_properties,
encryption_factory,
}
}

pub(super) async fn get_file_decryption_properties(
&self,
file_location: &object_store::path::Path,
) -> Result<Option<Arc<FileDecryptionProperties>>> {
match &self.file_decryption_properties {
Some(file_decryption_properties) => {
Ok(Some(Arc::clone(file_decryption_properties)))
}
None => match &self.encryption_factory {
Some((encryption_factory, encryption_config)) => Ok(encryption_factory
.get_file_decryption_properties(encryption_config, file_location)
.await?),
None => Ok(None),
},
}
}
}

#[cfg(not(feature = "parquet_encryption"))]
#[expect(dead_code)]
impl EncryptionContext {
pub(super) async fn get_file_decryption_properties(
&self,
_file_location: &object_store::path::Path,
) -> Result<
Option<std::sync::Arc<datafusion_common::encryption::FileDecryptionProperties>>,
> {
Ok(None)
}
}

impl ParquetMorselizer {
#[cfg(feature = "parquet_encryption")]
pub(super) fn get_encryption_context(&self) -> EncryptionContext {
EncryptionContext::new(
self.file_decryption_properties.clone(),
self.encryption_factory.clone(),
)
}

#[cfg(not(feature = "parquet_encryption"))]
#[expect(dead_code)]
pub(super) fn get_encryption_context(&self) -> EncryptionContext {
EncryptionContext::default()
}
}
Loading
Loading