diff --git a/.github/workflows/ci-fork.yml b/.github/workflows/ci-fork.yml new file mode 100644 index 00000000000..da6f132f725 --- /dev/null +++ b/.github/workflows/ci-fork.yml @@ -0,0 +1,73 @@ +name: CI (fork) + +on: + pull_request: { } + +permissions: + contents: read + +env: + CARGO_TERM_COLOR: auto + +jobs: + rust-fmt: + name: Rust (fmt) + runs-on: ubuntu-latest + timeout-minutes: 60 + steps: + - uses: actions/checkout@v6 + - uses: ./.github/actions/setup-rust + with: + toolchain: nightly + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Rust format + run: cargo +nightly fmt --all --check + + rust-clippy: + name: Rust (clippy) + runs-on: ubuntu-latest + timeout-minutes: 120 + steps: + - uses: actions/checkout@v6 + - uses: ./.github/actions/setup-rust + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Rust clippy (workspace) + run: > + cargo clippy --locked --workspace --all-features --all-targets + --exclude vortex-bench + --exclude vortex-python + --exclude vortex-duckdb + --exclude vortex-fuzz + --exclude duckdb-bench + --exclude lance-bench + --exclude datafusion-bench + --exclude random-access-bench + --exclude compress-bench + -- -D warnings + + rust-tests: + name: Rust (tests) + runs-on: ubuntu-latest + timeout-minutes: 240 + steps: + - uses: actions/checkout@v6 + - uses: ./.github/actions/setup-rust + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - uses: taiki-e/install-action@v2 + with: + tool: nextest + - name: Cargo nextest + run: > + cargo nextest run --locked --workspace --all-features --no-fail-fast + --exclude vortex-cuda + --exclude vortex-bench + --exclude vortex-python + --exclude vortex-duckdb + --exclude vortex-fuzz + --exclude duckdb-bench + --exclude lance-bench + --exclude datafusion-bench + --exclude random-access-bench + --exclude compress-bench diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cf6b918a445..198da826f02 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,9 +9,6 @@ concurrency: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: ${{ github.ref != 'refs/heads/develop' }} on: - push: - branches: [develop] - pull_request: { } workflow_dispatch: { } permissions: diff --git a/vortex-array/src/expr/analysis/immediate_access.rs b/vortex-array/src/expr/analysis/immediate_access.rs index 6dc26a7c950..0e1e3ac73aa 100644 --- a/vortex-array/src/expr/analysis/immediate_access.rs +++ b/vortex-array/src/expr/analysis/immediate_access.rs @@ -24,11 +24,13 @@ pub fn annotate_scope_access(scope: &StructFields) -> impl AnnotationFn() { - if expr.child(0).is::() { - return vec![field_name.clone()]; - } - } else if expr.is::() { + if let Some(field_name) = expr.as_opt::() + && expr.child(0).is::() + { + return vec![field_name.clone()]; + } + + if expr.is::() { return scope.names().iter().cloned().collect(); } diff --git a/vortex-array/src/expr/exprs/get_item.rs b/vortex-array/src/expr/exprs/get_item.rs index ecd8c211939..23d64b1e62b 100644 --- a/vortex-array/src/expr/exprs/get_item.rs +++ b/vortex-array/src/expr/exprs/get_item.rs @@ -31,9 +31,11 @@ use crate::expr::Pack; use crate::expr::ReduceCtx; use crate::expr::ReduceNode; use crate::expr::ReduceNodeRef; +use crate::expr::SimplifyCtx; use crate::expr::StatsCatalog; use crate::expr::VTable; use crate::expr::VTableExt; +use crate::expr::exprs::get_item_list::get_item_list; use crate::expr::exprs::root::root; use crate::expr::lit; use crate::expr::stats::Stat; @@ -95,15 +97,7 @@ impl VTable for GetItem { vortex_err!("Couldn't find the {} field in the input scope", field_name) })?; - // Match here to avoid cloning the dtype if nullability doesn't need to change - if matches!( - (struct_dtype.nullability(), field_dtype.nullability()), - (Nullability::Nullable, Nullability::NonNullable) - ) { - return Ok(field_dtype.with_nullability(Nullability::Nullable)); - } - - Ok(field_dtype) + Ok(field_dtype.union_nullability(struct_dtype.nullability())) } fn execute( @@ -125,6 +119,30 @@ impl VTable for GetItem { .execute(args.ctx) } + fn simplify( + &self, + field_name: &FieldName, + expr: &Expression, + ctx: &dyn SimplifyCtx, + ) -> VortexResult> { + let child = expr.child(0); + let child_dtype = ctx.return_dtype(child)?; + + let element_dtype = match child_dtype { + DType::List(element_dtype, _) => Some(element_dtype), + DType::FixedSizeList(element_dtype, ..) => Some(element_dtype), + _ => None, + }; + + if let Some(element_dtype) = element_dtype + && element_dtype.as_struct_fields_opt().is_some() + { + Ok(Some(get_item_list(field_name.clone(), child.clone()))) + } else { + Ok(None) + } + } + fn reduce( &self, field_name: &FieldName, @@ -245,6 +263,8 @@ pub fn get_item(field: impl Into, child: Expression) -> Expression { #[cfg(test)] mod tests { + use std::sync::Arc; + use vortex_buffer::buffer; use vortex_dtype::DType; use vortex_dtype::FieldNames; @@ -252,12 +272,16 @@ mod tests { use vortex_dtype::Nullability::NonNullable; use vortex_dtype::PType; use vortex_dtype::StructFields; + use vortex_scalar::Scalar; use crate::Array; use crate::IntoArray; + use crate::arrays::FixedSizeListArray; + use crate::arrays::ListArray; use crate::arrays::StructArray; use crate::expr::exprs::binary::checked_add; use crate::expr::exprs::get_item::get_item; + use crate::expr::exprs::get_item_list::get_item_list; use crate::expr::exprs::literal::lit; use crate::expr::exprs::pack::pack; use crate::expr::exprs::root::root; @@ -307,6 +331,182 @@ mod tests { ); } + #[test] + fn get_item_list_of_struct() { + let element_dtype = Arc::new(DType::Struct( + [ + ("a", DType::Primitive(PType::I32, NonNullable)), + ("b", DType::Utf8(NonNullable)), + ] + .into_iter() + .collect(), + NonNullable, + )); + + let row_count = 4; + let items = ListArray::from_iter_opt_slow::( + [ + Some(vec![ + Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(1i32, NonNullable), + Scalar::utf8("x", NonNullable), + ], + ), + Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(2i32, NonNullable), + Scalar::utf8("y", NonNullable), + ], + ), + ]), + Some(Vec::new()), + None, + Some(vec![Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(3i32, NonNullable), + Scalar::utf8("z", NonNullable), + ], + )]), + ], + element_dtype, + ) + .unwrap(); + + let ids = buffer![0i32, 1, 2, 3].into_array(); + + let data = StructArray::new( + FieldNames::from(["id", "items"]), + vec![ids, items], + row_count, + Validity::NonNullable, + ) + .into_array(); + + // Regression for nested field projection on list-of-struct: `items.a`. + let projection = get_item_list("a", get_item("items", root())); + let out = data.apply(&projection).expect("apply"); + + assert_eq!( + out.dtype(), + &DType::List( + Arc::new(DType::Primitive(PType::I32, NonNullable)), + Nullability::Nullable + ) + ); + + assert_eq!( + out.scalar_at(0) + .unwrap() + .as_list() + .elements() + .unwrap() + .to_vec(), + vec![ + Scalar::primitive(1i32, NonNullable), + Scalar::primitive(2i32, NonNullable), + ] + ); + assert!( + out.scalar_at(1) + .unwrap() + .as_list() + .elements() + .unwrap() + .is_empty() + ); + assert!(out.scalar_at(2).unwrap().is_null()); + assert_eq!( + out.scalar_at(3) + .unwrap() + .as_list() + .elements() + .unwrap() + .to_vec(), + vec![Scalar::primitive(3i32, NonNullable)] + ); + } + + #[test] + fn get_item_fixed_size_list_of_struct() { + let n_lists: usize = 3; + let list_size: u32 = 2; + let n_elements = n_lists * list_size as usize; + + let struct_elems = StructArray::try_new( + FieldNames::from(["a", "b"]), + vec![ + buffer![1i32, 2, 3, 4, 5, 6].into_array(), + buffer![10i64, 20, 30, 40, 50, 60].into_array(), + ], + n_elements, + Validity::from_iter([true, false, true, true, false, true]), + ) + .unwrap() + .into_array(); + + let items = FixedSizeListArray::try_new( + struct_elems, + list_size, + Validity::from_iter([true, false, true]), + n_lists, + ) + .unwrap() + .into_array(); + + let ids = buffer![0i32, 1, 2].into_array(); + + let data = StructArray::new( + FieldNames::from(["id", "items"]), + vec![ids, items], + n_lists, + Validity::NonNullable, + ) + .into_array(); + + // FixedSizeList-of-struct projection: `items.a`, including struct-level nulls inside the list. + let projection = get_item_list("a", get_item("items", root())); + let out = data.apply(&projection).expect("apply"); + + assert_eq!( + out.dtype(), + &DType::FixedSizeList( + Arc::new(DType::Primitive(PType::I32, Nullability::Nullable)), + list_size, + Nullability::Nullable + ) + ); + + assert_eq!( + out.scalar_at(0) + .unwrap() + .as_list() + .elements() + .unwrap() + .to_vec(), + vec![ + Scalar::primitive(1i32, Nullability::Nullable), + Scalar::null(DType::Primitive(PType::I32, Nullability::Nullable)), + ] + ); + assert!(out.scalar_at(1).unwrap().is_null()); + assert_eq!( + out.scalar_at(2) + .unwrap() + .as_list() + .elements() + .unwrap() + .to_vec(), + vec![ + Scalar::null(DType::Primitive(PType::I32, Nullability::Nullable)), + Scalar::primitive(6i32, Nullability::Nullable), + ] + ); + } + #[test] fn test_pack_get_item_rule() { // Create: pack(a: lit(1), b: lit(2)).get_item("b") diff --git a/vortex-array/src/expr/exprs/get_item_list.rs b/vortex-array/src/expr/exprs/get_item_list.rs new file mode 100644 index 00000000000..7f30965c711 --- /dev/null +++ b/vortex-array/src/expr/exprs/get_item_list.rs @@ -0,0 +1,201 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt::Formatter; +use std::ops::Not; +use std::sync::Arc; + +use vortex_dtype::DType; +use vortex_dtype::FieldName; +use vortex_dtype::Nullability; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_session::VortexSession; + +use crate::ArrayRef; +use crate::IntoArray; +use crate::arrays::FixedSizeListArray; +use crate::arrays::ListViewArray; +use crate::arrays::StructArray; +use crate::compute::mask; +use crate::expr::Arity; +use crate::expr::ChildName; +use crate::expr::ExecutionArgs; +use crate::expr::ExecutionResult; +use crate::expr::ExprId; +use crate::expr::Expression; +use crate::expr::VTable; +use crate::expr::VTableExt; +use crate::vtable::ValidityHelper; + +fn project_field(struct_elems: &StructArray, field_name: &FieldName) -> VortexResult { + let field = struct_elems.unmasked_field_by_name(field_name)?.clone(); + + match struct_elems.dtype().nullability() { + Nullability::NonNullable => Ok(field), + Nullability::Nullable => mask(&field, &struct_elems.validity_mask()?.not()), + } +} + +/// UNSTABLE: project a struct field from each element of a list. +/// +/// Semantics: +/// `get_item_list(field, list) == map(lambda x: get_item(field, x), list)`. +/// +/// This is a temporary internal expression used to support nested projections like `items.a` on +/// `list` and `fixed_size_list` without a general `map` expression. +/// +/// Do not serialize or persist this expression. It is not a stable part of the expression wire +/// format and may be removed or replaced by a proper `map`. +pub struct GetItemList; + +impl VTable for GetItemList { + type Options = FieldName; + + fn id(&self) -> ExprId { + ExprId::from("vortex.get_item_list") + } + + fn serialize(&self, _field_name: &FieldName) -> VortexResult>> { + vortex_bail!("UNSTABLE expression {} must not be serialized", self.id()) + } + + fn deserialize( + &self, + metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + _ = metadata; + vortex_bail!("UNSTABLE expression {} must not be deserialized", self.id()) + } + + fn arity(&self, _field_name: &FieldName) -> Arity { + Arity::Exact(1) + } + + fn child_name(&self, _field_name: &FieldName, child_idx: usize) -> ChildName { + match child_idx { + 0 => ChildName::from("list"), + _ => unreachable!( + "Invalid child index {} for GetItemList expression", + child_idx + ), + } + } + + fn fmt_sql( + &self, + field_name: &FieldName, + expr: &Expression, + f: &mut Formatter<'_>, + ) -> std::fmt::Result { + expr.child(0).fmt_sql(f)?; + write!(f, ".{}", field_name) + } + + fn return_dtype(&self, field_name: &FieldName, arg_dtypes: &[DType]) -> VortexResult { + let list_dtype = &arg_dtypes[0]; + + let (element_dtype, list_nullability, list_size) = match list_dtype { + DType::List(element_dtype, list_nullability) => { + (element_dtype.as_ref(), *list_nullability, None) + } + DType::FixedSizeList(element_dtype, list_size, list_nullability) => { + (element_dtype.as_ref(), *list_nullability, Some(*list_size)) + } + _ => { + return Err(vortex_err!( + "Expected list dtype for child of GetItemList expression, got {}", + list_dtype + )); + } + }; + + let struct_fields = element_dtype.as_struct_fields_opt().ok_or_else(|| { + vortex_err!( + "Expected list element struct dtype for GetItemList, got {}", + element_dtype + ) + })?; + + let field_dtype = struct_fields.field(field_name).ok_or_else(|| { + vortex_err!( + "Couldn't find the {} field in the list element struct dtype", + field_name + ) + })?; + + let projected = field_dtype.union_nullability(element_dtype.nullability()); + + Ok(match list_size { + Some(list_size) => { + DType::FixedSizeList(Arc::new(projected), list_size, list_nullability) + } + None => DType::List(Arc::new(projected), list_nullability), + }) + } + + fn execute( + &self, + field_name: &FieldName, + mut args: ExecutionArgs, + ) -> VortexResult { + let input = args + .inputs + .pop() + .vortex_expect("missing list for GetItemList expression"); + + match input.dtype() { + DType::List(..) => { + let list = input.execute::(args.ctx)?; + let struct_elems = list.elements().clone().execute::(args.ctx)?; + + let field = project_field(&struct_elems, field_name)?; + + ListViewArray::try_new( + field, + list.offsets().clone(), + list.sizes().clone(), + list.validity().clone(), + )? + .into_array() + .execute(args.ctx) + } + DType::FixedSizeList(..) => { + let list = input.execute::(args.ctx)?; + let struct_elems = list.elements().clone().execute::(args.ctx)?; + + let field = project_field(&struct_elems, field_name)?; + + FixedSizeListArray::try_new( + field, + list.list_size(), + list.validity().clone(), + list.len(), + )? + .into_array() + .execute(args.ctx) + } + _ => Err(vortex_err!( + "Expected list scope for GetItemList execution, got {}", + input.dtype() + )), + } + } + + fn is_null_sensitive(&self, _field_name: &FieldName) -> bool { + true + } + + fn is_fallible(&self, _field_name: &FieldName) -> bool { + false + } +} + +/// Creates an expression that projects a struct field from each element of a list. +#[doc(hidden)] +pub fn get_item_list(field: impl Into, list: Expression) -> Expression { + GetItemList.new_expr(field.into(), vec![list]) +} diff --git a/vortex-array/src/expr/exprs/mod.rs b/vortex-array/src/expr/exprs/mod.rs index 145d225bcae..3bb0b001c8e 100644 --- a/vortex-array/src/expr/exprs/mod.rs +++ b/vortex-array/src/expr/exprs/mod.rs @@ -6,6 +6,7 @@ pub(crate) mod binary; pub(crate) mod cast; pub(crate) mod dynamic; pub(crate) mod get_item; +pub(crate) mod get_item_list; pub(crate) mod is_null; pub(crate) mod like; pub(crate) mod list_contains; @@ -22,6 +23,7 @@ pub use binary::*; pub use cast::*; pub use dynamic::*; pub use get_item::*; +pub use get_item_list::*; pub use is_null::*; pub use like::*; pub use list_contains::*; diff --git a/vortex-array/src/expr/session.rs b/vortex-array/src/expr/session.rs index 13106c354c4..19ac2c7a24e 100644 --- a/vortex-array/src/expr/session.rs +++ b/vortex-array/src/expr/session.rs @@ -10,6 +10,7 @@ use crate::expr::exprs::between::Between; use crate::expr::exprs::binary::Binary; use crate::expr::exprs::cast::Cast; use crate::expr::exprs::get_item::GetItem; +use crate::expr::exprs::get_item_list::GetItemList; use crate::expr::exprs::is_null::IsNull; use crate::expr::exprs::like::Like; use crate::expr::exprs::list_contains::ListContains; @@ -57,6 +58,7 @@ impl Default for ExprSession { ExprVTable::new_static(&Binary), ExprVTable::new_static(&Cast), ExprVTable::new_static(&GetItem), + ExprVTable::new_static(&GetItemList), ExprVTable::new_static(&IsNull), ExprVTable::new_static(&Like), ExprVTable::new_static(&ListContains), diff --git a/vortex-array/src/expr/vtable.rs b/vortex-array/src/expr/vtable.rs index 168442781db..e225b28c999 100644 --- a/vortex-array/src/expr/vtable.rs +++ b/vortex-array/src/expr/vtable.rs @@ -748,6 +748,7 @@ mod tests { use crate::expr::exprs::cast::cast; use crate::expr::exprs::get_item::col; use crate::expr::exprs::get_item::get_item; + use crate::expr::exprs::get_item_list::get_item_list; use crate::expr::exprs::is_null::is_null; use crate::expr::exprs::list_contains::list_contains; use crate::expr::exprs::literal::lit; @@ -818,4 +819,16 @@ mod tests { Ok(()) } + + #[test] + fn get_item_list_is_not_serializable() { + let expr = get_item_list("field", root()); + let err = expr + .serialize_proto() + .expect_err("get_item_list must not be serializable"); + assert!( + err.to_string().contains("must not be serialized"), + "unexpected error: {err}" + ); + } } diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 6f555a982fb..1394937763c 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -224,8 +224,8 @@ impl FileOpener for VortexOpener { // The schema of the stream returned from the vortex scan. // We use the physical_file_schema as reference for types that don't roundtrip. - let scan_dtype = scan_projection.return_dtype(vxf.dtype()).map_err(|_e| { - exec_datafusion_err!("Couldn't get the dtype for the underlying Vortex scan") + let scan_dtype = scan_projection.return_dtype(vxf.dtype()).map_err(|e| { + exec_datafusion_err!("Couldn't get the dtype for the underlying Vortex scan: {e}") })?; let stream_schema = calculate_physical_schema(&scan_dtype, &projected_physical_schema)?; diff --git a/vortex-layout/src/layouts/list/mod.rs b/vortex-layout/src/layouts/list/mod.rs new file mode 100644 index 00000000000..f6a3a61fd9f --- /dev/null +++ b/vortex-layout/src/layouts/list/mod.rs @@ -0,0 +1,223 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod reader; +pub mod writer; + +use std::sync::Arc; + +use reader::ListReader; +use vortex_array::ArrayContext; +use vortex_array::DeserializeMetadata; +use vortex_array::EmptyMetadata; +use vortex_dtype::DType; +use vortex_dtype::Nullability; +use vortex_dtype::PType; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_session::SessionExt; +use vortex_session::VortexSession; + +use crate::LayoutChildType; +use crate::LayoutEncodingRef; +use crate::LayoutId; +use crate::LayoutReaderRef; +use crate::LayoutRef; +use crate::VTable; +use crate::children::LayoutChildren; +use crate::children::OwnedLayoutChildren; +use crate::segments::SegmentId; +use crate::segments::SegmentSource; +use crate::vtable; + +vtable!(List); + +impl VTable for ListVTable { + type Layout = ListLayout; + type Encoding = ListLayoutEncoding; + type Metadata = EmptyMetadata; + + fn id(_encoding: &Self::Encoding) -> LayoutId { + LayoutId::new_ref("vortex.list") + } + + fn encoding(_layout: &Self::Layout) -> LayoutEncodingRef { + LayoutEncodingRef::new_ref(ListLayoutEncoding.as_ref()) + } + + fn row_count(layout: &Self::Layout) -> u64 { + layout.row_count + } + + fn dtype(layout: &Self::Layout) -> &DType { + &layout.dtype + } + + fn metadata(_layout: &Self::Layout) -> Self::Metadata { + EmptyMetadata + } + + fn segment_ids(_layout: &Self::Layout) -> Vec { + vec![] + } + + fn nchildren(layout: &Self::Layout) -> usize { + let validity_children = layout.dtype.is_nullable() as usize; + match &layout.dtype { + DType::List(..) => 2 + validity_children, // offsets + elements + DType::FixedSizeList(..) => 1 + validity_children, // elements + _ => 0, + } + } + + fn child(layout: &Self::Layout, index: usize) -> VortexResult { + let is_nullable = layout.dtype.is_nullable(); + let offsets_dtype = DType::Primitive(PType::U64, Nullability::NonNullable); + + let child_dtype = match (&layout.dtype, is_nullable, index) { + // validity + (_, true, 0) => DType::Bool(Nullability::NonNullable), + + // variable-size list + (DType::List(..), false, 0) => offsets_dtype, + (DType::List(element_dtype, _), false, 1) => (*element_dtype.as_ref()).clone(), + (DType::List(..), true, 1) => offsets_dtype, + (DType::List(element_dtype, _), true, 2) => (*element_dtype.as_ref()).clone(), + + // fixed-size list + (DType::FixedSizeList(element_dtype, ..), false, 0) => { + (*element_dtype.as_ref()).clone() + } + (DType::FixedSizeList(element_dtype, ..), true, 1) => (*element_dtype.as_ref()).clone(), + + _ => return Err(vortex_err!("Invalid child index {index} for list layout")), + }; + + layout.children.child(index, &child_dtype) + } + + fn child_type(layout: &Self::Layout, idx: usize) -> LayoutChildType { + let is_nullable = layout.dtype.is_nullable(); + + if is_nullable && idx == 0 { + return LayoutChildType::Auxiliary("validity".into()); + } + + match &layout.dtype { + DType::List(..) => { + let offsets_idx = if is_nullable { 1 } else { 0 }; + if idx == offsets_idx { + LayoutChildType::Auxiliary("offsets".into()) + } else { + LayoutChildType::Auxiliary("elements".into()) + } + } + DType::FixedSizeList(..) => LayoutChildType::Auxiliary("elements".into()), + _ => unreachable!( + "ListLayout only supports List and FixedSizeList dtypes, got {}", + layout.dtype() + ), + } + } + + fn new_reader( + layout: &Self::Layout, + name: Arc, + segment_source: Arc, + session: &VortexSession, + ) -> VortexResult { + Ok(Arc::new(ListReader::try_new( + layout.clone(), + name, + segment_source, + session.session(), + )?)) + } + + fn build( + _encoding: &Self::Encoding, + dtype: &DType, + row_count: u64, + _metadata: &::Output, + _segment_ids: Vec, + children: &dyn LayoutChildren, + _ctx: &ArrayContext, + ) -> VortexResult { + vortex_ensure!( + matches!(dtype, DType::List(..) | DType::FixedSizeList(..)), + "Expected list dtype, got {}", + dtype + ); + + let expected_children = match dtype { + DType::List(..) => 2 + (dtype.is_nullable() as usize), + DType::FixedSizeList(..) => 1 + (dtype.is_nullable() as usize), + _ => unreachable!(), + }; + vortex_ensure!( + children.nchildren() == expected_children, + "List layout has {} children, expected {}", + children.nchildren(), + expected_children + ); + + Ok(ListLayout { + row_count, + dtype: dtype.clone(), + children: children.to_arc(), + }) + } + + fn with_children(layout: &mut Self::Layout, children: Vec) -> VortexResult<()> { + let expected_children = match layout.dtype { + DType::List(..) => 2 + (layout.dtype.is_nullable() as usize), + DType::FixedSizeList(..) => 1 + (layout.dtype.is_nullable() as usize), + _ => vortex_bail!("Expected list dtype, got {}", layout.dtype), + }; + vortex_ensure!( + children.len() == expected_children, + "ListLayout expects {} children, got {}", + expected_children, + children.len() + ); + layout.children = OwnedLayoutChildren::layout_children(children); + Ok(()) + } +} + +#[derive(Debug)] +pub struct ListLayoutEncoding; + +#[derive(Clone, Debug)] +pub struct ListLayout { + row_count: u64, + dtype: DType, + children: Arc, +} + +impl ListLayout { + pub fn new(row_count: u64, dtype: DType, children: Vec) -> Self { + Self { + row_count, + dtype, + children: OwnedLayoutChildren::layout_children(children), + } + } + + #[inline] + pub fn row_count(&self) -> u64 { + self.row_count + } + + #[inline] + pub fn dtype(&self) -> &DType { + &self.dtype + } + + #[inline] + pub fn children(&self) -> &Arc { + &self.children + } +} diff --git a/vortex-layout/src/layouts/list/reader.rs b/vortex-layout/src/layouts/list/reader.rs new file mode 100644 index 00000000000..0254090bfb1 --- /dev/null +++ b/vortex-layout/src/layouts/list/reader.rs @@ -0,0 +1,602 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::collections::BTreeSet; +use std::ops::BitAnd; +use std::ops::Range; +use std::sync::Arc; + +use futures::try_join; +use vortex_array::Array; +use vortex_array::IntoArray; +use vortex_array::MaskFuture; +use vortex_array::ToCanonical; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::FixedSizeListArray; +use vortex_array::arrays::ListArray; +use vortex_array::expr::Expression; +use vortex_array::expr::get_item; +use vortex_array::expr::root; +use vortex_dtype::DType; +use vortex_dtype::FieldMask; +use vortex_dtype::FieldName; +use vortex_dtype::Nullability; +use vortex_dtype::PType; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_mask::Mask; +use vortex_session::VortexSession; + +use crate::ArrayFuture; +use crate::LayoutReader; +use crate::LayoutReaderRef; +use crate::LazyReaderChildren; +use crate::layouts::list::ListLayout; +use crate::segments::SegmentSource; + +pub struct ListReader { + layout: ListLayout, + name: Arc, + lazy_children: LazyReaderChildren, + session: VortexSession, +} + +impl ListReader { + pub(super) fn try_new( + layout: ListLayout, + name: Arc, + segment_source: Arc, + session: VortexSession, + ) -> VortexResult { + let mut dtypes: Vec = Vec::new(); + let mut names: Vec> = Vec::new(); + + if layout.dtype().is_nullable() { + dtypes.push(DType::Bool(Nullability::NonNullable)); + names.push(Arc::from("validity")); + } + + match layout.dtype() { + DType::List(element_dtype, _) => { + dtypes.push(DType::Primitive(PType::U64, Nullability::NonNullable)); + names.push(Arc::from("offsets")); + dtypes.push((**element_dtype).clone()); + names.push(Arc::from("elements")); + } + DType::FixedSizeList(element_dtype, ..) => { + dtypes.push((**element_dtype).clone()); + names.push(Arc::from("elements")); + } + _ => vortex_bail!("Expected list dtype, got {}", layout.dtype()), + } + + let lazy_children = LazyReaderChildren::new( + layout.children().clone(), + dtypes, + names, + segment_source, + session.clone(), + ); + + Ok(Self { + layout, + name, + lazy_children, + session, + }) + } + + fn validity(&self) -> VortexResult> { + self.dtype() + .is_nullable() + .then(|| self.lazy_children.get(0)) + .transpose() + } + + fn offsets(&self) -> VortexResult<&LayoutReaderRef> { + let idx = if self.dtype().is_nullable() { 1 } else { 0 }; + self.lazy_children.get(idx) + } + + fn elements(&self) -> VortexResult<&LayoutReaderRef> { + let idx = match self.dtype() { + DType::List(..) => { + if self.dtype().is_nullable() { + 2 + } else { + 1 + } + } + DType::FixedSizeList(..) => { + if self.dtype().is_nullable() { + 1 + } else { + 0 + } + } + _ => return Err(vortex_err!("Expected list dtype, got {}", self.dtype())), + }; + self.lazy_children.get(idx) + } + + /// Creates a future that will produce a slice of this list array. + /// + /// The produced slice may have a projection applied to its elements. + fn list_slice_future( + &self, + row_range: Range, + element_expr: &Expression, + ) -> VortexResult { + let dtype = self.dtype().clone(); + let validity_fut = self + .validity()? + .map(|reader| { + let len = usize::try_from(row_range.end - row_range.start) + .vortex_expect("row range must fit in usize"); + reader.projection_evaluation(&row_range, &root(), MaskFuture::new_true(len)) + }) + .transpose()?; + + match dtype { + DType::List(_, list_nullability) => { + let offsets_reader = self.offsets()?.clone(); + let elements_reader = self.elements()?.clone(); + let row_range_clone = row_range.clone(); + let element_expr = element_expr.clone(); + + Ok(Box::pin(async move { + let row_len = usize::try_from(row_range_clone.end - row_range_clone.start) + .vortex_expect("row range must fit in usize"); + + let offsets_row_range = row_range_clone.start..row_range_clone.end + 1; + let offsets_len = row_len + 1; + let offsets_fut = offsets_reader.projection_evaluation( + &offsets_row_range, + &root(), + MaskFuture::new_true(offsets_len), + )?; + + let (offsets, validity) = try_join!(offsets_fut, async move { + match validity_fut { + Some(v) => v.await.map(Some), + None => Ok(None), + } + })?; + + let offsets = offsets.to_primitive(); + let offsets_slice = offsets.as_slice::(); + let base = *offsets_slice.first().unwrap_or(&0u64); + let end = *offsets_slice.last().unwrap_or(&base); + + let elements_row_range = base..end; + let elements_len = usize::try_from(end - base) + .vortex_expect("element range must fit in usize"); + let elements = elements_reader.projection_evaluation( + &elements_row_range, + &element_expr, + MaskFuture::new_true(elements_len), + )?; + + let elements = elements.await?; + + let normalized_offsets = vortex_array::arrays::PrimitiveArray::from_iter( + offsets_slice.iter().map(|v| *v - base), + ) + .into_array(); + + let validity = match (list_nullability, validity) { + (Nullability::NonNullable, _) => { + vortex_array::validity::Validity::NonNullable + } + (Nullability::Nullable, Some(v)) => { + vortex_array::validity::Validity::Array(v) + } + (Nullability::Nullable, None) => vortex_array::validity::Validity::AllValid, + }; + + Ok(ListArray::try_new(elements, normalized_offsets, validity)?.into_array()) + })) + } + DType::FixedSizeList(_, list_size, list_nullability) => { + let elements_reader = self.elements()?.clone(); + let row_range_clone = row_range.clone(); + let element_expr = element_expr.clone(); + + Ok(Box::pin(async move { + let row_len_u64 = row_range_clone.end - row_range_clone.start; + let row_len = + usize::try_from(row_len_u64).vortex_expect("row range must fit in usize"); + + let list_size_u64 = u64::from(list_size); + let element_start = row_range_clone + .start + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element start overflow"))?; + let element_end = row_range_clone + .end + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element end overflow"))?; + + let elements_row_range = element_start..element_end; + let elements_len = usize::try_from(element_end - element_start) + .vortex_expect("element range must fit in usize"); + let elements_fut = elements_reader.projection_evaluation( + &elements_row_range, + &element_expr, + MaskFuture::new_true(elements_len), + )?; + + let (elements, validity) = try_join!(elements_fut, async move { + match validity_fut { + Some(v) => v.await.map(Some), + None => Ok(None), + } + })?; + + let validity = match (list_nullability, validity) { + (Nullability::NonNullable, _) => { + vortex_array::validity::Validity::NonNullable + } + (Nullability::Nullable, Some(v)) => { + vortex_array::validity::Validity::Array(v) + } + (Nullability::Nullable, None) => vortex_array::validity::Validity::AllValid, + }; + + Ok( + FixedSizeListArray::try_new(elements, list_size, validity, row_len)? + .into_array(), + ) + })) + } + _ => Err(vortex_err!("Expected list dtype, got {}", dtype)), + } + } +} + +impl LayoutReader for ListReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + self.layout.dtype() + } + + fn row_count(&self) -> u64 { + self.layout.row_count() + } + + fn register_splits( + &self, + field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + splits.insert(row_range.end); + + match self.dtype() { + DType::FixedSizeList(_, list_size, _) => { + let list_size_u64 = u64::from(*list_size); + + let element_start = row_range + .start + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element start overflow"))?; + let element_end = row_range + .end + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element end overflow"))?; + + let element_range = element_start..element_end; + let mut element_splits = BTreeSet::new(); + self.elements()?.register_splits( + field_mask, + &element_range, + &mut element_splits, + )?; + + // Convert element splits back to row splits, but only when the element split + // is aligned to a row boundary. + for element_split in element_splits { + if element_split % list_size_u64 != 0 { + continue; + } + + let row_split = element_split / list_size_u64; + if row_split > row_range.start && row_split < row_range.end { + splits.insert(row_split); + } + } + } + DType::List(..) => { + let offsets_end = row_range + .end + .checked_add(1) + .ok_or_else(|| vortex_err!("List offsets end overflow"))?; + let offsets_range = row_range.start..offsets_end; + + let mut offsets_splits = BTreeSet::new(); + self.offsets()? + .register_splits(field_mask, &offsets_range, &mut offsets_splits)?; + + // Convert splits in the offsets array back to row splits. + // + // The offsets array has length = rows + 1, so a split at offset index `i` + // corresponds to a split in rows at `i - 1`. + for offsets_split in offsets_splits { + let Some(row_split) = offsets_split.checked_sub(1) else { + continue; + }; + if row_split > row_range.start && row_split < row_range.end { + splits.insert(row_split); + } + } + } + _ => {} + } + + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: Mask, + ) -> VortexResult { + Ok(MaskFuture::ready(mask)) + } + + fn filter_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let row_range = row_range.clone(); + let expr = expr.clone(); + let session = self.session.clone(); + + let list_fut = self.list_slice_future(row_range.clone(), &root())?; + + Ok(MaskFuture::new( + usize::try_from(row_range.end - row_range.start) + .vortex_expect("row range must fit in usize"), + async move { + let (array, mask) = try_join!(list_fut, mask)?; + if mask.all_false() { + return Ok(mask); + } + + let array = array.apply(&expr)?; + let mut ctx = session.create_execution_ctx(); + let array_mask = array.execute::(&mut ctx)?; + + Ok(mask.bitand(&array_mask)) + }, + )) + } + + fn projection_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + // If the expression is a simple element projection, we can push it down to the elements. + // + // NOTE: `vortex.get_item_list` is a temporary list-of-struct projection expression; + // when pushing down we construct the element projection and pass it into the elements reader. + let (is_pushdown, element_expr) = if expr.id().as_ref() == "vortex.get_item_list" + && expr.child(0).id().as_ref() == "vortex.root" + { + let field_name = expr + .options() + .as_any() + .downcast_ref::() + .vortex_expect("vortex.get_item_list options must be a FieldName"); + (true, get_item(field_name.clone(), root())) + } else if expr.id().as_ref() == "vortex.select" { + (true, expr.clone()) + } else { + (false, root()) + }; + + let row_range = row_range.clone(); + let expr = expr.clone(); + let list_fut = self.list_slice_future(row_range, &element_expr)?; + + Ok(Box::pin(async move { + let (mut array, mask) = try_join!(list_fut, mask)?; + + // Apply the selection mask before applying the expression, matching `FlatReader`. + if !mask.all_true() { + array = array.filter(mask)?; + } + + if is_pushdown { + Ok(array) + } else { + array.apply(&expr) + } + })) + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeSet; + use std::sync::Arc; + + use futures::stream; + use vortex_array::Array; + use vortex_array::ArrayContext; + use vortex_array::IntoArray; + use vortex_array::arrays::FixedSizeListArray; + use vortex_array::arrays::ListArray; + use vortex_buffer::buffer; + use vortex_dtype::Nullability::NonNullable; + use vortex_dtype::PType; + use vortex_io::runtime::single::block_on; + + use crate::LayoutStrategy; + use crate::layouts::chunked::writer::ChunkedLayoutStrategy; + use crate::layouts::flat::writer::FlatLayoutStrategy; + use crate::layouts::list::writer::ListStrategy; + use crate::segments::TestSegments; + use crate::sequence::SequenceId; + use crate::sequence::SequentialStreamAdapter; + use crate::sequence::SequentialStreamExt as _; + use crate::test::SESSION; + + #[test] + fn register_splits_fixed_size_list_maps_element_splits_to_rows() { + let ctx = ArrayContext::empty(); + + let segments = Arc::new(TestSegments::default()); + + let list_size: u32 = 2; + + let chunk1_elements = buffer![1i32, 2, 3, 4].into_array(); + let chunk1 = FixedSizeListArray::try_new( + chunk1_elements, + list_size, + vortex_array::validity::Validity::NonNullable, + 2, + ) + .unwrap() + .into_array(); + + let chunk2_elements = buffer![5i32, 6, 7, 8].into_array(); + let chunk2 = FixedSizeListArray::try_new( + chunk2_elements, + list_size, + vortex_array::validity::Validity::NonNullable, + 2, + ) + .unwrap() + .into_array(); + + let list_dtype = chunk1.dtype().clone(); + + let elements_strategy = Arc::new(ChunkedLayoutStrategy::new(FlatLayoutStrategy::default())); + let strategy = ListStrategy::new( + Arc::new(FlatLayoutStrategy::default()), + Arc::new(FlatLayoutStrategy::default()), + elements_strategy, + ); + + let (mut sequence_id, eof) = SequenceId::root().split(); + let layout = block_on(|handle| { + strategy.write_stream( + ctx, + segments.clone(), + SequentialStreamAdapter::new( + vortex_dtype::DType::FixedSizeList( + Arc::new(vortex_dtype::DType::Primitive(PType::I32, NonNullable)), + list_size, + NonNullable, + ), + stream::iter([ + Ok((sequence_id.advance(), chunk1)), + Ok((sequence_id.advance(), chunk2)), + ]), + ) + .sendable(), + eof, + handle, + ) + }) + .unwrap(); + + // Sanity check we produced the expected fixed-size list shape. + assert_eq!(layout.row_count(), 4); + assert_eq!(layout.dtype(), &list_dtype); + + // The elements child is chunked with a split at element index 4, which should map to row 2. + let reader = layout.new_reader("".into(), segments, &SESSION).unwrap(); + let mut splits = BTreeSet::new(); + reader + .register_splits(&[], &(0..layout.row_count()), &mut splits) + .unwrap(); + + assert!(splits.contains(&2), "splits = {splits:?}"); + assert!(splits.contains(&layout.row_count())); + } + + #[test] + fn register_splits_list_maps_offset_splits_to_rows() { + let ctx = ArrayContext::empty(); + + let segments = Arc::new(TestSegments::default()); + + let chunk1_elements = buffer![1i32, 2, 3, 4].into_array(); + let chunk1_offsets = buffer![0u64, 2, 4].into_array(); + let chunk1 = ListArray::try_new( + chunk1_elements, + chunk1_offsets, + vortex_array::validity::Validity::NonNullable, + ) + .unwrap() + .into_array(); + + let chunk2_elements = buffer![5i32, 6, 7, 8].into_array(); + let chunk2_offsets = buffer![0u64, 2, 4].into_array(); + let chunk2 = ListArray::try_new( + chunk2_elements, + chunk2_offsets, + vortex_array::validity::Validity::NonNullable, + ) + .unwrap() + .into_array(); + + let list_dtype = chunk1.dtype().clone(); + + let offsets_strategy = Arc::new(ChunkedLayoutStrategy::new(FlatLayoutStrategy::default())); + let elements_strategy = Arc::new(ChunkedLayoutStrategy::new(FlatLayoutStrategy::default())); + let strategy = ListStrategy::new( + Arc::new(FlatLayoutStrategy::default()), + offsets_strategy, + elements_strategy, + ); + + let (mut sequence_id, eof) = SequenceId::root().split(); + let layout = block_on(|handle| { + strategy.write_stream( + ctx, + segments.clone(), + SequentialStreamAdapter::new( + vortex_dtype::DType::List( + Arc::new(vortex_dtype::DType::Primitive(PType::I32, NonNullable)), + NonNullable, + ), + stream::iter([ + Ok((sequence_id.advance(), chunk1)), + Ok((sequence_id.advance(), chunk2)), + ]), + ) + .sendable(), + eof, + handle, + ) + }) + .unwrap(); + + // Sanity check we produced the expected list shape. + assert_eq!(layout.row_count(), 4); + assert_eq!(layout.dtype(), &list_dtype); + + // The offsets child is chunked with a split at offsets index 3, which maps to row 2. + let reader = layout.new_reader("".into(), segments, &SESSION).unwrap(); + let mut splits = BTreeSet::new(); + reader + .register_splits(&[], &(0..layout.row_count()), &mut splits) + .unwrap(); + + assert!(splits.contains(&2), "splits = {splits:?}"); + assert!(splits.contains(&layout.row_count())); + } +} diff --git a/vortex-layout/src/layouts/list/writer.rs b/vortex-layout/src/layouts/list/writer.rs new file mode 100644 index 00000000000..280552d39db --- /dev/null +++ b/vortex-layout/src/layouts/list/writer.rs @@ -0,0 +1,359 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + +use async_trait::async_trait; +use futures::StreamExt; +use futures::future::try_join_all; +use futures::pin_mut; +use itertools::Itertools; +use vortex_array::Array; +use vortex_array::ArrayContext; +use vortex_array::IntoArray; +use vortex_array::ToCanonical; +use vortex_array::arrays::list_from_list_view; +use vortex_dtype::DType; +use vortex_dtype::Nullability; +use vortex_dtype::PType; +use vortex_error::VortexError; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_io::kanal_ext::KanalExt; +use vortex_io::runtime::Handle; + +use crate::IntoLayout as _; +use crate::LayoutRef; +use crate::LayoutStrategy; +use crate::layouts::list::ListLayout; +use crate::segments::SegmentSinkRef; +use crate::sequence::SendableSequentialStream; +use crate::sequence::SequencePointer; +use crate::sequence::SequentialStreamAdapter; +use crate::sequence::SequentialStreamExt; + +trait ToU64 { + fn to_u64(self) -> u64; +} + +impl ToU64 for u8 { + fn to_u64(self) -> u64 { + u64::from(self) + } +} + +impl ToU64 for u16 { + fn to_u64(self) -> u64 { + u64::from(self) + } +} + +impl ToU64 for u32 { + fn to_u64(self) -> u64 { + u64::from(self) + } +} + +impl ToU64 for u64 { + fn to_u64(self) -> u64 { + self + } +} + +/// A write strategy that performs component shredding for list types. +/// +/// - Variable-size lists are written as: +/// - optional validity (is_valid: bool) +/// - offsets (u64, length = rows + 1) +/// - elements (concatenated) +/// - Fixed-size lists are written as: +/// - optional validity (is_valid: bool) +/// - elements (concatenated) +#[derive(Clone)] +pub struct ListStrategy { + validity: Arc, + offsets: Arc, + elements: Arc, +} + +impl ListStrategy { + pub fn new( + validity: Arc, + offsets: Arc, + elements: Arc, + ) -> Self { + Self { + validity, + offsets, + elements, + } + } +} + +#[async_trait] +impl LayoutStrategy for ListStrategy { + async fn write_stream( + &self, + ctx: ArrayContext, + segment_sink: SegmentSinkRef, + stream: SendableSequentialStream, + mut eof: SequencePointer, + handle: Handle, + ) -> VortexResult { + let dtype = stream.dtype().clone(); + + let is_nullable = dtype.is_nullable(); + let offsets_dtype = DType::Primitive(PType::U64, Nullability::NonNullable); + + let (stream_count, column_dtypes): (usize, Vec) = match &dtype { + DType::List(element_dtype, _) => { + let mut dtypes = Vec::new(); + if is_nullable { + dtypes.push(DType::Bool(Nullability::NonNullable)); + } + dtypes.push(offsets_dtype.clone()); + dtypes.push((**element_dtype).clone()); + (dtypes.len(), dtypes) + } + DType::FixedSizeList(element_dtype, ..) => { + let mut dtypes = Vec::new(); + if is_nullable { + dtypes.push(DType::Bool(Nullability::NonNullable)); + } + dtypes.push((**element_dtype).clone()); + (dtypes.len(), dtypes) + } + _ => { + vortex_bail!("ListStrategy expected list dtype, got {}", dtype); + } + }; + + let (column_streams_tx, column_streams_rx): (Vec<_>, Vec<_>) = + (0..stream_count).map(|_| kanal::bounded_async(1)).unzip(); + + let total_rows = Arc::new(AtomicU64::new(0)); + + // Spawn a task to fan out chunk components to their respective transposed streams. + { + let total_rows = total_rows.clone(); + let dtype = dtype.clone(); + handle + .spawn(async move { + let mut base_elements: u64 = 0; + let mut first_offsets = true; + + pin_mut!(stream); + while let Some(result) = stream.next().await { + match result { + Ok((sequence_id, chunk)) => { + total_rows.fetch_add(chunk.len() as u64, Ordering::SeqCst); + + let mut sequence_pointer = sequence_id.descend(); + + // validity (optional) + if is_nullable { + let validity = match chunk.validity_mask() { + Ok(validity) => validity.into_array(), + Err(e) => { + let e: Arc = Arc::new(e); + for tx in column_streams_tx.iter() { + let _ = tx + .send(Err(VortexError::from(e.clone()))) + .await; + } + break; + } + }; + let _ = column_streams_tx[0] + .send(Ok((sequence_pointer.advance(), validity))) + .await; + } + + match &dtype { + DType::List(..) => { + let list_view = chunk.to_listview(); + let list = match list_from_list_view(list_view) { + Ok(list) => list, + Err(e) => { + let e: Arc = Arc::new(e); + for tx in column_streams_tx.iter() { + let _ = tx + .send(Err(VortexError::from(e.clone()))) + .await; + } + break; + } + }; + + // Build global u64 offsets, dropping the leading 0 for all but the first chunk. + let offsets = list.offsets().to_primitive(); + let offsets_slice_u64: VortexResult> = + match offsets.ptype() { + ptype if ptype.is_unsigned_int() => vortex_dtype::match_each_unsigned_integer_ptype!(ptype, |T| { + Ok(offsets + .as_slice::() + .iter() + .map(|&v| v.to_u64()) + .collect()) + }), + ptype if ptype.is_signed_int() => { + vortex_dtype::match_each_signed_integer_ptype!( + ptype, + |T| { + offsets + .as_slice::() + .iter() + .map(|&v| { + u64::try_from(v).map_err(|_| { + vortex_err!( + "List offsets must be convertible to u64" + ) + }) + }) + .collect() + } + ) + } + other => Err(vortex_err!( + "List offsets must be an integer type, got {other}" + )), + }; + let offsets_slice_u64 = match offsets_slice_u64 { + Ok(v) => v, + Err(e) => { + let e: Arc = Arc::new(e); + for tx in column_streams_tx.iter() { + let _ = tx + .send(Err(VortexError::from(e.clone()))) + .await; + } + break; + } + }; + + let mut adjusted: Vec = Vec::with_capacity( + offsets_slice_u64 + .len() + .saturating_sub((!first_offsets) as usize), + ); + for (i, v) in offsets_slice_u64.into_iter().enumerate() { + if !first_offsets && i == 0 { + continue; + } + adjusted.push(v + base_elements); + } + + let offsets_arr = + vortex_array::arrays::PrimitiveArray::from_iter( + adjusted, + ) + .into_array(); + + // offsets index depends on nullable validity child + let offsets_idx = if is_nullable { 1 } else { 0 }; + let elements_idx = offsets_idx + 1; + + let _ = column_streams_tx[offsets_idx] + .send(Ok((sequence_pointer.advance(), offsets_arr))) + .await; + let _ = column_streams_tx[elements_idx] + .send(Ok(( + sequence_pointer.advance(), + list.elements().clone(), + ))) + .await; + + base_elements += list.elements().len() as u64; + first_offsets = false; + } + DType::FixedSizeList(..) => { + let list = chunk.to_fixed_size_list(); + + let elements_idx = if is_nullable { 1 } else { 0 }; + let _ = column_streams_tx[elements_idx] + .send(Ok(( + sequence_pointer.advance(), + list.elements().clone(), + ))) + .await; + } + _ => unreachable!(), + } + } + Err(e) => { + let e: Arc = Arc::new(e); + for tx in column_streams_tx.iter() { + let _ = tx.send(Err(VortexError::from(e.clone()))).await; + } + break; + } + } + } + }) + .detach(); + } + + let layout_futures: Vec<_> = column_dtypes + .into_iter() + .zip_eq(column_streams_rx) + .enumerate() + .map(|(index, (dtype, recv))| { + let column_stream = + SequentialStreamAdapter::new(dtype.clone(), recv.into_stream().boxed()) + .sendable(); + let child_eof = eof.split_off(); + handle.spawn_nested(|h| { + let validity = self.validity.clone(); + let offsets = self.offsets.clone(); + let elements = self.elements.clone(); + let ctx = ctx.clone(); + let segment_sink = segment_sink.clone(); + async move { + if is_nullable && index == 0 { + validity + .write_stream(ctx, segment_sink, column_stream, child_eof, h) + .await + } else if matches!(dtype, DType::Primitive(PType::U64, _)) { + offsets + .write_stream(ctx, segment_sink, column_stream, child_eof, h) + .await + } else { + elements + .write_stream(ctx, segment_sink, column_stream, child_eof, h) + .await + } + } + }) + }) + .collect(); + + let children = try_join_all(layout_futures).await?; + + let row_count = total_rows.load(Ordering::SeqCst); + + // Basic invariant: for variable-size lists, offsets must have row_count + 1 entries. + if matches!(dtype, DType::List(..)) { + let offsets_layout = if is_nullable { + &children[1] + } else { + &children[0] + }; + vortex_ensure!( + offsets_layout.row_count() == row_count + 1, + "ListLayout offsets row_count {} does not match list row_count + 1 ({})", + offsets_layout.row_count(), + row_count + 1 + ); + } + + Ok(ListLayout::new(row_count, dtype, children).into_layout()) + } + + fn buffered_bytes(&self) -> u64 { + self.elements.buffered_bytes() + } +} diff --git a/vortex-layout/src/layouts/mod.rs b/vortex-layout/src/layouts/mod.rs index 61193c54b6e..e33d33cf7e8 100644 --- a/vortex-layout/src/layouts/mod.rs +++ b/vortex-layout/src/layouts/mod.rs @@ -17,6 +17,7 @@ pub mod compressed; pub mod dict; pub mod file_stats; pub mod flat; +pub mod list; pub(crate) mod partitioned; pub mod repartition; pub mod row_idx; diff --git a/vortex-layout/src/layouts/table.rs b/vortex-layout/src/layouts/table.rs index 13ad7f8f51d..b6b4fe7c60b 100644 --- a/vortex-layout/src/layouts/table.rs +++ b/vortex-layout/src/layouts/table.rs @@ -34,6 +34,7 @@ use crate::IntoLayout; use crate::LayoutRef; use crate::LayoutStrategy; use crate::layouts::flat::writer::FlatLayoutStrategy; +use crate::layouts::list::writer::ListStrategy; use crate::layouts::struct_::StructLayout; use crate::segments::SegmentSinkRef; use crate::sequence::SendableSequentialStream; @@ -97,8 +98,7 @@ impl TableStrategy { /// /// ```no_run /// # use std::sync::Arc; - /// # use vortex_dtype::{field_path, Field, FieldPath}; - /// # use vortex_layout::layouts::compact::CompactCompressor; + /// # use vortex_dtype::field_path; /// # use vortex_layout::layouts::compressed::CompressingStrategy; /// # use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; /// # use vortex_layout::layouts::table::TableStrategy; @@ -106,19 +106,15 @@ impl TableStrategy { /// // A strategy for compressing data using the balanced BtrBlocks compressor. /// let compress_btrblocks = CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), true); /// - /// // A strategy that compresses data using ZSTD - /// let compress_compact = CompressingStrategy::new_compact(FlatLayoutStrategy::default(), CompactCompressor::default()); - /// /// // Our combined strategy uses no compression for validity buffers, BtrBlocks compression - /// // for most columns, and will use ZSTD compression for a nested binary column that we know - /// // is never filtered in. + /// // for most columns, and will leave a nested binary column uncompressed. /// let strategy = TableStrategy::new( /// Arc::new(FlatLayoutStrategy::default()), /// Arc::new(compress_btrblocks), /// ) /// .with_field_writer( /// field_path!(request.body.bytes), - /// Arc::new(compress_compact), + /// Arc::new(FlatLayoutStrategy::default()), /// ); /// ``` pub fn with_field_writer( @@ -336,6 +332,15 @@ impl LayoutStrategy for TableStrategy { if dtype.is_struct() { // Step into the field path for struct columns Arc::new(self.descend(&field)) + } else if matches!(dtype, DType::List(..) | DType::FixedSizeList(..)) { + // Component shredding for lists: descend into the element type. + let elements = + Arc::new(self.descend(&field).descend(&Field::ElementType)); + Arc::new(ListStrategy::new( + self.validity.clone(), + self.fallback.clone(), + elements, + )) } else { // Use fallback for leaf columns self.fallback.clone() diff --git a/vortex-layout/src/session.rs b/vortex-layout/src/session.rs index 131115950e4..f7cfcefcc88 100644 --- a/vortex-layout/src/session.rs +++ b/vortex-layout/src/session.rs @@ -9,6 +9,7 @@ use crate::LayoutEncodingRef; use crate::layouts::chunked::ChunkedLayoutEncoding; use crate::layouts::dict::DictLayoutEncoding; use crate::layouts::flat::FlatLayoutEncoding; +use crate::layouts::list::ListLayoutEncoding; use crate::layouts::struct_::StructLayoutEncoding; use crate::layouts::zoned::ZonedLayoutEncoding; @@ -46,6 +47,7 @@ impl Default for LayoutSession { // Register the built-in layout encodings. layouts.register(ChunkedLayoutEncoding.id(), ChunkedLayoutEncoding.as_ref()); layouts.register(FlatLayoutEncoding.id(), FlatLayoutEncoding.as_ref()); + layouts.register(ListLayoutEncoding.id(), ListLayoutEncoding.as_ref()); layouts.register(StructLayoutEncoding.id(), StructLayoutEncoding.as_ref()); layouts.register(ZonedLayoutEncoding.id(), ZonedLayoutEncoding.as_ref()); layouts.register(DictLayoutEncoding.id(), DictLayoutEncoding.as_ref()); diff --git a/vortex-scan/src/scan_builder.rs b/vortex-scan/src/scan_builder.rs index a8b562d134b..f654158ea36 100644 --- a/vortex-scan/src/scan_builder.rs +++ b/vortex-scan/src/scan_builder.rs @@ -185,9 +185,31 @@ impl ScanBuilder { self } + fn prepare_projection_and_reader( + session: &VortexSession, + row_offset: u64, + layout_reader: LayoutReaderRef, + projection: &Expression, + ) -> VortexResult<(LayoutReaderRef, Expression, DType)> { + let layout_reader = Arc::new(RowIdxLayoutReader::new( + row_offset, + layout_reader, + session.clone(), + )); + let projection = projection.optimize_recursive(layout_reader.dtype())?; + let dtype = projection.return_dtype(layout_reader.dtype())?; + Ok((layout_reader, projection, dtype)) + } + /// The [`DType`] returned by the scan, after applying the projection. pub fn dtype(&self) -> VortexResult { - self.projection.return_dtype(self.layout_reader.dtype()) + let (_, _, dtype) = Self::prepare_projection_and_reader( + &self.session, + self.row_offset, + self.layout_reader.clone(), + &self.projection, + )?; + Ok(dtype) } /// The session used by the scan. @@ -220,30 +242,34 @@ impl ScanBuilder { } pub fn prepare(self) -> VortexResult> { - let dtype = self.dtype()?; - if self.filter.is_some() && self.limit.is_some() { vortex_bail!("Vortex doesn't support scans with both a filter and a limit") } // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform // conjunction splitting if a filter is provided. - let mut layout_reader = self.layout_reader; - - // Enrich the layout reader to support RowIdx expressions. - // Note that this is applied below the filter layout reader since it can perform - // better over individual conjunctions. - layout_reader = Arc::new(RowIdxLayoutReader::new( - self.row_offset, + let ScanBuilder { + session, layout_reader, - self.session.clone(), - )); - - // Normalize and simplify the expressions. - let projection = self.projection.optimize_recursive(layout_reader.dtype())?; - - let filter = self - .filter + projection, + filter, + ordered, + row_range, + selection, + split_by, + concurrency, + map_fn, + limit, + row_offset, + .. + } = self; + + // Normalize and simplify the expressions, and enrich the layout reader to support + // RowIdx expressions. + let (layout_reader, projection, dtype) = + Self::prepare_projection_and_reader(&session, row_offset, layout_reader, &projection)?; + + let filter = filter .map(|f| f.optimize_recursive(layout_reader.dtype())) .transpose()?; @@ -252,33 +278,27 @@ impl ScanBuilder { filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?; let field_mask: Vec<_> = [filter_mask, projection_mask].concat(); - let splits = - if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) { - Splits::Ranges(ranges) - } else { - let split_range = self - .row_range - .clone() - .unwrap_or_else(|| 0..layout_reader.row_count()); - Splits::Natural(self.split_by.splits( - layout_reader.as_ref(), - &split_range, - &field_mask, - )?) - }; + let splits = if let Some(ranges) = attempt_split_ranges(&selection, row_range.as_ref()) { + Splits::Ranges(ranges) + } else { + let split_range = row_range + .clone() + .unwrap_or_else(|| 0..layout_reader.row_count()); + Splits::Natural(split_by.splits(layout_reader.as_ref(), &split_range, &field_mask)?) + }; Ok(RepeatedScan::new( - self.session.clone(), + session.clone(), layout_reader, projection, filter, - self.ordered, - self.row_range, - self.selection, + ordered, + row_range, + selection, splits, - self.concurrency, - self.map_fn, - self.limit, + concurrency, + map_fn, + limit, dtype, )) } @@ -451,10 +471,14 @@ mod test { use vortex_array::ToCanonical; use vortex_array::arrays::PrimitiveArray; use vortex_array::expr::Expression; + use vortex_array::expr::get_item; + use vortex_array::expr::root; use vortex_dtype::DType; use vortex_dtype::FieldMask; + use vortex_dtype::FieldNames; use vortex_dtype::Nullability; use vortex_dtype::PType; + use vortex_dtype::StructFields; use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_io::runtime::BlockingRuntime; @@ -465,6 +489,76 @@ mod test { use super::ScanBuilder; + #[derive(Debug)] + struct DTypeOnlyLayoutReader { + name: Arc, + dtype: DType, + row_count: u64, + } + + impl DTypeOnlyLayoutReader { + fn new(dtype: DType) -> Self { + Self { + name: Arc::from("dtype-only"), + dtype, + row_count: 1, + } + } + } + + impl LayoutReader for DTypeOnlyLayoutReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn register_splits( + &self, + _field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + splits.insert(row_range.end); + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: Mask, + ) -> VortexResult { + unimplemented!("not needed for this test"); + } + + fn filter_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + unimplemented!("not needed for this test"); + } + + fn projection_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + Ok(Box::pin(async move { + unreachable!("scan should not be polled in this test") + })) + } + } + #[derive(Debug)] struct CountingLayoutReader { name: Arc, @@ -538,6 +632,88 @@ mod test { } } + #[test] + fn dtype_simplifies_list_of_struct_nested_projection() -> VortexResult<()> { + let element_dtype = DType::Struct( + StructFields::new( + FieldNames::from(["a", "b"]), + vec![ + DType::Primitive(PType::I32, Nullability::NonNullable), + DType::Utf8(Nullability::NonNullable), + ], + ), + Nullability::NonNullable, + ); + let dtype = DType::Struct( + StructFields::new( + FieldNames::from(["items"]), + vec![DType::List( + Arc::new(element_dtype), + Nullability::NonNullable, + )], + ), + Nullability::NonNullable, + ); + + let reader = Arc::new(DTypeOnlyLayoutReader::new(dtype)); + let session = crate::test::SCAN_SESSION.clone(); + + // users express `items.a` as `get_item("a", get_item("items", root()))`. + // the outer get_item must be simplified (typed) into a list-aware projection before `dtype` + // validation, otherwise it fails with "Couldn't find the a field in the input scope". + let projection = get_item("a", get_item("items", root())); + let builder = ScanBuilder::new(session, reader).with_projection(projection); + + let actual = builder.dtype()?; + let expected = DType::List( + Arc::new(DType::Primitive(PType::I32, Nullability::NonNullable)), + Nullability::NonNullable, + ); + assert_eq!(actual, expected); + Ok(()) + } + + #[test] + fn dtype_simplifies_fixed_size_list_of_struct_nested_projection() -> VortexResult<()> { + let element_dtype = DType::Struct( + StructFields::new( + FieldNames::from(["a", "b"]), + vec![ + DType::Primitive(PType::I32, Nullability::NonNullable), + DType::Utf8(Nullability::NonNullable), + ], + ), + Nullability::NonNullable, + ); + let list_size: u32 = 2; + let dtype = DType::Struct( + StructFields::new( + FieldNames::from(["items"]), + vec![DType::FixedSizeList( + Arc::new(element_dtype), + list_size, + Nullability::NonNullable, + )], + ), + Nullability::NonNullable, + ); + + let reader = Arc::new(DTypeOnlyLayoutReader::new(dtype)); + let session = crate::test::SCAN_SESSION.clone(); + + let projection = get_item("a", get_item("items", root())); + let builder = ScanBuilder::new(session, reader).with_projection(projection); + + let actual = builder.dtype()?; + let expected = DType::FixedSizeList( + Arc::new(DType::Primitive(PType::I32, Nullability::NonNullable)), + list_size, + Nullability::NonNullable, + ); + assert_eq!(actual, expected); + Ok(()) + } + #[test] fn into_stream_is_lazy() { let calls = Arc::new(AtomicUsize::new(0));