Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vortex-datafusion/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ pub fn vortex_datafusion::VortexSource::with_projection_pushdown(self, bool) ->

pub fn vortex_datafusion::VortexSource::with_scan_concurrency(self, usize) -> Self

pub fn vortex_datafusion::VortexSource::with_segment_cache(self, segment_cache: alloc::sync::Arc<dyn vortex_layout::segments::cache::SegmentCache>) -> Self

pub fn vortex_datafusion::VortexSource::with_vortex_reader_factory(self, alloc::sync::Arc<dyn vortex_datafusion::reader::VortexReaderFactory>) -> Self

impl core::clone::Clone for vortex_datafusion::VortexSource
Expand Down
13 changes: 13 additions & 0 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use vortex::file::OpenOptionsSessionExt;
use vortex::io::InstrumentedReadAt;
use vortex::layout::LayoutReader;
use vortex::layout::scan::scan_builder::ScanBuilder;
use vortex::layout::segments::SegmentCache;
use vortex::layout::scan::split_by::SplitBy;
use vortex::metrics::Label;
use vortex::metrics::MetricsRegistry;
Expand Down Expand Up @@ -97,6 +98,7 @@ pub(crate) struct VortexOpener {

pub expression_convertor: Arc<dyn ExpressionConvertor>,
pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
pub segment_cache: Option<Arc<dyn SegmentCache>>,
/// Whether to enable expression pushdown into the underlying Vortex scan.
pub projection_pushdown: bool,
pub scan_concurrency: Option<usize>,
Expand All @@ -122,6 +124,7 @@ impl FileOpener for VortexOpener {
let file_pruning_predicate = self.file_pruning_predicate.clone();
let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory);
let file_metadata_cache = self.file_metadata_cache.clone();
let segment_cache = self.segment_cache.clone();

let unified_file_schema = Arc::clone(self.table_schema.file_schema());
let batch_size = self.batch_size;
Expand Down Expand Up @@ -200,6 +203,10 @@ impl FileOpener for VortexOpener {
open_opts = open_opts.with_footer(vortex_metadata.footer().clone());
}

if let Some(segment_cache) = segment_cache {
open_opts = open_opts.with_segment_cache(segment_cache);
}

let vxf = open_opts
.open_read(reader)
.await
Expand Down Expand Up @@ -680,6 +687,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
segment_cache: None,
projection_pushdown: false,
scan_concurrency: None,
}
Expand Down Expand Up @@ -812,6 +820,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
segment_cache: None,
projection_pushdown: false,
scan_concurrency: None,
};
Expand Down Expand Up @@ -899,6 +908,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
segment_cache: None,
projection_pushdown: false,
scan_concurrency: None,
};
Expand Down Expand Up @@ -1056,6 +1066,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
segment_cache: None,
projection_pushdown: false,
scan_concurrency: None,
};
Expand Down Expand Up @@ -1116,6 +1127,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
segment_cache: None,
projection_pushdown: false,
scan_concurrency: None,
}
Expand Down Expand Up @@ -1320,6 +1332,7 @@ mod tests {
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
segment_cache: None,
projection_pushdown: false,
scan_concurrency: None,
};
Expand Down
14 changes: 14 additions & 0 deletions vortex-datafusion/src/persistent/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_execution::cache::cache_manager::FileMetadataCache;
use datafusion_physical_expr::PhysicalExprRef;
use vortex::layout::segments::SegmentCache;
use datafusion_physical_expr::conjunction;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
Expand Down Expand Up @@ -192,6 +193,7 @@ pub struct VortexSource {
pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
vx_metrics_registry: Arc<dyn MetricsRegistry>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
segment_cache: Option<Arc<dyn SegmentCache>>,
/// Whether to enable expression pushdown into the underlying Vortex scan.
options: VortexTableOptions,
}
Expand Down Expand Up @@ -224,6 +226,7 @@ impl VortexSource {
vortex_reader_factory: None,
vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
file_metadata_cache: None,
segment_cache: None,
options: VortexTableOptions::default(),
}
}
Expand Down Expand Up @@ -283,6 +286,16 @@ impl VortexSource {
self
}

/// Sets a [`SegmentCache`] to reuse decoded segment bytes across scans of the same file.
///
/// Without a cache every query re-reads zone map and data segments from object storage.
/// Providing a shared [`MokaSegmentCache`](vortex::layout::segments::MokaSegmentCache)
/// eliminates those redundant reads for repeated queries on the same files.
pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
self.segment_cache = Some(segment_cache);
self
}

/// Sets the per-file Vortex scan concurrency.
///
/// This is separate from DataFusion's partition-level parallelism.
Expand Down Expand Up @@ -339,6 +352,7 @@ impl VortexSource {
has_output_ordering: !base_config.output_ordering.is_empty(),
expression_convertor: Arc::clone(&self.expression_convertor),
file_metadata_cache: self.file_metadata_cache.clone(),
segment_cache: self.segment_cache.clone(),
projection_pushdown: self.options.projection_pushdown,
scan_concurrency: self.options.scan_concurrency,
};
Expand Down
Loading