diff --git a/Cargo.lock b/Cargo.lock index de836acc39c..bb79f83f4e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1602,9 +1602,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.55" +version = "1.2.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47b26a0954ae34af09b50f0de26458fa95369a0d478d8236d3f93082b219bd29" +checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583" dependencies = [ "find-msvc-tools", "jobserver", @@ -1719,9 +1719,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.56" +version = "4.5.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75ca66430e33a14957acc24c5077b503e7d374151b2b4b3a10c83b4ceb4be0e" +checksum = "c6e6ff9dcd79cff5cd969a17a545d79e84ab086e444102a591e288a8aa3ce394" dependencies = [ "clap_builder", "clap_derive", @@ -1729,9 +1729,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.56" +version = "4.5.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793207c7fa6300a0608d1080b858e5fdbe713cdc1c8db9fb17777d8a13e63df0" +checksum = "fa42cf4d2b7a41bc8f663a7cab4031ebafa1bf3875705bfaf8466dc60ab52c00" dependencies = [ "anstream", "anstyle", @@ -1742,9 +1742,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.55" +version = "4.5.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" +checksum = "2a0b5487afeab2deb2ff4e03a807ad1a03ac532ff5a2cee5d86884440c7f7671" dependencies = [ "heck", "proc-macro2", @@ -2489,10 +2489,20 @@ version = "0.1.0" dependencies = [ "anyhow", "arrow-ipc 57.2.0", + "arrow-schema 57.2.0", + "async-trait", "clap", "datafusion 52.1.0", + "datafusion-catalog 52.1.0", "datafusion-common 52.1.0", + "datafusion-datasource 52.1.0", + "datafusion-execution 52.1.0", + "datafusion-expr 52.1.0", + "datafusion-physical-expr 52.1.0", + "datafusion-physical-expr-adapter 52.1.0", + "datafusion-physical-expr-common 52.1.0", "datafusion-physical-plan 52.1.0", + "datafusion-pruning 52.1.0", "futures", "get_dir", "itertools 0.14.0", @@ -2502,10 +2512,13 @@ dependencies = [ "opentelemetry_sdk", "parking_lot", "tokio", + "tracing", "url", + "vortex", "vortex-bench", "vortex-cuda", "vortex-datafusion", + "vortex-utils", ] [[package]] @@ -3747,7 +3760,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -3914,7 +3927,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -4057,9 +4070,9 @@ dependencies = [ [[package]] name = "find-msvc-tools" -version = "0.1.9" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db" [[package]] name = "finl_unicode" @@ -4330,7 +4343,7 @@ dependencies = [ "libc", "log", "rustversion", - "windows-link 0.1.3", + "windows-link 0.2.1", "windows-result 0.4.1", ] @@ -4897,9 +4910,9 @@ dependencies = [ [[package]] name = "insta" -version = "1.46.2" +version = "1.46.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38c91d64f9ad425e80200a50a0e8b8a641680b44e33ce832efe5b8bc65161b07" +checksum = "248b42847813a1550dafd15296fd9748c651d0c32194559dbc05d804d54b21e8" dependencies = [ "console 0.15.11", "once_cell", @@ -4959,7 +4972,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -5031,7 +5044,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -6346,7 +6359,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -8387,7 +8400,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -9353,7 +9366,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix 1.1.3", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -11406,7 +11419,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] diff --git a/benchmarks/datafusion-bench/Cargo.toml b/benchmarks/datafusion-bench/Cargo.toml index 4133cad2fae..c3382bbf1c3 100644 --- a/benchmarks/datafusion-bench/Cargo.toml +++ b/benchmarks/datafusion-bench/Cargo.toml @@ -17,6 +17,8 @@ publish = false [dependencies] anyhow = { workspace = true } arrow-ipc.workspace = true +arrow-schema = { workspace = true } +async-trait = { workspace = true } clap = { workspace = true, features = ["derive"] } datafusion = { workspace = true, features = [ "parquet", @@ -24,8 +26,16 @@ datafusion = { workspace = true, features = [ "nested_expressions", "unicode_expressions", ] } +datafusion-catalog = { workspace = true } datafusion-common = { workspace = true } +datafusion-datasource = { workspace = true } +datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-adapter = { workspace = true } +datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } +datafusion-pruning = { workspace = true } futures.workspace = true itertools.workspace = true object_store = { workspace = true, features = ["aws", "gcp"] } @@ -35,9 +45,12 @@ opentelemetry_sdk.workspace = true parking_lot = { workspace = true } tokio = { workspace = true, features = ["full"] } url = { workspace = true } +tracing = { workspace = true } +vortex = { workspace = true } vortex-bench = { workspace = true } vortex-cuda = { workspace = true, optional = true } vortex-datafusion = { workspace = true } +vortex-utils = { workspace = true } [build-dependencies] get_dir = { workspace = true } diff --git a/benchmarks/datafusion-bench/src/cuda/format.rs b/benchmarks/datafusion-bench/src/cuda/format.rs new file mode 100644 index 00000000000..c2ba3e229bc --- /dev/null +++ b/benchmarks/datafusion-bench/src/cuda/format.rs @@ -0,0 +1,147 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! CUDA-accelerated Vortex file format for DataFusion. + +use std::any::Any; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion_catalog::Session; +use datafusion_common::DataFusionError; +use datafusion_common::Result as DFResult; +use datafusion_common::Statistics; +use datafusion_common::internal_datafusion_err; +use datafusion_common::not_impl_err; +use datafusion_common::parsers::CompressionTypeVariant; +use datafusion_datasource::TableSchema; +use datafusion_datasource::file::FileSource; +use datafusion_datasource::file_compression_type::FileCompressionType; +use datafusion_datasource::file_format::FileFormat; +use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_physical_expr::LexRequirement; +use datafusion_physical_plan::ExecutionPlan; +use object_store::ObjectMeta; +use object_store::ObjectStore; +use vortex::file::VORTEX_FILE_EXTENSION; +use vortex::session::VortexSession; +use vortex_datafusion::VortexFormat; + +use super::source::CudaVortexSource; + +/// CUDA-accelerated Vortex file format for DataFusion. +/// +/// This wraps the standard `VortexFormat` but uses `CudaVortexSource` for execution. +pub struct CudaVortexFormat { + /// The underlying VortexFormat for schema inference and statistics. + inner: VortexFormat, + session: VortexSession, +} + +impl Debug for CudaVortexFormat { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CudaVortexFormat").finish() + } +} + +impl CudaVortexFormat { + /// Create a new CUDA-accelerated Vortex format. + pub fn new(session: VortexSession) -> Self { + Self { + inner: VortexFormat::new(session.clone()), + session, + } + } +} + +#[async_trait] +impl FileFormat for CudaVortexFormat { + fn as_any(&self) -> &dyn Any { + self + } + + fn compression_type(&self) -> Option { + None + } + + fn get_ext(&self) -> String { + VORTEX_FILE_EXTENSION.to_string() + } + + fn get_ext_with_compression( + &self, + file_compression_type: &FileCompressionType, + ) -> DFResult { + match file_compression_type.get_variant() { + CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()), + _ => Err(DataFusionError::Internal( + "Vortex does not support file level compression.".into(), + )), + } + } + + async fn infer_schema( + &self, + state: &dyn Session, + store: &Arc, + objects: &[ObjectMeta], + ) -> DFResult { + // Delegate to inner VortexFormat + self.inner.infer_schema(state, store, objects).await + } + + async fn infer_stats( + &self, + state: &dyn Session, + store: &Arc, + table_schema: SchemaRef, + object: &ObjectMeta, + ) -> DFResult { + // Delegate to inner VortexFormat + self.inner + .infer_stats(state, store, table_schema, object) + .await + } + + async fn create_physical_plan( + &self, + state: &dyn Session, + file_scan_config: FileScanConfig, + ) -> DFResult> { + // Get the source from the config and replace with our CUDA source + let mut source = file_scan_config + .file_source() + .as_any() + .downcast_ref::() + .cloned() + .ok_or_else(|| internal_datafusion_err!("Expected CudaVortexSource"))?; + + source = source + .with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache()); + + let conf = FileScanConfigBuilder::from(file_scan_config) + .with_source(Arc::new(source)) + .build(); + + Ok(DataSourceExec::from_data_source(conf)) + } + + async fn create_writer_physical_plan( + &self, + _input: Arc, + _state: &dyn Session, + _conf: datafusion_datasource::file_sink_config::FileSinkConfig, + _order_requirements: Option, + ) -> DFResult> { + not_impl_err!("CudaVortexFormat does not support writing") + } + + fn file_source(&self, table_schema: TableSchema) -> Arc { + Arc::new(CudaVortexSource::new(table_schema, self.session.clone())) + } +} diff --git a/benchmarks/datafusion-bench/src/cuda/mod.rs b/benchmarks/datafusion-bench/src/cuda/mod.rs new file mode 100644 index 00000000000..67f13766906 --- /dev/null +++ b/benchmarks/datafusion-bench/src/cuda/mod.rs @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! CUDA-accelerated execution for the DataFusion benchmark. +//! +//! This module provides CUDA-accelerated projection execution for TPC-H benchmarks. +//! It duplicates the opener logic from vortex-datafusion but uses CUDA execution +//! instead of CPU execution. + +mod format; +mod opener; +mod source; + +pub use format::CudaVortexFormat; +pub use source::CudaVortexSource; diff --git a/benchmarks/datafusion-bench/src/cuda/opener.rs b/benchmarks/datafusion-bench/src/cuda/opener.rs new file mode 100644 index 00000000000..e454d976eed --- /dev/null +++ b/benchmarks/datafusion-bench/src/cuda/opener.rs @@ -0,0 +1,391 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! CUDA-accelerated file opener for Vortex files. +//! +//! This is a modified version of `vortex_datafusion::persistent::opener::VortexOpener` +//! that uses CUDA execution instead of CPU execution for array processing. + +use std::ops::Range; +use std::sync::Arc; +use std::sync::Weak; + +use datafusion_common::DataFusionError; +use datafusion_common::Result as DFResult; +use datafusion_common::ScalarValue; +use datafusion_common::exec_datafusion_err; +use datafusion_datasource::FileRange; +use datafusion_datasource::PartitionedFile; +use datafusion_datasource::TableSchema; +use datafusion_datasource::file_stream::FileOpenFuture; +use datafusion_datasource::file_stream::FileOpener; +use datafusion_execution::cache::cache_manager::FileMetadataCache; +use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; +use datafusion_physical_expr::split_conjunction; +use datafusion_physical_expr::utils::reassign_expr_columns; +use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; +use datafusion_physical_expr_adapter::replace_columns_with_literals; +use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr; +use datafusion_physical_plan::metrics::Count; +use datafusion_pruning::FilePruner; +use futures::FutureExt; +use futures::StreamExt; +use futures::TryStreamExt; +use futures::stream; +use object_store::path::Path; +use tracing::Instrument; +use vortex::array::ArrayRef; +use vortex::array::IntoArray; +use vortex::array::VortexSessionExecute; +use vortex::array::arrow::ArrowArrayExecutor; +use vortex::error::VortexError; +use vortex::error::vortex_err; +use vortex::file::OpenOptionsSessionExt; +use vortex::io::InstrumentedReadAt; +use vortex::layout::LayoutReader; +use vortex::metrics::VortexMetrics; +use vortex::scan::ScanBuilder; +use vortex::session::VortexSession; +use vortex_cuda::CanonicalCudaExt; +use vortex_cuda::CudaSession; +use vortex_cuda::executor::CudaArrayExt; +use vortex_datafusion::ExpressionConvertor; +use vortex_datafusion::ProcessedProjection; +use vortex_datafusion::VortexAccessPlan; +use vortex_datafusion::VortexReaderFactory; +use vortex_datafusion::calculate_physical_schema; +use vortex_datafusion::make_vortex_predicate; +use vortex_utils::aliases::dash_map::DashMap; +use vortex_utils::aliases::dash_map::Entry; + +// Note: We avoid using CachedVortexMetadata and PrunableStream from vortex_datafusion +// as they are not public. Instead, we simplify the implementation. + +/// File opener that uses CUDA for array execution. +/// +/// This is similar to `VortexOpener` but uses `execute_cuda()` instead of CPU execution. +#[derive(Clone)] +pub(crate) struct CudaVortexOpener { + pub session: VortexSession, + pub vortex_reader_factory: Arc, + pub projection: ProjectionExprs, + pub filter: Option, + pub file_pruning_predicate: Option, + pub expr_adapter_factory: Arc, + pub table_schema: TableSchema, + pub batch_size: usize, + pub limit: Option, + pub metrics: VortexMetrics, + pub layout_readers: Arc>>, + pub has_output_ordering: bool, + pub expression_convertor: Arc, + pub file_metadata_cache: Option>, +} + +impl FileOpener for CudaVortexOpener { + fn open(&self, file: PartitionedFile) -> DFResult { + let session = self.session.clone(); + let metrics = self + .metrics + .child_with_tags([("file_path", file.path().to_string())]); + + let mut projection = self.projection.clone(); + let mut filter = self.filter.clone(); + + let reader = self + .vortex_reader_factory + .create_reader(file.path().as_ref(), &session)?; + + let reader = InstrumentedReadAt::new(reader, &metrics); + + let file_pruning_predicate = self.file_pruning_predicate.clone(); + let expr_adapter_factory = self.expr_adapter_factory.clone(); + let file_metadata_cache = self.file_metadata_cache.clone(); + + let unified_file_schema = self.table_schema.file_schema().clone(); + let batch_size = self.batch_size; + let limit = self.limit; + let layout_reader = self.layout_readers.clone(); + let has_output_ordering = self.has_output_ordering; + + let expr_convertor = self.expression_convertor.clone(); + + // Replace column access for partition columns with literals + #[allow(clippy::disallowed_types)] + let literal_value_cols = self + .table_schema + .table_partition_cols() + .iter() + .map(|f| f.name()) + .cloned() + .zip(file.partition_values.clone()) + .collect::>(); + + if !literal_value_cols.is_empty() { + projection = projection.try_map_exprs(|expr| { + replace_columns_with_literals(Arc::clone(&expr), &literal_value_cols) + })?; + filter = filter + .map(|p| replace_columns_with_literals(p, &literal_value_cols)) + .transpose()?; + } + + Ok(async move { + // Create FilePruner when we have a predicate and either dynamic expressions + // or file statistics available. + let mut file_pruner = file_pruning_predicate + .filter(|p| is_dynamic_physical_expr(p) || file.has_statistics()) + .and_then(|predicate| { + FilePruner::try_new( + predicate.clone(), + &unified_file_schema, + &file, + Count::default(), + ) + }); + + // Check if this file should be pruned based on statistics/partition values. + if let Some(file_pruner) = file_pruner.as_mut() + && file_pruner.should_prune()? + { + return Ok(stream::empty().boxed()); + } + + let open_opts = session + .open_options() + .with_file_size(file.object_meta.size) + .with_metrics(metrics.clone()); + + // Note: We skip using CachedVortexMetadata here to avoid depending on + // private vortex-datafusion internals. The footer will be re-read from the file. + drop(file_metadata_cache); + + let vxf = open_opts + .open_read(reader) + .await + .map_err(|e| exec_datafusion_err!("Failed to open Vortex file {e}"))?; + + let this_file_schema = Arc::new(calculate_physical_schema( + vxf.dtype(), + &unified_file_schema, + )?); + + let projected_physical_schema = projection.project_schema(&unified_file_schema)?; + + let expr_adapter = expr_adapter_factory.create( + Arc::clone(&unified_file_schema), + Arc::clone(&this_file_schema), + ); + + let simplifier = PhysicalExprSimplifier::new(&this_file_schema); + + let filter = filter + .map(|filter| simplifier.simplify(expr_adapter.rewrite(filter)?)) + .transpose()?; + let projection = + projection.try_map_exprs(|p| simplifier.simplify(expr_adapter.rewrite(p)?))?; + + let ProcessedProjection { + scan_projection, + leftover_projection, + } = expr_convertor.split_projection( + projection, + &this_file_schema, + &projected_physical_schema, + )?; + + let scan_dtype = scan_projection.return_dtype(vxf.dtype()).map_err(|_e| { + exec_datafusion_err!("Couldn't get the dtype for the underlying Vortex scan") + })?; + let stream_schema = calculate_physical_schema(&scan_dtype, &projected_physical_schema)?; + + let leftover_projection = leftover_projection + .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; + let projector = leftover_projection.make_projector(&stream_schema)?; + + // Share layout readers with other partitions + let layout_reader = match layout_reader.entry(file.object_meta.location.clone()) { + Entry::Occupied(mut occupied_entry) => { + if let Some(reader) = occupied_entry.get().upgrade() { + tracing::trace!("reusing layout reader for {}", occupied_entry.key()); + reader + } else { + tracing::trace!("creating layout reader for {}", occupied_entry.key()); + let reader = vxf.layout_reader().map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create layout reader: {e}" + )) + })?; + occupied_entry.insert(Arc::downgrade(&reader)); + reader + } + } + Entry::Vacant(vacant_entry) => { + tracing::trace!("creating layout reader for {}", vacant_entry.key()); + let reader = vxf.layout_reader().map_err(|e| { + DataFusionError::Execution(format!("Failed to create layout reader: {e}")) + })?; + vacant_entry.insert(Arc::downgrade(&reader)); + + reader + } + }; + + let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader); + + if let Some(extensions) = file.extensions + && let Some(vortex_plan) = extensions.downcast_ref::() + { + scan_builder = vortex_plan.apply_to_builder(scan_builder); + } + + if let Some(file_range) = file.range { + scan_builder = apply_byte_range( + file_range, + file.object_meta.size, + vxf.row_count(), + scan_builder, + ); + } + + let filter = filter + .and_then(|f| { + let (pushed, unpushed): (Vec, Vec) = + split_conjunction(&f) + .into_iter() + .cloned() + .partition(|expr| { + expr_convertor.can_be_pushed_down(expr, &this_file_schema) + }); + + if !unpushed.is_empty() { + return Some(Err(exec_datafusion_err!( + r#"VortexSource accepted but failed to push {} filters. + This should never happen if you have a properly configured + PhysicalExprAdapterFactory configured on the source. + + Failed filters: + + {unpushed:#?} + "#, + unpushed.len() + ))); + } + + make_vortex_predicate(expr_convertor.as_ref(), &pushed).transpose() + }) + .transpose()?; + + if let Some(limit) = limit + && filter.is_none() + { + scan_builder = scan_builder.with_limit(limit); + } + + // CUDA EXECUTION PATH + // This is the key difference from the standard VortexOpener: + // We use execute_cuda() instead of execute_record_batch() + let stream = scan_builder + .with_metrics(metrics) + .with_projection(scan_projection) + .with_some_filter(filter) + .with_ordered(has_output_ordering) + .into_stream() + .map_err(|e| exec_datafusion_err!("Failed to create Vortex stream: {e}"))? + .then(move |chunk_result| { + let session = session.clone(); + let stream_schema = stream_schema.clone(); + async move { + let chunk = chunk_result?; + + // Execute on CUDA - this is the main difference from CPU path + let mut cuda_ctx = + CudaSession::create_execution_ctx(&session).map_err(|e| { + vortex_err!("Failed to create CUDA execution context: {e}") + })?; + + tracing::info!("Executing array {} on CUDA device", chunk.encoding_id()); + let canonical = chunk.execute_cuda(&mut cuda_ctx).await?; + + // Copy result from GPU back to host memory + let host_canonical = canonical.into_host().await?; + + // Convert canonical result to ArrayRef and then to RecordBatch + let array: ArrayRef = host_canonical.into_array(); + let mut cpu_ctx = session.create_execution_ctx(); + array.execute_record_batch(&stream_schema, &mut cpu_ctx) + } + }) + .map_ok(move |rb| { + // Slice the stream into batches respecting datafusion's configured batch size + stream::iter( + (0..rb.num_rows().div_ceil(batch_size * 2)) + .flat_map(move |block_idx| { + let offset = block_idx * batch_size * 2; + + if rb.num_rows() - offset < 2 * batch_size { + let length = rb.num_rows() - offset; + [Some(rb.slice(offset, length)), None].into_iter() + } else { + let first = rb.slice(offset, batch_size); + let second = rb.slice(offset + batch_size, batch_size); + [Some(first), Some(second)].into_iter() + } + }) + .flatten() + .map(Ok), + ) + }) + .map_err(move |e: VortexError| { + DataFusionError::External(Box::new(e.with_context(format!( + "Failed to read Vortex file: {}", + file.object_meta.location + )))) + }) + .try_flatten() + .map(move |batch| { + if projector.projection().as_ref().is_empty() { + batch + } else { + batch.and_then(|b| projector.project_batch(&b)) + } + }) + .boxed(); + + // Note: We skip using PrunableStream here to avoid depending on private + // vortex-datafusion internals. File pruning still happens at the scan level. + drop(file_pruner); + Ok(stream) + } + .in_current_span() + .boxed()) + } +} + +/// If the file has a [`FileRange`], we translate it into a row range in the file for the scan. +fn apply_byte_range( + file_range: FileRange, + total_size: u64, + row_count: u64, + scan_builder: ScanBuilder, +) -> ScanBuilder { + let row_range = byte_range_to_row_range( + file_range.start as u64..file_range.end as u64, + row_count, + total_size, + ); + + scan_builder.with_row_range(row_range) +} + +fn byte_range_to_row_range(byte_range: Range, row_count: u64, total_size: u64) -> Range { + let average_row = total_size / row_count; + assert!(average_row > 0, "A row must always have at least one byte"); + + let start_row = byte_range.start / average_row; + let end_row = byte_range.end / average_row; + + start_row..u64::min(row_count, end_row) +} diff --git a/benchmarks/datafusion-bench/src/cuda/source.rs b/benchmarks/datafusion-bench/src/cuda/source.rs new file mode 100644 index 00000000000..d4b20591b96 --- /dev/null +++ b/benchmarks/datafusion-bench/src/cuda/source.rs @@ -0,0 +1,268 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! CUDA-accelerated Vortex file source for DataFusion. + +use std::any::Any; +use std::fmt::Formatter; +use std::sync::Arc; +use std::sync::Weak; + +use datafusion_common::Result as DFResult; +use datafusion_common::config::ConfigOptions; +use datafusion_datasource::TableSchema; +use datafusion_datasource::file::FileSource; +use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_stream::FileOpener; +use datafusion_execution::cache::cache_manager::FileMetadataCache; +use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr::conjunction; +use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_plan::DisplayFormatType; +use datafusion_physical_plan::PhysicalExpr; +use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; +use datafusion_physical_plan::filter_pushdown::PushedDown; +use datafusion_physical_plan::filter_pushdown::PushedDownPredicate; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use object_store::ObjectStore; +use object_store::path::Path; +use vortex::error::VortexExpect as _; +use vortex::file::VORTEX_FILE_EXTENSION; +use vortex::layout::LayoutReader; +use vortex::metrics::VortexMetrics; +use vortex::session::VortexSession; +use vortex_datafusion::DefaultExpressionConvertor; +use vortex_datafusion::DefaultVortexReaderFactory; +use vortex_datafusion::ExpressionConvertor; +use vortex_datafusion::VortexReaderFactory; +use vortex_utils::aliases::dash_map::DashMap; + +use super::opener::CudaVortexOpener; + +const PARTITION_LABEL: &str = "partition"; + +/// CUDA-accelerated Vortex file source for DataFusion. +/// +/// This is similar to `VortexSource` but creates `CudaVortexOpener` instances +/// that use CUDA for array execution. +#[derive(Clone)] +pub struct CudaVortexSource { + pub(crate) session: VortexSession, + pub(crate) table_schema: TableSchema, + pub(crate) projection: ProjectionExprs, + pub(crate) full_predicate: Option, + pub(crate) vortex_predicate: Option, + pub(crate) batch_size: Option, + _unused_df_metrics: ExecutionPlanMetricsSet, + layout_readers: Arc>>, + expression_convertor: Arc, + pub(crate) vortex_reader_factory: Option>, + vx_metrics: VortexMetrics, + file_metadata_cache: Option>, +} + +impl CudaVortexSource { + pub(crate) fn new(table_schema: TableSchema, session: VortexSession) -> Self { + let full_schema = table_schema.table_schema(); + let indices = (0..full_schema.fields().len()).collect::>(); + let projection = ProjectionExprs::from_indices(&indices, full_schema); + + Self { + session, + table_schema, + projection, + full_predicate: None, + vortex_predicate: None, + batch_size: None, + _unused_df_metrics: Default::default(), + layout_readers: Arc::new(DashMap::default()), + expression_convertor: Arc::new(DefaultExpressionConvertor::default()), + vortex_reader_factory: None, + vx_metrics: VortexMetrics::default(), + file_metadata_cache: None, + } + } + + /// Override the file metadata cache + pub fn with_file_metadata_cache( + mut self, + file_metadata_cache: Arc, + ) -> Self { + self.file_metadata_cache = Some(file_metadata_cache); + self + } +} + +impl FileSource for CudaVortexSource { + fn create_file_opener( + &self, + object_store: Arc, + base_config: &FileScanConfig, + partition: usize, + ) -> DFResult> { + let partition_metrics = self + .vx_metrics + .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter()); + + let batch_size = self + .batch_size + .vortex_expect("batch_size must be supplied to CudaVortexSource"); + + let expr_adapter_factory = base_config + .expr_adapter_factory + .clone() + .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory)); + + let vortex_reader_factory = self + .vortex_reader_factory + .clone() + .unwrap_or_else(|| Arc::new(DefaultVortexReaderFactory::new(object_store))); + + let opener = CudaVortexOpener { + session: self.session.clone(), + vortex_reader_factory, + projection: self.projection.clone(), + filter: self.vortex_predicate.clone(), + file_pruning_predicate: self.full_predicate.clone(), + expr_adapter_factory, + table_schema: self.table_schema.clone(), + batch_size, + limit: base_config.limit.map(|l| l as u64), + metrics: partition_metrics, + layout_readers: self.layout_readers.clone(), + has_output_ordering: !base_config.output_ordering.is_empty(), + expression_convertor: Arc::new(DefaultExpressionConvertor::default()), + file_metadata_cache: self.file_metadata_cache.clone(), + }; + + Ok(Arc::new(opener)) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn with_batch_size(&self, batch_size: usize) -> Arc { + let mut source = self.clone(); + source.batch_size = Some(batch_size); + Arc::new(source) + } + + fn filter(&self) -> Option> { + self.vortex_predicate.clone() + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self._unused_df_metrics + } + + fn file_type(&self) -> &str { + VORTEX_FILE_EXTENSION + } + + fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + if let Some(ref predicate) = self.vortex_predicate { + write!(f, ", predicate: {predicate}")?; + } + } + DisplayFormatType::TreeRender => { + if let Some(ref predicate) = self.vortex_predicate { + writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; + }; + } + } + Ok(()) + } + + fn supports_repartitioning(&self) -> bool { + true + } + + fn try_pushdown_filters( + &self, + filters: Vec>, + _config: &ConfigOptions, + ) -> DFResult>> { + if filters.is_empty() { + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![], + )); + } + + let mut source = self.clone(); + + source.full_predicate = match source.full_predicate { + Some(predicate) => Some(conjunction( + std::iter::once(predicate).chain(filters.clone()), + )), + None => Some(conjunction(filters.clone())), + }; + + let supported_filters = filters + .into_iter() + .map(|expr| { + if self + .expression_convertor + .can_be_pushed_down(&expr, self.table_schema.file_schema()) + { + PushedDownPredicate::supported(expr) + } else { + PushedDownPredicate::unsupported(expr) + } + }) + .collect::>(); + + if supported_filters + .iter() + .all(|p| matches!(p.discriminant, PushedDown::No)) + { + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PushedDown::No; supported_filters.len()], + ) + .with_updated_node(Arc::new(source) as _)); + } + + let supported = supported_filters + .iter() + .filter_map(|p| match p.discriminant { + PushedDown::Yes => Some(&p.predicate), + PushedDown::No => None, + }) + .cloned(); + + let predicate = match source.vortex_predicate { + Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)), + None => conjunction(supported), + }; + + tracing::debug!(%predicate, "Saving predicate"); + + source.vortex_predicate = Some(predicate); + + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + supported_filters.iter().map(|f| f.discriminant).collect(), + ) + .with_updated_node(Arc::new(source) as _)) + } + + fn try_pushdown_projection( + &self, + projection: &ProjectionExprs, + ) -> DFResult>> { + let mut source = self.clone(); + source.projection = self.projection.try_merge(projection)?; + Ok(Some(Arc::new(source))) + } + + fn projection(&self) -> Option<&ProjectionExprs> { + Some(&self.projection) + } + + fn table_schema(&self) -> &TableSchema { + &self.table_schema + } +} diff --git a/benchmarks/datafusion-bench/src/lib.rs b/benchmarks/datafusion-bench/src/lib.rs index 8869f8cbe99..b43160ddfa5 100644 --- a/benchmarks/datafusion-bench/src/lib.rs +++ b/benchmarks/datafusion-bench/src/lib.rs @@ -3,6 +3,9 @@ pub mod metrics; +#[cfg(feature = "cuda")] +pub mod cuda; + use std::sync::Arc; use datafusion::datasource::file_format::FileFormat; @@ -112,3 +115,39 @@ pub fn format_to_df_format(format: Format) -> Arc { } } } + +/// Initialize CUDA session and register kernels. +/// +/// This must be called before using CUDA execution. It initializes the CudaSession +/// and registers all CUDA kernels for supported encodings. +#[cfg(feature = "cuda")] +pub fn initialize_cuda_session() { + use vortex_cuda::CudaSessionExt; + use vortex_cuda::initialize_cuda; + + // Access the CudaSession via the extension trait - this lazily initializes it + let cuda_session = SESSION.cuda_session(); + // Register all CUDA kernels + initialize_cuda(&cuda_session); + tracing::info!("CUDA session initialized and kernels registered"); +} + +/// Returns a CUDA-accelerated Vortex format for DataFusion. +/// +/// This format uses CUDA for array execution during scans. +#[cfg(feature = "cuda")] +pub fn format_to_df_format_cuda(format: Format) -> Arc { + use cuda::CudaVortexFormat; + + match format { + Format::Csv => Arc::new(CsvFormat::default()) as _, + Format::Arrow => Arc::new(ArrowFormat), + Format::Parquet => Arc::new(ParquetFormat::new()), + Format::OnDiskVortex | Format::VortexCompact => { + Arc::new(CudaVortexFormat::new(SESSION.clone())) + } + Format::OnDiskDuckDB | Format::Lance => { + unimplemented!("Format {format} cannot be turned into a DataFusion `FileFormat`") + } + } +} diff --git a/benchmarks/datafusion-bench/src/main.rs b/benchmarks/datafusion-bench/src/main.rs index 6be64e5bbad..6fc4e064857 100644 --- a/benchmarks/datafusion-bench/src/main.rs +++ b/benchmarks/datafusion-bench/src/main.rs @@ -96,6 +96,14 @@ struct Args { #[arg(long = "opt", value_delimiter = ',', value_parser = value_parser!(Opt))] options: Vec, + + /// Enable CUDA-accelerated execution for Vortex scans. + /// + /// When enabled, array projections will be executed on the GPU using CUDA. + /// Requires the 'cuda' feature to be enabled at compile time. + #[cfg(feature = "cuda")] + #[arg(long, default_value_t = false)] + cuda: bool, } #[tokio::main] @@ -105,6 +113,18 @@ async fn main() -> anyhow::Result<()> { setup_logging_and_tracing(args.verbose, args.tracing)?; + // Initialize CUDA if requested + #[cfg(feature = "cuda")] + let use_cuda = args.cuda; + #[cfg(not(feature = "cuda"))] + let use_cuda = false; + + #[cfg(feature = "cuda")] + if use_cuda { + datafusion_bench::initialize_cuda_session(); + tracing::info!("CUDA execution enabled for Vortex scans"); + } + let benchmark = create_benchmark(args.benchmark, &opts)?; let filtered_queries = filter_queries( @@ -161,7 +181,7 @@ async fn main() -> anyhow::Result<()> { async move { let session = datafusion_bench::get_session_context(); datafusion_bench::make_object_store(&session, benchmark.data_url())?; - register_benchmark_tables(&session, benchmark, format).await?; + register_benchmark_tables(&session, benchmark, format, use_cuda).await?; Ok((session, format)) } }, @@ -209,12 +229,25 @@ async fn register_benchmark_tables( session: &SessionContext, benchmark: &B, format: Format, + use_cuda: bool, ) -> anyhow::Result<()> { match format { Format::Arrow => register_arrow_tables(session, benchmark).await, _ => { let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?; - let file_format = format_to_df_format(format); + + // Use CUDA format if requested and available + #[cfg(feature = "cuda")] + let file_format = if use_cuda { + datafusion_bench::format_to_df_format_cuda(format) + } else { + format_to_df_format(format) + }; + #[cfg(not(feature = "cuda"))] + let file_format = { + let _ = use_cuda; // suppress unused warning + format_to_df_format(format) + }; for table in benchmark.table_specs().iter() { let pattern = benchmark.pattern(table.name, format); diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 4ac557f00fd..ae73c035d0b 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -43,12 +43,14 @@ use crate::convert::FromDataFusion; /// Result of splitting a projection into Vortex expressions and leftover DataFusion projections. pub struct ProcessedProjection { + /// The projection expressions that can be pushed down into the Vortex scan. pub scan_projection: Expression, + /// The leftover projection expressions that must be evaluated by DataFusion after the scan. pub leftover_projection: ProjectionExprs, } /// Tries to convert the expressions into a vortex conjunction. Will return Ok(None) iff the input conjunction is empty. -pub(crate) fn make_vortex_predicate( +pub fn make_vortex_predicate( expr_convertor: &dyn ExpressionConvertor, predicate: &[Arc], ) -> DFResult> { diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index 46a964e667a..bf17021b924 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -14,7 +14,11 @@ mod persistent; #[cfg(test)] mod tests; +pub use convert::exprs::DefaultExpressionConvertor; pub use convert::exprs::ExpressionConvertor; +pub use convert::exprs::ProcessedProjection; +pub use convert::exprs::make_vortex_predicate; +pub use convert::schema::calculate_physical_schema; pub use persistent::*; /// Extension trait to convert our [`Precision`](vortex::stats::Precision) to Datafusion's [`Precision`](datafusion_common::stats::Precision)