diff --git a/Cargo.lock b/Cargo.lock index d44bf39ab22..ce9f9ad3db8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10346,6 +10346,7 @@ dependencies = [ "getrandom 0.3.4", "itertools 0.14.0", "num-traits", + "pco", "rand 0.9.2", "rustc-hash 2.1.1", "test-with", @@ -10361,12 +10362,14 @@ dependencies = [ "vortex-fastlanes", "vortex-fsst", "vortex-mask", + "vortex-pco", "vortex-runend", "vortex-scalar", "vortex-sequence", "vortex-sparse", "vortex-utils", "vortex-zigzag", + "vortex-zstd", ] [[package]] @@ -10715,6 +10718,7 @@ dependencies = [ "uuid", "vortex-alp", "vortex-array", + "vortex-btrblocks", "vortex-buffer", "vortex-bytebool", "vortex-cuda", @@ -10882,7 +10886,6 @@ dependencies = [ "oneshot", "parking_lot", "paste", - "pco", "pin-project-lite", "prost 0.14.3", "rstest", @@ -10899,15 +10902,12 @@ dependencies = [ "vortex-error", "vortex-flatbuffers", "vortex-io", - "vortex-layout", "vortex-mask", "vortex-metrics", - "vortex-pco", "vortex-scalar", "vortex-sequence", "vortex-session", "vortex-utils", - "vortex-zstd", ] [[package]] diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index 20427f1ad23..05ff892d958 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -21,7 +21,7 @@ cargo-fuzz = true default = ["native"] native = ["libfuzzer-sys", "zstd", "vortex-file", "vortex/files"] wasmfuzz = [] -zstd = ["vortex/zstd"] +zstd = ["vortex/zstd", "vortex-btrblocks/zstd", "vortex-btrblocks/pco"] cuda = ["vortex-cuda", "vortex/cuda", "tokio"] [dependencies] diff --git a/fuzz/fuzz_targets/file_io.rs b/fuzz/fuzz_targets/file_io.rs index b508f6eccb2..8eb82c56a3e 100644 --- a/fuzz/fuzz_targets/file_io.rs +++ b/fuzz/fuzz_targets/file_io.rs @@ -28,7 +28,6 @@ use vortex_fuzz::CompressorStrategy; use vortex_fuzz::FuzzFileAction; use vortex_fuzz::RUNTIME; use vortex_fuzz::SESSION; -use vortex_layout::layouts::compact::CompactCompressor; use vortex_utils::aliases::DefaultHashBuilder; use vortex_utils::aliases::hash_set::HashSet; @@ -62,7 +61,7 @@ fuzz_target!(|fuzz: FuzzFileAction| -> Corpus { CompressorStrategy::Default => SESSION.write_options(), CompressorStrategy::Compact => { let strategy = WriteStrategyBuilder::default() - .with_compressor(CompactCompressor::default()) + .with_compact_encodings() .build(); SESSION.write_options().with_strategy(strategy) } diff --git a/fuzz/src/array/mod.rs b/fuzz/src/array/mod.rs index 513c347dd7c..feabc4f249a 100644 --- a/fuzz/src/array/mod.rs +++ b/fuzz/src/array/mod.rs @@ -517,15 +517,22 @@ fn random_action_from_list( /// Compress an array using the given strategy. #[cfg(feature = "zstd")] pub fn compress_array(array: &dyn Array, strategy: CompressorStrategy) -> ArrayRef { - use vortex_layout::layouts::compact::CompactCompressor; + use vortex_btrblocks::BtrBlocksCompressorBuilder; + use vortex_btrblocks::FloatCode; + use vortex_btrblocks::IntCode; + use vortex_btrblocks::StringCode; match strategy { CompressorStrategy::Default => BtrBlocksCompressor::default() .compress(array) .vortex_expect("BtrBlocksCompressor compress should succeed in fuzz test"), - CompressorStrategy::Compact => CompactCompressor::default() + CompressorStrategy::Compact => BtrBlocksCompressorBuilder::default() + .include_string([StringCode::Zstd]) + .include_int([IntCode::Pco]) + .include_float([FloatCode::Pco]) + .build() .compress(array) - .vortex_expect("CompactCompressor compress should succeed in fuzz test"), + .vortex_expect("Compact compress should succeed in fuzz test"), } } diff --git a/vortex-bench/src/lib.rs b/vortex-bench/src/lib.rs index b3fd471ccf2..eed44ebe5e4 100644 --- a/vortex-bench/src/lib.rs +++ b/vortex-bench/src/lib.rs @@ -30,7 +30,6 @@ use vortex::error::VortexExpect; use vortex::error::vortex_err; use vortex::file::VortexWriteOptions; use vortex::file::WriteStrategyBuilder; -use vortex::layout::layouts::compact::CompactCompressor; use vortex::utils::aliases::hash_map::HashMap; pub mod benchmark; @@ -215,7 +214,7 @@ impl CompactionStrategy { match self { CompactionStrategy::Compact => options.with_strategy( WriteStrategyBuilder::default() - .with_compressor(CompactCompressor::default()) + .with_compact_encodings() .build(), ), CompactionStrategy::Default => options, diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 679a81f33da..ccb457f00bc 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -18,6 +18,7 @@ enum-iterator = { workspace = true } getrandom_v03 = { workspace = true } itertools = { workspace = true } num-traits = { workspace = true } +pco = { workspace = true, optional = true } rand = { workspace = true } rustc-hash = { workspace = true } tracing = { workspace = true } @@ -31,12 +32,14 @@ vortex-error = { workspace = true } vortex-fastlanes = { workspace = true } vortex-fsst = { workspace = true } vortex-mask = { workspace = true } +vortex-pco = { workspace = true, optional = true } vortex-runend = { workspace = true } vortex-scalar = { workspace = true } vortex-sequence = { workspace = true } vortex-sparse = { workspace = true } vortex-utils = { workspace = true } vortex-zigzag = { workspace = true } +vortex-zstd = { workspace = true, optional = true } [dev-dependencies] divan = { workspace = true } @@ -47,6 +50,8 @@ vortex-array = { workspace = true, features = ["_test-harness"] } [features] # This feature enabled unstable encodings for which we don't guarantee stability. unstable_encodings = [] +pco = ["dep:pco", "dep:vortex-pco"] +zstd = ["dep:vortex-zstd"] [lints] workspace = true diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index 82829feeb4b..1ba5fd57f09 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -51,9 +51,21 @@ pub struct BtrBlocksCompressorBuilder { impl Default for BtrBlocksCompressorBuilder { fn default() -> Self { Self { - int_schemes: ALL_INT_SCHEMES.iter().copied().collect(), - float_schemes: ALL_FLOAT_SCHEMES.iter().copied().collect(), - string_schemes: ALL_STRING_SCHEMES.iter().copied().collect(), + int_schemes: ALL_INT_SCHEMES + .iter() + .copied() + .filter(|s| s.code() != IntCode::Pco) + .collect(), + float_schemes: ALL_FLOAT_SCHEMES + .iter() + .copied() + .filter(|s| s.code() != FloatCode::Pco) + .collect(), + string_schemes: ALL_STRING_SCHEMES + .iter() + .copied() + .filter(|s| s.code() != StringCode::Zstd) + .collect(), } } } diff --git a/vortex-btrblocks/src/compressor/float/mod.rs b/vortex-btrblocks/src/compressor/float/mod.rs index 9e9a48e70ea..63b986f7598 100644 --- a/vortex-btrblocks/src/compressor/float/mod.rs +++ b/vortex-btrblocks/src/compressor/float/mod.rs @@ -77,6 +77,8 @@ pub const ALL_FLOAT_SCHEMES: &[&dyn FloatScheme] = &[ &DictScheme, &NullDominated, &RLE_FLOAT_SCHEME, + #[cfg(feature = "pco")] + &PcoScheme, ]; /// [`Compressor`] for floating-point numbers. @@ -142,6 +144,8 @@ pub enum FloatCode { Rle, /// Sparse encoding for null-dominated arrays. Sparse, + /// Pco (pcodec) compression for floats. + Pco, } #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -162,6 +166,11 @@ struct DictScheme; #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct NullDominated; +/// Pco (pcodec) compression for floats. +#[cfg(feature = "pco")] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct PcoScheme; + /// Configuration for float RLE compression. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct FloatRLEConfig; @@ -520,6 +529,31 @@ impl Scheme for NullDominated { } } +#[cfg(feature = "pco")] +impl Scheme for PcoScheme { + type StatsType = FloatStats; + type CodeType = FloatCode; + + fn code(&self) -> FloatCode { + FloatCode::Pco + } + + fn compress( + &self, + _compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + _ctx: CompressorContext, + _excludes: &[FloatCode], + ) -> VortexResult { + Ok(vortex_pco::PcoArray::from_primitive( + stats.source(), + pco::DEFAULT_COMPRESSION_LEVEL, + 8192, + )? + .into_array()) + } +} + #[cfg(test)] mod tests { diff --git a/vortex-btrblocks/src/compressor/integer/mod.rs b/vortex-btrblocks/src/compressor/integer/mod.rs index 0cd0b3e4544..deba5c600bf 100644 --- a/vortex-btrblocks/src/compressor/integer/mod.rs +++ b/vortex-btrblocks/src/compressor/integer/mod.rs @@ -62,6 +62,8 @@ pub const ALL_INT_SCHEMES: &[&dyn IntegerScheme] = &[ &RunEndScheme, &SequenceScheme, &RLE_INTEGER_SCHEME, + #[cfg(feature = "pco")] + &PcoScheme, ]; /// [`Compressor`] for signed and unsigned integers. @@ -156,6 +158,8 @@ pub enum IntCode { Sequence, /// RLE encoding - generic run-length encoding. Rle, + /// Pco (pcodec) compression for integers. + Pco, } #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -188,6 +192,11 @@ pub struct RunEndScheme; #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct SequenceScheme; +/// Pco (pcodec) compression for integers. +#[cfg(feature = "pco")] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct PcoScheme; + /// Threshold for the average run length in an array before we consider run-end encoding. const RUN_END_THRESHOLD: u32 = 4; @@ -818,6 +827,49 @@ impl Scheme for SequenceScheme { } } +#[cfg(feature = "pco")] +impl Scheme for PcoScheme { + type StatsType = IntegerStats; + type CodeType = IntCode; + + fn code(&self) -> IntCode { + IntCode::Pco + } + + fn expected_compression_ratio( + &self, + compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + ctx: CompressorContext, + excludes: &[IntCode], + ) -> VortexResult { + // Pco does not support I8 or U8. + if matches!( + stats.src.ptype(), + vortex_dtype::PType::I8 | vortex_dtype::PType::U8 + ) { + return Ok(0.0); + } + + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + } + + fn compress( + &self, + _compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + _ctx: CompressorContext, + _excludes: &[IntCode], + ) -> VortexResult { + Ok(vortex_pco::PcoArray::from_primitive( + stats.source(), + pco::DEFAULT_COMPRESSION_LEVEL, + 8192, + )? + .into_array()) + } +} + #[cfg(test)] mod tests { use std::iter; diff --git a/vortex-btrblocks/src/compressor/string.rs b/vortex-btrblocks/src/compressor/string.rs index 8d3af78c414..7738e826169 100644 --- a/vortex-btrblocks/src/compressor/string.rs +++ b/vortex-btrblocks/src/compressor/string.rs @@ -124,6 +124,8 @@ pub const ALL_STRING_SCHEMES: &[&dyn StringScheme] = &[ &FSSTScheme, &ConstantScheme, &NullDominated, + #[cfg(feature = "zstd")] + &ZstdScheme, ]; /// [`Compressor`] for strings. @@ -209,6 +211,11 @@ pub struct ConstantScheme; #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct NullDominated; +/// Zstd compression without dictionaries (nvCOMP compatible). +#[cfg(feature = "zstd")] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct ZstdScheme; + /// Unique identifier for string compression schemes. #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)] pub enum StringCode { @@ -222,6 +229,8 @@ pub enum StringCode { Constant, /// Sparse encoding for null-dominated arrays. Sparse, + /// Zstd compression without dictionaries. + Zstd, } impl Scheme for UncompressedScheme { @@ -502,6 +511,30 @@ impl Scheme for NullDominated { } } +#[cfg(feature = "zstd")] +impl Scheme for ZstdScheme { + type StatsType = StringStats; + type CodeType = StringCode; + + fn code(&self) -> StringCode { + StringCode::Zstd + } + + fn compress( + &self, + _compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + _ctx: CompressorContext, + _excludes: &[StringCode], + ) -> VortexResult { + let compacted = stats.source().compact_buffers()?; + Ok( + vortex_zstd::ZstdArray::from_var_bin_view_without_dict(&compacted, 3, 8192)? + .into_array(), + ) + } +} + #[cfg(test)] mod tests { use vortex_array::arrays::VarBinViewArray; diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index d2285a6b1c0..9d32f9412e0 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -33,6 +33,7 @@ tracing = { workspace = true } uuid = { workspace = true } # Needed to pickup the "js" feature for wasm targets from the workspace configuration vortex-alp = { workspace = true } vortex-array = { workspace = true } +vortex-btrblocks = { workspace = true } vortex-buffer = { workspace = true } vortex-bytebool = { workspace = true } @@ -78,7 +79,7 @@ tokio = [ "vortex-io/tokio", "vortex-layout/tokio", ] -zstd = ["dep:vortex-zstd", "vortex-layout/zstd"] +zstd = ["dep:vortex-zstd", "vortex-btrblocks/zstd", "vortex-btrblocks/pco"] [package.metadata.cargo-machete] ignored = ["getrandom_v03", "uuid"] diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index 6f90864979f..0803e7df5e8 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -7,9 +7,9 @@ use std::sync::Arc; use std::sync::LazyLock; // Compressed encodings from encoding crates +// Canonical array encodings from vortex-array use vortex_alp::ALPRDVTable; use vortex_alp::ALPVTable; -// Canonical array encodings from vortex-array use vortex_array::arrays::BoolVTable; use vortex_array::arrays::ChunkedVTable; use vortex_array::arrays::ConstantVTable; @@ -26,6 +26,14 @@ use vortex_array::arrays::StructVTable; use vortex_array::arrays::VarBinVTable; use vortex_array::arrays::VarBinViewVTable; use vortex_array::session::ArrayRegistry; +#[cfg(feature = "zstd")] +use vortex_btrblocks::BtrBlocksCompressorBuilder; +#[cfg(feature = "zstd")] +use vortex_btrblocks::FloatCode; +#[cfg(feature = "zstd")] +use vortex_btrblocks::IntCode; +#[cfg(feature = "zstd")] +use vortex_btrblocks::StringCode; use vortex_bytebool::ByteBoolVTable; use vortex_datetime_parts::DateTimePartsVTable; use vortex_decimal_byte_parts::DecimalBytePartsVTable; @@ -164,6 +172,40 @@ impl WriteStrategyBuilder { self } + /// Configure a write strategy that emits only CUDA-compatible encodings. + /// + /// This configures BtrBlocks to use Zstd for strings and exclude schemes + /// without CUDA kernel support. + #[cfg(feature = "zstd")] + pub fn with_cuda_compatible_encodings(mut self) -> Self { + let btrblocks = BtrBlocksCompressorBuilder::default() + .include_string([StringCode::Zstd]) + .exclude_int([IntCode::Sparse, IntCode::Rle]) + .exclude_float([FloatCode::AlpRd, FloatCode::Rle, FloatCode::Sparse]) + .exclude_string([StringCode::Dict, StringCode::Fsst]) + .build(); + + self.compressor = Some(Arc::new(btrblocks)); + self + } + + /// Configure a write strategy that uses compact encodings (Pco for numerics, Zstd for + /// strings/binary). + /// + /// This provides better compression ratios than the default BtrBlocks strategy, + /// especially for floating-point heavy datasets. + #[cfg(feature = "zstd")] + pub fn with_compact_encodings(mut self) -> Self { + let btrblocks = BtrBlocksCompressorBuilder::default() + .include_string([StringCode::Zstd]) + .include_int([IntCode::Pco]) + .include_float([FloatCode::Pco]) + .build(); + + self.compressor = Some(Arc::new(btrblocks)); + self + } + /// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides /// applied. pub fn build(self) -> Arc { diff --git a/vortex-layout/Cargo.toml b/vortex-layout/Cargo.toml index 40277b84262..77f4c4a7ce9 100644 --- a/vortex-layout/Cargo.toml +++ b/vortex-layout/Cargo.toml @@ -29,7 +29,6 @@ once_cell = { workspace = true, features = ["parking_lot"] } oneshot = { workspace = true } parking_lot = { workspace = true } paste = { workspace = true } -pco = { workspace = true } pin-project-lite = { workspace = true } prost = { workspace = true } rustc-hash = { workspace = true } @@ -47,12 +46,10 @@ vortex-flatbuffers = { workspace = true, features = ["layout"] } vortex-io = { workspace = true } vortex-mask = { workspace = true } vortex-metrics = { workspace = true } -vortex-pco = { workspace = true } vortex-scalar = { workspace = true } vortex-sequence = { workspace = true } vortex-session = { workspace = true } vortex-utils = { workspace = true, features = ["dashmap"] } -vortex-zstd = { workspace = true, optional = true } [dev-dependencies] futures = { workspace = true, features = ["executor"] } @@ -60,13 +57,11 @@ rstest = { workspace = true } tokio = { workspace = true, features = ["rt", "macros"] } vortex-array = { path = "../vortex-array", features = ["_test-harness"] } vortex-io = { path = "../vortex-io", features = ["tokio"] } -vortex-layout = { path = ".", features = ["zstd"] } vortex-utils = { workspace = true, features = ["_test-harness"] } [features] _test-harness = [] tokio = ["dep:tokio", "vortex-error/tokio"] -zstd = ["dep:vortex-zstd"] [lints] workspace = true diff --git a/vortex-layout/src/layouts/compact.rs b/vortex-layout/src/layouts/compact.rs deleted file mode 100644 index 8163a32e4ae..00000000000 --- a/vortex-layout/src/layouts/compact.rs +++ /dev/null @@ -1,280 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use vortex_array::Array; -use vortex_array::ArrayRef; -use vortex_array::Canonical; -use vortex_array::IntoArray; -use vortex_array::ToCanonical; -use vortex_array::arrays::ExtensionArray; -use vortex_array::arrays::FixedSizeListArray; -use vortex_array::arrays::ListViewArray; -use vortex_array::arrays::PrimitiveArray; -use vortex_array::arrays::StructArray; -use vortex_array::arrays::narrowed_decimal; -use vortex_array::vtable::ValidityHelper; -use vortex_decimal_byte_parts::DecimalBytePartsArray; -use vortex_dtype::PType; -use vortex_error::VortexResult; -use vortex_pco::PcoArray; -use vortex_scalar::DecimalType; -use vortex_zstd::ZstdArray; - -fn is_pco_number_type(ptype: PType) -> bool { - matches!( - ptype, - PType::F16 - | PType::F32 - | PType::F64 - | PType::I16 - | PType::I32 - | PType::I64 - | PType::U16 - | PType::U32 - | PType::U64 - ) -} - -/// A simple compressor that uses the "compact" strategy: -/// - Pco for supported numeric types (16, 32, and 64-bit floats and ints) -/// - Zstd for everything else (primitive arrays only) -#[derive(Debug, Clone)] -pub struct CompactCompressor { - pco_level: usize, - zstd_level: i32, - /// Dictionaries are trained from previously seen frames to improve compression ratio. - /// nvCOMP though doesn't support ZSTD dictionaries. Therefore, we need the option to - /// disable them for compatibility. - zstd_use_dicts: bool, - zstd_values_per_page: usize, -} - -impl CompactCompressor { - pub fn with_pco_level(mut self, level: usize) -> Self { - self.pco_level = level; - self - } - - pub fn with_zstd_level(mut self, level: i32) -> Self { - self.zstd_level = level; - self - } - - pub fn with_zstd_use_dicts(mut self, use_dicts: bool) -> Self { - self.zstd_use_dicts = use_dicts; - self - } - - /// Sets the number of non-null primitive values to store per - /// separately-decompressible page/frame. - /// - /// Fewer values per page can reduce the time to query a small slice of rows, but too - /// few can increase compressed size and (de)compression time. The default is 0, which - /// is used for maximally-large pages. - pub fn with_zstd_values_per_page(mut self, values_per_page: usize) -> Self { - self.zstd_values_per_page = values_per_page; - self - } - - pub fn compress(&self, array: &dyn Array) -> VortexResult { - self.compress_canonical(array.to_canonical()?) - } - - /// Compress a single array using the compact strategy - pub fn compress_canonical(&self, canonical: Canonical) -> VortexResult { - let uncompressed_nbytes = canonical.as_ref().nbytes(); - let compressed = match &canonical { - // TODO compress BoolArrays - Canonical::Primitive(primitive) => { - // pco for applicable numbers, zstd for everything else - let ptype = primitive.ptype(); - - if is_pco_number_type(ptype) { - let pco_array = PcoArray::from_primitive( - primitive, - self.pco_level, - self.zstd_values_per_page, - )?; - pco_array.into_array() - } else { - let zstd_array = if self.zstd_use_dicts { - ZstdArray::from_primitive( - primitive, - self.zstd_level, - self.zstd_values_per_page, - )? - } else { - ZstdArray::from_primitive_without_dict( - primitive, - self.zstd_level, - self.zstd_values_per_page, - )? - }; - zstd_array.into_array() - } - } - Canonical::Decimal(decimal) => { - let decimal = narrowed_decimal(decimal.clone()); - let validity = decimal.validity(); - let int_values = match decimal.values_type() { - DecimalType::I8 => { - PrimitiveArray::new(decimal.buffer::(), validity.clone()) - } - DecimalType::I16 => { - PrimitiveArray::new(decimal.buffer::(), validity.clone()) - } - DecimalType::I32 => { - PrimitiveArray::new(decimal.buffer::(), validity.clone()) - } - DecimalType::I64 => { - PrimitiveArray::new(decimal.buffer::(), validity.clone()) - } - _ => { - // Vortex lacks support for i128 and i256. - return Ok(canonical.into_array()); - } - }; - let compressed = self.compress_canonical(Canonical::Primitive(int_values))?; - DecimalBytePartsArray::try_new(compressed, decimal.decimal_dtype())?.to_array() - } - Canonical::VarBinView(vbv) => { - // always zstd - if self.zstd_use_dicts { - ZstdArray::from_var_bin_view(vbv, self.zstd_level, self.zstd_values_per_page)? - .into_array() - } else { - ZstdArray::from_var_bin_view_without_dict( - vbv, - self.zstd_level, - self.zstd_values_per_page, - )? - .into_array() - } - } - Canonical::Struct(struct_array) => { - // recurse - let fields = struct_array - .unmasked_fields() - .iter() - .map(|field| self.compress(field)) - .collect::>>()?; - - StructArray::try_new( - struct_array.names().clone(), - fields, - struct_array.len(), - struct_array.validity().clone(), - )? - .into_array() - } - Canonical::List(listview) => { - let compressed_elems = self.compress(listview.elements())?; - - // Note that since the type of our offsets and sizes is not encoded in our `DType`, - // we can narrow the widths. - let compressed_offsets = - self.compress(&listview.offsets().to_primitive().narrow()?.into_array())?; - let compressed_sizes = - self.compress(&listview.sizes().to_primitive().narrow()?.into_array())?; - - // SAFETY: Since compression does not change the logical values of arrays, this is - // effectively the same array but represented differently, so all invariants that - // were previously upheld by the valid `ListViewArray` are still upheld. - // If the original was zero-copyable to list, compression maintains that property. - unsafe { - ListViewArray::new_unchecked( - compressed_elems, - compressed_offsets, - compressed_sizes, - listview.validity().clone(), - ) - .with_zero_copy_to_list(listview.is_zero_copy_to_list()) - } - .into_array() - } - Canonical::FixedSizeList(fsl) => { - let compressed_elems = self.compress(fsl.elements())?; - - FixedSizeListArray::try_new( - compressed_elems, - fsl.list_size(), - fsl.validity().clone(), - fsl.len(), - )? - .into_array() - } - Canonical::Extension(ext_array) => { - let compressed_storage = self.compress(ext_array.storage())?; - - ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage).into_array() - } - _ => return Ok(canonical.into_array()), - }; - - if compressed.nbytes() >= uncompressed_nbytes { - return Ok(canonical.into_array()); - } - Ok(compressed) - } -} - -impl Default for CompactCompressor { - fn default() -> Self { - Self { - pco_level: pco::DEFAULT_COMPRESSION_LEVEL, - zstd_level: 3, - zstd_use_dicts: true, - // This is probably high enough to not hurt performance or - // compression. It also currently aligns with the default strategy's - // number of rows per statistic, which allows efficient pushdown - // (but nothing enforces this). - zstd_values_per_page: 8192, - } - } -} - -#[cfg(test)] -mod tests { - use vortex_array::IntoArray; - use vortex_array::arrays::PrimitiveArray; - use vortex_array::arrays::StructArray; - use vortex_array::assert_arrays_eq; - use vortex_array::validity::Validity; - use vortex_buffer::buffer; - use vortex_dtype::FieldName; - - use super::*; - - #[test] - fn test_compact_compressor_struct_with_mixed_types() { - let compressor = CompactCompressor::default(); - - // Create a struct array containing various types - let columns = [ - // Pco types - PrimitiveArray::new(buffer![1.0f64, 2.0, 3.0, 4.0, 5.0], Validity::NonNullable), - PrimitiveArray::new(buffer![10i32, 20, 30, 40, 50], Validity::NonNullable), - // Zstd types - PrimitiveArray::new(buffer![11u8, 22, 33, 44, 55], Validity::NonNullable), - ] - .iter() - .map(|a| a.clone().into_array()) - .collect::>(); - let field_names: Vec = - vec!["f64_field".into(), "i32_field".into(), "u8_field".into()]; - - let n_rows = columns[0].len(); - let struct_array = StructArray::try_new( - field_names.into(), - columns.clone(), - n_rows, - Validity::NonNullable, - ) - .unwrap(); - - // Compress the struct array - let compressed = compressor.compress(struct_array.as_ref()).unwrap(); - - assert_arrays_eq!(struct_array, compressed) - } -} diff --git a/vortex-layout/src/layouts/compressed.rs b/vortex-layout/src/layouts/compressed.rs index 9f45b5b25df..de672715d9e 100644 --- a/vortex-layout/src/layouts/compressed.rs +++ b/vortex-layout/src/layouts/compressed.rs @@ -25,10 +25,7 @@ use crate::sequence::SequentialStreamExt; /// A boxed compressor function from arrays into compressed arrays. /// -/// Both the balanced `BtrBlocksCompressor` and the size-optimized `CompactCompressor` -/// meet this interface. -/// -/// API consumers are also free to implement this trait to provide new plugin compressors. +/// API consumers are free to implement this trait to provide new plugin compressors. pub trait CompressorPlugin: Send + Sync + 'static { fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult; } @@ -54,13 +51,6 @@ impl CompressorPlugin for BtrBlocksCompressor { } } -#[cfg(feature = "zstd")] -impl CompressorPlugin for crate::layouts::compact::CompactCompressor { - fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult { - self.compress(chunk) - } -} - /// A layout writer that compresses chunks. #[derive(Clone)] pub struct CompressingStrategy { @@ -87,21 +77,6 @@ impl CompressingStrategy { Self::new(child, Arc::new(compressor)) } - /// Create a new writer that compresses using a `CompactCompressor` to compress chunks. - /// - /// This may create smaller files than the BtrBlocks writer, in exchange for some penalty - /// to decoding performance. This is only recommended for datasets that make heavy use of - /// floating point numbers. - /// - /// [`CompactCompressor`]: crate::layouts::compact::CompactCompressor - #[cfg(feature = "zstd")] - pub fn new_compact( - child: S, - compressor: crate::layouts::compact::CompactCompressor, - ) -> Self { - Self::new(child, Arc::new(compressor)) - } - /// Create a new compressor from a plugin interface. pub fn new_opaque(child: S, compressor: C) -> Self { Self::new(child, Arc::new(compressor)) diff --git a/vortex-layout/src/layouts/mod.rs b/vortex-layout/src/layouts/mod.rs index 61193c54b6e..03b29a97a8f 100644 --- a/vortex-layout/src/layouts/mod.rs +++ b/vortex-layout/src/layouts/mod.rs @@ -11,8 +11,6 @@ use vortex_error::SharedVortexResult; pub mod buffered; pub mod chunked; pub mod collect; -#[cfg(feature = "zstd")] -pub mod compact; pub mod compressed; pub mod dict; pub mod file_stats; diff --git a/vortex-layout/src/layouts/table.rs b/vortex-layout/src/layouts/table.rs index faff6190191..1d444be33b2 100644 --- a/vortex-layout/src/layouts/table.rs +++ b/vortex-layout/src/layouts/table.rs @@ -85,27 +85,23 @@ impl TableStrategy { /// ```ignore /// # use std::sync::Arc; /// # use vortex_dtype::{field_path, Field, FieldPath}; - /// # use vortex_layout::layouts::compact::CompactCompressor; /// # use vortex_layout::layouts::compressed::CompressingStrategy; /// # use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; /// # use vortex_layout::layouts::table::TableStrategy; /// /// // A strategy for compressing data using the balanced BtrBlocks compressor. - /// let compress_btrblocks = CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), true); - /// - /// // A strategy that compresses data using ZSTD - /// let compress_compact = CompressingStrategy::new_compact(FlatLayoutStrategy::default(), CompactCompressor::default()); + /// let compress = CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), true); /// /// // Our combined strategy uses no compression for validity buffers, BtrBlocks compression - /// // for most columns, and will use ZSTD compression for a nested binary column that we know - /// // is never filtered in. + /// // for most columns, and stores a nested binary column uncompressed (flat) because it + /// // is pre-compressed or never filtered on. /// let strategy = TableStrategy::new( /// Arc::new(FlatLayoutStrategy::default()), - /// Arc::new(compress_btrblocks), + /// Arc::new(compress), /// ) /// .with_field_writer( /// field_path!(request.body.bytes), - /// Arc::new(compress_compact), + /// Arc::new(FlatLayoutStrategy::default()), /// ); /// ``` pub fn with_field_writer( diff --git a/vortex-python/src/io.rs b/vortex-python/src/io.rs index 22743e4828d..0f510db107c 100644 --- a/vortex-python/src/io.rs +++ b/vortex-python/src/io.rs @@ -15,7 +15,6 @@ use vortex::array::arrow::FromArrowArray; use vortex::array::iter::ArrayIterator; use vortex::array::iter::ArrayIteratorAdapter; use vortex::array::iter::ArrayIteratorExt; -use vortex::compressor::CompactCompressor; use vortex::dtype::DType; use vortex::dtype::arrow::FromArrowType; use vortex::error::VortexError; @@ -246,9 +245,7 @@ pub fn write( /// :func:`vortex.io.write`. #[pyclass(name = "VortexWriteOptions", module = "io", frozen)] pub(crate) struct PyVortexWriteOptions { - // TODO(DK): This might need to be an Arc if we actually have multiple - // compressors. - compressor: Option, + use_compact_encodings: bool, } #[pymethods] @@ -256,7 +253,9 @@ impl PyVortexWriteOptions { /// Balance size, read-throughput, and read-latency. #[staticmethod] pub fn default() -> Self { - Self { compressor: None } + Self { + use_compact_encodings: false, + } } /// Prioritize small size over read-throughput and read-latency. @@ -290,14 +289,14 @@ impl PyVortexWriteOptions { /// ```python /// >>> vx.io.VortexWriteOptions.compact().write(sprl, "tiny.vortex") /// >>> os.path.getsize('tiny.vortex') - /// 55116 + /// 55316 /// ``` /// /// Random numbers are not (usually) composed of random bytes! #[staticmethod] pub fn compact() -> Self { Self { - compressor: Some(CompactCompressor::default()), + use_compact_encodings: true, } } @@ -350,8 +349,8 @@ impl PyVortexWriteOptions { ) -> PyVortexResult<()> { py.detach(|| { let mut strategy = WriteStrategyBuilder::default(); - if let Some(compressor) = self.compressor.as_ref() { - strategy = strategy.with_compressor(compressor.clone()) + if self.use_compact_encodings { + strategy = strategy.with_compact_encodings(); } let strategy = strategy.build(); TOKIO_RUNTIME.block_on(async move { diff --git a/vortex-tui/src/convert.rs b/vortex-tui/src/convert.rs index ce6858053a5..858e908ec2e 100644 --- a/vortex-tui/src/convert.rs +++ b/vortex-tui/src/convert.rs @@ -15,7 +15,6 @@ use tokio::io::AsyncWriteExt; use vortex::array::ArrayRef; use vortex::array::arrow::FromArrowArray; use vortex::array::stream::ArrayStreamAdapter; -use vortex::compressor::CompactCompressor; use vortex::dtype::DType; use vortex::dtype::arrow::FromArrowType; use vortex::error::VortexError; @@ -95,7 +94,7 @@ pub async fn exec_convert(session: &VortexSession, flags: ConvertArgs) -> anyhow let strategy = WriteStrategyBuilder::default(); let strategy = match flags.strategy { Strategy::Btrblocks => strategy, - Strategy::Compact => strategy.with_compressor(CompactCompressor::default()), + Strategy::Compact => strategy.with_compact_encodings(), }; let mut file = File::create(output_path).await?; diff --git a/vortex/Cargo.toml b/vortex/Cargo.toml index f37cc7a791a..745fb59d0d8 100644 --- a/vortex/Cargo.toml +++ b/vortex/Cargo.toml @@ -77,7 +77,7 @@ files = ["dep:vortex-file"] memmap2 = ["vortex-buffer/memmap2"] object_store = ["vortex-file/object_store", "vortex-io/object_store"] tokio = ["vortex-file/tokio"] -zstd = ["dep:vortex-zstd", "vortex-file/zstd", "vortex-layout/zstd"] +zstd = ["dep:vortex-zstd", "vortex-file/zstd"] pretty = ["vortex-array/table-display"] serde = [ "vortex-array/serde", diff --git a/vortex/examples/tracing_vortex.rs b/vortex/examples/tracing_vortex.rs index 8fc428e8de9..aa456a44f5f 100644 --- a/vortex/examples/tracing_vortex.rs +++ b/vortex/examples/tracing_vortex.rs @@ -38,7 +38,6 @@ use vortex::array::arrays::PrimitiveArray; use vortex::array::arrays::StructArray; use vortex::array::arrays::VarBinArray; use vortex::array::validity::Validity; -use vortex::compressor::CompactCompressor; use vortex::dtype::DType; use vortex::dtype::Nullability; use vortex::file::WriteStrategyBuilder; @@ -386,10 +385,10 @@ async fn write_batch_to_vortex( let file_path = output_dir.join(format!("traces_{:04}.vortex", file_index)); let mut file = tokio::fs::File::create(&file_path).await?; - // Use the write-optimized CompactCompressor for the telemetry files. + // Use compact encodings (Pco + Zstd) for the telemetry files. let write_opts = session.write_options().with_strategy( WriteStrategyBuilder::default() - .with_compressor(CompactCompressor::default()) + .with_compact_encodings() .build(), ); diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index c7d304ed971..0da4a1a1b40 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -32,8 +32,6 @@ pub mod compute2 { pub mod compressor { pub use vortex_btrblocks::BtrBlocksCompressor; - #[cfg(feature = "zstd")] - pub use vortex_layout::layouts::compact::CompactCompressor; } pub mod dtype { @@ -201,7 +199,6 @@ mod test { use vortex_file::OpenOptionsSessionExt; use vortex_file::WriteOptionsSessionExt; use vortex_file::WriteStrategyBuilder; - use vortex_layout::layouts::compact::CompactCompressor; use vortex_session::VortexSession; use crate as vortex; @@ -244,7 +241,6 @@ mod test { fn compress() -> VortexResult<()> { // [compress] use vortex::compressor::BtrBlocksCompressor; - use vortex::compressor::CompactCompressor; let array = PrimitiveArray::new(buffer![42u64; 100_000], Validity::NonNullable); @@ -255,12 +251,6 @@ mod test { compressed.nbytes(), array.nbytes() ); - - // Or apply generally stronger compression with the compact compressor - let compressed = CompactCompressor::default() - .with_zstd_values_per_page(8192) - .compress(array.as_ref())?; - println!("Compact size: {} / {}", compressed.nbytes(), array.nbytes()); // [compress] Ok(()) @@ -319,7 +309,7 @@ mod test { .write_options() .with_strategy( WriteStrategyBuilder::default() - .with_compressor(CompactCompressor::default()) + .with_compact_encodings() .build(), ) .write(