diff --git a/Cargo.lock b/Cargo.lock index 6bf31b33898..8fea287b5f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11031,7 +11031,6 @@ dependencies = [ "vortex-buffer", "vortex-dtype", "vortex-error", - "vortex-mask", "vortex-proto", "vortex-session", "vortex-utils", diff --git a/vortex-array/src/array_future.rs b/vortex-array/src/array_future.rs new file mode 100644 index 00000000000..94ee4a7b754 --- /dev/null +++ b/vortex-array/src/array_future.rs @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::future::Future; +use std::ops::Range; +use std::sync::Arc; + +use futures::FutureExt; +use futures::TryFutureExt; +use futures::future::BoxFuture; +use futures::future::Shared; +use vortex_error::SharedVortexResult; +use vortex_error::VortexError; +use vortex_error::VortexResult; +use vortex_error::vortex_panic; + +use crate::ArrayRef; + +/// A future that resolves to an array with a known length. +#[derive(Clone)] +pub struct ArrayFuture { + inner: Shared>>, + len: usize, + estimated_bytes: usize, +} + +impl ArrayFuture { + /// Create a new `ArrayFuture` from a future that returns an array. + pub fn new(len: usize, estimated_bytes: usize, fut: F) -> Self + where + F: Future> + Send + 'static, + { + Self { + inner: fut + .inspect(move |r| { + if let Ok(array) = r + && array.len() != len { + vortex_panic!("ArrayFuture created with future that returned array of incorrect length (expected {}, got {})", len, array.len()); + } + }) + .map_err(Arc::new) + .boxed() + .shared(), + len, + estimated_bytes, + } + } + + /// Create an `ArrayFuture` from an already-resolved array. + pub fn ready(array: ArrayRef) -> Self { + let len = array.len(); + Self::new(len, 0, async move { Ok(array) }) + } + + /// Returns the length of the array. + pub fn len(&self) -> usize { + self.len + } + + /// Returns true if the array is empty. + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Returns the estimated decoded byte size of the array. + pub fn estimated_bytes(&self) -> usize { + self.estimated_bytes + } + + /// Create an `ArrayFuture` that resolves to a slice of the original array. + pub fn slice(&self, range: Range) -> Self { + let inner = self.inner.clone(); + let parent_len = self.len; + let slice_estimated = if self.len > 0 { + usize::try_from(self.estimated_bytes as u128 * range.len() as u128 / self.len as u128) + .unwrap_or(usize::MAX) + } else { + 0 + }; + Self::new(range.len(), slice_estimated, async move { + let array = inner.await?; + debug_assert!(range.end <= parent_len, "slice range out of bounds"); + let _ = parent_len; + array.slice(range) + }) + } +} + +impl Future for ArrayFuture { + type Output = VortexResult; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + self.inner.poll_unpin(cx).map_err(VortexError::from) + } +} diff --git a/vortex-array/src/expr/exprs/mod.rs b/vortex-array/src/expr/exprs/mod.rs index 145d225bcae..c8a5776a131 100644 --- a/vortex-array/src/expr/exprs/mod.rs +++ b/vortex-array/src/expr/exprs/mod.rs @@ -17,6 +17,8 @@ pub(crate) mod operators; pub(crate) mod pack; pub(crate) mod root; pub(crate) mod select; +pub(crate) mod stats; + pub use between::*; pub use binary::*; pub use cast::*; @@ -33,3 +35,4 @@ pub use operators::*; pub use pack::*; pub use root::*; pub use select::*; +pub use stats::*; diff --git a/vortex-array/src/expr/exprs/stats.rs b/vortex-array/src/expr/exprs/stats.rs new file mode 100644 index 00000000000..05763e5ee69 --- /dev/null +++ b/vortex-array/src/expr/exprs/stats.rs @@ -0,0 +1,96 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_scalar::Scalar; + +use crate::Array; +use crate::ArrayRef; +use crate::IntoArray; +use crate::arrays::ConstantArray; +use crate::expr::Arity; +use crate::expr::ChildName; +use crate::expr::ExecutionArgs; +use crate::expr::ExprId; +use crate::expr::Expression; +use crate::expr::SimplifyCtx; +use crate::expr::VTable; +use crate::expr::VTableExt; +use crate::expr::stats::Precision; +use crate::expr::stats::Stat; +use crate::expr::stats::StatsProvider; + +/// Creates a new expression that returns a minimum bound of its input. +pub fn statistic(stat: Stat, child: Expression) -> Expression { + Statistic.new_expr(stat, vec![child]) +} + +pub struct Statistic; + +impl VTable for Statistic { + type Options = Stat; + + fn id(&self) -> ExprId { + ExprId::from("statistic") + } + + fn arity(&self, _options: &Self::Options) -> Arity { + Arity::Exact(1) + } + + fn child_name(&self, _options: &Self::Options, _child_idx: usize) -> ChildName { + ChildName::from("input") + } + + fn return_dtype(&self, stat: &Stat, arg_dtypes: &[DType]) -> VortexResult { + stat.dtype(&arg_dtypes[0]) + .ok_or_else(|| { + vortex_err!( + "statistic {:?} not supported for dtype {:?}", + stat, + arg_dtypes[0] + ) + }) + // We make all statistics types nullable in case there is no reduction rule to handle + // the statistic expression. + .map(|dt| dt.as_nullable()) + } + + fn execute(&self, stat: &Stat, args: ExecutionArgs) -> VortexResult { + // FIXME(ngates): we should implement this as a reduction rule instead? + let Some(stat_dtype) = stat.dtype(args.inputs[0].dtype()) else { + vortex_bail!( + "Statistic {:?} not supported for dtype {:?}", + stat, + args.inputs[0].dtype() + ); + }; + + Ok(match args.inputs[0].statistics().get(*stat) { + // TODO(ngates): do we care about precision here? Possibly we should configure in the + // options of the expression. + Some(Precision::Exact(v)) => { + // We have an exact value for the statistic; so we return a constant array + // with that value. + ConstantArray::new(v, args.row_count).into_array() + } + None | Some(Precision::Inexact(_)) => { + ConstantArray::new(Scalar::null(stat_dtype), args.row_count).into_array() + } + }) + } + + fn simplify( + &self, + _options: &Self::Options, + _expr: &Expression, + _ctx: &dyn SimplifyCtx, + ) -> VortexResult> { + // FIXME(ngates): we really want to implement a reduction rule for all arrays? But it's an array. + // And it's a reduction rule. How do we do this without reduce_parent on everything..? + Ok(None) + } +} diff --git a/vortex-array/src/expr/stats/mod.rs b/vortex-array/src/expr/stats/mod.rs index cba33e2743c..e01f807e21d 100644 --- a/vortex-array/src/expr/stats/mod.rs +++ b/vortex-array/src/expr/stats/mod.rs @@ -216,7 +216,7 @@ impl Stat { }) } - pub fn name(&self) -> &str { + pub const fn name(&self) -> &'static str { match self { Self::IsConstant => "is_constant", Self::IsSorted => "is_sorted", diff --git a/vortex-array/src/expr/vtable.rs b/vortex-array/src/expr/vtable.rs index 5176ff31109..4f65d4dcc7a 100644 --- a/vortex-array/src/expr/vtable.rs +++ b/vortex-array/src/expr/vtable.rs @@ -76,7 +76,20 @@ pub trait VTable: 'static + Sized + Send + Sync { options: &Self::Options, expr: &Expression, f: &mut Formatter<'_>, - ) -> fmt::Result; + ) -> fmt::Result { + write!(f, "{}(", expr.id())?; + for (i, child) in expr.children().iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + child.fmt_sql(f)?; + } + let options = format!("{}", options); + if !options.is_empty() { + write!(f, ", options={}", options)?; + } + write!(f, ")") + } /// Compute the return [`DType`] of the expression if evaluated over the given input types. fn return_dtype(&self, options: &Self::Options, arg_dtypes: &[DType]) -> VortexResult; @@ -136,6 +149,14 @@ pub trait VTable: 'static + Sized + Send + Sync { Ok(None) } + /// Falsify the expression, returning a new expression that is true whenever the original + /// expression is guaranteed to be false via stats. + fn falsify(&self, options: &Self::Options, expr: &Expression) -> Option { + _ = options; + _ = expr; + None + } + /// See [`Expression::stat_falsification`]. fn stat_falsification( &self, diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index d4b2bd5e6fd..ea1ca2f54e3 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -16,6 +16,7 @@ use std::sync::LazyLock; pub use array::*; +pub use array_future::*; pub use canonical::*; pub use columnar::*; pub use context::*; @@ -31,6 +32,7 @@ pub mod accessor; #[doc(hidden)] pub mod aliases; mod array; +mod array_future; pub mod arrays; pub mod arrow; pub mod buffer; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 15a2a81887a..5f8da13830c 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -35,12 +35,15 @@ use vortex::array::ArrayRef; use vortex::array::VortexSessionExecute; use vortex::array::arrow::ArrowArrayExecutor; use vortex::error::VortexError; +use vortex::error::VortexExpect; use vortex::file::OpenOptionsSessionExt; use vortex::io::InstrumentedReadAt; +use vortex::io::session::RuntimeSessionExt; use vortex::layout::LayoutReader; use vortex::metrics::Label; use vortex::metrics::MetricsRegistry; use vortex::scan::ScanBuilder; +use vortex::scan::v2::scan::ScanBuilder2; use vortex::session::VortexSession; use vortex_utils::aliases::dash_map::DashMap; use vortex_utils::aliases::dash_map::Entry; @@ -270,21 +273,28 @@ impl FileOpener for VortexOpener { } }; - let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader); + // let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader); + let mut scan_builder = ScanBuilder2::new( + vxf.footer() + .layout() + .new_reader2(&vxf.segment_source(), &session) + .vortex_expect("Failed to create footer"), + session.clone(), + ); - if let Some(extensions) = file.extensions - && let Some(vortex_plan) = extensions.downcast_ref::() - { - scan_builder = vortex_plan.apply_to_builder(scan_builder); - } + // if let Some(extensions) = file.extensions + // && let Some(vortex_plan) = extensions.downcast_ref::() + // { + // scan_builder = vortex_plan.apply_to_builder(scan_builder); + // } if let Some(file_range) = file.range { - scan_builder = apply_byte_range( - file_range, - file.object_meta.size, - vxf.row_count(), - scan_builder, - ); + // scan_builder = apply_byte_range( + // file_range, + // file.object_meta.size, + // vxf.row_count(), + // scan_builder, + // ); } let filter = filter @@ -326,17 +336,24 @@ impl FileOpener for VortexOpener { scan_builder = scan_builder.with_limit(limit); } + let handle = session.handle().clone(); let stream = scan_builder - .with_metrics_registry(metrics_registry) + // .with_metrics_registry(metrics_registry) .with_projection(scan_projection) .with_some_filter(filter) - .with_ordered(has_output_ordering) - .map(move |chunk| { + // .with_ordered(has_output_ordering) + .into_array_stream() + .vortex_expect("Failed to execute Vortex scan") + .then(move |chunk| { let mut ctx = session.create_execution_ctx(); - chunk.execute_record_batch(&stream_schema, &mut ctx) + let stream_schema = stream_schema.clone(); + handle.spawn_cpu(move || { + chunk + .vortex_expect("failed") + .execute_record_batch(&stream_schema, &mut ctx) + }) }) - .into_stream() - .map_err(|e| exec_datafusion_err!("Failed to create Vortex stream: {e}"))? + // .map_err(|e| exec_datafusion_err!("Failed to create Vortex stream: {e}"))? .map_ok(move |rb| { // We try and slice the stream into respecting datafusion's configured batch size. stream::iter( diff --git a/vortex-file/src/file.rs b/vortex-file/src/file.rs index 65d77a43aa7..a37b600e74d 100644 --- a/vortex-file/src/file.rs +++ b/vortex-file/src/file.rs @@ -26,6 +26,7 @@ use vortex_layout::LayoutReader; use vortex_layout::segments::SegmentSource; use vortex_scan::ScanBuilder; use vortex_scan::SplitBy; +use vortex_scan::v2::scan::ScanBuilder2; use vortex_session::VortexSession; use vortex_utils::aliases::hash_map::HashMap; @@ -95,6 +96,14 @@ impl VortexFile { )) } + pub fn scan2(&self) -> VortexResult { + let reader_ref = self + .footer + .layout() + .new_reader2(&self.segment_source, &self.session)?; + Ok(ScanBuilder2::new(reader_ref, self.session.clone())) + } + /// Returns true if the expression will never match any rows in the file. pub fn can_prune(&self, filter: &Expression) -> VortexResult { let Some((stats, fields)) = self diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index 5a1eac809ba..1f14e5bcb87 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -65,7 +65,7 @@ use vortex_error::VortexResult; use vortex_io::session::RuntimeSession; use vortex_layout::session::LayoutSession; use vortex_scalar::Scalar; -use vortex_scan::ScanBuilder; +use vortex_scan::v2::scan::ScanBuilder2; use vortex_session::VortexSession; use crate::OpenOptionsSessionExt; @@ -121,7 +121,7 @@ async fn test_read_simple() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap(); @@ -201,7 +201,7 @@ async fn test_round_trip_many_types() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap() @@ -287,7 +287,7 @@ async fn test_read_projection() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let array = file - .scan() + .scan2() .unwrap() .with_projection(select(["strings"], root())) .into_array_stream() @@ -309,7 +309,7 @@ async fn test_read_projection() { assert_arrays_eq!(actual.as_ref(), expected.as_ref()); let array = file - .scan() + .scan2() .unwrap() .with_projection(select(["numbers"], root())) .into_array_stream() @@ -358,7 +358,7 @@ async fn unequal_batches() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap(); @@ -418,7 +418,7 @@ async fn write_chunked() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap(); @@ -448,7 +448,7 @@ async fn test_empty_varbin_array_roundtrip() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let result = file - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap() @@ -478,7 +478,7 @@ async fn issue_5385_filter_casted_column() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .with_filter(eq( cast( @@ -528,7 +528,7 @@ async fn filter_string() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .with_filter(eq(get_item("name", root()), lit("Joseph"))) .into_array_stream() @@ -577,7 +577,7 @@ async fn filter_or() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .with_filter(or( eq(get_item("name", root()), lit("Angela")), @@ -634,7 +634,7 @@ async fn filter_and() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .with_filter(and( gt(get_item("age", root()), lit(21)), @@ -688,7 +688,7 @@ async fn test_with_indices_simple() { // test no indices let actual_kept_array = file - .scan() + .scan2() .unwrap() .with_row_indices(Buffer::::empty()) .into_array_stream() @@ -704,7 +704,7 @@ async fn test_with_indices_simple() { let kept_indices = [0_u64, 3, 99, 100, 101, 399, 400, 401, 499]; let actual_kept_array = file - .scan() + .scan2() .unwrap() .with_row_indices(Buffer::from_iter(kept_indices)) .into_array_stream() @@ -724,7 +724,7 @@ async fn test_with_indices_simple() { // test all indices let actual_array = file - .scan() + .scan2() .unwrap() .with_row_indices((0u64..500).collect::>()) .into_array_stream() @@ -767,7 +767,7 @@ async fn test_with_indices_on_two_columns() { let kept_indices = [0_u64, 3, 7]; let array = file - .scan() + .scan2() .unwrap() .with_row_indices(Buffer::from_iter(kept_indices)) .into_array_stream() @@ -822,7 +822,7 @@ async fn test_with_indices_and_with_row_filter_simple() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let actual_kept_array = file - .scan() + .scan2() .unwrap() .with_filter(gt(get_item("numbers", root()), lit(50_i16))) .with_row_indices(Buffer::empty()) @@ -839,7 +839,7 @@ async fn test_with_indices_and_with_row_filter_simple() { let kept_indices = [0u64, 3, 99, 100, 101, 399, 400, 401, 499]; let actual_kept_array = file - .scan() + .scan2() .unwrap() .with_filter(gt(get_item("numbers", root()), lit(50_i16))) .with_row_indices(Buffer::from_iter(kept_indices)) @@ -862,7 +862,7 @@ async fn test_with_indices_and_with_row_filter_simple() { // test all indices let actual_array = file - .scan() + .scan2() .unwrap() .with_filter(gt(get_item("numbers", root()), lit(50_i16))) .with_row_indices((0..500).collect::>()) @@ -925,7 +925,7 @@ async fn filter_string_chunked() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let actual_array = file - .scan() + .scan2() .unwrap() .with_filter(eq(get_item("name", root()), lit("Joseph"))) .into_array_stream() @@ -1013,7 +1013,7 @@ async fn test_pruning_with_or() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let actual_array = file - .scan() + .scan2() .unwrap() .with_filter(or( lt_eq(get_item("letter", root()), lit("J")), @@ -1086,7 +1086,7 @@ async fn test_repeated_projection() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let actual = file - .scan() + .scan2() .unwrap() .with_projection(select(["strings", "strings"], root())) .into_array_stream() @@ -1119,7 +1119,7 @@ async fn chunked_file() -> VortexResult { #[tokio::test] async fn basic_file_roundtrip() -> VortexResult<()> { let vxf = chunked_file().await?; - let result = vxf.scan()?.into_array_stream()?.read_all().await?; + let result = vxf.scan2()?.into_array_stream()?.read_all().await?; let expected = buffer![0i32, 1, 2, 3, 4, 5, 6, 7, 8].into_array(); assert_arrays_eq!(result.as_ref(), expected.as_ref()); @@ -1163,7 +1163,7 @@ async fn file_excluding_dtype() -> VortexResult<()> { async fn file_take() -> VortexResult<()> { let vxf = chunked_file().await?; let result = vxf - .scan()? + .scan2()? .with_row_indices(buffer![0, 1, 8]) .into_array_stream()? .read_all() @@ -1201,7 +1201,7 @@ async fn write_nullable_top_level_struct() { async fn round_trip( array: &dyn Array, - f: impl Fn(ScanBuilder) -> VortexResult>, + f: impl Fn(ScanBuilder2) -> VortexResult, ) -> VortexResult { let mut writer = vec![]; SESSION @@ -1218,7 +1218,7 @@ async fn round_trip( assert_eq!(vxf.dtype(), array.dtype()); assert_eq!(vxf.row_count(), array.len() as u64); - f(vxf.scan()?)?.into_array_stream()?.read_all().await + f(vxf.scan2()?)?.into_array_stream()?.read_all().await } #[tokio::test] @@ -1297,7 +1297,7 @@ async fn test_into_tokio_array_stream() -> VortexResult<()> { .await?; let file = SESSION.open_options().open_buffer(buf)?; - let stream = file.scan().unwrap().into_array_stream()?; + let stream = file.scan2().unwrap().into_array_stream()?; let array = stream.read_all().await?; assert_eq!(array.len(), 8); @@ -1319,7 +1319,7 @@ async fn test_array_stream_no_double_dict_encode() -> VortexResult<()> { .write(&mut buf, array.to_array_stream()) .await?; let file = SESSION.open_options().open_buffer(buf)?; - let read_array = file.scan()?.into_array_stream()?.read_all().await?; + let read_array = file.scan2()?.into_array_stream()?.read_all().await?; let dict = read_array .as_opt::() @@ -1347,7 +1347,7 @@ async fn test_writer_basic_push() -> VortexResult<()> { assert_eq!(summary.row_count(), 4); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 4); assert_eq!(result.dtype(), &dtype); @@ -1377,7 +1377,7 @@ async fn test_writer_multiple_pushes() -> VortexResult<()> { assert_eq!(summary.row_count(), 9); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 9); let numbers = result @@ -1411,7 +1411,7 @@ async fn test_writer_push_stream() -> VortexResult<()> { assert_eq!(summary.row_count(), 6); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 6); let numbers = result @@ -1475,7 +1475,7 @@ async fn test_writer_empty_chunks() -> VortexResult<()> { assert_eq!(summary.row_count(), 2); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 2); let numbers = result @@ -1513,7 +1513,7 @@ async fn test_writer_mixed_push_and_stream() -> VortexResult<()> { assert_eq!(summary.row_count(), 6); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 6); let numbers = result @@ -1553,7 +1553,7 @@ async fn test_writer_with_complex_types() -> VortexResult<()> { assert_eq!(footer.row_count(), 3); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 3); assert_eq!(result.dtype(), &dtype); diff --git a/vortex-layout/src/layout.rs b/vortex-layout/src/layout.rs index b47b64abe37..1e3f7595a9d 100644 --- a/vortex-layout/src/layout.rs +++ b/vortex-layout/src/layout.rs @@ -25,6 +25,8 @@ use crate::display::DisplayLayoutTree; use crate::display::display_tree_with_segment_sizes; use crate::segments::SegmentId; use crate::segments::SegmentSource; +use crate::segments::SegmentSourceRef; +use crate::v2::reader::ReaderRef; pub type LayoutId = ArcRef; @@ -68,6 +70,12 @@ pub trait Layout: 'static + Send + Sync + Debug + private::Sealed { segment_source: Arc, session: &VortexSession, ) -> VortexResult; + + fn new_reader2( + &self, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult; } pub trait IntoLayout { @@ -313,6 +321,14 @@ impl Layout for LayoutAdapter { ) -> VortexResult { V::new_reader(&self.0, name, segment_source, session) } + + fn new_reader2( + &self, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult { + V::new_reader2(&self.0, segment_source, session) + } } mod private { diff --git a/vortex-layout/src/layouts/chunked/mod.rs b/vortex-layout/src/layouts/chunked/mod.rs index 8c5c77b6f78..53a700aa640 100644 --- a/vortex-layout/src/layouts/chunked/mod.rs +++ b/vortex-layout/src/layouts/chunked/mod.rs @@ -6,6 +6,7 @@ pub mod writer; use std::sync::Arc; +use itertools::Itertools; use vortex_array::ArrayContext; use vortex_array::DeserializeMetadata; use vortex_array::EmptyMetadata; @@ -24,6 +25,9 @@ use crate::children::OwnedLayoutChildren; use crate::layouts::chunked::reader::ChunkedReader; use crate::segments::SegmentId; use crate::segments::SegmentSource; +use crate::segments::SegmentSourceRef; +use crate::v2; +use crate::v2::reader::ReaderRef; use crate::vtable; vtable!(Chunked); @@ -83,6 +87,21 @@ impl VTable for ChunkedVTable { ))) } + fn new_reader2( + layout: &Self::Layout, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult { + let chunks = (0..Self::nchildren(layout)) + .map(|i| Self::child(layout, i)?.new_reader2(segment_source, session)) + .try_collect()?; + Ok(Arc::new(v2::readers::chunked::ChunkedReader::new( + layout.row_count, + layout.dtype.clone(), + chunks, + ))) + } + fn build( _encoding: &Self::Encoding, dtype: &DType, diff --git a/vortex-layout/src/layouts/dict/mod.rs b/vortex-layout/src/layouts/dict/mod.rs index e24336e7d9a..7b5ff688b45 100644 --- a/vortex-layout/src/layouts/dict/mod.rs +++ b/vortex-layout/src/layouts/dict/mod.rs @@ -30,6 +30,9 @@ use crate::VTable; use crate::children::LayoutChildren; use crate::segments::SegmentId; use crate::segments::SegmentSource; +use crate::segments::SegmentSourceRef; +use crate::v2; +use crate::v2::reader::ReaderRef; use crate::vtable; vtable!(Dict); @@ -101,6 +104,20 @@ impl VTable for DictVTable { )?)) } + fn new_reader2( + layout: &Self::Layout, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult { + let values = layout.values.new_reader2(segment_source, session)?; + let codes = layout.codes.new_reader2(segment_source, session)?; + Ok(Arc::new(v2::readers::dict::DictReader::new( + Self::dtype(layout).clone(), + values, + codes, + ))) + } + fn build( _encoding: &Self::Encoding, dtype: &DType, diff --git a/vortex-layout/src/layouts/flat/mod.rs b/vortex-layout/src/layouts/flat/mod.rs index 3826fc5dc7d..c6e792b2972 100644 --- a/vortex-layout/src/layouts/flat/mod.rs +++ b/vortex-layout/src/layouts/flat/mod.rs @@ -27,6 +27,9 @@ use crate::children::LayoutChildren; use crate::layouts::flat::reader::FlatReader; use crate::segments::SegmentId; use crate::segments::SegmentSource; +use crate::segments::SegmentSourceRef; +use crate::v2; +use crate::v2::reader::ReaderRef; use crate::vtable; /// Check if inline array node is enabled. @@ -94,6 +97,23 @@ impl VTable for FlatVTable { ))) } + fn new_reader2( + layout: &Self::Layout, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult { + let len = usize::try_from(layout.row_count).unwrap_or(usize::MAX); + Ok(Arc::new(v2::readers::flat::FlatReader::new( + len, + layout.dtype.clone(), + layout.array_tree.clone(), + layout.segment_id, + segment_source.clone(), + layout.ctx.clone(), + session.clone(), + ))) + } + fn build( _encoding: &Self::Encoding, dtype: &DType, diff --git a/vortex-layout/src/layouts/struct_/mod.rs b/vortex-layout/src/layouts/struct_/mod.rs index 2594e6723c0..a587e3b7a54 100644 --- a/vortex-layout/src/layouts/struct_/mod.rs +++ b/vortex-layout/src/layouts/struct_/mod.rs @@ -6,6 +6,7 @@ pub mod writer; use std::sync::Arc; +use itertools::Itertools; use reader::StructReader; use vortex_array::ArrayContext; use vortex_array::DeserializeMetadata; @@ -33,6 +34,9 @@ use crate::children::LayoutChildren; use crate::children::OwnedLayoutChildren; use crate::segments::SegmentId; use crate::segments::SegmentSource; +use crate::segments::SegmentSourceRef; +use crate::v2; +use crate::v2::reader::ReaderRef; use crate::vtable; vtable!(Struct); @@ -124,6 +128,30 @@ impl VTable for StructVTable { )?)) } + fn new_reader2( + layout: &Self::Layout, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult { + // Create validity reader from child 0 when nullable. + let validity = layout + .dtype + .is_nullable() + .then(|| Self::child(layout, 0)?.new_reader2(segment_source, session)) + .transpose()?; + let start_idx = if layout.dtype.is_nullable() { 1 } else { 0 }; + let nchildren = Self::nchildren(layout); + let fields = (start_idx..nchildren) + .map(|i| Self::child(layout, i)?.new_reader2(segment_source, session)) + .try_collect()?; + Ok(Arc::new(v2::readers::struct_::StructReader::new( + layout.row_count, + layout.dtype.clone(), + validity, + fields, + ))) + } + fn build( _encoding: &Self::Encoding, dtype: &DType, diff --git a/vortex-layout/src/layouts/zoned/mod.rs b/vortex-layout/src/layouts/zoned/mod.rs index 8cd63e3e429..d98ca79d8ee 100644 --- a/vortex-layout/src/layouts/zoned/mod.rs +++ b/vortex-layout/src/layouts/zoned/mod.rs @@ -38,6 +38,9 @@ use crate::layouts::zoned::reader::ZonedReader; use crate::layouts::zoned::zone_map::ZoneMap; use crate::segments::SegmentId; use crate::segments::SegmentSource; +use crate::segments::SegmentSourceRef; +use crate::v2; +use crate::v2::reader::ReaderRef; use crate::vtable; vtable!(Zoned); @@ -111,6 +114,21 @@ impl VTable for ZonedVTable { )?)) } + fn new_reader2( + layout: &Self::Layout, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult { + let data = layout.child(0)?.new_reader2(segment_source, session)?; + let zones = layout.child(1)?.new_reader2(segment_source, session)?; + Ok(Arc::new(v2::readers::zoned::ZonedReader::new( + data, + zones, + layout.zone_len, + layout.present_stats.clone(), + ))) + } + fn build( _encoding: &Self::Encoding, dtype: &DType, diff --git a/vortex-layout/src/lib.rs b/vortex-layout/src/lib.rs index f7f3787da79..6e438291820 100644 --- a/vortex-layout/src/lib.rs +++ b/vortex-layout/src/lib.rs @@ -24,6 +24,7 @@ pub mod session; mod strategy; #[cfg(test)] mod test; +pub mod v2; pub mod vtable; pub type LayoutContext = Context; diff --git a/vortex-layout/src/segments/source.rs b/vortex-layout/src/segments/source.rs index a48a79b2889..3a5b3f84b99 100644 --- a/vortex-layout/src/segments/source.rs +++ b/vortex-layout/src/segments/source.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::sync::Arc; + use futures::future::BoxFuture; use vortex_array::buffer::BufferHandle; use vortex_error::VortexResult; @@ -9,6 +11,9 @@ use crate::segments::SegmentId; /// Static future resolving to a segment byte buffer. pub type SegmentFuture = BoxFuture<'static, VortexResult>; +/// A reference-counted segment source. +pub type SegmentSourceRef = Arc; + /// A trait for providing segment data to a [`crate::LayoutReader`]. pub trait SegmentSource: 'static + Send + Sync { /// Request a segment, returning a future that will eventually resolve to the segment data. diff --git a/vortex-layout/src/v2/expression.rs b/vortex-layout/src/v2/expression.rs new file mode 100644 index 00000000000..76f639f987f --- /dev/null +++ b/vortex-layout/src/v2/expression.rs @@ -0,0 +1,52 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use itertools::Itertools; +use vortex_array::expr::Expression; +use vortex_array::expr::Literal; +use vortex_array::expr::Root; +use vortex_error::VortexResult; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::readers::constant::ConstantReader; +use crate::v2::readers::scalar_fn::ScalarFnReader; + +impl dyn Reader + '_ { + /// Apply the expression to this reader, producing a new reader in constant time. + pub fn apply(self: Arc, expr: &Expression) -> VortexResult { + // If the expression is a root, return self. + if expr.is::() { + return Ok(self); + } + + // Manually convert literals to ConstantArray. + if let Some(scalar) = expr.as_opt::() { + return Ok(Arc::new(ConstantReader::new( + scalar.clone(), + self.row_count(), + ))); + } + + let row_count = self.row_count(); + + // Otherwise, collect the child readers. + let children: Vec<_> = expr + .children() + .iter() + .map(|e| self.clone().apply(e)) + .try_collect()?; + + // And wrap the scalar function up in an array. + let reader: ReaderRef = Arc::new(ScalarFnReader::try_new( + expr.scalar_fn().clone(), + children, + row_count, + )?); + + // Optimize the resulting reader. + reader.optimize() + } +} diff --git a/vortex-layout/src/v2/expressions/falsify.rs b/vortex-layout/src/v2/expressions/falsify.rs new file mode 100644 index 00000000000..23235ca7971 --- /dev/null +++ b/vortex-layout/src/v2/expressions/falsify.rs @@ -0,0 +1,92 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt::Display; +use std::fmt::Formatter; + +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::ConstantArray; +use vortex_array::expr::Arity; +use vortex_array::expr::ChildName; +use vortex_array::expr::ExecutionArgs; +use vortex_array::expr::ExprId; +use vortex_array::expr::Expression; +use vortex_array::expr::VTable; +use vortex_dtype::DType; +use vortex_dtype::Nullability; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; + +/// An expression that evaluates to true when the predicate is provably false, without evaluating +/// it. +/// +/// Falsify typically reduces to operations over statistics expressions. For example, +/// the expression `falsify(col > 5)` may reduce to `col.max() <= 5`. +/// +/// If a falsify expression cannot be reduced, it evaluates to `false` for all inputs. +pub struct Falsify; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct FalsifyOptions { + predicate: Expression, +} + +impl Display for FalsifyOptions { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "predicate={}", self.predicate) + } +} + +impl VTable for Falsify { + // FIXME(ngates): should the predicate be a child expression, or live like this in the options. + // It's a bit weird? Maybe it makes implementing the optimizer rules a little more fiddly? + // But it's weird to have a child expression that we know is never executed. + type Options = FalsifyOptions; + + fn id(&self) -> ExprId { + ExprId::from("falsify") + } + + fn arity(&self, _options: &Self::Options) -> Arity { + Arity::Exact(0) + } + + fn child_name(&self, _options: &Self::Options, _child_idx: usize) -> ChildName { + ChildName::from("predicate") + } + + fn fmt_sql( + &self, + _options: &Self::Options, + expr: &Expression, + f: &mut Formatter<'_>, + ) -> std::fmt::Result { + write!(f, "falsify(")?; + expr.child(0).fmt_sql(f)?; + write!(f, ")") + } + + fn return_dtype(&self, _options: &Self::Options, arg_dtypes: &[DType]) -> VortexResult { + if !arg_dtypes[0].is_boolean() { + vortex_bail!("falsify() requires a boolean argument"); + } + Ok(DType::Bool(Nullability::NonNullable)) + } + + // NOTE(ngates): do we prefer evaluate or execute semantics??? + fn evaluate( + &self, + _options: &Self::Options, + _expr: &Expression, + scope: &ArrayRef, + ) -> VortexResult { + // Unless falsify has been reduced by another expression, we cannot prove the predicate + // is false. Therefore, we return a constant false array. + Ok(ConstantArray::new(false, scope.len()).into_array()) + } + + fn execute(&self, _options: &Self::Options, _args: ExecutionArgs) -> VortexResult { + Ok(Datum::Scalar(Scalar::Bool(BoolScalar::new(Some(false))))) + } +} diff --git a/vortex-layout/src/v2/expressions/mod.rs b/vortex-layout/src/v2/expressions/mod.rs new file mode 100644 index 00000000000..41b859483a0 --- /dev/null +++ b/vortex-layout/src/v2/expressions/mod.rs @@ -0,0 +1,4 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod falsify; diff --git a/vortex-layout/src/v2/matcher.rs b/vortex-layout/src/v2/matcher.rs new file mode 100644 index 00000000000..dcd68f2dc1d --- /dev/null +++ b/vortex-layout/src/v2/matcher.rs @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::expr; + +use crate::v2::reader::Reader; +use crate::v2::readers::scalar_fn::ScalarFnReader; + +impl dyn Reader + '_ { + /// If this reader is a [`ScalarFnReader`], return its scalar function options + pub fn as_scalar_fn(&self) -> Option<&V::Options> { + self.as_any() + .downcast_ref::() + .and_then(|r| r.scalar_fn().as_opt::()) + } +} diff --git a/vortex-layout/src/v2/mod.rs b/vortex-layout/src/v2/mod.rs new file mode 100644 index 00000000000..6b4185aaf5c --- /dev/null +++ b/vortex-layout/src/v2/mod.rs @@ -0,0 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +pub mod reader; +pub mod readers; diff --git a/vortex-layout/src/v2/optimizer.rs b/vortex-layout/src/v2/optimizer.rs new file mode 100644 index 00000000000..343179037ac --- /dev/null +++ b/vortex-layout/src/v2/optimizer.rs @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use vortex_error::VortexResult; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; + +impl dyn Reader + '_ { + pub fn optimize(self: Arc) -> VortexResult { + // TODO(ngates): run the reduce rules + Ok(self) + } +} diff --git a/vortex-layout/src/v2/reader.rs b/vortex-layout/src/v2/reader.rs new file mode 100644 index 00000000000..bf84cc14163 --- /dev/null +++ b/vortex-layout/src/v2/reader.rs @@ -0,0 +1,71 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::fmt; +use std::fmt::Display; +use std::ops::Range; +use std::sync::Arc; + +use termtree::Tree; +use vortex_array::ArrayFuture; +use vortex_array::expr::Expression; +use vortex_dtype::DType; +use vortex_error::VortexResult; + +pub type ReaderRef = Arc; + +/// A reader provides an interface for loading data from row-indexed layouts. +/// +/// Readers have a concrete row count allowing fixed partitions over a known set of rows. Readers +/// are driven by calling `next_chunk()` which returns an [`ArrayFuture`] with a known length. +pub trait Reader: 'static + Send + Sync { + /// Downcast the reader to a concrete type. + fn as_any(&self) -> &dyn Any; + + /// Get the data type of the layout being read. + fn dtype(&self) -> &DType; + + /// Returns the number of rows in the reader. + fn row_count(&self) -> u64; + + /// Apply an expression to the reader, returning a new reader that will execute the expression + /// on top of the current reader. + fn apply(&self, expression: &Expression) -> VortexResult; + + /// Creates a scan over the given row range of the reader. + fn execute(&self, row_range: Range) -> VortexResult; + + /// Build a tree representation of this reader for display purposes. + fn display_tree(&self) -> Tree; +} + +/// A convenience wrapper for displaying a reader tree. +pub struct DisplayReaderTree<'a>(pub &'a dyn Reader); + +impl Display for DisplayReaderTree<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0.display_tree()) + } +} + +pub type ReaderStreamRef = Box; + +/// A stream of array chunks from a reader. +/// +/// Each call to `next_chunk()` returns an [`ArrayFuture`] whose length is determined by +/// the reader (not the caller). Returns `None` when the stream is exhausted. +pub trait ReaderStream: 'static + Send + Sync { + /// The data type of the returned data. + fn dtype(&self) -> &DType; + + /// Skip the next `n` rows of the stream. + /// + /// # Panics + /// + /// Panics if `n` is greater than the number of rows remaining in the stream. + fn skip(&mut self, n: usize); + + /// Returns the next chunk of data as an [`ArrayFuture`], or `None` if no more chunks. + fn next_chunk(&mut self) -> VortexResult>; +} diff --git a/vortex-layout/src/v2/readers/chunked.rs b/vortex-layout/src/v2/readers/chunked.rs new file mode 100644 index 00000000000..d08a8a4ff15 --- /dev/null +++ b/vortex-layout/src/v2/readers/chunked.rs @@ -0,0 +1,211 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::collections::VecDeque; +use std::ops::Range; +use std::sync::Arc; + +use termtree::Tree; +use vortex_array::ArrayFuture; +use vortex_array::expr::Expression; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_error::vortex_panic; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; + +/// A reader over a sequence of chunk readers with the same dtype. +pub struct ChunkedReader { + row_count: u64, + dtype: DType, + chunks: Vec, +} + +impl ChunkedReader { + pub fn new(row_count: u64, dtype: DType, chunks: Vec) -> Self { + Self { + row_count, + dtype, + chunks, + } + } +} + +impl Reader for ChunkedReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn apply(&self, expression: &Expression) -> VortexResult { + let new_chunks: Vec = self + .chunks + .iter() + .map(|chunk| chunk.apply(expression)) + .collect::>()?; + let new_dtype = expression.return_dtype(&self.dtype)?; + Ok(Arc::new(Self { + row_count: self.row_count, + dtype: new_dtype, + chunks: new_chunks, + })) + } + + fn execute(&self, row_range: Range) -> VortexResult { + // Collect overlapping chunk readers (not streams — lazy construction). + let mut remaining_start = row_range.start; + let mut remaining_end = row_range.end; + let mut pending_chunks = VecDeque::new(); + + for chunk in &self.chunks { + let chunk_row_count = chunk.row_count(); + + if remaining_start >= chunk_row_count { + remaining_start -= chunk_row_count; + remaining_end -= chunk_row_count; + continue; + } + + let start_in_chunk = remaining_start; + let end_in_chunk = remaining_end.min(chunk_row_count); + + pending_chunks.push_back(PendingChunk { + reader: chunk.clone(), + row_range: start_in_chunk..end_in_chunk, + }); + + remaining_start = 0; + if remaining_end <= chunk_row_count { + break; + } else { + remaining_end -= chunk_row_count; + } + } + + Ok(Box::new(ChunkedReaderStream { + dtype: self.dtype.clone(), + pending_chunks, + active_stream: None, + })) + } + + fn display_tree(&self) -> Tree { + let label = format!( + "Chunked({}, rows={}, chunks={})", + self.dtype, + self.row_count, + self.chunks.len() + ); + let mut tree = Tree::new(label); + for (i, chunk) in self.chunks.iter().enumerate() { + let child = chunk.display_tree(); + tree.push(Tree::new(format!("[{}]: {}", i, child.root)).with_leaves(child.leaves)); + } + tree + } +} + +struct PendingChunk { + reader: ReaderRef, + row_range: Range, +} + +/// A stream that lazily constructs child streams as it advances through chunks. +struct ChunkedReaderStream { + dtype: DType, + pending_chunks: VecDeque, + active_stream: Option, +} + +impl ChunkedReaderStream { + /// Ensure we have an active stream pointing at a non-exhausted chunk. + /// Returns `Ok(true)` if a stream is ready, `Ok(false)` if no more chunks. + fn ensure_active_stream(&mut self) -> VortexResult { + loop { + if self.active_stream.is_some() { + return Ok(true); + } + + // Try to activate the next pending chunk. + let Some(pending) = self.pending_chunks.pop_front() else { + return Ok(false); + }; + self.active_stream = Some(pending.reader.execute(pending.row_range)?); + } + } +} + +impl ReaderStream for ChunkedReaderStream { + fn dtype(&self) -> &DType { + &self.dtype + } + + fn skip(&mut self, mut n: usize) { + while n > 0 { + // Try to skip within the active stream. + if let Some(ref mut stream) = self.active_stream { + // We don't know the stream's remaining length without next_chunk_len, + // so we try to skip and if there's no more data, move on. + // For skip, the contract says n must not exceed remaining, so we delegate. + stream.skip(n); + return; + } + + // Skip entire pending chunks without constructing streams. + let Some(pending) = self.pending_chunks.front() else { + vortex_panic!("Cannot skip {} more rows, no chunks remaining", n); + }; + let chunk_rows = usize::try_from(pending.row_range.end - pending.row_range.start) + .unwrap_or(usize::MAX); + if n >= chunk_rows { + self.pending_chunks.pop_front(); + n -= chunk_rows; + } else { + // Partial skip — construct the stream and skip within it. + match self.pending_chunks.pop_front() { + Some(pending) => { + let mut stream = match pending.reader.execute(pending.row_range) { + Ok(s) => s, + Err(e) => vortex_panic!("failed to execute chunk reader: {e}"), + }; + stream.skip(n); + self.active_stream = Some(stream); + } + None => vortex_panic!("pending chunk disappeared during skip"), + } + return; + } + } + } + + fn next_chunk(&mut self) -> VortexResult> { + loop { + if !self.ensure_active_stream()? { + return Ok(None); + } + + let Some(stream) = self.active_stream.as_mut() else { + return Ok(None); + }; + + match stream.next_chunk()? { + Some(future) => return Ok(Some(future)), + None => { + // Current stream is exhausted, try next chunk. + self.active_stream = None; + } + } + } + } +} diff --git a/vortex-layout/src/v2/readers/constant.rs b/vortex-layout/src/v2/readers/constant.rs new file mode 100644 index 00000000000..f729a50c4a1 --- /dev/null +++ b/vortex-layout/src/v2/readers/constant.rs @@ -0,0 +1,126 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +use termtree::Tree; +use vortex_array::ArrayFuture; +use vortex_array::IntoArray; +use vortex_array::arrays::ConstantArray; +use vortex_array::expr::Expression; +use vortex_array::expr::root; +use vortex_array::expr::transform::replace; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_error::vortex_panic; +use vortex_scalar::Scalar; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; + +/// A reader that produces constant values. +pub struct ConstantReader { + scalar: Scalar, + row_count: u64, + expression: Option, +} + +impl ConstantReader { + pub fn new(scalar: Scalar, row_count: u64) -> Self { + Self { + scalar, + row_count, + expression: None, + } + } +} + +impl Reader for ConstantReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + self.scalar.dtype() + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn apply(&self, expression: &Expression) -> VortexResult { + let new_expr = match &self.expression { + None => expression.clone(), + Some(existing) => replace(existing.clone(), &root(), expression.clone()), + }; + Ok(Arc::new(Self { + scalar: self.scalar.clone(), + row_count: self.row_count, + expression: Some(new_expr), + })) + } + + fn execute(&self, row_range: Range) -> VortexResult { + let remaining = row_range.end.saturating_sub(row_range.start); + Ok(Box::new(ConstantReaderStream { + scalar: self.scalar.clone(), + remaining, + expression: self.expression.clone(), + estimated_bytes_per_row: super::estimated_decoded_bytes(self.scalar.dtype(), 1), + })) + } + + fn display_tree(&self) -> Tree { + Tree::new(format!( + "Constant({}, rows={}, value={})", + self.scalar.dtype(), + self.row_count, + self.scalar + )) + } +} + +struct ConstantReaderStream { + scalar: Scalar, + remaining: u64, + expression: Option, + estimated_bytes_per_row: usize, +} + +impl ReaderStream for ConstantReaderStream { + fn dtype(&self) -> &DType { + self.scalar.dtype() + } + + fn skip(&mut self, n: usize) { + let n = n as u64; + if n > self.remaining { + vortex_panic!("Cannot skip {} rows, only {} remaining", n, self.remaining); + } + self.remaining -= n; + } + + fn next_chunk(&mut self) -> VortexResult> { + if self.remaining == 0 { + return Ok(None); + } + + let len = usize::try_from(self.remaining).unwrap_or(usize::MAX); + let scalar = self.scalar.clone(); + let expression = self.expression.clone(); + let estimated_bytes = len * self.estimated_bytes_per_row; + self.remaining = 0; + + Ok(Some(ArrayFuture::new(len, estimated_bytes, async move { + let mut array = ConstantArray::new(scalar, len).into_array(); + if let Some(e) = expression { + array = array.apply(&e)?; + } + Ok(array) + }))) + } +} diff --git a/vortex-layout/src/v2/readers/dict.rs b/vortex-layout/src/v2/readers/dict.rs new file mode 100644 index 00000000000..5efdc3616c3 --- /dev/null +++ b/vortex-layout/src/v2/readers/dict.rs @@ -0,0 +1,168 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +use futures::FutureExt; +use futures::future::BoxFuture; +use futures::future::Shared; +use termtree::Tree; +use vortex_array::ArrayFuture; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::DictArray; +use vortex_array::arrays::SharedArray; +use vortex_array::expr::Expression; +use vortex_array::expr::root; +use vortex_array::expr::transform::replace; +use vortex_dtype::DType; +use vortex_error::VortexError; +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; + +type SharedVortexResult = Result>; + +/// A reader that reconstructs dict-encoded arrays from separate values and codes readers. +/// +/// The values reader holds the dictionary (typically a small flat array), while the codes +/// reader holds integer indices into the dictionary (row-aligned with the parent layout). +pub struct DictReader { + dtype: DType, + values: ReaderRef, + codes: ReaderRef, + expression: Option, +} + +impl DictReader { + pub fn new(dtype: DType, values: ReaderRef, codes: ReaderRef) -> Self { + Self { + dtype, + values, + codes, + expression: None, + } + } +} + +impl Reader for DictReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.codes.row_count() + } + + fn apply(&self, expression: &Expression) -> VortexResult { + // Dict encoding is transparent — expressions are applied after reconstruction. + // We compose the expression and apply it after creating the DictArray. + let new_expr = match &self.expression { + None => expression.clone(), + Some(existing) => replace(existing.clone(), &root(), expression.clone()), + }; + let new_dtype = new_expr.return_dtype(&self.dtype)?; + Ok(Arc::new(Self { + dtype: new_dtype, + values: self.values.clone(), + codes: self.codes.clone(), + expression: Some(new_expr), + })) + } + + fn execute(&self, row_range: Range) -> VortexResult { + // Read the full dictionary values once and share across all chunks. + let values_row_count = self.values.row_count(); + let mut values_stream = self.values.execute(0..values_row_count)?; + let values_array_future = values_stream + .next_chunk()? + .ok_or_else(|| vortex_err!("Dict values stream is empty"))?; + let values_fut: Shared>> = async move { + let array = values_array_future.await?; + Ok(SharedArray::new(array).into_array()) + } + .map(|r: VortexResult| r.map_err(Arc::new)) + .boxed() + .shared(); + + let codes_stream = self.codes.execute(row_range)?; + + Ok(Box::new(DictReaderStream { + dtype: self.dtype.clone(), + codes_stream, + values_fut, + expression: self.expression.clone(), + estimated_bytes_per_row: super::estimated_decoded_bytes(&self.dtype, 1), + })) + } + + fn display_tree(&self) -> Tree { + let mut label = format!("Dict({}, rows={}", self.dtype, self.codes.row_count()); + if let Some(expr) = &self.expression { + label.push_str(&format!(", expr={}", expr)); + } + label.push(')'); + + let mut tree = Tree::new(label); + + let values_child = self.values.display_tree(); + tree.push( + Tree::new(format!("values: {}", values_child.root)).with_leaves(values_child.leaves), + ); + + let codes_child = self.codes.display_tree(); + tree.push( + Tree::new(format!("codes: {}", codes_child.root)).with_leaves(codes_child.leaves), + ); + + tree + } +} + +struct DictReaderStream { + dtype: DType, + codes_stream: ReaderStreamRef, + values_fut: Shared>>, + expression: Option, + estimated_bytes_per_row: usize, +} + +impl ReaderStream for DictReaderStream { + fn dtype(&self) -> &DType { + &self.dtype + } + + fn skip(&mut self, n: usize) { + self.codes_stream.skip(n); + } + + fn next_chunk(&mut self) -> VortexResult> { + let Some(codes_future) = self.codes_stream.next_chunk()? else { + return Ok(None); + }; + let values_fut = self.values_fut.clone(); + let expression = self.expression.clone(); + let len = codes_future.len(); + let estimated_bytes = len * self.estimated_bytes_per_row; + + Ok(Some(ArrayFuture::new(len, estimated_bytes, async move { + let values = values_fut.await.map_err(|e| vortex_err!("{e}"))?; + let codes = codes_future.await?; + let mut array = DictArray::try_new(codes, values)?.into_array(); + if let Some(expr) = expression { + array = array.apply(&expr)?; + } + Ok(array) + }))) + } +} diff --git a/vortex-layout/src/v2/readers/flat.rs b/vortex-layout/src/v2/readers/flat.rs new file mode 100644 index 00000000000..fd8a14a3d75 --- /dev/null +++ b/vortex-layout/src/v2/readers/flat.rs @@ -0,0 +1,224 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +use futures::FutureExt; +use futures::future::BoxFuture; +use futures::future::Shared; +use termtree::Tree; +use vortex_array::ArrayContext; +use vortex_array::ArrayFuture; +use vortex_array::ArrayRef; +use vortex_array::expr::Expression; +use vortex_array::expr::root; +use vortex_array::expr::transform::replace; +use vortex_array::serde::ArrayParts; +use vortex_buffer::ByteBuffer; +use vortex_dtype::DType; +use vortex_error::SharedVortexResult; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_error::vortex_panic; +use vortex_session::VortexSession; + +use crate::segments::SegmentId; +use crate::segments::SegmentSourceRef; +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; + +type SharedArrayFuture = Shared>>; + +/// A leaf reader that reads a single flat segment. +/// +/// The segment source handles caching — multiple streams reading the same segment will share +/// the underlying I/O through the segment source's cache. +pub struct FlatReader { + len: usize, + dtype: DType, + decode_dtype: DType, + array_tree: Option, + segment_id: SegmentId, + segment_source: SegmentSourceRef, + ctx: ArrayContext, + session: VortexSession, + expression: Option, +} + +impl FlatReader { + pub fn new( + len: usize, + dtype: DType, + array_tree: Option, + segment_id: SegmentId, + segment_source: SegmentSourceRef, + ctx: ArrayContext, + session: VortexSession, + ) -> Self { + Self { + len, + decode_dtype: dtype.clone(), + dtype, + array_tree, + segment_id, + segment_source, + ctx, + session, + expression: None, + } + } +} + +impl Reader for FlatReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.len as u64 + } + + fn apply(&self, expression: &Expression) -> VortexResult { + let new_expr = match &self.expression { + None => expression.clone(), + Some(existing) => replace(existing.clone(), &root(), expression.clone()), + }; + let new_dtype = new_expr.return_dtype(&self.dtype)?; + Ok(Arc::new(Self { + len: self.len, + dtype: new_dtype, + decode_dtype: self.decode_dtype.clone(), + array_tree: self.array_tree.clone(), + segment_id: self.segment_id, + segment_source: self.segment_source.clone(), + ctx: self.ctx.clone(), + session: self.session.clone(), + expression: Some(new_expr), + })) + } + + fn execute(&self, row_range: Range) -> VortexResult { + let start = usize::try_from(row_range.start) + .map_err(|_| vortex_err!("row range start too large for usize"))?; + let end = usize::try_from(row_range.end) + .map_err(|_| vortex_err!("row range end too large for usize"))?; + + if start > self.len || end > self.len || start > end { + vortex_bail!( + "Row range {}..{} out of bounds for flat reader of length {}", + start, + end, + self.len + ); + } + + // Decode the array once and share the result across all next_chunk calls. + let segment_source = self.segment_source.clone(); + let segment_id = self.segment_id; + let array_tree = self.array_tree.clone(); + let decode_dtype = self.decode_dtype.clone(); + let row_count = self.len; + let ctx = self.ctx.clone(); + let session = self.session.clone(); + let array_fut = async move { + let segment = segment_source.request(segment_id).await?; + let parts = if let Some(array_tree) = array_tree { + ArrayParts::from_flatbuffer_and_segment(array_tree, segment)? + } else { + ArrayParts::try_from(segment)? + }; + parts.decode(&decode_dtype, row_count, &ctx, &session) + } + .map(|r| r.map_err(Arc::new)) + .boxed() + .shared(); + + Ok(Box::new(FlatReaderStream { + dtype: self.dtype.clone(), + array_fut, + expression: self.expression.clone(), + row_count: self.len, + offset: start, + remaining: end - start, + estimated_bytes_per_row: super::estimated_decoded_bytes(&self.dtype, 1), + })) + } + + fn display_tree(&self) -> Tree { + let mut label = format!( + "Flat({}, rows={}, segment={}", + self.dtype, self.len, self.segment_id + ); + if let Some(expr) = &self.expression { + label.push_str(&format!(", expr={}", expr)); + } + label.push(')'); + Tree::new(label) + } +} + +struct FlatReaderStream { + dtype: DType, + array_fut: SharedArrayFuture, + expression: Option, + row_count: usize, + offset: usize, + remaining: usize, + estimated_bytes_per_row: usize, +} + +impl ReaderStream for FlatReaderStream { + fn dtype(&self) -> &DType { + &self.dtype + } + + fn skip(&mut self, n: usize) { + if n > self.remaining { + vortex_panic!("Cannot skip {} rows, only {} remaining", n, self.remaining); + } + self.offset += n; + self.remaining -= n; + } + + fn next_chunk(&mut self) -> VortexResult> { + if self.remaining == 0 { + return Ok(None); + } + + let array_fut = self.array_fut.clone(); + let row_count = self.row_count; + let offset = self.offset; + let len = self.remaining; + let expression = self.expression.clone(); + let estimated_bytes = len * self.estimated_bytes_per_row; + + self.offset += len; + self.remaining = 0; + + Ok(Some(ArrayFuture::new(len, estimated_bytes, async move { + // Await the shared array future (decoded once, shared across chunks). + let mut array: ArrayRef = array_fut.await?; + + // Slice to the requested row range within the segment. + if offset > 0 || len < row_count { + array = array.slice(offset..offset + len)?; + } + + // Apply any accumulated expression. + if let Some(expr) = expression { + array = array.apply(&expr)?; + } + + Ok(array) + }))) + } +} diff --git a/vortex-layout/src/v2/readers/mod.rs b/vortex-layout/src/v2/readers/mod.rs new file mode 100644 index 00000000000..7bc328832c6 --- /dev/null +++ b/vortex-layout/src/v2/readers/mod.rs @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_dtype::DType; +use vortex_dtype::DecimalType; + +pub mod chunked; +pub mod constant; +pub mod dict; +pub mod flat; +pub mod scalar_fn; +pub mod struct_; +pub mod zoned; + +/// Estimates the decoded in-memory byte size for `n` elements of the given type. +/// +/// For fixed-width types this is exact. For variable-width types (strings, binary, lists) +/// a heuristic estimate is used. +fn estimated_decoded_bytes(dtype: &DType, n: usize) -> usize { + const VARIABLE_ELEMENT_ESTIMATE: usize = 64; + match dtype { + DType::Null => 0, + DType::Bool(_) => n, + DType::Primitive(ptype, _) => n * ptype.byte_width(), + DType::Decimal(decimal, _) => { + n * DecimalType::smallest_decimal_value_type(decimal).byte_width() + } + DType::Utf8(_) | DType::Binary(_) => n * VARIABLE_ELEMENT_ESTIMATE, + DType::List(elem_dtype, _) => n * 10 * estimated_decoded_bytes(elem_dtype, 1), + DType::FixedSizeList(elem_dtype, list_size, _) => { + n * estimated_decoded_bytes(elem_dtype, *list_size as usize) + } + DType::Struct(fields, _) => fields + .fields() + .map(|f| estimated_decoded_bytes(&f, n)) + .sum(), + DType::Extension(ext) => estimated_decoded_bytes(ext.storage_dtype(), n), + } +} diff --git a/vortex-layout/src/v2/readers/scalar_fn.rs b/vortex-layout/src/v2/readers/scalar_fn.rs new file mode 100644 index 00000000000..7387f4dd29e --- /dev/null +++ b/vortex-layout/src/v2/readers/scalar_fn.rs @@ -0,0 +1,254 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +use futures::future::try_join_all; +use itertools::Itertools; +use termtree::Tree; +use vortex_array::ArrayFuture; +use vortex_array::IntoArray; +use vortex_array::arrays::ScalarFnArray; +use vortex_array::expr::Expression; +use vortex_array::expr::Literal; +use vortex_array::expr::Root; +use vortex_array::expr::ScalarFn; +use vortex_array::expr::VTable; +use vortex_array::expr::VTableExt; +use vortex_array::optimizer::ArrayOptimizer; +use vortex_dtype::DType; +use vortex_error::VortexResult; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; +use crate::v2::readers::constant::ConstantReader; + +/// A [`Reader`] for applying a scalar function to child readers. +pub struct ScalarFnReader { + scalar_fn: ScalarFn, + dtype: DType, + row_count: u64, + children: Vec, +} + +impl ScalarFnReader { + pub fn try_new( + scalar_fn: ScalarFn, + children: Vec, + row_count: u64, + ) -> VortexResult { + let dtype = scalar_fn.return_dtype( + &children + .iter() + .map(|c| c.dtype().clone()) + .collect::>(), + )?; + + Ok(Self { + scalar_fn, + dtype, + row_count, + children, + }) + } + + pub fn scalar_fn(&self) -> &ScalarFn { + &self.scalar_fn + } +} + +impl Reader for ScalarFnReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn apply(&self, expression: &Expression) -> VortexResult { + // Treat this ScalarFnReader as a data source. Resolve Root to a clone of self, + // literals to constants, and wrap everything else in a new ScalarFnReader. + if expression.is::() { + return Ok(Arc::new(Self { + scalar_fn: self.scalar_fn.clone(), + dtype: self.dtype.clone(), + row_count: self.row_count, + children: self.children.clone(), + })); + } + + if let Some(scalar) = expression.as_opt::() { + return Ok(Arc::new(ConstantReader::new( + scalar.clone(), + self.row_count, + ))); + } + + let resolved_children: Vec = expression + .children() + .iter() + .map(|child| self.apply(child)) + .try_collect()?; + + Ok(Arc::new(Self::try_new( + expression.scalar_fn().clone(), + resolved_children, + self.row_count, + )?)) + } + + fn execute(&self, row_range: Range) -> VortexResult { + let input_streams = self + .children + .iter() + .map(|child| child.execute(row_range.clone())) + .collect::>>()?; + + let num_inputs = input_streams.len(); + Ok(Box::new(ScalarFnArrayStream { + dtype: self.dtype.clone(), + scalar_fn: self.scalar_fn.clone(), + input_streams, + input_buffers: vec![None; num_inputs], + })) + } + + fn display_tree(&self) -> Tree { + let label = format!( + "ScalarFn({}, rows={}, fn={})", + self.dtype, self.row_count, self.scalar_fn + ); + let mut tree = Tree::new(label); + for (i, child) in self.children.iter().enumerate() { + let child_tree = child.display_tree(); + tree.push( + Tree::new(format!("[{}]: {}", i, child_tree.root)).with_leaves(child_tree.leaves), + ); + } + tree + } +} + +struct ScalarFnArrayStream { + dtype: DType, + scalar_fn: ScalarFn, + input_streams: Vec, + input_buffers: Vec>, +} + +impl ScalarFnArrayStream { + /// Get the next ArrayFuture for an input stream, taking from the buffer first. + fn next_for_input( + stream: &mut ReaderStreamRef, + buffer: &mut Option, + ) -> VortexResult> { + if let Some(buffered) = buffer.take() { + return Ok(Some(buffered)); + } + stream.next_chunk() + } +} + +impl ReaderStream for ScalarFnArrayStream { + fn dtype(&self) -> &DType { + &self.dtype + } + + fn skip(&mut self, n: usize) { + for (stream, buffer) in self + .input_streams + .iter_mut() + .zip(self.input_buffers.iter_mut()) + { + let mut remaining = n; + if let Some(buf) = buffer.take() { + if remaining < buf.len() { + *buffer = Some(buf.slice(remaining..buf.len())); + continue; + } + remaining -= buf.len(); + } + if remaining > 0 { + stream.skip(remaining); + } + } + } + + fn next_chunk(&mut self) -> VortexResult> { + // Collect an ArrayFuture for each input. + let mut all_futures: Vec = Vec::with_capacity(self.input_streams.len()); + + for (stream, buffer) in self + .input_streams + .iter_mut() + .zip(self.input_buffers.iter_mut()) + { + let Some(future) = Self::next_for_input(stream, buffer)? else { + return Ok(None); + }; + all_futures.push(future); + } + + if all_futures.is_empty() { + return Ok(None); + } + + // Find the minimum length. + let min_len = all_futures.iter().map(|f| f.len()).min().unwrap_or(0); + if min_len == 0 { + return Ok(None); + } + + // For inputs with len > min_len, buffer the remainder and slice. + let mut chunk_futures: Vec = Vec::with_capacity(all_futures.len()); + for (idx, future) in all_futures.into_iter().enumerate() { + if future.len() > min_len { + let remainder = future.slice(min_len..future.len()); + let chunk = future.slice(0..min_len); + self.input_buffers[idx] = Some(remainder); + chunk_futures.push(chunk); + } else { + chunk_futures.push(future); + } + } + + let scalar_fn = self.scalar_fn.clone(); + let estimated_bytes: usize = chunk_futures.iter().map(|f| f.estimated_bytes()).sum(); + Ok(Some(ArrayFuture::new( + min_len, + estimated_bytes, + async move { + let input_arrays = try_join_all(chunk_futures).await?; + let array = ScalarFnArray::try_new(scalar_fn, input_arrays, min_len)?.into_array(); + let array = array.optimize()?; + Ok(array) + }, + ))) + } +} + +pub trait ScalarFnReaderExt: VTable { + /// Creates a [`ScalarFnReader`] applying this scalar function to the given children. + fn new_reader( + &'static self, + options: Self::Options, + children: Vec, + row_count: u64, + ) -> VortexResult { + Ok(Arc::new(ScalarFnReader::try_new( + self.bind(options), + children, + row_count, + )?)) + } +} +impl ScalarFnReaderExt for V {} diff --git a/vortex-layout/src/v2/readers/struct_.rs b/vortex-layout/src/v2/readers/struct_.rs new file mode 100644 index 00000000000..3e1840d6418 --- /dev/null +++ b/vortex-layout/src/v2/readers/struct_.rs @@ -0,0 +1,308 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +use futures::future::try_join_all; +use itertools::Itertools; +use termtree::Tree; +use vortex_array::ArrayFuture; +use vortex_array::IntoArray; +use vortex_array::arrays::StructArray; +use vortex_array::expr::Expression; +use vortex_array::expr::GetItem; +use vortex_array::expr::Literal; +use vortex_array::expr::Root; +use vortex_array::validity::Validity; +use vortex_dtype::DType; +use vortex_error::VortexResult; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; +use crate::v2::readers::constant::ConstantReader; +use crate::v2::readers::scalar_fn::ScalarFnReader; + +/// A reader over a struct with named fields. +pub struct StructReader { + row_count: u64, + dtype: DType, + validity: Option, + fields: Vec, +} + +impl StructReader { + pub fn new( + row_count: u64, + dtype: DType, + validity: Option, + fields: Vec, + ) -> Self { + Self { + row_count, + dtype, + validity, + fields, + } + } + + /// Recursively resolve an expression through this struct, extracting fields where possible. + fn resolve_expr(&self, expr: &Expression) -> VortexResult { + // Root references this struct. + if expr.is::() { + return Ok(Arc::new(Self { + row_count: self.row_count, + dtype: self.dtype.clone(), + validity: self.validity.clone(), + fields: self.fields.clone(), + })); + } + + // Literals become constant readers. + if let Some(scalar) = expr.as_opt::() { + return Ok(Arc::new(ConstantReader::new( + scalar.clone(), + self.row_count, + ))); + } + + // Recursively resolve all children through this struct. + let resolved_children: Vec = expr + .children() + .iter() + .map(|child| self.resolve_expr(child)) + .try_collect()?; + + // If this is GetItem and the resolved child is a StructReader, extract the field directly. + if let Some(field_name) = expr.as_opt::() { + debug_assert_eq!(resolved_children.len(), 1); + let child_reader = &resolved_children[0]; + if let Some(struct_reader) = child_reader.as_any().downcast_ref::() { + let struct_fields = struct_reader + .dtype + .as_struct_fields_opt() + .ok_or_else(|| vortex_error::vortex_err!("Expected struct dtype"))?; + let field_idx = struct_fields.find(field_name).ok_or_else(|| { + vortex_error::vortex_err!("Field '{}' not found in struct", field_name) + })?; + return Ok(struct_reader.fields[field_idx].clone()); + } + // Otherwise, the child is some other reader (e.g., ChunkedReader). Delegate apply. + return child_reader.apply(expr); + } + + // For any other scalar function, wrap the resolved children. + Ok(Arc::new(ScalarFnReader::try_new( + expr.scalar_fn().clone(), + resolved_children, + self.row_count, + )?)) + } +} + +impl Reader for StructReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn apply(&self, expression: &Expression) -> VortexResult { + self.resolve_expr(expression) + } + + fn execute(&self, row_range: Range) -> VortexResult { + let validity_stream = self + .validity + .as_ref() + .map(|v| v.execute(row_range.clone())) + .transpose()?; + let field_streams = self + .fields + .iter() + .map(|field| field.execute(row_range.clone())) + .collect::>>()?; + + let num_fields = field_streams.len(); + Ok(Box::new(StructReaderStream { + dtype: self.dtype.clone(), + validity: validity_stream, + fields: field_streams, + validity_buffer: None, + field_buffers: vec![None; num_fields], + })) + } + + fn display_tree(&self) -> Tree { + let label = format!("Struct({}, rows={})", self.dtype, self.row_count); + let mut tree = Tree::new(label); + + // Add field children with names from the struct dtype. + if let Some(struct_fields) = self.dtype.as_struct_fields_opt() { + for (name, field_reader) in struct_fields.names().iter().zip(self.fields.iter()) { + let child = field_reader.display_tree(); + tree.push(Tree::new(format!("{}: {}", name, child.root)).with_leaves(child.leaves)); + } + } else { + for (i, field_reader) in self.fields.iter().enumerate() { + let child = field_reader.display_tree(); + tree.push(Tree::new(format!("[{}]: {}", i, child.root)).with_leaves(child.leaves)); + } + } + + // Add validity child if present. + if let Some(validity) = &self.validity { + let child = validity.display_tree(); + tree.push(Tree::new(format!("validity: {}", child.root)).with_leaves(child.leaves)); + } + + tree + } +} + +struct StructReaderStream { + dtype: DType, + validity: Option, + fields: Vec, + validity_buffer: Option, + field_buffers: Vec>, +} + +impl StructReaderStream { + /// Get the next ArrayFuture for a child stream, taking from the buffer first. + fn next_for_child( + stream: &mut ReaderStreamRef, + buffer: &mut Option, + ) -> VortexResult> { + if let Some(buffered) = buffer.take() { + return Ok(Some(buffered)); + } + stream.next_chunk() + } +} + +/// Skip `n` rows from an optional stream, consuming from the buffer first. +fn skip_child(stream: Option<&mut ReaderStreamRef>, buffer: &mut Option, n: usize) { + let mut remaining = n; + if let Some(buf) = buffer.take() { + if remaining < buf.len() { + *buffer = Some(buf.slice(remaining..buf.len())); + return; + } + remaining -= buf.len(); + } + if remaining > 0 + && let Some(stream) = stream + { + stream.skip(remaining); + } +} + +impl ReaderStream for StructReaderStream { + fn dtype(&self) -> &DType { + &self.dtype + } + + fn skip(&mut self, n: usize) { + // Skip n rows from each child (validity + fields), consuming from buffers first. + skip_child(self.validity.as_mut(), &mut self.validity_buffer, n); + for (field_stream, field_buf) in self.fields.iter_mut().zip(self.field_buffers.iter_mut()) { + skip_child(Some(field_stream), field_buf, n); + } + } + + fn next_chunk(&mut self) -> VortexResult> { + // Collect an ArrayFuture for each child (validity + fields). + let mut all_futures: Vec = Vec::with_capacity(1 + self.fields.len()); + + // Validity + let has_validity = self.validity.is_some(); + if let Some(ref mut validity_stream) = self.validity { + let Some(validity_future) = + Self::next_for_child(validity_stream, &mut self.validity_buffer)? + else { + return Ok(None); + }; + all_futures.push(validity_future); + } + + // Fields + for (field_stream, field_buf) in self.fields.iter_mut().zip(self.field_buffers.iter_mut()) { + let Some(field_future) = Self::next_for_child(field_stream, field_buf)? else { + return Ok(None); + }; + all_futures.push(field_future); + } + + if all_futures.is_empty() { + return Ok(None); + } + + // Find the minimum length. + let min_len = all_futures.iter().map(|f| f.len()).min().unwrap_or(0); + if min_len == 0 { + return Ok(None); + } + + // For children with len > min_len, buffer the remainder and slice. + let mut chunk_futures: Vec = Vec::with_capacity(all_futures.len()); + + for (buf_idx, future) in all_futures.into_iter().enumerate() { + if future.len() > min_len { + // Buffer the remainder. + let remainder = future.slice(min_len..future.len()); + let chunk = future.slice(0..min_len); + + if buf_idx == 0 && has_validity { + self.validity_buffer = Some(remainder); + } else { + let field_idx = if has_validity { buf_idx - 1 } else { buf_idx }; + self.field_buffers[field_idx] = Some(remainder); + } + + chunk_futures.push(chunk); + } else { + chunk_futures.push(future); + } + } + + let struct_fields = self + .dtype + .as_struct_fields_opt() + .ok_or_else(|| vortex_error::vortex_err!("Expected struct dtype"))? + .clone(); + let nullability = self.dtype.nullability(); + let estimated_bytes: usize = chunk_futures.iter().map(|f| f.estimated_bytes()).sum(); + + Ok(Some(ArrayFuture::new( + min_len, + estimated_bytes, + async move { + // Split off validity future from field futures. + let arrays = try_join_all(chunk_futures).await?; + + let (validity, fields) = if has_validity { + let validity_array = arrays[0].clone(); + let fields = arrays[1..].to_vec(); + (Validity::Array(validity_array), fields) + } else { + (nullability.into(), arrays) + }; + + Ok( + StructArray::try_new_with_dtype(fields, struct_fields, min_len, validity)? + .into_array(), + ) + }, + ))) + } +} diff --git a/vortex-layout/src/v2/readers/zoned.rs b/vortex-layout/src/v2/readers/zoned.rs new file mode 100644 index 00000000000..56a4b04c84e --- /dev/null +++ b/vortex-layout/src/v2/readers/zoned.rs @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +use termtree::Tree; +use vortex_array::expr::Expression; +use vortex_array::expr::stats::Stat; +use vortex_dtype::DType; +use vortex_error::VortexResult; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::reader::ReaderStreamRef; + +/// A reader that wraps a data reader with per-zone statistics for pruning. +/// +/// The zone map reader holds aggregate statistics (min, max, null_count) for each zone +/// of `zone_len` rows. During execute, zone statistics can be used to skip entire zones +/// that cannot match a filter expression. +pub struct ZonedReader { + data: ReaderRef, + zone_map: ReaderRef, + zone_len: usize, + present_stats: Arc<[Stat]>, +} + +impl ZonedReader { + pub fn new( + data: ReaderRef, + zone_map: ReaderRef, + zone_len: usize, + present_stats: Arc<[Stat]>, + ) -> Self { + Self { + data, + zone_map, + zone_len, + present_stats, + } + } +} + +impl Reader for ZonedReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + self.data.dtype() + } + + fn row_count(&self) -> u64 { + self.data.row_count() + } + + fn apply(&self, expression: &Expression) -> VortexResult { + // Apply the expression to the data reader. + let new_data = self.data.apply(expression)?; + + // TODO(ngates): transform the expression into zone-map-compatible pruning checks + // (e.g., `x > 5` becomes `max >= 5`). For now, we just pass through the data + // and keep the zone map as-is. + Ok(Arc::new(ZonedReader { + data: new_data, + zone_map: self.zone_map.clone(), + zone_len: self.zone_len, + present_stats: self.present_stats.clone(), + })) + } + + fn execute(&self, row_range: Range) -> VortexResult { + // TODO(ngates): use zone map to produce a pruning mask, then drive the data stream + // with zones that can't match the filter skipped. + self.data.execute(row_range) + } + + fn display_tree(&self) -> Tree { + let label = format!( + "Zoned({}, rows={}, zone_len={})", + self.data.dtype(), + self.data.row_count(), + self.zone_len + ); + let mut tree = Tree::new(label); + + let data_child = self.data.display_tree(); + tree.push(Tree::new(format!("data: {}", data_child.root)).with_leaves(data_child.leaves)); + + let zone_map_child = self.zone_map.display_tree(); + tree.push( + Tree::new(format!("zone_map: {}", zone_map_child.root)) + .with_leaves(zone_map_child.leaves), + ); + + tree + } +} diff --git a/vortex-layout/src/vtable.rs b/vortex-layout/src/vtable.rs index d51ee61c3c4..54cd6772c7a 100644 --- a/vortex-layout/src/vtable.rs +++ b/vortex-layout/src/vtable.rs @@ -24,6 +24,8 @@ use crate::LayoutRef; use crate::children::LayoutChildren; use crate::segments::SegmentId; use crate::segments::SegmentSource; +use crate::segments::SegmentSourceRef; +use crate::v2::reader::ReaderRef; pub trait VTable: 'static + Sized + Send + Sync + Debug { type Layout: 'static + Send + Sync + Clone + Debug + Deref + IntoLayout; @@ -65,6 +67,15 @@ pub trait VTable: 'static + Sized + Send + Sync + Debug { session: &VortexSession, ) -> VortexResult; + fn new_reader2( + layout: &Self::Layout, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult { + let _ = (layout, segment_source, session); + vortex_bail!("new_reader2 not implemented for layout: {:?}", layout) + } + /// Construct a new [`Layout`] from the provided parts. fn build( encoding: &Self::Encoding, diff --git a/vortex-scalar/Cargo.toml b/vortex-scalar/Cargo.toml index d69c03f8679..d838c6bfdc4 100644 --- a/vortex-scalar/Cargo.toml +++ b/vortex-scalar/Cargo.toml @@ -27,7 +27,6 @@ prost = { workspace = true } vortex-buffer = { workspace = true } vortex-dtype = { workspace = true } vortex-error = { workspace = true } -vortex-mask = { workspace = true } vortex-proto = { workspace = true, features = ["scalar"] } vortex-session = { workspace = true } vortex-utils = { workspace = true } diff --git a/vortex-scan/src/api.rs b/vortex-scan/src/api.rs index bbcfccba526..bc6115b5933 100644 --- a/vortex-scan/src/api.rs +++ b/vortex-scan/src/api.rs @@ -20,6 +20,7 @@ //! example which encodings it knows about. use std::any::Any; +use std::ops::Range; use std::sync::Arc; use async_trait::async_trait; @@ -30,6 +31,8 @@ use vortex_dtype::DType; use vortex_error::VortexResult; use vortex_session::VortexSession; +use crate::Selection; + /// Create a Vortex source from serialized configuration. /// /// Providers can be registered with Vortex under a specific @@ -81,6 +84,10 @@ pub struct ScanRequest { pub filter: Option, /// Optional limit on the number of rows to scan. pub limit: Option, + /// Optional row range to scan within the data source. + pub row_range: Option>, + /// Row selection to apply within the row range. + pub row_selection: Selection, } /// A boxed data source scan. @@ -123,7 +130,7 @@ pub trait Split: 'static + Send { } /// An estimate that can be exact, an upper bound, or unknown. -#[derive(Default)] +#[derive(Debug, Clone, Default)] pub enum Estimate { /// The exact value. Exact(T), diff --git a/vortex-scan/src/layout.rs b/vortex-scan/src/layout.rs index 183a023bd2d..50ce37598ae 100644 --- a/vortex-scan/src/layout.rs +++ b/vortex-scan/src/layout.rs @@ -65,6 +65,12 @@ impl DataSource for LayoutReaderDataSource { builder = builder.with_limit(limit); } + if let Some(row_range) = scan_request.row_range { + builder = builder.with_row_range(row_range); + } + + builder = builder.with_selection(scan_request.row_selection); + let scan = builder.prepare()?; let dtype = scan.dtype().clone(); let splits = scan.execute(None)?; diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index d5d56e68e25..8ebe4f59ef3 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -26,5 +26,6 @@ pub mod layout; mod repeated_scan; #[cfg(test)] mod test; +pub mod v2; pub use repeated_scan::RepeatedScan; diff --git a/vortex-scan/src/selection.rs b/vortex-scan/src/selection.rs index d2fb46d4bb0..7aee9b71c9d 100644 --- a/vortex-scan/src/selection.rs +++ b/vortex-scan/src/selection.rs @@ -12,7 +12,7 @@ use crate::row_mask::RowMask; /// A selection identifies a set of rows to include in the scan (in addition to applying any /// filter predicates). -#[derive(Default, Clone)] +#[derive(Debug, Default, Clone)] pub enum Selection { /// No selection, all rows are included. #[default] diff --git a/vortex-scan/src/v2/mod.rs b/vortex-scan/src/v2/mod.rs new file mode 100644 index 00000000000..4f05171404d --- /dev/null +++ b/vortex-scan/src/v2/mod.rs @@ -0,0 +1,4 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +pub mod scan; diff --git a/vortex-scan/src/v2/scan.rs b/vortex-scan/src/v2/scan.rs new file mode 100644 index 00000000000..13b1959d4f2 --- /dev/null +++ b/vortex-scan/src/v2/scan.rs @@ -0,0 +1,401 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::collections::VecDeque; +use std::ops::BitAnd; +use std::ops::Range; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use futures::Stream; +use futures::StreamExt; +use futures::future::BoxFuture; +use futures::stream::FuturesOrdered; +use vortex_array::ArrayFuture; +use vortex_array::ArrayRef; +use vortex_array::expr::Expression; +use vortex_array::expr::root; +use vortex_array::stream::ArrayStream; +use vortex_buffer::Buffer; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_layout::v2::reader::ReaderRef; +use vortex_layout::v2::reader::ReaderStreamRef; +use vortex_mask::Mask; +use vortex_session::VortexSession; + +use crate::Selection; + +pub struct ScanBuilder2 { + reader: ReaderRef, + projection: Expression, + filter: Option, + limit: Option, + row_range: Range, + row_selection: Selection, // NOTE: applies to the selected row range. + session: VortexSession, + byte_budget: usize, +} + +impl ScanBuilder2 { + pub fn new(reader: ReaderRef, session: VortexSession) -> Self { + let row_range = 0..reader.row_count(); + Self { + reader, + projection: root(), + filter: None, + limit: None, + row_range, + row_selection: Selection::All, + session, + byte_budget: 0, + } + } + + pub fn with_filter(mut self, filter: Expression) -> Self { + self.filter = Some(filter); + self + } + + pub fn with_some_filter(mut self, filter: Option) -> Self { + self.filter = filter; + self + } + + pub fn with_projection(mut self, projection: Expression) -> Self { + self.projection = projection; + self + } + + pub fn with_limit(mut self, limit: u64) -> Self { + self.limit = Some(limit); + self + } + + pub fn with_row_range(mut self, row_range: Range) -> Self { + self.row_range = row_range; + self + } + + /// Sets the row selection to use the given selection (relative to the row range). + pub fn with_row_selection(mut self, row_selection: Selection) -> Self { + self.row_selection = row_selection; + self + } + + /// Sets the row selection to include only the given row indices (relative to the row range). + pub fn with_row_indices(mut self, row_indices: Buffer) -> Self { + self.row_selection = Selection::IncludeByIndex(row_indices); + self + } + + /// Sets the byte budget for pipelining. When set to a non-zero value, the scan will + /// enqueue multiple chunks of work up to the given estimated byte budget, allowing I/O + /// and compute to overlap. + pub fn with_byte_budget(mut self, byte_budget: usize) -> Self { + self.byte_budget = byte_budget; + self + } + + pub fn into_array_stream(self) -> VortexResult { + let projection = self.projection.optimize_recursive(self.reader.dtype())?; + let filter = self + .filter + .map(|f| f.optimize_recursive(self.reader.dtype())) + .transpose()?; + + let dtype = projection.return_dtype(self.reader.dtype())?; + + // Apply expressions to the reader tree. + let projection_reader = self.reader.apply(&projection)?; + let filter_reader = filter.as_ref().map(|f| self.reader.apply(f)).transpose()?; + + tracing::debug!( + "Executing scan with:\nProjection:\n{}\nFilter:\n{}", + projection_reader.display_tree(), + filter_reader + .as_ref() + .map_or("None".to_string(), |f| f.display_tree().to_string()) + ); + + // Execute both readers over the row range to produce streams. + // TODO(ngates): we could partition this? + let row_offset = self.row_range.start; + let filter_stream = filter_reader + .map(|r| r.execute(self.row_range.clone())) + .transpose()?; + let projection_stream = projection_reader.execute(self.row_range)?; + + Ok(Scan { + dtype, + filter_stream, + projection_stream, + pipeline: FuturesOrdered::new(), + pipeline_bytes: VecDeque::new(), + bytes_in_flight: 0, + byte_budget: self.byte_budget, + limit: self.limit, + rows_produced: 0, + rows_enqueued: 0, + row_selection: self.row_selection, + row_offset, + filter_buffer: None, + projection_buffer: None, + exhausted: false, + }) + } +} + +struct Scan { + dtype: DType, + filter_stream: Option, + projection_stream: ReaderStreamRef, + pipeline: FuturesOrdered>>, + pipeline_bytes: VecDeque, + bytes_in_flight: usize, + byte_budget: usize, + limit: Option, + rows_produced: u64, + rows_enqueued: u64, + row_selection: Selection, + row_offset: u64, + filter_buffer: Option, + projection_buffer: Option, + exhausted: bool, +} + +impl Scan { + /// Get the next projection chunk, consuming from the buffer first. + fn next_projection_chunk(&mut self) -> VortexResult> { + if let Some(buffered) = self.projection_buffer.take() { + return Ok(Some(buffered)); + } + self.projection_stream.next_chunk() + } + + /// Get the next filter chunk, consuming from the buffer first. + fn next_filter_chunk( + stream: &mut ReaderStreamRef, + buffer: &mut Option, + ) -> VortexResult> { + if let Some(buffered) = buffer.take() { + return Ok(Some(buffered)); + } + stream.next_chunk() + } + + /// Collect filter chunks covering exactly `n` rows. + /// Returns a vec of ArrayFutures whose total len == n. + fn collect_filter_chunks(&mut self, n: usize) -> Option>> { + let filter_stream = self.filter_stream.as_mut()?; + let mut chunks = Vec::new(); + let mut remaining = n; + + while remaining > 0 { + let chunk = match Self::next_filter_chunk(filter_stream, &mut self.filter_buffer) { + Ok(Some(f)) => f, + Ok(None) => { + return Some(Err(vortex_error::vortex_err!( + "Filter stream exhausted before covering {} rows", + n + ))); + } + Err(e) => return Some(Err(e)), + }; + + if chunk.len() <= remaining { + remaining -= chunk.len(); + chunks.push(chunk); + } else { + // Buffer the remainder. + let used = chunk.slice(0..remaining); + self.filter_buffer = Some(chunk.slice(remaining..chunk.len())); + remaining = 0; + chunks.push(used); + } + } + + Some(Ok(chunks)) + } + + /// Skip `n` rows in the filter stream, consuming from the buffer first. + fn skip_filter(&mut self, n: usize) { + let mut remaining = n; + if let Some(buf) = self.filter_buffer.take() { + if remaining < buf.len() { + self.filter_buffer = Some(buf.slice(remaining..buf.len())); + return; + } + remaining -= buf.len(); + } + if remaining > 0 + && let Some(filter_stream) = &mut self.filter_stream + { + filter_stream.skip(remaining); + } + } +} + +impl ArrayStream for Scan { + fn dtype(&self) -> &DType { + &self.dtype + } +} + +impl Stream for Scan { + type Item = VortexResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + // === Fill phase === + // Enqueue chunks into the pipeline until we hit the byte budget or exhaust input. + while !this.exhausted + && (this.bytes_in_flight == 0 || this.bytes_in_flight < this.byte_budget) + { + // Check limit on rows enqueued. + if this.limit.is_some_and(|limit| this.rows_enqueued >= limit) { + this.exhausted = true; + break; + } + + // Get the next projection chunk. + let proj_future = match this.next_projection_chunk() { + Ok(Some(f)) => f, + Ok(None) => { + this.exhausted = true; + break; + } + Err(e) => return Poll::Ready(Some(Err(e))), + }; + + let mut chunk_len = proj_future.len(); + + // Apply limit: if remaining < chunk_len, slice the projection future. + if let Some(limit) = this.limit { + let remaining = usize::try_from(limit - this.rows_enqueued).unwrap_or(usize::MAX); + if remaining < chunk_len { + // Buffer the remainder and use only what we need. + this.projection_buffer = Some(proj_future.slice(remaining..chunk_len)); + chunk_len = remaining; + } + } + + let proj_future = if chunk_len < proj_future.len() { + proj_future.slice(0..chunk_len) + } else { + proj_future + }; + + if chunk_len == 0 { + this.exhausted = true; + break; + } + + // Compute the selection mask for this chunk's row range. + let chunk_row_range = this.row_offset..this.row_offset + chunk_len as u64; + let selection_mask = this.row_selection.row_mask(&chunk_row_range).mask().clone(); + + // If all rows are excluded by selection, skip this chunk entirely (no I/O). + if selection_mask.all_false() { + this.skip_filter(chunk_len); + this.row_offset += chunk_len as u64; + continue; + } + + this.row_offset += chunk_len as u64; + + // Estimate bytes for this chunk. + let proj_estimated = proj_future.estimated_bytes(); + let mut chunk_estimated = proj_estimated; + + // Build the future: await projection, apply filter + selection. + let fut: BoxFuture<'static, VortexResult> = if this.filter_stream.is_some() { + let filter_chunks = match this.collect_filter_chunks(chunk_len) { + Some(Ok(chunks)) => chunks, + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => { + return Poll::Ready(Some(Err(vortex_error::vortex_err!( + "Filter stream missing" + )))); + } + }; + + chunk_estimated += filter_chunks + .iter() + .map(|f| f.estimated_bytes()) + .sum::(); + + Box::pin(async move { + // Await filter chunks and combine into a single mask. + let mut filter_masks: Vec = Vec::with_capacity(filter_chunks.len()); + for filter_chunk in filter_chunks { + let filter_array = filter_chunk.await?; + filter_masks.push(filter_array.try_to_mask_fill_null_false()?); + } + + let filter_mask = if filter_masks.len() == 1 { + filter_masks + .into_iter() + .next() + .unwrap_or_else(|| Mask::new_true(0)) + } else { + Mask::concat(filter_masks.iter())? + }; + + // AND with selection mask. + let mask = if selection_mask.all_true() { + filter_mask + } else { + (&filter_mask).bitand(&selection_mask) + }; + + // Await projection. + let array = proj_future.await?; + + // Filter the projection array. + if mask.all_true() { + Ok(array) + } else { + array.filter(mask) + } + }) + } else if selection_mask.all_true() { + Box::pin(proj_future) + } else { + Box::pin(async move { + let array = proj_future.await?; + array.filter(selection_mask) + }) + }; + + this.pipeline.push_back(fut); + this.pipeline_bytes.push_back(chunk_estimated); + this.bytes_in_flight += chunk_estimated; + this.rows_enqueued += chunk_len as u64; + } + + // === Drain phase === + if this.pipeline.is_empty() { + return Poll::Ready(None); + } + + match this.pipeline.poll_next_unpin(cx) { + Poll::Ready(Some(result)) => { + if let Some(estimated) = this.pipeline_bytes.pop_front() { + this.bytes_in_flight = this.bytes_in_flight.saturating_sub(estimated); + } + match result { + Ok(array) => { + this.rows_produced += array.len() as u64; + Poll::Ready(Some(Ok(array))) + } + Err(e) => Poll::Ready(Some(Err(e))), + } + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +}