From 09d6906b4b3a6510fd6ed3fcf0d0d1c2123f9ae3 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Thu, 5 Feb 2026 16:09:20 +0000 Subject: [PATCH 1/5] gpu compatible write strategy Signed-off-by: Onur Satici --- Cargo.lock | 1 + vortex-file/Cargo.toml | 1 + vortex-file/src/strategy.rs | 82 +++++++++++++++++++++- vortex-file/src/strategy_gpu_zstd.rs | 100 +++++++++++++++++++++++++++ 4 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 vortex-file/src/strategy_gpu_zstd.rs diff --git a/Cargo.lock b/Cargo.lock index 7176b893f72..4d638e2d113 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10691,6 +10691,7 @@ dependencies = [ "uuid", "vortex-alp", "vortex-array", + "vortex-btrblocks", "vortex-buffer", "vortex-bytebool", "vortex-cuda", diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index d2285a6b1c0..8271cfd7af0 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -34,6 +34,7 @@ uuid = { workspace = true } # Needed to pi vortex-alp = { workspace = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } +vortex-btrblocks = { workspace = true } vortex-bytebool = { workspace = true } vortex-datetime-parts = { workspace = true } diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index 6f90864979f..5ac6df8e26c 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -6,10 +6,16 @@ use std::sync::Arc; use std::sync::LazyLock; +#[cfg(feature = "zstd")] +#[path = "strategy_gpu_zstd.rs"] +mod strategy_gpu_zstd; + // Compressed encodings from encoding crates +// Canonical array encodings from vortex-array +#[cfg(feature = "zstd")] +use strategy_gpu_zstd::GpuCompatibleCompressor; use vortex_alp::ALPRDVTable; use vortex_alp::ALPVTable; -// Canonical array encodings from vortex-array use vortex_array::arrays::BoolVTable; use vortex_array::arrays::ChunkedVTable; use vortex_array::arrays::ConstantVTable; @@ -26,6 +32,14 @@ use vortex_array::arrays::StructVTable; use vortex_array::arrays::VarBinVTable; use vortex_array::arrays::VarBinViewVTable; use vortex_array::session::ArrayRegistry; +#[cfg(feature = "zstd")] +use vortex_btrblocks::BtrBlocksCompressorBuilder; +#[cfg(feature = "zstd")] +use vortex_btrblocks::FloatCode; +#[cfg(feature = "zstd")] +use vortex_btrblocks::IntCode; +#[cfg(feature = "zstd")] +use vortex_btrblocks::StringCode; use vortex_bytebool::ByteBoolVTable; use vortex_datetime_parts::DateTimePartsVTable; use vortex_decimal_byte_parts::DecimalBytePartsVTable; @@ -106,6 +120,47 @@ pub static ALLOWED_ENCODINGS: LazyLock = LazyLock::new(|| { registry }); +/// Static registry of array encodings compatible with CUDA kernel execution. +/// +/// This includes all canonical encodings needed to represent nested array trees, plus +/// compressed encodings that have CUDA kernel implementations in `vortex-cuda`. +#[cfg(feature = "zstd")] +pub static GPU_ALLOWED_ENCODINGS: LazyLock = LazyLock::new(|| { + let registry = ArrayRegistry::default(); + + // Canonical encodings from vortex-array + registry.register(NullVTable::ID, NullVTable); + registry.register(BoolVTable::ID, BoolVTable); + registry.register(PrimitiveVTable::ID, PrimitiveVTable); + registry.register(DecimalVTable::ID, DecimalVTable); + registry.register(VarBinVTable::ID, VarBinVTable); + registry.register(VarBinViewVTable::ID, VarBinViewVTable); + registry.register(ListVTable::ID, ListVTable); + registry.register(ListViewVTable::ID, ListViewVTable); + registry.register(FixedSizeListVTable::ID, FixedSizeListVTable); + registry.register(StructVTable::ID, StructVTable); + registry.register(ExtensionVTable::ID, ExtensionVTable); + registry.register(ChunkedVTable::ID, ChunkedVTable); + registry.register(ConstantVTable::ID, ConstantVTable); + registry.register(MaskedVTable::ID, MaskedVTable); + registry.register(DictVTable::ID, DictVTable); + + // Compressed encodings with CUDA kernel support + registry.register(ALPVTable::ID, ALPVTable); + registry.register(BitPackedVTable::ID, BitPackedVTable); + registry.register(DateTimePartsVTable::ID, DateTimePartsVTable); + registry.register(DecimalBytePartsVTable::ID, DecimalBytePartsVTable); + registry.register(FoRVTable::ID, FoRVTable); + registry.register(RunEndVTable::ID, RunEndVTable); + registry.register(SequenceVTable::ID, SequenceVTable); + registry.register(ZigZagVTable::ID, ZigZagVTable); + + #[cfg(feature = "zstd")] + registry.register(ZstdVTable::ID, ZstdVTable); + + registry +}); + /// Build a new [writer strategy][LayoutStrategy] to compress and reorganize chunks of a Vortex file. /// /// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk, @@ -164,6 +219,31 @@ impl WriteStrategyBuilder { self } + /// Configure a write strategy that emits only CUDA-compatible encodings. + /// + /// This keeps the default write layout pipeline, but: + /// - Restricts flat-layout normalization to [`GPU_ALLOWED_ENCODINGS`] + /// - Configures BtrBlocks to exclude schemes without CUDA kernel support + #[cfg(feature = "zstd")] + pub fn with_cuda_compatible_encodings(mut self) -> Self { + let btrblocks = BtrBlocksCompressorBuilder::default() + .exclude_int([IntCode::Sparse, IntCode::Rle]) + .exclude_float([FloatCode::AlpRd, FloatCode::Rle, FloatCode::Sparse]) + // Keep string schemes disabled in btrblocks; when `zstd` feature is enabled, we + // separately encode string/binary leaves as Zstd (without dictionaries). + .exclude_string([ + StringCode::Dict, + StringCode::Fsst, + StringCode::Constant, + StringCode::Sparse, + ]) + .build(); + + self.compressor = Some(Arc::new(GpuCompatibleCompressor::new(btrblocks))); + self.allow_encodings = Some((*GPU_ALLOWED_ENCODINGS).clone()); + self + } + /// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides /// applied. pub fn build(self) -> Arc { diff --git a/vortex-file/src/strategy_gpu_zstd.rs b/vortex-file/src/strategy_gpu_zstd.rs new file mode 100644 index 00000000000..20589a1d6f1 --- /dev/null +++ b/vortex-file/src/strategy_gpu_zstd.rs @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::Array; +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::IntoArray; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::FixedSizeListArray; +use vortex_array::arrays::ListViewArray; +use vortex_array::arrays::StructArray; +use vortex_btrblocks::BtrBlocksCompressor; +use vortex_error::VortexResult; +use vortex_layout::layouts::compressed::CompressorPlugin; +use vortex_zstd::ZstdArray; + +const GPU_ZSTD_LEVEL: i32 = 3; +const GPU_ZSTD_VALUES_PER_PAGE: usize = 8192; + +#[derive(Clone)] +pub(super) struct GpuCompatibleCompressor { + btrblocks: BtrBlocksCompressor, +} + +impl GpuCompatibleCompressor { + pub(super) const fn new(btrblocks: BtrBlocksCompressor) -> Self { + Self { btrblocks } + } + + fn compress_canonical(&self, canonical: Canonical) -> VortexResult { + match canonical { + // Use nvcomp-compatible zstd (without dictionary) for string/binary leaves. + Canonical::VarBinView(vbv) => { + let zstd = ZstdArray::from_var_bin_view_without_dict( + &vbv, + GPU_ZSTD_LEVEL, + GPU_ZSTD_VALUES_PER_PAGE, + )? + .into_array(); + if zstd.nbytes() < vbv.nbytes() { + Ok(zstd) + } else { + Ok(vbv.into_array()) + } + } + Canonical::Struct(struct_array) => { + let fields = struct_array + .unmasked_fields() + .iter() + .map(|field| self.compress_canonical(field.to_canonical()?)) + .collect::>>()?; + + Ok(StructArray::try_new( + struct_array.names().clone(), + fields, + struct_array.len(), + struct_array.validity()?, + )? + .into_array()) + } + Canonical::List(list_view) => { + let compressed_elems = + self.compress_canonical(list_view.elements().to_canonical()?)?; + let compressed_offsets = + self.compress_canonical(list_view.offsets().to_canonical()?)?; + let compressed_sizes = + self.compress_canonical(list_view.sizes().to_canonical()?)?; + + Ok(ListViewArray::try_new( + compressed_elems, + compressed_offsets, + compressed_sizes, + list_view.validity()?, + )? + .into_array()) + } + Canonical::FixedSizeList(fsl) => { + let compressed_elems = self.compress_canonical(fsl.elements().to_canonical()?)?; + Ok(FixedSizeListArray::try_new( + compressed_elems, + fsl.list_size(), + fsl.validity()?, + fsl.len(), + )? + .into_array()) + } + Canonical::Extension(ext) => { + let compressed_storage = self.compress_canonical(ext.storage().to_canonical()?)?; + Ok(ExtensionArray::new(ext.ext_dtype().clone(), compressed_storage).into_array()) + } + other => self.btrblocks.compress(other.as_ref()), + } + } +} + +impl CompressorPlugin for GpuCompatibleCompressor { + fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult { + self.compress_canonical(chunk.to_canonical()?) + } +} From 143b0844ba7c239ed09fdb888f6055092defa136 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Fri, 6 Feb 2026 11:08:09 +0000 Subject: [PATCH 2/5] taplo Signed-off-by: Onur Satici --- vortex-file/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index 8271cfd7af0..e859c515430 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -33,8 +33,8 @@ tracing = { workspace = true } uuid = { workspace = true } # Needed to pickup the "js" feature for wasm targets from the workspace configuration vortex-alp = { workspace = true } vortex-array = { workspace = true } -vortex-buffer = { workspace = true } vortex-btrblocks = { workspace = true } +vortex-buffer = { workspace = true } vortex-bytebool = { workspace = true } vortex-datetime-parts = { workspace = true } From 73c05cf613990fb22fd040e87a9bcc2e7d661510 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Fri, 6 Feb 2026 12:22:12 +0000 Subject: [PATCH 3/5] don't use custom compressor Signed-off-by: Onur Satici --- Cargo.lock | 6 +- fuzz/Cargo.toml | 2 +- fuzz/fuzz_targets/file_io.rs | 3 +- fuzz/src/array/mod.rs | 13 +- vortex-bench/src/lib.rs | 3 +- vortex-btrblocks/Cargo.toml | 5 + vortex-btrblocks/src/builder.rs | 18 +- vortex-btrblocks/src/compressor/float/mod.rs | 34 +++ .../src/compressor/integer/mod.rs | 52 ++++ vortex-btrblocks/src/compressor/string.rs | 35 +++ vortex-file/Cargo.toml | 2 +- vortex-file/src/strategy.rs | 82 ++--- vortex-file/src/strategy_gpu_zstd.rs | 100 ------- vortex-layout/Cargo.toml | 5 +- vortex-layout/src/layouts/compact.rs | 280 ------------------ vortex-layout/src/layouts/compressed.rs | 27 +- vortex-layout/src/layouts/mod.rs | 2 - vortex-layout/src/layouts/table.rs | 14 +- vortex-python/src/io.rs | 15 +- vortex-tui/src/convert.rs | 3 +- vortex/Cargo.toml | 2 +- vortex/examples/tracing_vortex.rs | 5 +- vortex/src/lib.rs | 12 +- 23 files changed, 199 insertions(+), 521 deletions(-) delete mode 100644 vortex-file/src/strategy_gpu_zstd.rs delete mode 100644 vortex-layout/src/layouts/compact.rs diff --git a/Cargo.lock b/Cargo.lock index 20f401e24a9..00278955a6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10327,6 +10327,7 @@ dependencies = [ "getrandom 0.3.4", "itertools 0.14.0", "num-traits", + "pco", "rand 0.9.2", "rustc-hash", "test-with", @@ -10342,12 +10343,14 @@ dependencies = [ "vortex-fastlanes", "vortex-fsst", "vortex-mask", + "vortex-pco", "vortex-runend", "vortex-scalar", "vortex-sequence", "vortex-sparse", "vortex-utils", "vortex-zigzag", + "vortex-zstd", ] [[package]] @@ -10860,7 +10863,6 @@ dependencies = [ "oneshot", "parking_lot", "paste", - "pco", "pin-project-lite", "prost 0.14.3", "rstest", @@ -10879,12 +10881,10 @@ dependencies = [ "vortex-io", "vortex-mask", "vortex-metrics", - "vortex-pco", "vortex-scalar", "vortex-sequence", "vortex-session", "vortex-utils", - "vortex-zstd", ] [[package]] diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index 20427f1ad23..05ff892d958 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -21,7 +21,7 @@ cargo-fuzz = true default = ["native"] native = ["libfuzzer-sys", "zstd", "vortex-file", "vortex/files"] wasmfuzz = [] -zstd = ["vortex/zstd"] +zstd = ["vortex/zstd", "vortex-btrblocks/zstd", "vortex-btrblocks/pco"] cuda = ["vortex-cuda", "vortex/cuda", "tokio"] [dependencies] diff --git a/fuzz/fuzz_targets/file_io.rs b/fuzz/fuzz_targets/file_io.rs index b508f6eccb2..8eb82c56a3e 100644 --- a/fuzz/fuzz_targets/file_io.rs +++ b/fuzz/fuzz_targets/file_io.rs @@ -28,7 +28,6 @@ use vortex_fuzz::CompressorStrategy; use vortex_fuzz::FuzzFileAction; use vortex_fuzz::RUNTIME; use vortex_fuzz::SESSION; -use vortex_layout::layouts::compact::CompactCompressor; use vortex_utils::aliases::DefaultHashBuilder; use vortex_utils::aliases::hash_set::HashSet; @@ -62,7 +61,7 @@ fuzz_target!(|fuzz: FuzzFileAction| -> Corpus { CompressorStrategy::Default => SESSION.write_options(), CompressorStrategy::Compact => { let strategy = WriteStrategyBuilder::default() - .with_compressor(CompactCompressor::default()) + .with_compact_encodings() .build(); SESSION.write_options().with_strategy(strategy) } diff --git a/fuzz/src/array/mod.rs b/fuzz/src/array/mod.rs index 513c347dd7c..feabc4f249a 100644 --- a/fuzz/src/array/mod.rs +++ b/fuzz/src/array/mod.rs @@ -517,15 +517,22 @@ fn random_action_from_list( /// Compress an array using the given strategy. #[cfg(feature = "zstd")] pub fn compress_array(array: &dyn Array, strategy: CompressorStrategy) -> ArrayRef { - use vortex_layout::layouts::compact::CompactCompressor; + use vortex_btrblocks::BtrBlocksCompressorBuilder; + use vortex_btrblocks::FloatCode; + use vortex_btrblocks::IntCode; + use vortex_btrblocks::StringCode; match strategy { CompressorStrategy::Default => BtrBlocksCompressor::default() .compress(array) .vortex_expect("BtrBlocksCompressor compress should succeed in fuzz test"), - CompressorStrategy::Compact => CompactCompressor::default() + CompressorStrategy::Compact => BtrBlocksCompressorBuilder::default() + .include_string([StringCode::Zstd]) + .include_int([IntCode::Pco]) + .include_float([FloatCode::Pco]) + .build() .compress(array) - .vortex_expect("CompactCompressor compress should succeed in fuzz test"), + .vortex_expect("Compact compress should succeed in fuzz test"), } } diff --git a/vortex-bench/src/lib.rs b/vortex-bench/src/lib.rs index 5db8b34736e..b847d85a8b3 100644 --- a/vortex-bench/src/lib.rs +++ b/vortex-bench/src/lib.rs @@ -29,7 +29,6 @@ use vortex::error::VortexExpect; use vortex::error::vortex_err; use vortex::file::VortexWriteOptions; use vortex::file::WriteStrategyBuilder; -use vortex::layout::layouts::compact::CompactCompressor; use vortex::utils::aliases::hash_map::HashMap; pub mod benchmark; @@ -213,7 +212,7 @@ impl CompactionStrategy { match self { CompactionStrategy::Compact => options.with_strategy( WriteStrategyBuilder::default() - .with_compressor(CompactCompressor::default()) + .with_compact_encodings() .build(), ), CompactionStrategy::Default => options, diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 679a81f33da..821c5326feb 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -36,7 +36,10 @@ vortex-scalar = { workspace = true } vortex-sequence = { workspace = true } vortex-sparse = { workspace = true } vortex-utils = { workspace = true } +pco = { workspace = true, optional = true } +vortex-pco = { workspace = true, optional = true } vortex-zigzag = { workspace = true } +vortex-zstd = { workspace = true, optional = true } [dev-dependencies] divan = { workspace = true } @@ -47,6 +50,8 @@ vortex-array = { workspace = true, features = ["_test-harness"] } [features] # This feature enabled unstable encodings for which we don't guarantee stability. unstable_encodings = [] +pco = ["dep:pco", "dep:vortex-pco"] +zstd = ["dep:vortex-zstd"] [lints] workspace = true diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index 82829feeb4b..1ba5fd57f09 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -51,9 +51,21 @@ pub struct BtrBlocksCompressorBuilder { impl Default for BtrBlocksCompressorBuilder { fn default() -> Self { Self { - int_schemes: ALL_INT_SCHEMES.iter().copied().collect(), - float_schemes: ALL_FLOAT_SCHEMES.iter().copied().collect(), - string_schemes: ALL_STRING_SCHEMES.iter().copied().collect(), + int_schemes: ALL_INT_SCHEMES + .iter() + .copied() + .filter(|s| s.code() != IntCode::Pco) + .collect(), + float_schemes: ALL_FLOAT_SCHEMES + .iter() + .copied() + .filter(|s| s.code() != FloatCode::Pco) + .collect(), + string_schemes: ALL_STRING_SCHEMES + .iter() + .copied() + .filter(|s| s.code() != StringCode::Zstd) + .collect(), } } } diff --git a/vortex-btrblocks/src/compressor/float/mod.rs b/vortex-btrblocks/src/compressor/float/mod.rs index 9e9a48e70ea..63b986f7598 100644 --- a/vortex-btrblocks/src/compressor/float/mod.rs +++ b/vortex-btrblocks/src/compressor/float/mod.rs @@ -77,6 +77,8 @@ pub const ALL_FLOAT_SCHEMES: &[&dyn FloatScheme] = &[ &DictScheme, &NullDominated, &RLE_FLOAT_SCHEME, + #[cfg(feature = "pco")] + &PcoScheme, ]; /// [`Compressor`] for floating-point numbers. @@ -142,6 +144,8 @@ pub enum FloatCode { Rle, /// Sparse encoding for null-dominated arrays. Sparse, + /// Pco (pcodec) compression for floats. + Pco, } #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -162,6 +166,11 @@ struct DictScheme; #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct NullDominated; +/// Pco (pcodec) compression for floats. +#[cfg(feature = "pco")] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct PcoScheme; + /// Configuration for float RLE compression. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct FloatRLEConfig; @@ -520,6 +529,31 @@ impl Scheme for NullDominated { } } +#[cfg(feature = "pco")] +impl Scheme for PcoScheme { + type StatsType = FloatStats; + type CodeType = FloatCode; + + fn code(&self) -> FloatCode { + FloatCode::Pco + } + + fn compress( + &self, + _compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + _ctx: CompressorContext, + _excludes: &[FloatCode], + ) -> VortexResult { + Ok(vortex_pco::PcoArray::from_primitive( + stats.source(), + pco::DEFAULT_COMPRESSION_LEVEL, + 8192, + )? + .into_array()) + } +} + #[cfg(test)] mod tests { diff --git a/vortex-btrblocks/src/compressor/integer/mod.rs b/vortex-btrblocks/src/compressor/integer/mod.rs index 0cd0b3e4544..deba5c600bf 100644 --- a/vortex-btrblocks/src/compressor/integer/mod.rs +++ b/vortex-btrblocks/src/compressor/integer/mod.rs @@ -62,6 +62,8 @@ pub const ALL_INT_SCHEMES: &[&dyn IntegerScheme] = &[ &RunEndScheme, &SequenceScheme, &RLE_INTEGER_SCHEME, + #[cfg(feature = "pco")] + &PcoScheme, ]; /// [`Compressor`] for signed and unsigned integers. @@ -156,6 +158,8 @@ pub enum IntCode { Sequence, /// RLE encoding - generic run-length encoding. Rle, + /// Pco (pcodec) compression for integers. + Pco, } #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -188,6 +192,11 @@ pub struct RunEndScheme; #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct SequenceScheme; +/// Pco (pcodec) compression for integers. +#[cfg(feature = "pco")] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct PcoScheme; + /// Threshold for the average run length in an array before we consider run-end encoding. const RUN_END_THRESHOLD: u32 = 4; @@ -818,6 +827,49 @@ impl Scheme for SequenceScheme { } } +#[cfg(feature = "pco")] +impl Scheme for PcoScheme { + type StatsType = IntegerStats; + type CodeType = IntCode; + + fn code(&self) -> IntCode { + IntCode::Pco + } + + fn expected_compression_ratio( + &self, + compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + ctx: CompressorContext, + excludes: &[IntCode], + ) -> VortexResult { + // Pco does not support I8 or U8. + if matches!( + stats.src.ptype(), + vortex_dtype::PType::I8 | vortex_dtype::PType::U8 + ) { + return Ok(0.0); + } + + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + } + + fn compress( + &self, + _compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + _ctx: CompressorContext, + _excludes: &[IntCode], + ) -> VortexResult { + Ok(vortex_pco::PcoArray::from_primitive( + stats.source(), + pco::DEFAULT_COMPRESSION_LEVEL, + 8192, + )? + .into_array()) + } +} + #[cfg(test)] mod tests { use std::iter; diff --git a/vortex-btrblocks/src/compressor/string.rs b/vortex-btrblocks/src/compressor/string.rs index 8d3af78c414..bfaa68f6f60 100644 --- a/vortex-btrblocks/src/compressor/string.rs +++ b/vortex-btrblocks/src/compressor/string.rs @@ -124,6 +124,8 @@ pub const ALL_STRING_SCHEMES: &[&dyn StringScheme] = &[ &FSSTScheme, &ConstantScheme, &NullDominated, + #[cfg(feature = "zstd")] + &ZstdScheme, ]; /// [`Compressor`] for strings. @@ -209,6 +211,11 @@ pub struct ConstantScheme; #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct NullDominated; +/// Zstd compression without dictionaries (nvCOMP compatible). +#[cfg(feature = "zstd")] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct ZstdScheme; + /// Unique identifier for string compression schemes. #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)] pub enum StringCode { @@ -222,6 +229,8 @@ pub enum StringCode { Constant, /// Sparse encoding for null-dominated arrays. Sparse, + /// Zstd compression without dictionaries. + Zstd, } impl Scheme for UncompressedScheme { @@ -502,6 +511,32 @@ impl Scheme for NullDominated { } } +#[cfg(feature = "zstd")] +impl Scheme for ZstdScheme { + type StatsType = StringStats; + type CodeType = StringCode; + + fn code(&self) -> StringCode { + StringCode::Zstd + } + + fn compress( + &self, + _compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + _ctx: CompressorContext, + _excludes: &[StringCode], + ) -> VortexResult { + let zstd = vortex_zstd::ZstdArray::from_var_bin_view_without_dict(stats.source(), 3, 8192)? + .into_array(); + if zstd.nbytes() < stats.source().nbytes() { + Ok(zstd) + } else { + Ok(stats.source().to_array()) + } + } +} + #[cfg(test)] mod tests { use vortex_array::arrays::VarBinViewArray; diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index e859c515430..9d32f9412e0 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -79,7 +79,7 @@ tokio = [ "vortex-io/tokio", "vortex-layout/tokio", ] -zstd = ["dep:vortex-zstd", "vortex-layout/zstd"] +zstd = ["dep:vortex-zstd", "vortex-btrblocks/zstd", "vortex-btrblocks/pco"] [package.metadata.cargo-machete] ignored = ["getrandom_v03", "uuid"] diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index 5ac6df8e26c..0803e7df5e8 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -6,14 +6,8 @@ use std::sync::Arc; use std::sync::LazyLock; -#[cfg(feature = "zstd")] -#[path = "strategy_gpu_zstd.rs"] -mod strategy_gpu_zstd; - // Compressed encodings from encoding crates // Canonical array encodings from vortex-array -#[cfg(feature = "zstd")] -use strategy_gpu_zstd::GpuCompatibleCompressor; use vortex_alp::ALPRDVTable; use vortex_alp::ALPVTable; use vortex_array::arrays::BoolVTable; @@ -120,47 +114,6 @@ pub static ALLOWED_ENCODINGS: LazyLock = LazyLock::new(|| { registry }); -/// Static registry of array encodings compatible with CUDA kernel execution. -/// -/// This includes all canonical encodings needed to represent nested array trees, plus -/// compressed encodings that have CUDA kernel implementations in `vortex-cuda`. -#[cfg(feature = "zstd")] -pub static GPU_ALLOWED_ENCODINGS: LazyLock = LazyLock::new(|| { - let registry = ArrayRegistry::default(); - - // Canonical encodings from vortex-array - registry.register(NullVTable::ID, NullVTable); - registry.register(BoolVTable::ID, BoolVTable); - registry.register(PrimitiveVTable::ID, PrimitiveVTable); - registry.register(DecimalVTable::ID, DecimalVTable); - registry.register(VarBinVTable::ID, VarBinVTable); - registry.register(VarBinViewVTable::ID, VarBinViewVTable); - registry.register(ListVTable::ID, ListVTable); - registry.register(ListViewVTable::ID, ListViewVTable); - registry.register(FixedSizeListVTable::ID, FixedSizeListVTable); - registry.register(StructVTable::ID, StructVTable); - registry.register(ExtensionVTable::ID, ExtensionVTable); - registry.register(ChunkedVTable::ID, ChunkedVTable); - registry.register(ConstantVTable::ID, ConstantVTable); - registry.register(MaskedVTable::ID, MaskedVTable); - registry.register(DictVTable::ID, DictVTable); - - // Compressed encodings with CUDA kernel support - registry.register(ALPVTable::ID, ALPVTable); - registry.register(BitPackedVTable::ID, BitPackedVTable); - registry.register(DateTimePartsVTable::ID, DateTimePartsVTable); - registry.register(DecimalBytePartsVTable::ID, DecimalBytePartsVTable); - registry.register(FoRVTable::ID, FoRVTable); - registry.register(RunEndVTable::ID, RunEndVTable); - registry.register(SequenceVTable::ID, SequenceVTable); - registry.register(ZigZagVTable::ID, ZigZagVTable); - - #[cfg(feature = "zstd")] - registry.register(ZstdVTable::ID, ZstdVTable); - - registry -}); - /// Build a new [writer strategy][LayoutStrategy] to compress and reorganize chunks of a Vortex file. /// /// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk, @@ -221,26 +174,35 @@ impl WriteStrategyBuilder { /// Configure a write strategy that emits only CUDA-compatible encodings. /// - /// This keeps the default write layout pipeline, but: - /// - Restricts flat-layout normalization to [`GPU_ALLOWED_ENCODINGS`] - /// - Configures BtrBlocks to exclude schemes without CUDA kernel support + /// This configures BtrBlocks to use Zstd for strings and exclude schemes + /// without CUDA kernel support. #[cfg(feature = "zstd")] pub fn with_cuda_compatible_encodings(mut self) -> Self { let btrblocks = BtrBlocksCompressorBuilder::default() + .include_string([StringCode::Zstd]) .exclude_int([IntCode::Sparse, IntCode::Rle]) .exclude_float([FloatCode::AlpRd, FloatCode::Rle, FloatCode::Sparse]) - // Keep string schemes disabled in btrblocks; when `zstd` feature is enabled, we - // separately encode string/binary leaves as Zstd (without dictionaries). - .exclude_string([ - StringCode::Dict, - StringCode::Fsst, - StringCode::Constant, - StringCode::Sparse, - ]) + .exclude_string([StringCode::Dict, StringCode::Fsst]) + .build(); + + self.compressor = Some(Arc::new(btrblocks)); + self + } + + /// Configure a write strategy that uses compact encodings (Pco for numerics, Zstd for + /// strings/binary). + /// + /// This provides better compression ratios than the default BtrBlocks strategy, + /// especially for floating-point heavy datasets. + #[cfg(feature = "zstd")] + pub fn with_compact_encodings(mut self) -> Self { + let btrblocks = BtrBlocksCompressorBuilder::default() + .include_string([StringCode::Zstd]) + .include_int([IntCode::Pco]) + .include_float([FloatCode::Pco]) .build(); - self.compressor = Some(Arc::new(GpuCompatibleCompressor::new(btrblocks))); - self.allow_encodings = Some((*GPU_ALLOWED_ENCODINGS).clone()); + self.compressor = Some(Arc::new(btrblocks)); self } diff --git a/vortex-file/src/strategy_gpu_zstd.rs b/vortex-file/src/strategy_gpu_zstd.rs deleted file mode 100644 index 20589a1d6f1..00000000000 --- a/vortex-file/src/strategy_gpu_zstd.rs +++ /dev/null @@ -1,100 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use vortex_array::Array; -use vortex_array::ArrayRef; -use vortex_array::Canonical; -use vortex_array::IntoArray; -use vortex_array::arrays::ExtensionArray; -use vortex_array::arrays::FixedSizeListArray; -use vortex_array::arrays::ListViewArray; -use vortex_array::arrays::StructArray; -use vortex_btrblocks::BtrBlocksCompressor; -use vortex_error::VortexResult; -use vortex_layout::layouts::compressed::CompressorPlugin; -use vortex_zstd::ZstdArray; - -const GPU_ZSTD_LEVEL: i32 = 3; -const GPU_ZSTD_VALUES_PER_PAGE: usize = 8192; - -#[derive(Clone)] -pub(super) struct GpuCompatibleCompressor { - btrblocks: BtrBlocksCompressor, -} - -impl GpuCompatibleCompressor { - pub(super) const fn new(btrblocks: BtrBlocksCompressor) -> Self { - Self { btrblocks } - } - - fn compress_canonical(&self, canonical: Canonical) -> VortexResult { - match canonical { - // Use nvcomp-compatible zstd (without dictionary) for string/binary leaves. - Canonical::VarBinView(vbv) => { - let zstd = ZstdArray::from_var_bin_view_without_dict( - &vbv, - GPU_ZSTD_LEVEL, - GPU_ZSTD_VALUES_PER_PAGE, - )? - .into_array(); - if zstd.nbytes() < vbv.nbytes() { - Ok(zstd) - } else { - Ok(vbv.into_array()) - } - } - Canonical::Struct(struct_array) => { - let fields = struct_array - .unmasked_fields() - .iter() - .map(|field| self.compress_canonical(field.to_canonical()?)) - .collect::>>()?; - - Ok(StructArray::try_new( - struct_array.names().clone(), - fields, - struct_array.len(), - struct_array.validity()?, - )? - .into_array()) - } - Canonical::List(list_view) => { - let compressed_elems = - self.compress_canonical(list_view.elements().to_canonical()?)?; - let compressed_offsets = - self.compress_canonical(list_view.offsets().to_canonical()?)?; - let compressed_sizes = - self.compress_canonical(list_view.sizes().to_canonical()?)?; - - Ok(ListViewArray::try_new( - compressed_elems, - compressed_offsets, - compressed_sizes, - list_view.validity()?, - )? - .into_array()) - } - Canonical::FixedSizeList(fsl) => { - let compressed_elems = self.compress_canonical(fsl.elements().to_canonical()?)?; - Ok(FixedSizeListArray::try_new( - compressed_elems, - fsl.list_size(), - fsl.validity()?, - fsl.len(), - )? - .into_array()) - } - Canonical::Extension(ext) => { - let compressed_storage = self.compress_canonical(ext.storage().to_canonical()?)?; - Ok(ExtensionArray::new(ext.ext_dtype().clone(), compressed_storage).into_array()) - } - other => self.btrblocks.compress(other.as_ref()), - } - } -} - -impl CompressorPlugin for GpuCompatibleCompressor { - fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult { - self.compress_canonical(chunk.to_canonical()?) - } -} diff --git a/vortex-layout/Cargo.toml b/vortex-layout/Cargo.toml index b57815b84e9..54fbad5e38e 100644 --- a/vortex-layout/Cargo.toml +++ b/vortex-layout/Cargo.toml @@ -29,7 +29,6 @@ once_cell = { workspace = true, features = ["parking_lot"] } oneshot = { workspace = true } parking_lot = { workspace = true } paste = { workspace = true } -pco = { workspace = true } pin-project-lite = { workspace = true } prost = { workspace = true } rustc-hash = { workspace = true } @@ -47,12 +46,10 @@ vortex-flatbuffers = { workspace = true, features = ["layout"] } vortex-io = { workspace = true } vortex-mask = { workspace = true } vortex-metrics = { workspace = true } -vortex-pco = { workspace = true } vortex-scalar = { workspace = true } vortex-sequence = { workspace = true } vortex-session = { workspace = true } vortex-utils = { workspace = true, features = ["dashmap"] } -vortex-zstd = { workspace = true, optional = true } [dev-dependencies] futures = { workspace = true, features = ["executor"] } @@ -65,7 +62,7 @@ vortex-utils = { workspace = true, features = ["_test-harness"] } [features] _test-harness = [] tokio = ["dep:tokio", "vortex-error/tokio"] -zstd = ["dep:vortex-zstd"] + [lints] workspace = true diff --git a/vortex-layout/src/layouts/compact.rs b/vortex-layout/src/layouts/compact.rs deleted file mode 100644 index 8163a32e4ae..00000000000 --- a/vortex-layout/src/layouts/compact.rs +++ /dev/null @@ -1,280 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use vortex_array::Array; -use vortex_array::ArrayRef; -use vortex_array::Canonical; -use vortex_array::IntoArray; -use vortex_array::ToCanonical; -use vortex_array::arrays::ExtensionArray; -use vortex_array::arrays::FixedSizeListArray; -use vortex_array::arrays::ListViewArray; -use vortex_array::arrays::PrimitiveArray; -use vortex_array::arrays::StructArray; -use vortex_array::arrays::narrowed_decimal; -use vortex_array::vtable::ValidityHelper; -use vortex_decimal_byte_parts::DecimalBytePartsArray; -use vortex_dtype::PType; -use vortex_error::VortexResult; -use vortex_pco::PcoArray; -use vortex_scalar::DecimalType; -use vortex_zstd::ZstdArray; - -fn is_pco_number_type(ptype: PType) -> bool { - matches!( - ptype, - PType::F16 - | PType::F32 - | PType::F64 - | PType::I16 - | PType::I32 - | PType::I64 - | PType::U16 - | PType::U32 - | PType::U64 - ) -} - -/// A simple compressor that uses the "compact" strategy: -/// - Pco for supported numeric types (16, 32, and 64-bit floats and ints) -/// - Zstd for everything else (primitive arrays only) -#[derive(Debug, Clone)] -pub struct CompactCompressor { - pco_level: usize, - zstd_level: i32, - /// Dictionaries are trained from previously seen frames to improve compression ratio. - /// nvCOMP though doesn't support ZSTD dictionaries. Therefore, we need the option to - /// disable them for compatibility. - zstd_use_dicts: bool, - zstd_values_per_page: usize, -} - -impl CompactCompressor { - pub fn with_pco_level(mut self, level: usize) -> Self { - self.pco_level = level; - self - } - - pub fn with_zstd_level(mut self, level: i32) -> Self { - self.zstd_level = level; - self - } - - pub fn with_zstd_use_dicts(mut self, use_dicts: bool) -> Self { - self.zstd_use_dicts = use_dicts; - self - } - - /// Sets the number of non-null primitive values to store per - /// separately-decompressible page/frame. - /// - /// Fewer values per page can reduce the time to query a small slice of rows, but too - /// few can increase compressed size and (de)compression time. The default is 0, which - /// is used for maximally-large pages. - pub fn with_zstd_values_per_page(mut self, values_per_page: usize) -> Self { - self.zstd_values_per_page = values_per_page; - self - } - - pub fn compress(&self, array: &dyn Array) -> VortexResult { - self.compress_canonical(array.to_canonical()?) - } - - /// Compress a single array using the compact strategy - pub fn compress_canonical(&self, canonical: Canonical) -> VortexResult { - let uncompressed_nbytes = canonical.as_ref().nbytes(); - let compressed = match &canonical { - // TODO compress BoolArrays - Canonical::Primitive(primitive) => { - // pco for applicable numbers, zstd for everything else - let ptype = primitive.ptype(); - - if is_pco_number_type(ptype) { - let pco_array = PcoArray::from_primitive( - primitive, - self.pco_level, - self.zstd_values_per_page, - )?; - pco_array.into_array() - } else { - let zstd_array = if self.zstd_use_dicts { - ZstdArray::from_primitive( - primitive, - self.zstd_level, - self.zstd_values_per_page, - )? - } else { - ZstdArray::from_primitive_without_dict( - primitive, - self.zstd_level, - self.zstd_values_per_page, - )? - }; - zstd_array.into_array() - } - } - Canonical::Decimal(decimal) => { - let decimal = narrowed_decimal(decimal.clone()); - let validity = decimal.validity(); - let int_values = match decimal.values_type() { - DecimalType::I8 => { - PrimitiveArray::new(decimal.buffer::(), validity.clone()) - } - DecimalType::I16 => { - PrimitiveArray::new(decimal.buffer::(), validity.clone()) - } - DecimalType::I32 => { - PrimitiveArray::new(decimal.buffer::(), validity.clone()) - } - DecimalType::I64 => { - PrimitiveArray::new(decimal.buffer::(), validity.clone()) - } - _ => { - // Vortex lacks support for i128 and i256. - return Ok(canonical.into_array()); - } - }; - let compressed = self.compress_canonical(Canonical::Primitive(int_values))?; - DecimalBytePartsArray::try_new(compressed, decimal.decimal_dtype())?.to_array() - } - Canonical::VarBinView(vbv) => { - // always zstd - if self.zstd_use_dicts { - ZstdArray::from_var_bin_view(vbv, self.zstd_level, self.zstd_values_per_page)? - .into_array() - } else { - ZstdArray::from_var_bin_view_without_dict( - vbv, - self.zstd_level, - self.zstd_values_per_page, - )? - .into_array() - } - } - Canonical::Struct(struct_array) => { - // recurse - let fields = struct_array - .unmasked_fields() - .iter() - .map(|field| self.compress(field)) - .collect::>>()?; - - StructArray::try_new( - struct_array.names().clone(), - fields, - struct_array.len(), - struct_array.validity().clone(), - )? - .into_array() - } - Canonical::List(listview) => { - let compressed_elems = self.compress(listview.elements())?; - - // Note that since the type of our offsets and sizes is not encoded in our `DType`, - // we can narrow the widths. - let compressed_offsets = - self.compress(&listview.offsets().to_primitive().narrow()?.into_array())?; - let compressed_sizes = - self.compress(&listview.sizes().to_primitive().narrow()?.into_array())?; - - // SAFETY: Since compression does not change the logical values of arrays, this is - // effectively the same array but represented differently, so all invariants that - // were previously upheld by the valid `ListViewArray` are still upheld. - // If the original was zero-copyable to list, compression maintains that property. - unsafe { - ListViewArray::new_unchecked( - compressed_elems, - compressed_offsets, - compressed_sizes, - listview.validity().clone(), - ) - .with_zero_copy_to_list(listview.is_zero_copy_to_list()) - } - .into_array() - } - Canonical::FixedSizeList(fsl) => { - let compressed_elems = self.compress(fsl.elements())?; - - FixedSizeListArray::try_new( - compressed_elems, - fsl.list_size(), - fsl.validity().clone(), - fsl.len(), - )? - .into_array() - } - Canonical::Extension(ext_array) => { - let compressed_storage = self.compress(ext_array.storage())?; - - ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage).into_array() - } - _ => return Ok(canonical.into_array()), - }; - - if compressed.nbytes() >= uncompressed_nbytes { - return Ok(canonical.into_array()); - } - Ok(compressed) - } -} - -impl Default for CompactCompressor { - fn default() -> Self { - Self { - pco_level: pco::DEFAULT_COMPRESSION_LEVEL, - zstd_level: 3, - zstd_use_dicts: true, - // This is probably high enough to not hurt performance or - // compression. It also currently aligns with the default strategy's - // number of rows per statistic, which allows efficient pushdown - // (but nothing enforces this). - zstd_values_per_page: 8192, - } - } -} - -#[cfg(test)] -mod tests { - use vortex_array::IntoArray; - use vortex_array::arrays::PrimitiveArray; - use vortex_array::arrays::StructArray; - use vortex_array::assert_arrays_eq; - use vortex_array::validity::Validity; - use vortex_buffer::buffer; - use vortex_dtype::FieldName; - - use super::*; - - #[test] - fn test_compact_compressor_struct_with_mixed_types() { - let compressor = CompactCompressor::default(); - - // Create a struct array containing various types - let columns = [ - // Pco types - PrimitiveArray::new(buffer![1.0f64, 2.0, 3.0, 4.0, 5.0], Validity::NonNullable), - PrimitiveArray::new(buffer![10i32, 20, 30, 40, 50], Validity::NonNullable), - // Zstd types - PrimitiveArray::new(buffer![11u8, 22, 33, 44, 55], Validity::NonNullable), - ] - .iter() - .map(|a| a.clone().into_array()) - .collect::>(); - let field_names: Vec = - vec!["f64_field".into(), "i32_field".into(), "u8_field".into()]; - - let n_rows = columns[0].len(); - let struct_array = StructArray::try_new( - field_names.into(), - columns.clone(), - n_rows, - Validity::NonNullable, - ) - .unwrap(); - - // Compress the struct array - let compressed = compressor.compress(struct_array.as_ref()).unwrap(); - - assert_arrays_eq!(struct_array, compressed) - } -} diff --git a/vortex-layout/src/layouts/compressed.rs b/vortex-layout/src/layouts/compressed.rs index 9f45b5b25df..de672715d9e 100644 --- a/vortex-layout/src/layouts/compressed.rs +++ b/vortex-layout/src/layouts/compressed.rs @@ -25,10 +25,7 @@ use crate::sequence::SequentialStreamExt; /// A boxed compressor function from arrays into compressed arrays. /// -/// Both the balanced `BtrBlocksCompressor` and the size-optimized `CompactCompressor` -/// meet this interface. -/// -/// API consumers are also free to implement this trait to provide new plugin compressors. +/// API consumers are free to implement this trait to provide new plugin compressors. pub trait CompressorPlugin: Send + Sync + 'static { fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult; } @@ -54,13 +51,6 @@ impl CompressorPlugin for BtrBlocksCompressor { } } -#[cfg(feature = "zstd")] -impl CompressorPlugin for crate::layouts::compact::CompactCompressor { - fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult { - self.compress(chunk) - } -} - /// A layout writer that compresses chunks. #[derive(Clone)] pub struct CompressingStrategy { @@ -87,21 +77,6 @@ impl CompressingStrategy { Self::new(child, Arc::new(compressor)) } - /// Create a new writer that compresses using a `CompactCompressor` to compress chunks. - /// - /// This may create smaller files than the BtrBlocks writer, in exchange for some penalty - /// to decoding performance. This is only recommended for datasets that make heavy use of - /// floating point numbers. - /// - /// [`CompactCompressor`]: crate::layouts::compact::CompactCompressor - #[cfg(feature = "zstd")] - pub fn new_compact( - child: S, - compressor: crate::layouts::compact::CompactCompressor, - ) -> Self { - Self::new(child, Arc::new(compressor)) - } - /// Create a new compressor from a plugin interface. pub fn new_opaque(child: S, compressor: C) -> Self { Self::new(child, Arc::new(compressor)) diff --git a/vortex-layout/src/layouts/mod.rs b/vortex-layout/src/layouts/mod.rs index 61193c54b6e..03b29a97a8f 100644 --- a/vortex-layout/src/layouts/mod.rs +++ b/vortex-layout/src/layouts/mod.rs @@ -11,8 +11,6 @@ use vortex_error::SharedVortexResult; pub mod buffered; pub mod chunked; pub mod collect; -#[cfg(feature = "zstd")] -pub mod compact; pub mod compressed; pub mod dict; pub mod file_stats; diff --git a/vortex-layout/src/layouts/table.rs b/vortex-layout/src/layouts/table.rs index faff6190191..1d444be33b2 100644 --- a/vortex-layout/src/layouts/table.rs +++ b/vortex-layout/src/layouts/table.rs @@ -85,27 +85,23 @@ impl TableStrategy { /// ```ignore /// # use std::sync::Arc; /// # use vortex_dtype::{field_path, Field, FieldPath}; - /// # use vortex_layout::layouts::compact::CompactCompressor; /// # use vortex_layout::layouts::compressed::CompressingStrategy; /// # use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; /// # use vortex_layout::layouts::table::TableStrategy; /// /// // A strategy for compressing data using the balanced BtrBlocks compressor. - /// let compress_btrblocks = CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), true); - /// - /// // A strategy that compresses data using ZSTD - /// let compress_compact = CompressingStrategy::new_compact(FlatLayoutStrategy::default(), CompactCompressor::default()); + /// let compress = CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), true); /// /// // Our combined strategy uses no compression for validity buffers, BtrBlocks compression - /// // for most columns, and will use ZSTD compression for a nested binary column that we know - /// // is never filtered in. + /// // for most columns, and stores a nested binary column uncompressed (flat) because it + /// // is pre-compressed or never filtered on. /// let strategy = TableStrategy::new( /// Arc::new(FlatLayoutStrategy::default()), - /// Arc::new(compress_btrblocks), + /// Arc::new(compress), /// ) /// .with_field_writer( /// field_path!(request.body.bytes), - /// Arc::new(compress_compact), + /// Arc::new(FlatLayoutStrategy::default()), /// ); /// ``` pub fn with_field_writer( diff --git a/vortex-python/src/io.rs b/vortex-python/src/io.rs index 22743e4828d..3f2b9dc2e04 100644 --- a/vortex-python/src/io.rs +++ b/vortex-python/src/io.rs @@ -15,7 +15,6 @@ use vortex::array::arrow::FromArrowArray; use vortex::array::iter::ArrayIterator; use vortex::array::iter::ArrayIteratorAdapter; use vortex::array::iter::ArrayIteratorExt; -use vortex::compressor::CompactCompressor; use vortex::dtype::DType; use vortex::dtype::arrow::FromArrowType; use vortex::error::VortexError; @@ -246,9 +245,7 @@ pub fn write( /// :func:`vortex.io.write`. #[pyclass(name = "VortexWriteOptions", module = "io", frozen)] pub(crate) struct PyVortexWriteOptions { - // TODO(DK): This might need to be an Arc if we actually have multiple - // compressors. - compressor: Option, + use_compact_encodings: bool, } #[pymethods] @@ -256,7 +253,9 @@ impl PyVortexWriteOptions { /// Balance size, read-throughput, and read-latency. #[staticmethod] pub fn default() -> Self { - Self { compressor: None } + Self { + use_compact_encodings: false, + } } /// Prioritize small size over read-throughput and read-latency. @@ -297,7 +296,7 @@ impl PyVortexWriteOptions { #[staticmethod] pub fn compact() -> Self { Self { - compressor: Some(CompactCompressor::default()), + use_compact_encodings: true, } } @@ -350,8 +349,8 @@ impl PyVortexWriteOptions { ) -> PyVortexResult<()> { py.detach(|| { let mut strategy = WriteStrategyBuilder::default(); - if let Some(compressor) = self.compressor.as_ref() { - strategy = strategy.with_compressor(compressor.clone()) + if self.use_compact_encodings { + strategy = strategy.with_compact_encodings(); } let strategy = strategy.build(); TOKIO_RUNTIME.block_on(async move { diff --git a/vortex-tui/src/convert.rs b/vortex-tui/src/convert.rs index ce6858053a5..858e908ec2e 100644 --- a/vortex-tui/src/convert.rs +++ b/vortex-tui/src/convert.rs @@ -15,7 +15,6 @@ use tokio::io::AsyncWriteExt; use vortex::array::ArrayRef; use vortex::array::arrow::FromArrowArray; use vortex::array::stream::ArrayStreamAdapter; -use vortex::compressor::CompactCompressor; use vortex::dtype::DType; use vortex::dtype::arrow::FromArrowType; use vortex::error::VortexError; @@ -95,7 +94,7 @@ pub async fn exec_convert(session: &VortexSession, flags: ConvertArgs) -> anyhow let strategy = WriteStrategyBuilder::default(); let strategy = match flags.strategy { Strategy::Btrblocks => strategy, - Strategy::Compact => strategy.with_compressor(CompactCompressor::default()), + Strategy::Compact => strategy.with_compact_encodings(), }; let mut file = File::create(output_path).await?; diff --git a/vortex/Cargo.toml b/vortex/Cargo.toml index f37cc7a791a..745fb59d0d8 100644 --- a/vortex/Cargo.toml +++ b/vortex/Cargo.toml @@ -77,7 +77,7 @@ files = ["dep:vortex-file"] memmap2 = ["vortex-buffer/memmap2"] object_store = ["vortex-file/object_store", "vortex-io/object_store"] tokio = ["vortex-file/tokio"] -zstd = ["dep:vortex-zstd", "vortex-file/zstd", "vortex-layout/zstd"] +zstd = ["dep:vortex-zstd", "vortex-file/zstd"] pretty = ["vortex-array/table-display"] serde = [ "vortex-array/serde", diff --git a/vortex/examples/tracing_vortex.rs b/vortex/examples/tracing_vortex.rs index 8fc428e8de9..aa456a44f5f 100644 --- a/vortex/examples/tracing_vortex.rs +++ b/vortex/examples/tracing_vortex.rs @@ -38,7 +38,6 @@ use vortex::array::arrays::PrimitiveArray; use vortex::array::arrays::StructArray; use vortex::array::arrays::VarBinArray; use vortex::array::validity::Validity; -use vortex::compressor::CompactCompressor; use vortex::dtype::DType; use vortex::dtype::Nullability; use vortex::file::WriteStrategyBuilder; @@ -386,10 +385,10 @@ async fn write_batch_to_vortex( let file_path = output_dir.join(format!("traces_{:04}.vortex", file_index)); let mut file = tokio::fs::File::create(&file_path).await?; - // Use the write-optimized CompactCompressor for the telemetry files. + // Use compact encodings (Pco + Zstd) for the telemetry files. let write_opts = session.write_options().with_strategy( WriteStrategyBuilder::default() - .with_compressor(CompactCompressor::default()) + .with_compact_encodings() .build(), ); diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index f7b5a95ee25..28e03e1b4b8 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -33,8 +33,6 @@ pub mod compute2 { pub mod compressor { pub use vortex_btrblocks::BtrBlocksCompressor; - #[cfg(feature = "zstd")] - pub use vortex_layout::layouts::compact::CompactCompressor; } pub mod dtype { @@ -203,7 +201,6 @@ mod test { use vortex_file::OpenOptionsSessionExt; use vortex_file::WriteOptionsSessionExt; use vortex_file::WriteStrategyBuilder; - use vortex_layout::layouts::compact::CompactCompressor; use vortex_session::VortexSession; use crate as vortex; @@ -246,7 +243,6 @@ mod test { fn compress() -> VortexResult<()> { // [compress] use vortex::compressor::BtrBlocksCompressor; - use vortex::compressor::CompactCompressor; let array = PrimitiveArray::new(buffer![42u64; 100_000], Validity::NonNullable); @@ -257,12 +253,6 @@ mod test { compressed.nbytes(), array.nbytes() ); - - // Or apply generally stronger compression with the compact compressor - let compressed = CompactCompressor::default() - .with_zstd_values_per_page(8192) - .compress(array.as_ref())?; - println!("Compact size: {} / {}", compressed.nbytes(), array.nbytes()); // [compress] Ok(()) @@ -321,7 +311,7 @@ mod test { .write_options() .with_strategy( WriteStrategyBuilder::default() - .with_compressor(CompactCompressor::default()) + .with_compact_encodings() .build(), ) .write( From 735233c47fe03d84292bcd8a730bd5d7eb9233ac Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Fri, 6 Feb 2026 14:37:28 +0000 Subject: [PATCH 4/5] zstd Signed-off-by: Onur Satici --- vortex-btrblocks/src/compressor/string.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/vortex-btrblocks/src/compressor/string.rs b/vortex-btrblocks/src/compressor/string.rs index bfaa68f6f60..11da267f4fe 100644 --- a/vortex-btrblocks/src/compressor/string.rs +++ b/vortex-btrblocks/src/compressor/string.rs @@ -527,13 +527,10 @@ impl Scheme for ZstdScheme { _ctx: CompressorContext, _excludes: &[StringCode], ) -> VortexResult { - let zstd = vortex_zstd::ZstdArray::from_var_bin_view_without_dict(stats.source(), 3, 8192)? - .into_array(); - if zstd.nbytes() < stats.source().nbytes() { - Ok(zstd) - } else { - Ok(stats.source().to_array()) - } + Ok( + vortex_zstd::ZstdArray::from_var_bin_view_without_dict(stats.source(), 3, 8192)? + .into_array(), + ) } } From b1eb106877569c05989175ac9d52e3d42fedbaa4 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Fri, 6 Feb 2026 14:40:36 +0000 Subject: [PATCH 5/5] taplo Signed-off-by: Onur Satici --- vortex-btrblocks/Cargo.toml | 4 ++-- vortex-layout/Cargo.toml | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 821c5326feb..ccb457f00bc 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -18,6 +18,7 @@ enum-iterator = { workspace = true } getrandom_v03 = { workspace = true } itertools = { workspace = true } num-traits = { workspace = true } +pco = { workspace = true, optional = true } rand = { workspace = true } rustc-hash = { workspace = true } tracing = { workspace = true } @@ -31,13 +32,12 @@ vortex-error = { workspace = true } vortex-fastlanes = { workspace = true } vortex-fsst = { workspace = true } vortex-mask = { workspace = true } +vortex-pco = { workspace = true, optional = true } vortex-runend = { workspace = true } vortex-scalar = { workspace = true } vortex-sequence = { workspace = true } vortex-sparse = { workspace = true } vortex-utils = { workspace = true } -pco = { workspace = true, optional = true } -vortex-pco = { workspace = true, optional = true } vortex-zigzag = { workspace = true } vortex-zstd = { workspace = true, optional = true } diff --git a/vortex-layout/Cargo.toml b/vortex-layout/Cargo.toml index 54fbad5e38e..77f4c4a7ce9 100644 --- a/vortex-layout/Cargo.toml +++ b/vortex-layout/Cargo.toml @@ -63,7 +63,6 @@ vortex-utils = { workspace = true, features = ["_test-harness"] } _test-harness = [] tokio = ["dep:tokio", "vortex-error/tokio"] - [lints] workspace = true