From 6dbc5a1993f4e264ccd8faad6c882e757424891d Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Wed, 6 May 2026 13:15:01 +0200 Subject: [PATCH] Vortex-datafusion segment cache Signed-off-by: Christoph Schulze --- vortex-datafusion/public-api.lock | 2 ++ vortex-datafusion/src/persistent/opener.rs | 13 +++++++++++++ vortex-datafusion/src/persistent/source.rs | 14 ++++++++++++++ 3 files changed, 29 insertions(+) diff --git a/vortex-datafusion/public-api.lock b/vortex-datafusion/public-api.lock index 9d558c1de48..be04e369343 100644 --- a/vortex-datafusion/public-api.lock +++ b/vortex-datafusion/public-api.lock @@ -200,6 +200,8 @@ pub fn vortex_datafusion::VortexSource::with_projection_pushdown(self, enabled: pub fn vortex_datafusion::VortexSource::with_scan_concurrency(self, scan_concurrency: usize) -> Self +pub fn vortex_datafusion::VortexSource::with_segment_cache(self, segment_cache: alloc::sync::Arc) -> Self + pub fn vortex_datafusion::VortexSource::with_vortex_reader_factory(self, vortex_reader_factory: alloc::sync::Arc) -> Self impl core::clone::Clone for vortex_datafusion::VortexSource diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 0719b023881..1b6f18cf907 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -41,6 +41,7 @@ use vortex::file::OpenOptionsSessionExt; use vortex::io::InstrumentedReadAt; use vortex::layout::LayoutReader; use vortex::layout::scan::scan_builder::ScanBuilder; +use vortex::layout::segments::SegmentCache; use vortex::layout::scan::split_by::SplitBy; use vortex::metrics::Label; use vortex::metrics::MetricsRegistry; @@ -97,6 +98,7 @@ pub(crate) struct VortexOpener { pub expression_convertor: Arc, pub file_metadata_cache: Option>, + pub segment_cache: Option>, /// Whether to enable expression pushdown into the underlying Vortex scan. pub projection_pushdown: bool, pub scan_concurrency: Option, @@ -122,6 +124,7 @@ impl FileOpener for VortexOpener { let file_pruning_predicate = self.file_pruning_predicate.clone(); let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory); let file_metadata_cache = self.file_metadata_cache.clone(); + let segment_cache = self.segment_cache.clone(); let unified_file_schema = Arc::clone(self.table_schema.file_schema()); let batch_size = self.batch_size; @@ -200,6 +203,10 @@ impl FileOpener for VortexOpener { open_opts = open_opts.with_footer(vortex_metadata.footer().clone()); } + if let Some(segment_cache) = segment_cache { + open_opts = open_opts.with_segment_cache(segment_cache); + } + let vxf = open_opts .open_read(reader) .await @@ -656,6 +663,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache: None, projection_pushdown: false, scan_concurrency: None, } @@ -788,6 +796,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache: None, projection_pushdown: false, scan_concurrency: None, }; @@ -875,6 +884,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache: None, projection_pushdown: false, scan_concurrency: None, }; @@ -1032,6 +1042,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache: None, projection_pushdown: false, scan_concurrency: None, }; @@ -1092,6 +1103,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache: None, projection_pushdown: false, scan_concurrency: None, } @@ -1296,6 +1308,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache: None, projection_pushdown: false, scan_concurrency: None, }; diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 211170c6607..c3fe481574e 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -15,6 +15,7 @@ use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; use datafusion_execution::cache::cache_manager::FileMetadataCache; use datafusion_physical_expr::PhysicalExprRef; +use vortex::layout::segments::SegmentCache; use datafusion_physical_expr::conjunction; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; @@ -192,6 +193,7 @@ pub struct VortexSource { pub(crate) vortex_reader_factory: Option>, vx_metrics_registry: Arc, file_metadata_cache: Option>, + segment_cache: Option>, /// Whether to enable expression pushdown into the underlying Vortex scan. options: VortexTableOptions, } @@ -224,6 +226,7 @@ impl VortexSource { vortex_reader_factory: None, vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()), file_metadata_cache: None, + segment_cache: None, options: VortexTableOptions::default(), } } @@ -283,6 +286,16 @@ impl VortexSource { self } + /// Sets a [`SegmentCache`] to reuse decoded segment bytes across scans of the same file. + /// + /// Without a cache every query re-reads zone map and data segments from object storage. + /// Providing a shared [`MokaSegmentCache`](vortex::layout::segments::MokaSegmentCache) + /// eliminates those redundant reads for repeated queries on the same files. + pub fn with_segment_cache(mut self, segment_cache: Arc) -> Self { + self.segment_cache = Some(segment_cache); + self + } + /// Sets the per-file Vortex scan concurrency. /// /// This is separate from DataFusion's partition-level parallelism. @@ -341,6 +354,7 @@ impl FileSource for VortexSource { has_output_ordering: !base_config.output_ordering.is_empty(), expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: self.file_metadata_cache.clone(), + segment_cache: self.segment_cache.clone(), projection_pushdown: self.options.projection_pushdown, scan_concurrency: self.options.scan_concurrency, };