Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions encodings/fsst/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ _test-harness = ["dep:rand", "vortex-array/_test-harness"]
divan = { workspace = true }
rand = { workspace = true }
rstest = { workspace = true }
test-with = { workspace = true }
vortex-array = { workspace = true, features = ["_test-harness"] }

[[bench]]
Expand Down
6 changes: 5 additions & 1 deletion encodings/fsst/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ where
I: Iterator<Item = Option<&'a [u8]>>,
{
let mut buffer = Vec::with_capacity(DEFAULT_BUFFER_LEN);
let mut builder = VarBinBuilder::<i32>::with_capacity(len);

// Offsets are widened to i64 because the cumulative compressed bytes can exceed i32::MAX for
// large inputs (see issue #7833). Per-string sizes still fit in i32.
let mut builder = VarBinBuilder::<i64>::with_capacity(len);
let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
Comment thread
robert3005 marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should at least justify this being i32?


for string in iter {
match string {
None => {
Expand Down
61 changes: 61 additions & 0 deletions encodings/fsst/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use rand::SeedableRng;
use rand::rngs::StdRng;
use rand::seq::IndexedRandom;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
Expand Down Expand Up @@ -107,3 +110,61 @@ fn test_fsst_array_ops() {

assert_arrays_eq!(fsst_array, canonical_array);
}

// TODO(someone): ideally CI would run this in release mode as well since debug builds make the
// allocation and compression loop substantially slower.
/// Regression for #7833: [`fsst_compress`] must accept inputs whose cumulative compressed
/// bytes exceed [`i32::MAX`]. Before the fix, [`fsst_compress_iter`] hardcoded
/// [`VarBinBuilder<i32>`] for the FSST output and panicked in
/// [`VarBinBuilder::append_value`] once cumulative compressed bytes crossed the boundary.
///
/// The input is built with [`VarBinBuilder<i64>`] so the input itself does not panic, which
/// confirms the overflow is on the FSST output side. After the fix the test must succeed
/// with the row count preserved.
///
/// Allocates ~2.5 GiB for the input and ~2.5 GiB for the FSST output (~5 GiB total), so it
/// is gated to CI runs and skipped when `VORTEX_SKIP_SLOW_TESTS` is set. To run it locally:
///
/// ```text
/// CI=1 cargo test --release -p vortex-fsst fsst_compress_offsets
/// ```
///
/// [`fsst_compress_iter`]: crate::compress::fsst_compress_iter
#[test_with::env(CI)]
#[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
fn fsst_compress_offsets_overflow_i32() {
// High-entropy ASCII strings sliced from a random pool. FSST is a symbol-table
// compressor; pseudo-random data with no recurring byte sequences resists compression,
// so the compressed output stays close to input size and crosses the i32 boundary.
const STRING_LEN: usize = 64 * 1024;
const TOTAL_BYTES: usize = (1usize << 31) + (512 << 20); // ~2.5 GiB
const N: usize = TOTAL_BYTES / STRING_LEN;
const POOL_LEN: usize = 64 * 1024 * 1024;

// Printable ASCII alphabet so the result is valid UTF-8.
const ALPHABET: &[u8; 95] =
b" !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~";

let mut rng = StdRng::seed_from_u64(0xC0DE_C011_B711);
let pool: Vec<u8> = (0..POOL_LEN)
.map(|_| *ALPHABET.choose(&mut rng).unwrap())
.collect();

println!("building large VarBinArray");
let mut builder = VarBinBuilder::<i64>::with_capacity(N);
for i in 0..N {
let off = i.wrapping_mul(31337) % (POOL_LEN - STRING_LEN);
builder.append_value(&pool[off..off + STRING_LEN]);
}
let array = builder.finish(DType::Utf8(Nullability::NonNullable));

println!("training FSST compressor");
let compressor = fsst_train_compressor(&array);
let len = array.len();
let dtype = array.dtype().clone();
let mut ctx = LEGACY_SESSION.create_execution_ctx();

println!("compressing to FSST");
let compressed = fsst_compress(array, len, &dtype, &compressor, &mut ctx);
assert_eq!(compressed.len(), len);
}
80 changes: 75 additions & 5 deletions vortex-array/src/arrays/varbin/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use arrow_array::BinaryArray;
use arrow_array::LargeBinaryArray;
use arrow_array::LargeStringArray;
use arrow_array::StringArray;
use arrow_ord::cmp;
use arrow_schema::DataType;
use vortex_buffer::BitBuffer;
use vortex_error::VortexExpect as _;
use vortex_error::VortexResult;
Expand Down Expand Up @@ -82,15 +85,26 @@ impl CompareKernel for VarBin {

let lhs = Datum::try_new(lhs.array(), ctx)?;

// Use StringViewArray/BinaryViewArray to match the Utf8View/BinaryView types
// produced by Datum::try_new (which uses execute_arrow(None, ctx))
let arrow_rhs: &dyn arrow_array::Datum = match rhs_const.dtype() {
DType::Utf8(_) => &rhs_const
// The RHS scalar must match the LHS Arrow data type. VarBin with i64 offsets is
// converted to LargeBinary/LargeUtf8 (see `preferred_arrow_type`), and Arrow refuses to
// compare LargeBinary with Binary (or LargeUtf8 with Utf8).
let arrow_rhs: &dyn arrow_array::Datum = match (rhs_const.dtype(), lhs.data_type()) {
(DType::Utf8(_), DataType::LargeUtf8) => &rhs_const
.as_utf8()
.value()
.map(LargeStringArray::new_scalar)
.unwrap_or_else(|| arrow_array::Scalar::new(LargeStringArray::new_null(1))),
(DType::Utf8(_), _) => &rhs_const
.as_utf8()
.value()
.map(StringArray::new_scalar)
.unwrap_or_else(|| arrow_array::Scalar::new(StringArray::new_null(1))),
DType::Binary(_) => &rhs_const
(DType::Binary(_), DataType::LargeBinary) => &rhs_const
.as_binary()
.value()
.map(LargeBinaryArray::new_scalar)
.unwrap_or_else(|| arrow_array::Scalar::new(LargeBinaryArray::new_null(1))),
(DType::Binary(_), _) => &rhs_const
.as_binary()
.value()
.map(BinaryArray::new_scalar)
Expand Down Expand Up @@ -237,9 +251,14 @@ mod test {

#[cfg(test)]
mod tests {
use vortex_buffer::ByteBuffer;

use crate::IntoArray;
use crate::arrays::BoolArray;
use crate::arrays::ConstantArray;
use crate::arrays::VarBinArray;
use crate::arrays::varbin::builder::VarBinBuilder;
use crate::assert_arrays_eq;
use crate::builtins::ArrayBuiltins;
use crate::dtype::DType;
use crate::dtype::Nullability;
Expand All @@ -260,4 +279,55 @@ mod tests {
&DType::Bool(Nullability::Nullable)
);
}

/// Regression: a [`VarBinArray`] built with `i64` offsets is canonicalised to
/// Arrow `LargeUtf8` / `LargeBinary` by `preferred_arrow_type`. Without an explicit
/// branch in [`CompareKernel`], the constant RHS is wrapped in a `StringArray` /
/// `BinaryArray` and Arrow rejects the `LargeUtf8 == Utf8` mismatch. Triggering
/// this only requires `i64` offsets, not large data.
///
/// [`CompareKernel`]: super::CompareKernel
#[test]
fn varbin_i64_offsets_compare_constant() {
let mut builder = VarBinBuilder::<i64>::with_capacity(3);
builder.append_value(b"abc");
builder.append_value(b"xyz");
builder.append_value(b"abc");
let array = builder.finish(DType::Utf8(Nullability::NonNullable));

let result = array
.into_array()
.binary(
ConstantArray::new(Scalar::utf8("abc", Nullability::NonNullable), 3).into_array(),
Operator::Eq,
)
.unwrap();

let expected = BoolArray::from_iter([true, false, true]);
assert_arrays_eq!(result, expected);
}

#[test]
fn varbin_i64_offsets_compare_constant_binary() {
let mut builder = VarBinBuilder::<i64>::with_capacity(3);
builder.append_value(b"abc");
builder.append_value(b"xyz");
builder.append_value(b"abc");
let array = builder.finish(DType::Binary(Nullability::NonNullable));

let result = array
.into_array()
.binary(
ConstantArray::new(
Scalar::binary(ByteBuffer::copy_from(b"abc"), Nullability::NonNullable),
3,
)
.into_array(),
Operator::Eq,
)
.unwrap();

let expected = BoolArray::from_iter([true, false, true]);
assert_arrays_eq!(result, expected);
}
}
Loading