From bbe817df283944641ef18930853aee205e2a116a Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 23 Dec 2020 07:58:53 +0000 Subject: [PATCH 1/6] Aligned naming with Rust's std --- rust/arrow/src/array/array_binary.rs | 8 +++--- rust/arrow/src/array/array_boolean.rs | 2 +- rust/arrow/src/array/array_list.rs | 2 +- rust/arrow/src/array/array_primitive.rs | 2 +- rust/arrow/src/array/array_string.rs | 4 +-- rust/arrow/src/bitmap.rs | 2 +- rust/arrow/src/buffer.rs | 27 ++++++++++---------- rust/arrow/src/bytes.rs | 22 ++++++++-------- rust/arrow/src/compute/kernels/arithmetic.rs | 2 +- rust/arrow/src/ffi.rs | 2 +- 10 files changed, 38 insertions(+), 35 deletions(-) diff --git a/rust/arrow/src/array/array_binary.rs b/rust/arrow/src/array/array_binary.rs index db4097aee69..244532d7092 100644 --- a/rust/arrow/src/array/array_binary.rs +++ b/rust/arrow/src/array/array_binary.rs @@ -205,8 +205,8 @@ impl From 2, "BinaryArray data should contain 2 buffers only (offsets and values)" ); - let raw_value_offsets = data.buffers()[0].raw_data(); - let value_data = data.buffers()[1].raw_data(); + let raw_value_offsets = data.buffers()[0].ptr(); + let value_data = data.buffers()[1].ptr(); Self { data, value_offsets: RawPtrBox::new(as_aligned_pointer::( @@ -421,7 +421,7 @@ impl From for FixedSizeBinaryArray { 1, "FixedSizeBinaryArray data should contain 1 buffer only (values)" ); - let value_data = data.buffers()[0].raw_data(); + let value_data = data.buffers()[0].ptr(); let length = match data.data_type() { DataType::FixedSizeBinary(len) => *len, _ => panic!("Expected data type to be FixedSizeBinary"), @@ -589,7 +589,7 @@ impl From for DecimalArray { 1, "DecimalArray data should contain 1 buffer only (values)" ); - let value_data = data.buffers()[0].raw_data(); + let value_data = data.buffers()[0].ptr(); let (precision, scale) = match data.data_type() { DataType::Decimal(precision, scale) => (*precision, *scale), _ => panic!("Expected data type to be Decimal"), diff --git a/rust/arrow/src/array/array_boolean.rs b/rust/arrow/src/array/array_boolean.rs index 82deca9bd9f..2e24170c392 100644 --- a/rust/arrow/src/array/array_boolean.rs +++ b/rust/arrow/src/array/array_boolean.rs @@ -147,7 +147,7 @@ impl From for BooleanArray { 1, "BooleanArray data should contain a single buffer only (values buffer)" ); - let raw_values = data.buffers()[0].raw_data(); + let raw_values = data.buffers()[0].ptr(); assert!( memory::is_aligned::(raw_values, mem::align_of::()), "memory is not aligned" diff --git a/rust/arrow/src/array/array_list.rs b/rust/arrow/src/array/array_list.rs index 4eb8dc56640..e29b0ef0d4d 100644 --- a/rust/arrow/src/array/array_list.rs +++ b/rust/arrow/src/array/array_list.rs @@ -117,7 +117,7 @@ impl From for GenericListArray From for PrimitiveArray { 1, "PrimitiveArray data should contain a single buffer only (values buffer)" ); - let raw_values = data.buffers()[0].raw_data(); + let raw_values = data.buffers()[0].ptr(); assert!( memory::is_aligned::(raw_values, mem::align_of::()), "memory is not aligned" diff --git a/rust/arrow/src/array/array_string.rs b/rust/arrow/src/array/array_string.rs index 5f871b8f595..a0cae920bbe 100644 --- a/rust/arrow/src/array/array_string.rs +++ b/rust/arrow/src/array/array_string.rs @@ -254,8 +254,8 @@ impl From 2, "StringArray data should contain 2 buffers only (offsets and values)" ); - let raw_value_offsets = data.buffers()[0].raw_data(); - let value_data = data.buffers()[1].raw_data(); + let raw_value_offsets = data.buffers()[0].ptr(); + let value_data = data.buffers()[1].ptr(); Self { data, value_offsets: RawPtrBox::new(as_aligned_pointer::( diff --git a/rust/arrow/src/bitmap.rs b/rust/arrow/src/bitmap.rs index 7df609fb88b..8acbe8f290f 100644 --- a/rust/arrow/src/bitmap.rs +++ b/rust/arrow/src/bitmap.rs @@ -54,7 +54,7 @@ impl Bitmap { pub fn is_set(&self, i: usize) -> bool { assert!(i < (self.bits.len() << 3)); - unsafe { bit_util::get_bit_raw(self.bits.raw_data(), i) } + unsafe { bit_util::get_bit_raw(self.bits.ptr(), i) } } pub fn buffer_ref(&self) -> &Buffer { diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 58b75a7d3fa..a951b48cca8 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -71,6 +71,7 @@ impl Buffer { /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. pub unsafe fn from_raw_parts(ptr: *const u8, len: usize, capacity: usize) -> Self { + assert!(len <= capacity); Buffer::build_with_arguments(ptr, len, Deallocation::Native(capacity)) } @@ -126,7 +127,7 @@ impl Buffer { /// Returns the byte slice stored in this buffer pub fn data(&self) -> &[u8] { - &self.data.as_slice()[self.offset..] + &self.data[self.offset..] } /// Returns a slice of this buffer, starting from `offset`. @@ -141,12 +142,12 @@ impl Buffer { } } - /// Returns a raw pointer for this buffer. + /// Returns a pointer to the start of this buffer. /// /// Note that this should be used cautiously, and the returned pointer should not be /// stored anywhere, to avoid dangling pointers. - pub fn raw_data(&self) -> *const u8 { - unsafe { self.data.raw_data().add(self.offset) } + pub fn ptr(&self) -> *const u8 { + unsafe { self.data.ptr().add(self.offset) } } /// View buffer as typed slice. @@ -161,9 +162,9 @@ impl Buffer { /// `bool` in Rust. However, `bool` arrays in Arrow are bit-packed which breaks this condition. pub unsafe fn typed_data(&self) -> &[T] { assert_eq!(self.len() % mem::size_of::(), 0); - assert!(memory::is_ptr_aligned::(self.raw_data() as *const T)); + assert!(memory::is_ptr_aligned::(self.data.ptr() as *const T)); from_raw_parts( - self.raw_data() as *const T, + self.ptr() as *const T, self.len() / mem::size_of::(), ) } @@ -183,7 +184,7 @@ impl Buffer { /// in larger chunks and starting at arbitrary bit offsets. /// Note that both `offset` and `length` are measured in bits. pub fn bit_chunks(&self, offset: usize, len: usize) -> BitChunks { - BitChunks::new(&self.data.as_slice()[self.offset..], offset, len) + BitChunks::new(&self.data(), offset, len) } /// Returns the number of 1-bits in this buffer. @@ -923,11 +924,11 @@ mod tests { assert_eq!(0, buf.len()); assert_eq!(0, buf.data().len()); assert_eq!(0, buf.capacity()); - assert!(buf.raw_data().is_null()); + assert!(buf.ptr().is_null()); let buf = Buffer::from(&[0, 1, 2, 3, 4]); assert_eq!(5, buf.len()); - assert!(!buf.raw_data().is_null()); + assert!(!buf.ptr().is_null()); assert_eq!([0, 1, 2, 3, 4], buf.data()); } @@ -935,7 +936,7 @@ mod tests { fn test_from_vec() { let buf = Buffer::from(&[0, 1, 2, 3, 4]); assert_eq!(5, buf.len()); - assert!(!buf.raw_data().is_null()); + assert!(!buf.ptr().is_null()); assert_eq!([0, 1, 2, 3, 4], buf.data()); } @@ -945,7 +946,7 @@ mod tests { let buf2 = buf; assert_eq!(5, buf2.len()); assert_eq!(64, buf2.capacity()); - assert!(!buf2.raw_data().is_null()); + assert!(!buf2.ptr().is_null()); assert_eq!([0, 1, 2, 3, 4], buf2.data()); } @@ -956,12 +957,12 @@ mod tests { assert_eq!([6, 8, 10], buf2.data()); assert_eq!(3, buf2.len()); - assert_eq!(unsafe { buf.raw_data().offset(2) }, buf2.raw_data()); + assert_eq!(unsafe { buf.ptr().offset(2) }, buf2.ptr()); let buf3 = buf2.slice(1); assert_eq!([8, 10], buf3.data()); assert_eq!(2, buf3.len()); - assert_eq!(unsafe { buf.raw_data().offset(3) }, buf3.raw_data()); + assert_eq!(unsafe { buf.ptr().offset(3) }, buf3.ptr()); let buf4 = buf.slice(5); let empty_slice: [u8; 0] = []; diff --git a/rust/arrow/src/bytes.rs b/rust/arrow/src/bytes.rs index 0363d8735a5..a1e9c91a00e 100644 --- a/rust/arrow/src/bytes.rs +++ b/rust/arrow/src/bytes.rs @@ -86,9 +86,8 @@ impl Bytes { } } - #[inline] - pub fn as_slice(&self) -> &[u8] { - unsafe { slice::from_raw_parts(self.ptr, self.len) } + fn as_slice(&self) -> &[u8] { + self } #[inline] @@ -102,15 +101,10 @@ impl Bytes { } #[inline] - pub fn raw_data(&self) -> *const u8 { + pub fn ptr(&self) -> *const u8 { self.ptr } - #[inline] - pub fn raw_data_mut(&mut self) -> *mut u8 { - self.ptr as *mut u8 - } - pub fn capacity(&self) -> usize { match self.deallocation { Deallocation::Native(capacity) => capacity, @@ -136,6 +130,14 @@ impl Drop for Bytes { } } +impl std::ops::Deref for Bytes { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + unsafe { slice::from_raw_parts(self.ptr, self.len) } + } +} + impl PartialEq for Bytes { fn eq(&self, other: &Bytes) -> bool { self.as_slice() == other.as_slice() @@ -146,7 +148,7 @@ impl Debug for Bytes { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { write!(f, "Bytes {{ ptr: {:?}, len: {}, data: ", self.ptr, self.len,)?; - f.debug_list().entries(self.as_slice().iter()).finish()?; + f.debug_list().entries(self.iter()).finish()?; write!(f, " }}") } diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index 59128d320b7..18092b6335b 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -190,7 +190,7 @@ where if let Some(b) = &null_bit_buffer { // some value is null for i in 0..left.len() { - let is_valid = unsafe { bit_util::get_bit_raw(b.raw_data(), i) }; + let is_valid = unsafe { bit_util::get_bit_raw(b.ptr(), i) }; values.push(if is_valid { let right_value = right.value(i); if right_value.is_zero() { diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index 1d8d36da6d9..f66440ba2b1 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -329,7 +329,7 @@ impl FFI_ArrowArray { .iter() .map(|maybe_buffer| match maybe_buffer { // note that `raw_data` takes into account the buffer's offset - Some(b) => b.raw_data() as *const std::os::raw::c_void, + Some(b) => b.ptr() as *const std::os::raw::c_void, None => std::ptr::null(), }) .collect::>(); From a00b9fd3e44ff49e7ad3750f9fef91a9b14f17a2 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 23 Dec 2020 13:41:36 +0000 Subject: [PATCH 2/6] Removed some unsafes, added some more. --- rust/arrow/benches/buffer_bit_ops.rs | 2 +- rust/arrow/src/array/array_binary.rs | 36 +++-- rust/arrow/src/array/array_boolean.rs | 21 +-- rust/arrow/src/array/array_list.rs | 15 +- rust/arrow/src/array/array_primitive.rs | 20 +-- rust/arrow/src/array/array_string.rs | 21 ++- rust/arrow/src/array/builder.rs | 6 +- rust/arrow/src/array/raw_pointer.rs | 35 +++-- rust/arrow/src/array/transform/boolean.rs | 2 +- rust/arrow/src/array/transform/mod.rs | 4 +- rust/arrow/src/array/transform/utils.rs | 2 +- rust/arrow/src/bitmap.rs | 2 +- rust/arrow/src/buffer.rs | 150 +++++++++++-------- rust/arrow/src/bytes.rs | 17 ++- rust/arrow/src/compute/kernels/arithmetic.rs | 2 +- rust/arrow/src/compute/kernels/comparison.rs | 8 +- rust/arrow/src/compute/kernels/take.rs | 14 +- rust/arrow/src/compute/util.rs | 4 +- rust/arrow/src/ffi.rs | 17 ++- rust/arrow/src/json/reader.rs | 19 ++- rust/arrow/src/memory.rs | 79 +++++----- 21 files changed, 264 insertions(+), 212 deletions(-) diff --git a/rust/arrow/benches/buffer_bit_ops.rs b/rust/arrow/benches/buffer_bit_ops.rs index 572f1f26519..8a5fbaf1e45 100644 --- a/rust/arrow/benches/buffer_bit_ops.rs +++ b/rust/arrow/benches/buffer_bit_ops.rs @@ -28,7 +28,7 @@ fn create_buffer(size: usize) -> Buffer { let mut result = MutableBuffer::new(size).with_bitset(size, false); for i in 0..size { - result.data_mut()[i] = 0b01010101 << i << (i % 4); + result.as_slice_mut()[i] = 0b01010101 << i << (i % 4); } result.freeze() diff --git a/rust/arrow/src/array/array_binary.rs b/rust/arrow/src/array/array_binary.rs index 244532d7092..d359c55cdbf 100644 --- a/rust/arrow/src/array/array_binary.rs +++ b/rust/arrow/src/array/array_binary.rs @@ -24,9 +24,9 @@ use std::{ }; use super::{ - array::print_long_array, raw_pointer::as_aligned_pointer, raw_pointer::RawPtrBox, - Array, ArrayData, ArrayDataRef, FixedSizeListArray, GenericBinaryIter, - GenericListArray, LargeListArray, ListArray, OffsetSizeTrait, + array::print_long_array, raw_pointer::RawPtrBox, Array, ArrayData, ArrayDataRef, + FixedSizeListArray, GenericBinaryIter, GenericListArray, LargeListArray, ListArray, + OffsetSizeTrait, }; use crate::util::bit_util; use crate::{buffer::Buffer, datatypes::ToByteSlice}; @@ -82,7 +82,7 @@ impl GenericBinaryArray { #[inline] fn value_offset_at(&self, i: usize) -> OffsetSize { - unsafe { *self.value_offsets.get().add(i) } + unsafe { *self.value_offsets.as_ptr().add(i) } } /// Returns the element at index `i` as a byte slice. @@ -92,7 +92,7 @@ impl GenericBinaryArray { unsafe { let pos = self.value_offset_at(offset); std::slice::from_raw_parts( - self.value_data.get().offset(pos.to_isize()), + self.value_data.as_ptr().offset(pos.to_isize()), (self.value_offset_at(offset + 1) - pos).to_usize().unwrap(), ) } @@ -205,14 +205,12 @@ impl From 2, "BinaryArray data should contain 2 buffers only (offsets and values)" ); - let raw_value_offsets = data.buffers()[0].ptr(); - let value_data = data.buffers()[1].ptr(); + let offsets = data.buffers()[0].as_ptr(); + let values = data.buffers()[1].as_ptr(); Self { data, - value_offsets: RawPtrBox::new(as_aligned_pointer::( - raw_value_offsets, - )), - value_data: RawPtrBox::new(value_data), + value_offsets: unsafe { RawPtrBox::new(offsets) }, + value_data: unsafe { RawPtrBox::new(values) }, } } } @@ -234,7 +232,7 @@ where offsets.push(length_so_far); { - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); for (i, s) in iter.enumerate() { if let Some(s) = s { @@ -328,7 +326,7 @@ impl FixedSizeBinaryArray { unsafe { let pos = self.value_offset_at(offset); std::slice::from_raw_parts( - self.value_data.get().offset(pos as isize), + self.value_data.as_ptr().offset(pos as isize), (self.value_offset_at(offset + 1) - pos) as usize, ) } @@ -389,7 +387,7 @@ impl From>>> for FixedSizeBinaryArray { let num_bytes = bit_util::ceil(len, 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); data.iter().enumerate().for_each(|(i, entry)| { if entry.is_some() { @@ -421,14 +419,14 @@ impl From for FixedSizeBinaryArray { 1, "FixedSizeBinaryArray data should contain 1 buffer only (values)" ); - let value_data = data.buffers()[0].ptr(); + let value_data = data.buffers()[0].as_ptr(); let length = match data.data_type() { DataType::FixedSizeBinary(len) => *len, _ => panic!("Expected data type to be FixedSizeBinary"), }; Self { data, - value_data: RawPtrBox::new(value_data), + value_data: unsafe { RawPtrBox::new(value_data) }, length, } } @@ -514,7 +512,7 @@ impl DecimalArray { let raw_val = unsafe { let pos = self.value_offset_at(offset); std::slice::from_raw_parts( - self.value_data.get().offset(pos as isize), + self.value_data.as_ptr().offset(pos as isize), (self.value_offset_at(offset + 1) - pos) as usize, ) }; @@ -589,7 +587,7 @@ impl From for DecimalArray { 1, "DecimalArray data should contain 1 buffer only (values)" ); - let value_data = data.buffers()[0].ptr(); + let values = data.buffers()[0].as_ptr(); let (precision, scale) = match data.data_type() { DataType::Decimal(precision, scale) => (*precision, *scale), _ => panic!("Expected data type to be Decimal"), @@ -597,7 +595,7 @@ impl From for DecimalArray { let length = 16; Self { data, - value_data: RawPtrBox::new(value_data), + value_data: unsafe { RawPtrBox::new(values) }, precision, scale, length, diff --git a/rust/arrow/src/array/array_boolean.rs b/rust/arrow/src/array/array_boolean.rs index 2e24170c392..ae39dd151dd 100644 --- a/rust/arrow/src/array/array_boolean.rs +++ b/rust/arrow/src/array/array_boolean.rs @@ -24,7 +24,6 @@ use std::{convert::From, sync::Arc}; use super::*; use super::{array::print_long_array, raw_pointer::RawPtrBox}; use crate::buffer::{Buffer, MutableBuffer}; -use crate::memory; use crate::util::bit_util; /// Array of bools @@ -58,7 +57,7 @@ impl BooleanArray { /// Returns a raw pointer to the values of this array. pub fn raw_values(&self) -> *const u8 { - unsafe { self.raw_values.get().add(self.data.offset()) } + unsafe { self.raw_values.as_ptr().add(self.data.offset()) } } /// Returns a slice for the given offset and length @@ -87,7 +86,7 @@ impl BooleanArray { /// Note this doesn't do any bound checking, for performance reason. pub fn value(&self, i: usize) -> bool { let offset = i + self.offset(); - unsafe { bit_util::get_bit_raw(self.raw_values.get() as *const u8, offset) } + unsafe { bit_util::get_bit_raw(self.raw_values.as_ptr(), offset) } } } @@ -119,7 +118,7 @@ impl From> for BooleanArray { fn from(data: Vec) -> Self { let mut mut_buf = MutableBuffer::new_null(data.len()); { - let mut_slice = mut_buf.data_mut(); + let mut_slice = mut_buf.as_slice_mut(); for (i, b) in data.iter().enumerate() { if *b { bit_util::set_bit(mut_slice, i); @@ -147,14 +146,10 @@ impl From for BooleanArray { 1, "BooleanArray data should contain a single buffer only (values buffer)" ); - let raw_values = data.buffers()[0].ptr(); - assert!( - memory::is_aligned::(raw_values, mem::align_of::()), - "memory is not aligned" - ); + let ptr = data.buffers()[0].as_ptr(); Self { data, - raw_values: RawPtrBox::new(raw_values as *const u8), + raw_values: unsafe { RawPtrBox::new(ptr) }, } } } @@ -185,11 +180,9 @@ impl>> FromIterator for BooleanArray { let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); let mut val_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); - let data = unsafe { - std::slice::from_raw_parts_mut(val_buf.raw_data_mut(), val_buf.capacity()) - }; + let data = val_buf.as_slice_mut(); - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); iter.enumerate().for_each(|(i, item)| { if let Some(a) = item.borrow() { bit_util::set_bit(null_slice, i); diff --git a/rust/arrow/src/array/array_list.rs b/rust/arrow/src/array/array_list.rs index e29b0ef0d4d..5af27769bfa 100644 --- a/rust/arrow/src/array/array_list.rs +++ b/rust/arrow/src/array/array_list.rs @@ -23,8 +23,8 @@ use std::mem; use num::Num; use super::{ - array::print_long_array, make_array, raw_pointer::as_aligned_pointer, - raw_pointer::RawPtrBox, Array, ArrayDataRef, ArrayRef, + array::print_long_array, make_array, raw_pointer::RawPtrBox, Array, ArrayDataRef, + ArrayRef, }; use crate::datatypes::ArrowNativeType; use crate::datatypes::DataType; @@ -100,7 +100,7 @@ impl GenericListArray { #[inline] fn value_offset_at(&self, i: usize) -> OffsetSize { - unsafe { *self.value_offsets.get().add(i) } + unsafe { *self.value_offsets.as_ptr().add(i) } } } @@ -117,18 +117,19 @@ impl From for GenericListArray::new(value_offsets) }; unsafe { assert!( - (*value_offsets.offset(0)).is_zero(), + (*value_offsets.as_ptr().offset(0)).is_zero(), "offsets do not start at zero" ); } Self { data, values, - value_offsets: RawPtrBox::new(value_offsets), + value_offsets, } } } diff --git a/rust/arrow/src/array/array_primitive.rs b/rust/arrow/src/array/array_primitive.rs index 472da1528ef..03001fe6b4c 100644 --- a/rust/arrow/src/array/array_primitive.rs +++ b/rust/arrow/src/array/array_primitive.rs @@ -29,7 +29,6 @@ use super::array::print_long_array; use super::raw_pointer::RawPtrBox; use super::*; use crate::buffer::{Buffer, MutableBuffer}; -use crate::memory; use crate::util::bit_util; /// Number of seconds in a day @@ -75,7 +74,7 @@ impl PrimitiveArray { #[deprecated(note = "Please use values() instead")] pub unsafe fn value_slice(&self, offset: usize, len: usize) -> &[T::Native] { std::slice::from_raw_parts( - self.raw_values.get().add(self.data.offset()).add(offset), + self.raw_values.as_ptr().add(self.data.offset()).add(offset), len, ) } @@ -88,7 +87,7 @@ impl PrimitiveArray { // buffer bounds/offset is ensured by the ArrayData instance. unsafe { std::slice::from_raw_parts( - self.raw_values.get().add(self.data.offset()), + self.raw_values.as_ptr().add(self.data.offset()), self.len(), ) } @@ -106,7 +105,7 @@ impl PrimitiveArray { /// caller must ensure that the passed in offset is less than the array len() pub fn value(&self, i: usize) -> T::Native { let offset = i + self.offset(); - unsafe { *self.raw_values.get().add(offset) } + unsafe { *self.raw_values.as_ptr().add(offset) } } } @@ -316,7 +315,7 @@ impl::Native let null = vec![0; mem::size_of::<::Native>()]; - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); iter.enumerate().for_each(|(i, item)| { if let Some(a) = item.borrow() { bit_util::set_bit(null_slice, i); @@ -413,7 +412,7 @@ impl PrimitiveArray { { let null = vec![0; mem::size_of::()]; - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); for (i, v) in data.iter().enumerate() { if let Some(n) = v { bit_util::set_bit(null_slice, i); @@ -442,14 +441,11 @@ impl From for PrimitiveArray { 1, "PrimitiveArray data should contain a single buffer only (values buffer)" ); - let raw_values = data.buffers()[0].ptr(); - assert!( - memory::is_aligned::(raw_values, mem::align_of::()), - "memory is not aligned" - ); + + let ptr = data.buffers()[0].as_ptr(); Self { data, - raw_values: RawPtrBox::new(raw_values as *const T::Native), + raw_values: unsafe { RawPtrBox::new(ptr) }, } } } diff --git a/rust/arrow/src/array/array_string.rs b/rust/arrow/src/array/array_string.rs index a0cae920bbe..95273f03c03 100644 --- a/rust/arrow/src/array/array_string.rs +++ b/rust/arrow/src/array/array_string.rs @@ -21,9 +21,8 @@ use std::mem; use std::{any::Any, iter::FromIterator}; use super::{ - array::print_long_array, raw_pointer::as_aligned_pointer, raw_pointer::RawPtrBox, - Array, ArrayData, ArrayDataRef, GenericListArray, GenericStringIter, LargeListArray, - ListArray, OffsetSizeTrait, + array::print_long_array, raw_pointer::RawPtrBox, Array, ArrayData, ArrayDataRef, + GenericListArray, GenericStringIter, LargeListArray, ListArray, OffsetSizeTrait, }; use crate::util::bit_util; use crate::{buffer::Buffer, datatypes::ToByteSlice}; @@ -80,7 +79,7 @@ impl GenericStringArray { #[inline] fn value_offset_at(&self, i: usize) -> OffsetSize { - unsafe { *self.value_offsets.get().add(i) } + unsafe { *self.value_offsets.as_ptr().add(i) } } /// Returns the element at index `i` as &str @@ -90,7 +89,7 @@ impl GenericStringArray { unsafe { let pos = self.value_offset_at(offset); let slice = std::slice::from_raw_parts( - self.value_data.get().offset(pos.to_isize()), + self.value_data.as_ptr().offset(pos.to_isize()), (self.value_offset_at(offset + 1) - pos).to_usize().unwrap(), ); @@ -168,7 +167,7 @@ where if let Some(s) = s { let s = s.as_ref(); // set null bit - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); bit_util::set_bit(null_slice, i); length_so_far = length_so_far + OffsetSize::from_usize(s.len()).unwrap(); @@ -254,14 +253,12 @@ impl From 2, "StringArray data should contain 2 buffers only (offsets and values)" ); - let raw_value_offsets = data.buffers()[0].ptr(); - let value_data = data.buffers()[1].ptr(); + let offsets = data.buffers()[0].as_ptr(); + let values = data.buffers()[1].as_ptr(); Self { data, - value_offsets: RawPtrBox::new(as_aligned_pointer::( - raw_value_offsets, - )), - value_data: RawPtrBox::new(value_data), + value_offsets: unsafe { RawPtrBox::new(offsets) }, + value_data: unsafe { RawPtrBox::new(values) }, } } } diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs index 3dff38be846..4aa35c638b6 100644 --- a/rust/arrow/src/array/builder.rs +++ b/rust/arrow/src/array/builder.rs @@ -344,7 +344,7 @@ impl BooleanBufferBuilder { if v { let data = unsafe { std::slice::from_raw_parts_mut( - self.buffer.raw_data_mut(), + self.buffer.as_mut_ptr(), self.buffer.capacity(), ) }; @@ -359,7 +359,7 @@ impl BooleanBufferBuilder { if n != 0 && v { let data = unsafe { std::slice::from_raw_parts_mut( - self.buffer.raw_data_mut(), + self.buffer.as_mut_ptr(), self.buffer.capacity(), ) }; @@ -379,7 +379,7 @@ impl BooleanBufferBuilder { // updated on each append but is updated in the // `freeze` method instead. unsafe { - bit_util::set_bit_raw(self.buffer.raw_data_mut(), self.len); + bit_util::set_bit_raw(self.buffer.as_mut_ptr(), self.len); } } self.len += 1; diff --git a/rust/arrow/src/array/raw_pointer.rs b/rust/arrow/src/array/raw_pointer.rs index 8eeadfe9390..d18ba4b29a3 100644 --- a/rust/arrow/src/array/raw_pointer.rs +++ b/rust/arrow/src/array/raw_pointer.rs @@ -16,28 +16,37 @@ // under the License. use crate::memory; +use std::ptr::NonNull; +/// This struct is highly `unsafe` and offers the possibility to self-reference a [arrow::Buffer] from [arrow::array::ArrayData]. +/// as a pointer to the beginning of its contents. pub(super) struct RawPtrBox { - inner: *const T, + ptr: NonNull, } impl RawPtrBox { - pub(super) fn new(inner: *const T) -> Self { - Self { inner } + /// # Safety + /// The user must guarantee that: + /// * the contents where `ptr` points to are never `moved`. This is guaranteed when they are Pinned. + /// * the lifetime of this struct does not outlive the lifetime of `ptr`. + /// Failure to fulfill any the above conditions results in undefined behavior. + /// # Panic + /// This function panics if: + /// * `ptr` is null + /// * `ptr` is not aligned to a slice of type `T`. This is guaranteed if it was built from a slice of type `T`. + pub(super) unsafe fn new(ptr: *const u8) -> Self { + let ptr = NonNull::new(ptr as *mut u8).expect("Pointer cannot be null"); + assert!( + memory::is_aligned(ptr, std::mem::align_of::()), + "memory is not aligned" + ); + Self { ptr: ptr.cast() } } - pub(super) fn get(&self) -> *const T { - self.inner + pub(super) fn as_ptr(&self) -> *const T { + self.ptr.as_ptr() } } unsafe impl Send for RawPtrBox {} unsafe impl Sync for RawPtrBox {} - -pub(super) fn as_aligned_pointer(p: *const u8) -> *const T { - assert!( - memory::is_aligned(p, std::mem::align_of::()), - "memory is not aligned" - ); - p as *const T -} diff --git a/rust/arrow/src/array/transform/boolean.rs b/rust/arrow/src/array/transform/boolean.rs index cfe485b7b70..2aa918b763a 100644 --- a/rust/arrow/src/array/transform/boolean.rs +++ b/rust/arrow/src/array/transform/boolean.rs @@ -29,7 +29,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { let buffer = &mut mutable.buffer1; reserve_for_bits(buffer, mutable.len + len); set_bits( - &mut buffer.data_mut(), + &mut buffer.as_slice_mut(), values, mutable.len, array.offset() + start, diff --git a/rust/arrow/src/array/transform/mod.rs b/rust/arrow/src/array/transform/mod.rs index 28be14eee7b..dc16ed65a48 100644 --- a/rust/arrow/src/array/transform/mod.rs +++ b/rust/arrow/src/array/transform/mod.rs @@ -96,7 +96,7 @@ fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits Box::new(move |mutable, start, len| { utils::reserve_for_bits(&mut mutable.null_buffer, mutable.len + len); mutable.null_count += utils::set_bits( - mutable.null_buffer.data_mut(), + mutable.null_buffer.as_slice_mut(), bytes, mutable.len, array.offset() + start, @@ -106,7 +106,7 @@ fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits } else if use_nulls { Box::new(|mutable, _, len| { utils::reserve_for_bits(&mut mutable.null_buffer, mutable.len + len); - let write_data = mutable.null_buffer.data_mut(); + let write_data = mutable.null_buffer.as_slice_mut(); let offset = mutable.len; (0..len).for_each(|i| { bit_util::set_bit(write_data, offset + i); diff --git a/rust/arrow/src/array/transform/utils.rs b/rust/arrow/src/array/transform/utils.rs index 933ec0da1c6..c95912996f2 100644 --- a/rust/arrow/src/array/transform/utils.rs +++ b/rust/arrow/src/array/transform/utils.rs @@ -72,7 +72,7 @@ pub(super) unsafe fn get_last_offset( // Soundness // * offset buffer is always extended in slices of T and aligned accordingly. // * Buffer[0] is initialized with one element, 0, and thus `mutable_offsets.len() - 1` is always valid. - let (prefix, offsets, suffix) = offset_buffer.data().align_to::(); + let (prefix, offsets, suffix) = offset_buffer.as_slice().align_to::(); debug_assert!(prefix.is_empty() && suffix.is_empty()); *offsets.get_unchecked(offsets.len() - 1) } diff --git a/rust/arrow/src/bitmap.rs b/rust/arrow/src/bitmap.rs index 8acbe8f290f..e76cb51c2d1 100644 --- a/rust/arrow/src/bitmap.rs +++ b/rust/arrow/src/bitmap.rs @@ -54,7 +54,7 @@ impl Bitmap { pub fn is_set(&self, i: usize) -> bool { assert!(i < (self.bits.len() << 3)); - unsafe { bit_util::get_bit_raw(self.bits.ptr(), i) } + unsafe { bit_util::get_bit_raw(self.bits.as_ptr(), i) } } pub fn buffer_ref(&self) -> &Buffer { diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index a951b48cca8..ae5a43f92ef 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -26,13 +26,12 @@ use crate::{ ffi, }; -use std::cmp; use std::convert::AsRef; use std::fmt::Debug; use std::mem; use std::ops::{BitAnd, BitOr, Not}; -use std::slice::{from_raw_parts, from_raw_parts_mut}; use std::sync::Arc; +use std::{cmp, ptr::NonNull}; #[cfg(feature = "avx512")] use crate::arch::avx512::*; @@ -70,7 +69,7 @@ impl Buffer { /// /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. - pub unsafe fn from_raw_parts(ptr: *const u8, len: usize, capacity: usize) -> Self { + pub unsafe fn from_raw_parts(ptr: NonNull, len: usize, capacity: usize) -> Self { assert!(len <= capacity); Buffer::build_with_arguments(ptr, len, Deallocation::Native(capacity)) } @@ -89,7 +88,7 @@ impl Buffer { /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes and that the foreign deallocator frees the region. pub unsafe fn from_unowned( - ptr: *const u8, + ptr: NonNull, len: usize, data: Arc, ) -> Self { @@ -98,7 +97,7 @@ impl Buffer { /// Auxiliary method to create a new Buffer unsafe fn build_with_arguments( - ptr: *const u8, + ptr: NonNull, len: usize, deallocation: Deallocation, ) -> Self { @@ -130,7 +129,10 @@ impl Buffer { &self.data[self.offset..] } - /// Returns a slice of this buffer, starting from `offset`. + /// Returns a new [Buffer] that is a slice of this buffer starting at `offset`. + /// Doing so allows the same memory region to be shared between buffers. + /// # Panics + /// Panics iff `offset` is larger than `len`. pub fn slice(&self, offset: usize) -> Self { assert!( offset <= self.len(), @@ -146,8 +148,8 @@ impl Buffer { /// /// Note that this should be used cautiously, and the returned pointer should not be /// stored anywhere, to avoid dangling pointers. - pub fn ptr(&self) -> *const u8 { - unsafe { self.data.ptr().add(self.offset) } + pub fn as_ptr(&self) -> *const u8 { + unsafe { self.data.ptr().as_ptr().add(self.offset) } } /// View buffer as typed slice. @@ -162,9 +164,15 @@ impl Buffer { /// `bool` in Rust. However, `bool` arrays in Arrow are bit-packed which breaks this condition. pub unsafe fn typed_data(&self) -> &[T] { assert_eq!(self.len() % mem::size_of::(), 0); - assert!(memory::is_ptr_aligned::(self.data.ptr() as *const T)); - from_raw_parts( - self.ptr() as *const T, + assert!(memory::is_ptr_aligned::(self.data.ptr().cast())); + // JUSTIFICATION + // Benefit + // Many of the buffers represent specific types, and consumers of `Buffer` often need to re-interpret them. + // Soundness + // * The pointer is non-null by construction + // * alignment asserted above + std::slice::from_raw_parts( + self.as_ptr() as *const T, self.len() / mem::size_of::(), ) } @@ -214,13 +222,33 @@ impl> From for Buffer { let len = slice.len() * mem::size_of::(); let capacity = bit_util::round_upto_multiple_of_64(len); let buffer = memory::allocate_aligned(capacity); + // JUSTIFICATION + // Benefit + // It is often useful to create a buffer from bytes, typically when they are allocated by external sources + // Soundness + // * The pointers are non-null by construction + // * alignment asserted above + // Unsoundness + // * There is no guarantee that the memory regions do are non-overalling, but `memcpy` requires this. unsafe { - memory::memcpy(buffer, slice.as_ptr(), len); + memory::memcpy( + buffer, + NonNull::new_unchecked(slice.as_ptr() as *mut u8), + len, + ); Buffer::build_with_arguments(buffer, len, Deallocation::Native(capacity)) } } } +impl std::ops::Deref for Buffer { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len()) } + } +} + /// Apply a bitwise operation `simd_op` / `scalar_op` to two inputs using simd instructions and return the result as a Buffer. /// The `simd_op` functions gets applied on chunks of 64 bytes (512 bits) at a time /// and the `scalar_op` gets applied to remaining bytes. @@ -668,7 +696,8 @@ unsafe impl Send for Buffer {} /// converted into a immutable buffer via the `freeze` method. #[derive(Debug)] pub struct MutableBuffer { - data: *mut u8, + // dangling iff capacity = 0 + data: NonNull, len: usize, capacity: usize, } @@ -701,7 +730,7 @@ impl MutableBuffer { assert!(end <= self.capacity); let v = if val { 255 } else { 0 }; unsafe { - std::ptr::write_bytes(self.data, v, end); + std::ptr::write_bytes(self.data.as_ptr(), v, end); self.len = end; } self @@ -715,7 +744,7 @@ impl MutableBuffer { pub fn set_null_bits(&mut self, start: usize, count: usize) { assert!(start + count <= self.capacity); unsafe { - std::ptr::write_bytes(self.data.add(start), 0, count); + std::ptr::write_bytes(self.data.as_ptr().add(start), 0, count); } } @@ -727,9 +756,8 @@ impl MutableBuffer { if capacity > self.capacity { let new_capacity = bit_util::round_upto_multiple_of_64(capacity); let new_capacity = cmp::max(new_capacity, self.capacity * 2); - let new_data = + self.data = unsafe { memory::reallocate(self.data, self.capacity, new_capacity) }; - self.data = new_data as *mut u8; self.capacity = new_capacity; } self.capacity @@ -748,9 +776,8 @@ impl MutableBuffer { } else { let new_capacity = bit_util::round_upto_multiple_of_64(new_len); if new_capacity < self.capacity { - let new_data = + self.data = unsafe { memory::reallocate(self.data, self.capacity, new_capacity) }; - self.data = new_data as *mut u8; self.capacity = new_capacity; } } @@ -781,21 +808,13 @@ impl MutableBuffer { } /// Returns the data stored in this buffer as a slice. - pub fn data(&self) -> &[u8] { - if self.data.is_null() { - &[] - } else { - unsafe { std::slice::from_raw_parts(self.raw_data(), self.len()) } - } + pub fn as_slice(&self) -> &[u8] { + self } /// Returns the data stored in this buffer as a mutable slice. - pub fn data_mut(&mut self) -> &mut [u8] { - if self.data.is_null() { - &mut [] - } else { - unsafe { std::slice::from_raw_parts_mut(self.raw_data_mut(), self.len()) } - } + pub fn as_slice_mut(&mut self) -> &mut [u8] { + self } /// Returns a raw pointer for this buffer. @@ -803,13 +822,13 @@ impl MutableBuffer { /// Note that this should be used cautiously, and the returned pointer should not be /// stored anywhere, to avoid dangling pointers. #[inline] - pub const fn raw_data(&self) -> *const u8 { - self.data + pub const fn as_ptr(&self) -> *const u8 { + self.data.as_ptr() } #[inline] - pub fn raw_data_mut(&mut self) -> *mut u8 { - self.data + pub fn as_mut_ptr(&mut self) -> *mut u8 { + self.data.as_ptr() } /// Freezes this buffer and return an immutable version of it. @@ -827,10 +846,16 @@ impl MutableBuffer { /// View buffer as typed slice. pub fn typed_data_mut(&mut self) -> &mut [T] { assert_eq!(self.len() % mem::size_of::(), 0); - assert!(memory::is_ptr_aligned::(self.raw_data() as *const T)); + assert!(memory::is_ptr_aligned::(self.data.cast())); + // JUSTIFICATION + // Benefit + // Many of the buffers represent specific types, and consumers of `Buffer` often need to re-interpret them. + // Soundness + // * The pointer is non-null by construction + // * alignment asserted above unsafe { - from_raw_parts_mut( - self.raw_data() as *mut T, + std::slice::from_raw_parts_mut( + self.as_ptr() as *mut T, self.len() / mem::size_of::(), ) } @@ -844,7 +869,9 @@ impl MutableBuffer { self.reserve(new_len); } unsafe { - memory::memcpy(self.data.add(self.len), bytes.as_ptr(), bytes.len()); + let dst = NonNull::new_unchecked(self.data.as_ptr().add(self.len)); + let src = NonNull::new_unchecked(bytes.as_ptr() as *mut u8); + memory::memcpy(dst, src, bytes.len()); } self.len = new_len; } @@ -859,11 +886,23 @@ impl MutableBuffer { } } +impl std::ops::Deref for MutableBuffer { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) } + } +} + +impl std::ops::DerefMut for MutableBuffer { + fn deref_mut(&mut self) -> &mut [u8] { + unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) } + } +} + impl Drop for MutableBuffer { fn drop(&mut self) { - if !self.data.is_null() { - unsafe { memory::free_aligned(self.data, self.capacity) }; - } + unsafe { memory::free_aligned(self.data, self.capacity) }; } } @@ -875,7 +914,7 @@ impl PartialEq for MutableBuffer { if self.capacity != other.capacity { return false; } - unsafe { memory::memcmp(self.data, other.data, self.len) == 0 } + self.as_slice() == other.as_slice() } } @@ -884,7 +923,6 @@ unsafe impl Send for MutableBuffer {} #[cfg(test)] mod tests { - use std::ptr::null_mut; use std::thread; use super::*; @@ -920,15 +958,9 @@ mod tests { #[test] fn test_from_raw_parts() { - let buf = unsafe { Buffer::from_raw_parts(null_mut(), 0, 0) }; - assert_eq!(0, buf.len()); - assert_eq!(0, buf.data().len()); - assert_eq!(0, buf.capacity()); - assert!(buf.ptr().is_null()); - let buf = Buffer::from(&[0, 1, 2, 3, 4]); assert_eq!(5, buf.len()); - assert!(!buf.ptr().is_null()); + assert!(!buf.as_ptr().is_null()); assert_eq!([0, 1, 2, 3, 4], buf.data()); } @@ -936,7 +968,7 @@ mod tests { fn test_from_vec() { let buf = Buffer::from(&[0, 1, 2, 3, 4]); assert_eq!(5, buf.len()); - assert!(!buf.ptr().is_null()); + assert!(!buf.as_ptr().is_null()); assert_eq!([0, 1, 2, 3, 4], buf.data()); } @@ -946,7 +978,7 @@ mod tests { let buf2 = buf; assert_eq!(5, buf2.len()); assert_eq!(64, buf2.capacity()); - assert!(!buf2.ptr().is_null()); + assert!(!buf2.as_ptr().is_null()); assert_eq!([0, 1, 2, 3, 4], buf2.data()); } @@ -957,12 +989,12 @@ mod tests { assert_eq!([6, 8, 10], buf2.data()); assert_eq!(3, buf2.len()); - assert_eq!(unsafe { buf.ptr().offset(2) }, buf2.ptr()); + assert_eq!(unsafe { buf.as_ptr().offset(2) }, buf2.as_ptr()); let buf3 = buf2.slice(1); assert_eq!([8, 10], buf3.data()); assert_eq!(2, buf3.len()); - assert_eq!(unsafe { buf.ptr().offset(3) }, buf3.ptr()); + assert_eq!(unsafe { buf.as_ptr().offset(3) }, buf3.as_ptr()); let buf4 = buf.slice(5); let empty_slice: [u8; 0] = []; @@ -1046,17 +1078,17 @@ mod tests { let mut buf = MutableBuffer::new(100); buf.extend_from_slice(b"hello"); assert_eq!(5, buf.len()); - assert_eq!(b"hello", buf.data()); + assert_eq!(b"hello", buf.as_slice()); buf.extend_from_slice(b" world"); assert_eq!(11, buf.len()); - assert_eq!(b"hello world", buf.data()); + assert_eq!(b"hello world", buf.as_slice()); buf.clear(); assert_eq!(0, buf.len()); buf.extend_from_slice(b"hello arrow"); assert_eq!(11, buf.len()); - assert_eq!(b"hello arrow", buf.data()); + assert_eq!(b"hello arrow", buf.as_slice()); } #[test] @@ -1107,7 +1139,7 @@ mod tests { buf.extend_from_slice(b"aaaa bbbb cccc dddd"); assert_eq!(19, buf.len()); assert_eq!(64, buf.capacity()); - assert_eq!(b"aaaa bbbb cccc dddd", buf.data()); + assert_eq!(b"aaaa bbbb cccc dddd", buf.as_slice()); let immutable_buf = buf.freeze(); assert_eq!(19, immutable_buf.len()); diff --git a/rust/arrow/src/bytes.rs b/rust/arrow/src/bytes.rs index a1e9c91a00e..331011687da 100644 --- a/rust/arrow/src/bytes.rs +++ b/rust/arrow/src/bytes.rs @@ -20,6 +20,7 @@ //! Note that this is a low-level functionality of this crate. use core::slice; +use std::ptr::NonNull; use std::sync::Arc; use std::{fmt::Debug, fmt::Formatter}; @@ -56,7 +57,7 @@ impl Debug for Deallocation { /// foreign deallocator to deallocate the region when it is no longer needed. pub struct Bytes { /// The raw pointer to be begining of the region - ptr: *const u8, + ptr: NonNull, /// The number of bytes visible to this region. This is always smaller than its capacity (when avaliable). len: usize, @@ -78,7 +79,11 @@ impl Bytes { /// /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. - pub unsafe fn new(ptr: *const u8, len: usize, deallocation: Deallocation) -> Bytes { + pub unsafe fn new( + ptr: std::ptr::NonNull, + len: usize, + deallocation: Deallocation, + ) -> Bytes { Bytes { ptr, len, @@ -101,7 +106,7 @@ impl Bytes { } #[inline] - pub fn ptr(&self) -> *const u8 { + pub fn ptr(&self) -> NonNull { self.ptr } @@ -120,9 +125,7 @@ impl Drop for Bytes { fn drop(&mut self) { match &self.deallocation { Deallocation::Native(capacity) => { - if !self.ptr.is_null() { - unsafe { memory::free_aligned(self.ptr as *mut u8, *capacity) }; - } + unsafe { memory::free_aligned(self.ptr, *capacity) }; } // foreign interface knows how to deallocate itself. Deallocation::Foreign(_) => (), @@ -134,7 +137,7 @@ impl std::ops::Deref for Bytes { type Target = [u8]; fn deref(&self) -> &[u8] { - unsafe { slice::from_raw_parts(self.ptr, self.len) } + unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) } } } diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index 18092b6335b..0616628e2d3 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -190,7 +190,7 @@ where if let Some(b) = &null_bit_buffer { // some value is null for i in 0..left.len() { - let is_valid = unsafe { bit_util::get_bit_raw(b.ptr(), i) }; + let is_valid = unsafe { bit_util::get_bit_raw(b.as_ptr(), i) }; values.push(if is_valid { let right_value = right.value(i); if right_value.is_zero() { diff --git a/rust/arrow/src/compute/kernels/comparison.rs b/rust/arrow/src/compute/kernels/comparison.rs index f77ce57f76c..47e4e6f31cc 100644 --- a/rust/arrow/src/compute/kernels/comparison.rs +++ b/rust/arrow/src/compute/kernels/comparison.rs @@ -51,7 +51,7 @@ macro_rules! compare_op { let actual_capacity = bit_util::round_upto_multiple_of_64(byte_capacity); let mut buffer = MutableBuffer::new(actual_capacity); buffer.resize(byte_capacity); - let data = buffer.raw_data_mut(); + let data = buffer.as_mut_ptr(); for i in 0..$left.len() { if $op($left.value(i), $right.value(i)) { @@ -84,7 +84,7 @@ macro_rules! compare_op_scalar { let actual_capacity = bit_util::round_upto_multiple_of_64(byte_capacity); let mut buffer = MutableBuffer::new(actual_capacity); buffer.resize(byte_capacity); - let data = buffer.raw_data_mut(); + let data = buffer.as_mut_ptr(); for i in 0..$left.len() { if $op($left.value(i), $right) { @@ -656,7 +656,7 @@ where let not_both_null_bitmap = not_both_null_bit_buffer.data(); let mut bool_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); - let bool_slice = bool_buf.data_mut(); + let bool_slice = bool_buf.as_slice_mut(); // if both array slots are valid, check if list contains primitive for i in 0..left_len { @@ -711,7 +711,7 @@ where let not_both_null_bitmap = not_both_null_bit_buffer.data(); let mut bool_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); - let bool_slice = bool_buf.data_mut(); + let bool_slice = &mut bool_buf; for i in 0..left_len { // contains(null, null) = false diff --git a/rust/arrow/src/compute/kernels/take.rs b/rust/arrow/src/compute/kernels/take.rs index c1d31f928ff..ebb3022525f 100644 --- a/rust/arrow/src/compute/kernels/take.rs +++ b/rust/arrow/src/compute/kernels/take.rs @@ -291,7 +291,7 @@ where let num_bytes = bit_util::ceil(data_len, 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); for (i, elem) in data.iter_mut().enumerate() { let index = ToPrimitive::to_usize(&indices.value(i)).ok_or_else(|| { @@ -342,7 +342,7 @@ where let num_byte = bit_util::ceil(data_len, 8); let mut val_buf = MutableBuffer::new(num_byte).with_bitset(num_byte, false); - let val_slice = val_buf.data_mut(); + let val_slice = val_buf.as_slice_mut(); let null_count = values.null_count(); @@ -363,7 +363,7 @@ where nulls = indices.data_ref().null_buffer().cloned(); } else { let mut null_buf = MutableBuffer::new(num_byte).with_bitset(num_byte, true); - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); (0..data_len).try_for_each::<_, Result<()>>(|i| { let index = ToPrimitive::to_usize(&indices.value(i)).ok_or_else(|| { @@ -442,7 +442,7 @@ where let num_bytes = bit_util::ceil(data_len, 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); for (i, offset) in offsets.iter_mut().skip(1).enumerate() { let index = ToPrimitive::to_usize(&indices.value(i)).ok_or_else(|| { @@ -480,7 +480,7 @@ where let num_bytes = bit_util::ceil(data_len, 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); for (i, offset) in offsets.iter_mut().skip(1).enumerate() { let index = ToPrimitive::to_usize(&indices.value(i)).ok_or_else(|| { @@ -544,7 +544,7 @@ where let num_bytes = bit_util::ceil(indices.len(), 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); { - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); offsets[..].windows(2).enumerate().for_each( |(i, window): (usize, &[OffsetType::Native])| { if window[0] == window[1] { @@ -589,7 +589,7 @@ where let mut null_count = 0; let num_bytes = bit_util::ceil(indices.len(), 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); for i in 0..indices.len() { let index = ToPrimitive::to_usize(&indices.value(i)).ok_or_else(|| { diff --git a/rust/arrow/src/compute/util.rs b/rust/arrow/src/compute/util.rs index 043b5a36533..603a70dd3a8 100644 --- a/rust/arrow/src/compute/util.rs +++ b/rust/arrow/src/compute/util.rs @@ -297,7 +297,7 @@ pub(super) mod tests { values.append(&mut array); } else { list_null_count += 1; - bit_util::unset_bit(&mut list_bitmap.data_mut(), idx); + bit_util::unset_bit(&mut list_bitmap.as_slice_mut(), idx); } offset.push(values.len() as i64); } @@ -387,7 +387,7 @@ pub(super) mod tests { values.extend(items.into_iter()); } else { list_null_count += 1; - bit_util::unset_bit(&mut list_bitmap.data_mut(), idx); + bit_util::unset_bit(&mut list_bitmap.as_slice_mut(), idx); values.extend(vec![None; length as usize].into_iter()); } } diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index f66440ba2b1..d494843aafa 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -76,7 +76,14 @@ To import an array, unsafely create an `ArrowArray` from two pointers using [Arr To export an array, create an `ArrowArray` using [ArrowArray::try_new]. */ -use std::{ffi::CStr, ffi::CString, iter, mem::size_of, ptr, sync::Arc}; +use std::{ + ffi::CStr, + ffi::CString, + iter, + mem::size_of, + ptr::{self, NonNull}, + sync::Arc, +}; use crate::buffer::Buffer; use crate::datatypes::DataType; @@ -329,7 +336,7 @@ impl FFI_ArrowArray { .iter() .map(|maybe_buffer| match maybe_buffer { // note that `raw_data` takes into account the buffer's offset - Some(b) => b.ptr() as *const std::os::raw::c_void, + Some(b) => b.as_ptr() as *const std::os::raw::c_void, None => std::ptr::null(), }) .collect::>(); @@ -393,11 +400,7 @@ unsafe fn create_buffer( assert!(index < array.n_buffers as usize); let ptr = *buffers.add(index); - if ptr.is_null() { - None - } else { - Some(Buffer::from_unowned(ptr, len, array)) - } + NonNull::new(ptr as *mut u8).map(|ptr| Buffer::from_unowned(ptr, len, array)) } impl Drop for FFI_ArrowArray { diff --git a/rust/arrow/src/json/reader.rs b/rust/arrow/src/json/reader.rs index d8bbd86691d..0c687212f11 100644 --- a/rust/arrow/src/json/reader.rs +++ b/rust/arrow/src/json/reader.rs @@ -872,7 +872,7 @@ impl Decoder { rows.iter().enumerate().for_each(|(i, v)| { if let Value::Array(a) = v { cur_offset = cur_offset + OffsetSize::from_usize(a.len()).unwrap(); - bit_util::set_bit(list_nulls.data_mut(), i); + bit_util::set_bit(list_nulls.as_slice_mut(), i); } else if let Value::Null = v { // value is null, not incremented } else { @@ -896,11 +896,17 @@ impl Decoder { if let Value::Bool(child) = value { // if valid boolean, append value if *child { - bit_util::set_bit(bool_values.data_mut(), curr_index); + bit_util::set_bit( + bool_values.as_slice_mut(), + curr_index, + ); } } else { // null slot - bit_util::unset_bit(bool_nulls.data_mut(), curr_index); + bit_util::unset_bit( + bool_nulls.as_slice_mut(), + curr_index, + ); } curr_index += 1; }); @@ -964,7 +970,10 @@ impl Decoder { .flat_map(|row| { if let Value::Array(values) = row { values.iter().for_each(|_| { - bit_util::set_bit(null_buffer.data_mut(), struct_index); + bit_util::set_bit( + null_buffer.as_slice_mut(), + struct_index, + ); struct_index += 1; }); values.clone() @@ -1178,7 +1187,7 @@ impl Decoder { .map(|(i, v)| match v { // we want the field as an object, if it's not, we treat as null Some(Value::Object(value)) => { - bit_util::set_bit(null_buffer.data_mut(), i); + bit_util::set_bit(null_buffer.as_slice_mut(), i); Value::Object(value.clone()) } _ => Value::Object(Default::default()), diff --git a/rust/arrow/src/memory.rs b/rust/arrow/src/memory.rs index 8bd334469ee..ad103b06280 100644 --- a/rust/arrow/src/memory.rs +++ b/rust/arrow/src/memory.rs @@ -20,7 +20,10 @@ use std::mem::align_of; use std::ptr::NonNull; -use std::{alloc::Layout, sync::atomic::AtomicIsize}; +use std::{ + alloc::{handle_alloc_error, Layout}, + sync::atomic::AtomicIsize, +}; // NOTE: Below code is written for spatial/temporal prefetcher optimizations. Memory allocation // should align well with usage pattern of cache access and block sizes on layers of storage levels from @@ -138,18 +141,19 @@ const BYPASS_PTR: NonNull = unsafe { NonNull::new_unchecked(ALIGNMENT as *mu // If this number is not zero after all objects have been `drop`, there is a memory leak pub static mut ALLOCATIONS: AtomicIsize = AtomicIsize::new(0); -pub fn allocate_aligned(size: usize) -> *mut u8 { +pub fn allocate_aligned(size: usize) -> NonNull { unsafe { if size == 0 { // In a perfect world, there is no need to request zero size allocation. // Currently, passing zero sized layout to alloc is UB. // This will dodge allocator api for any type. - BYPASS_PTR.as_ptr() + BYPASS_PTR } else { ALLOCATIONS.fetch_add(size as isize, std::sync::atomic::Ordering::SeqCst); let layout = Layout::from_size_align_unchecked(size, ALIGNMENT); - std::alloc::alloc_zeroed(layout) + let raw_ptr = std::alloc::alloc_zeroed(layout); + NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout)) } } } @@ -162,10 +166,13 @@ pub fn allocate_aligned(size: usize) -> *mut u8 { /// * ptr must denote a block of memory currently allocated via this allocator, /// /// * size must be the same size that was used to allocate that block of memory, -pub unsafe fn free_aligned(ptr: *mut u8, size: usize) { - if ptr != BYPASS_PTR.as_ptr() { +pub unsafe fn free_aligned(ptr: NonNull, size: usize) { + if ptr != BYPASS_PTR { ALLOCATIONS.fetch_sub(size as isize, std::sync::atomic::Ordering::SeqCst); - std::alloc::dealloc(ptr, Layout::from_size_align_unchecked(size, ALIGNMENT)); + std::alloc::dealloc( + ptr.as_ptr(), + Layout::from_size_align_unchecked(size, ALIGNMENT), + ); } } @@ -180,65 +187,69 @@ pub unsafe fn free_aligned(ptr: *mut u8, size: usize) { /// /// * new_size, when rounded up to the nearest multiple of [ALIGNMENT], must not overflow (i.e., /// the rounded value must be less than usize::MAX). -pub unsafe fn reallocate(ptr: *mut u8, old_size: usize, new_size: usize) -> *mut u8 { - if ptr == BYPASS_PTR.as_ptr() { +pub unsafe fn reallocate( + ptr: NonNull, + old_size: usize, + new_size: usize, +) -> NonNull { + if ptr == BYPASS_PTR { return allocate_aligned(new_size); } if new_size == 0 { free_aligned(ptr, old_size); - return BYPASS_PTR.as_ptr(); + return BYPASS_PTR; } ALLOCATIONS.fetch_add( new_size as isize - old_size as isize, std::sync::atomic::Ordering::SeqCst, ); - let new_ptr = std::alloc::realloc( - ptr, + let raw_ptr = std::alloc::realloc( + ptr.as_ptr(), Layout::from_size_align_unchecked(old_size, ALIGNMENT), new_size, ); - - if !new_ptr.is_null() && new_size > old_size { - new_ptr.add(old_size).write_bytes(0, new_size - old_size); + let ptr = NonNull::new(raw_ptr).unwrap_or_else(|| { + handle_alloc_error(Layout::from_size_align_unchecked(new_size, ALIGNMENT)) + }); + + if new_size > old_size { + ptr.as_ptr() + .add(old_size) + .write_bytes(0, new_size - old_size); } - - new_ptr + ptr } /// # Safety /// /// Behavior is undefined if any of the following conditions are violated: /// -/// * `src` must be valid for reads of `len * size_of::()` bytes. +/// * `src` must be valid for reads of `len * size_of::()` bytes. /// -/// * `dst` must be valid for writes of `len * size_of::()` bytes. +/// * `dst` must be valid for writes of `len * size_of::()` bytes. /// /// * Both `src` and `dst` must be properly aligned. /// /// `memcpy` creates a bitwise copy of `T`, regardless of whether `T` is [`Copy`]. If `T` is not /// [`Copy`], using both the values in the region beginning at `*src` and the region beginning at /// `*dst` can [violate memory safety][read-ownership]. -pub unsafe fn memcpy(dst: *mut u8, src: *const u8, len: usize) { - if len != 0x00 && src != BYPASS_PTR.as_ptr() { - std::ptr::copy_nonoverlapping(src, dst, len) +pub unsafe fn memcpy(dst: NonNull, src: NonNull, count: usize) { + if src != BYPASS_PTR { + std::ptr::copy_nonoverlapping(src.as_ptr(), dst.as_ptr(), count) } } -extern "C" { - pub fn memcmp(p1: *const u8, p2: *const u8, len: usize) -> i32; -} - /// Check if the pointer `p` is aligned to offset `a`. -pub fn is_aligned(p: *const T, a: usize) -> bool { +pub fn is_aligned(p: NonNull, a: usize) -> bool { let a_minus_one = a.wrapping_sub(1); - let pmoda = p as usize & a_minus_one; + let pmoda = p.as_ptr() as usize & a_minus_one; pmoda == 0 } -pub fn is_ptr_aligned(p: *const T) -> bool { - p.align_offset(align_of::()) == 0 +pub fn is_ptr_aligned(p: NonNull) -> bool { + p.as_ptr().align_offset(align_of::()) == 0 } #[cfg(test)] @@ -250,7 +261,7 @@ mod tests { for _ in 0..10 { let p = allocate_aligned(1024); // make sure this is 64-byte aligned - assert_eq!(0, (p as usize) % 64); + assert_eq!(0, (p.as_ptr() as usize) % 64); unsafe { free_aligned(p, 1024) }; } } @@ -258,16 +269,16 @@ mod tests { #[test] fn test_is_aligned() { // allocate memory aligned to 64-byte - let mut ptr = allocate_aligned(10); + let ptr = allocate_aligned(10); assert_eq!(true, is_aligned::(ptr, 1)); assert_eq!(true, is_aligned::(ptr, 2)); assert_eq!(true, is_aligned::(ptr, 4)); // now make the memory aligned to 63-byte - ptr = unsafe { ptr.offset(1) }; + let ptr = unsafe { NonNull::new_unchecked(ptr.as_ptr().offset(1)) }; assert_eq!(true, is_aligned::(ptr, 1)); assert_eq!(false, is_aligned::(ptr, 2)); assert_eq!(false, is_aligned::(ptr, 4)); - unsafe { free_aligned(ptr.offset(-1), 10) }; + unsafe { free_aligned(NonNull::new_unchecked(ptr.as_ptr().offset(-1)), 10) }; } } From 8093d17cabfc5fe6d40ee4c369835bb198e7c6d0 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 23 Dec 2020 15:24:30 +0000 Subject: [PATCH 3/6] Migrated parquet. --- .../src/bin/arrow-json-integration-test.rs | 2 +- rust/parquet/src/arrow/array_reader.rs | 2 +- rust/parquet/src/arrow/record_reader.rs | 24 ++++++++++--------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/rust/integration-testing/src/bin/arrow-json-integration-test.rs b/rust/integration-testing/src/bin/arrow-json-integration-test.rs index b1bec677cf1..05b30caed8c 100644 --- a/rust/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/rust/integration-testing/src/bin/arrow-json-integration-test.rs @@ -599,7 +599,7 @@ fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer { .iter() .enumerate() .for_each(|(i, v)| { - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); if *v != 0 { bit_util::set_bit(null_slice, i); } diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index eb9548a8622..c8363c6a557 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -920,7 +920,7 @@ impl ArrayReader for ListArrayReader { let num_bytes = bit_util::ceil(offsets.len(), 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); - let null_slice = null_buf.data_mut(); + let null_slice = null_buf.as_slice_mut(); let mut list_index = 0; for i in 0..rep_levels.len() { if rep_levels[i] == 0 && def_levels[i] != 0 { diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index 0a7c17fdcfb..d2f77cd3f9a 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -163,8 +163,8 @@ impl RecordReader { new_buffer.resize(num_bytes); - let new_def_levels = new_buffer.data_mut(); - let left_def_levels = &def_levels_buf.data_mut()[new_len..]; + let new_def_levels = new_buffer.as_slice_mut(); + let left_def_levels = &def_levels_buf.as_slice_mut()[new_len..]; new_def_levels[0..num_bytes].copy_from_slice(&left_def_levels[0..num_bytes]); @@ -190,8 +190,8 @@ impl RecordReader { new_buffer.resize(num_bytes); - let new_rep_levels = new_buffer.data_mut(); - let left_rep_levels = &rep_levels_buf.data_mut()[new_len..]; + let new_rep_levels = new_buffer.as_slice_mut(); + let left_rep_levels = &rep_levels_buf.as_slice_mut()[new_len..]; new_rep_levels[0..num_bytes].copy_from_slice(&left_rep_levels[0..num_bytes]); @@ -217,8 +217,8 @@ impl RecordReader { new_buffer.resize(num_bytes); - let new_records = new_buffer.data_mut(); - let left_records = &mut self.records.data_mut()[new_len..]; + let new_records = new_buffer.as_slice_mut(); + let left_records = &mut self.records.as_slice_mut()[new_len..]; new_records[0..num_bytes].copy_from_slice(&left_records[0..num_bytes]); @@ -291,20 +291,20 @@ impl RecordReader { // Convert mutable buffer spaces to mutable slices let (prefix, values, suffix) = - unsafe { self.records.data_mut().align_to_mut::() }; + unsafe { self.records.as_slice_mut().align_to_mut::() }; assert!(prefix.is_empty() && suffix.is_empty()); let values = &mut values[values_written..]; let def_levels = self.def_levels.as_mut().map(|buf| { let (prefix, def_levels, suffix) = - unsafe { buf.data_mut().align_to_mut::() }; + unsafe { buf.as_slice_mut().align_to_mut::() }; assert!(prefix.is_empty() && suffix.is_empty()); &mut def_levels[values_written..] }); let rep_levels = self.rep_levels.as_mut().map(|buf| { let (prefix, rep_levels, suffix) = - unsafe { buf.data_mut().align_to_mut::() }; + unsafe { buf.as_slice_mut().align_to_mut::() }; assert!(prefix.is_empty() && suffix.is_empty()); &mut rep_levels[values_written..] }); @@ -317,7 +317,8 @@ impl RecordReader { // get new references for the def levels. let def_levels = self.def_levels.as_ref().map(|buf| { - let (prefix, def_levels, suffix) = unsafe { buf.data().align_to::() }; + let (prefix, def_levels, suffix) = + unsafe { buf.as_slice().align_to::() }; assert!(prefix.is_empty() && suffix.is_empty()); &def_levels[values_written..] }); @@ -370,7 +371,8 @@ impl RecordReader { /// records read. fn split_records(&mut self, records_to_read: usize) -> Result { let rep_levels = self.rep_levels.as_ref().map(|buf| { - let (prefix, rep_levels, suffix) = unsafe { buf.data().align_to::() }; + let (prefix, rep_levels, suffix) = + unsafe { buf.as_slice().align_to::() }; assert!(prefix.is_empty() && suffix.is_empty()); rep_levels }); From b5bb0b964e82aed47ec4b45442bab5a52097cef2 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 23 Dec 2020 16:13:10 +0000 Subject: [PATCH 4/6] Rest data -> as_slice --- rust/arrow/src/array/array_struct.rs | 4 +- rust/arrow/src/array/array_union.rs | 4 +- rust/arrow/src/array/builder.rs | 6 +-- rust/arrow/src/array/data.rs | 6 +-- rust/arrow/src/array/equal/boolean.rs | 4 +- rust/arrow/src/array/equal/decimal.rs | 4 +- rust/arrow/src/array/equal/fixed_binary.rs | 4 +- rust/arrow/src/array/equal/primitive.rs | 4 +- rust/arrow/src/array/equal/structure.rs | 4 +- rust/arrow/src/array/equal/utils.rs | 4 +- rust/arrow/src/array/equal/variable_size.rs | 8 ++-- rust/arrow/src/array/transform/boolean.rs | 2 +- .../arrow/src/array/transform/fixed_binary.rs | 2 +- rust/arrow/src/array/transform/mod.rs | 2 +- rust/arrow/src/array/transform/primitive.rs | 2 +- .../src/array/transform/variable_size.rs | 2 +- rust/arrow/src/bitmap.rs | 2 +- rust/arrow/src/buffer.rs | 48 ++++++++++--------- rust/arrow/src/compute/kernels/comparison.rs | 4 +- rust/arrow/src/compute/kernels/substring.rs | 2 +- rust/arrow/src/ipc/writer.rs | 2 +- rust/parquet/src/arrow/array_reader.rs | 2 +- 22 files changed, 63 insertions(+), 59 deletions(-) diff --git a/rust/arrow/src/array/array_struct.rs b/rust/arrow/src/array/array_struct.rs index a447837aa82..60e9b2fd062 100644 --- a/rust/arrow/src/array/array_struct.rs +++ b/rust/arrow/src/array/array_struct.rs @@ -388,8 +388,8 @@ mod tests { for i in 0..expected_int_data.len() { if !expected_int_data.is_null(i) { assert_eq!( - expected_value_buf.data()[i * 4..(i + 1) * 4], - actual_value_buf.data()[i * 4..(i + 1) * 4] + expected_value_buf.as_slice()[i * 4..(i + 1) * 4], + actual_value_buf.as_slice()[i * 4..(i + 1) * 4] ); } } diff --git a/rust/arrow/src/array/array_union.rs b/rust/arrow/src/array/array_union.rs index ea42843589f..9d1d217c9a5 100644 --- a/rust/arrow/src/array/array_union.rs +++ b/rust/arrow/src/array/array_union.rs @@ -219,7 +219,7 @@ impl UnionArray { /// Panics if `index` is greater than the length of the array. pub fn type_id(&self, index: usize) -> i8 { assert!(index - self.offset() < self.len()); - self.data().buffers()[0].data()[index] as i8 + self.data().buffers()[0].as_slice()[index] as i8 } /// Returns the offset into the underlying values array for the array slot at `index`. @@ -236,7 +236,7 @@ impl UnionArray { Some(b) => b.count_set_bits_offset(0, index), None => index, }; - self.data().buffers()[1].data()[valid_slots * size_of::()] as i32 + self.data().buffers()[1].as_slice()[valid_slots * size_of::()] as i32 } else { index as i32 } diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs index 4aa35c638b6..abab9e66744 100644 --- a/rust/arrow/src/array/builder.rs +++ b/rust/arrow/src/array/builder.rs @@ -2504,7 +2504,7 @@ mod tests { let buf2 = builder.finish(); assert_eq!(buf.len(), buf2.len()); - assert_eq!(buf.data(), buf2.data()); + assert_eq!(buf.as_slice(), buf2.as_slice()); } #[test] @@ -3166,8 +3166,8 @@ mod tests { for i in 0..expected_int_data.len() { if !expected_int_data.is_null(i) { assert_eq!( - expected_value_buf.data()[i * 4..(i + 1) * 4], - actual_value_buf.data()[i * 4..(i + 1) * 4] + expected_value_buf.as_slice()[i * 4..(i + 1) * 4], + actual_value_buf.as_slice()[i * 4..(i + 1) * 4] ); } } diff --git a/rust/arrow/src/array/data.rs b/rust/arrow/src/array/data.rs index d634ed18d5c..10b7770ac42 100644 --- a/rust/arrow/src/array/data.rs +++ b/rust/arrow/src/array/data.rs @@ -239,7 +239,7 @@ impl ArrayData { /// * the datatype is `Boolean` (it corresponds to a bit-packed buffer where the offset is not applicable) #[inline] pub(super) fn buffer(&self, buffer: usize) -> &[T] { - let values = unsafe { self.buffers[buffer].data().align_to::() }; + let values = unsafe { self.buffers[buffer].as_slice().align_to::() }; if !values.0.is_empty() || !values.2.is_empty() { panic!("The buffer is not byte-aligned with its interpretation") }; @@ -383,7 +383,7 @@ mod tests { assert_eq!(10, arr_data.null_count()); assert_eq!(5, arr_data.offset()); assert_eq!(1, arr_data.buffers().len()); - assert_eq!(&[0, 1, 2, 3], arr_data.buffers()[0].data()); + assert_eq!(&[0, 1, 2, 3], arr_data.buffers()[0].as_slice()); assert_eq!(1, arr_data.child_data().len()); assert_eq!(child_arr_data, arr_data.child_data()[0]); } @@ -424,7 +424,7 @@ mod tests { .null_bit_buffer(Buffer::from(bit_v)) .build(); assert!(arr_data.null_buffer().is_some()); - assert_eq!(&bit_v, arr_data.null_buffer().unwrap().data()); + assert_eq!(&bit_v, arr_data.null_buffer().unwrap().as_slice()); } #[test] diff --git a/rust/arrow/src/array/equal/boolean.rs b/rust/arrow/src/array/equal/boolean.rs index 4158080b81d..88bd080ba53 100644 --- a/rust/arrow/src/array/equal/boolean.rs +++ b/rust/arrow/src/array/equal/boolean.rs @@ -26,8 +26,8 @@ pub(super) fn boolean_equal( rhs_start: usize, len: usize, ) -> bool { - let lhs_values = lhs.buffers()[0].data(); - let rhs_values = rhs.buffers()[0].data(); + let lhs_values = lhs.buffers()[0].as_slice(); + let rhs_values = rhs.buffers()[0].as_slice(); // TODO: we can do this more efficiently if all values are not-null (0..len).all(|i| { diff --git a/rust/arrow/src/array/equal/decimal.rs b/rust/arrow/src/array/equal/decimal.rs index 715b308c4b8..a8fdded2fa7 100644 --- a/rust/arrow/src/array/equal/decimal.rs +++ b/rust/arrow/src/array/equal/decimal.rs @@ -31,8 +31,8 @@ pub(super) fn decimal_equal( _ => unreachable!(), }; - let lhs_values = &lhs.buffers()[0].data()[lhs.offset() * size..]; - let rhs_values = &rhs.buffers()[0].data()[rhs.offset() * size..]; + let lhs_values = &lhs.buffers()[0].as_slice()[lhs.offset() * size..]; + let rhs_values = &rhs.buffers()[0].as_slice()[rhs.offset() * size..]; if lhs.null_count() == 0 && rhs.null_count() == 0 { equal_len( diff --git a/rust/arrow/src/array/equal/fixed_binary.rs b/rust/arrow/src/array/equal/fixed_binary.rs index f14b097fbb0..c6889ba4b43 100644 --- a/rust/arrow/src/array/equal/fixed_binary.rs +++ b/rust/arrow/src/array/equal/fixed_binary.rs @@ -31,8 +31,8 @@ pub(super) fn fixed_binary_equal( _ => unreachable!(), }; - let lhs_values = &lhs.buffers()[0].data()[lhs.offset() * size..]; - let rhs_values = &rhs.buffers()[0].data()[rhs.offset() * size..]; + let lhs_values = &lhs.buffers()[0].as_slice()[lhs.offset() * size..]; + let rhs_values = &rhs.buffers()[0].as_slice()[rhs.offset() * size..]; if lhs.null_count() == 0 && rhs.null_count() == 0 { equal_len( diff --git a/rust/arrow/src/array/equal/primitive.rs b/rust/arrow/src/array/equal/primitive.rs index 19602e46488..4bb256643ca 100644 --- a/rust/arrow/src/array/equal/primitive.rs +++ b/rust/arrow/src/array/equal/primitive.rs @@ -29,8 +29,8 @@ pub(super) fn primitive_equal( len: usize, ) -> bool { let byte_width = size_of::(); - let lhs_values = &lhs.buffers()[0].data()[lhs.offset() * byte_width..]; - let rhs_values = &rhs.buffers()[0].data()[rhs.offset() * byte_width..]; + let lhs_values = &lhs.buffers()[0].as_slice()[lhs.offset() * byte_width..]; + let rhs_values = &rhs.buffers()[0].as_slice()[rhs.offset() * byte_width..]; if lhs.null_count() == 0 && rhs.null_count() == 0 { // without nulls, we just need to compare slices diff --git a/rust/arrow/src/array/equal/structure.rs b/rust/arrow/src/array/equal/structure.rs index 5b12ae776d9..31ccbc870d0 100644 --- a/rust/arrow/src/array/equal/structure.rs +++ b/rust/arrow/src/array/equal/structure.rs @@ -93,8 +93,8 @@ pub(super) fn struct_equal( equal_values(lhs, rhs, lhs_nulls, rhs_nulls, lhs_start, rhs_start, len) } else { // get a ref of the null buffer bytes, to use in testing for nullness - let lhs_null_bytes = lhs_nulls.as_ref().unwrap().data(); - let rhs_null_bytes = rhs_nulls.as_ref().unwrap().data(); + let lhs_null_bytes = lhs_nulls.as_ref().unwrap().as_slice(); + let rhs_null_bytes = rhs_nulls.as_ref().unwrap().as_slice(); // with nulls, we need to compare item by item whenever it is not null (0..len).all(|i| { let lhs_pos = lhs_start + i; diff --git a/rust/arrow/src/array/equal/utils.rs b/rust/arrow/src/array/equal/utils.rs index 3bb4c0be653..3ccc2450852 100644 --- a/rust/arrow/src/array/equal/utils.rs +++ b/rust/arrow/src/array/equal/utils.rs @@ -46,8 +46,8 @@ pub(super) fn equal_nulls( let lhs_null_count = count_nulls(lhs_nulls, lhs_start, len); let rhs_null_count = count_nulls(rhs_nulls, rhs_start, len); if lhs_null_count > 0 || rhs_null_count > 0 { - let lhs_values = lhs_nulls.unwrap().data(); - let rhs_values = rhs_nulls.unwrap().data(); + let lhs_values = lhs_nulls.unwrap().as_slice(); + let rhs_values = rhs_nulls.unwrap().as_slice(); equal_bits( lhs_values, rhs_values, diff --git a/rust/arrow/src/array/equal/variable_size.rs b/rust/arrow/src/array/equal/variable_size.rs index caf8a0c1eae..94fdf6b2980 100644 --- a/rust/arrow/src/array/equal/variable_size.rs +++ b/rust/arrow/src/array/equal/variable_size.rs @@ -61,8 +61,8 @@ pub(super) fn variable_sized_equal( let rhs_offsets = rhs.buffer::(0); // these are bytes, and thus the offset does not need to be multiplied - let lhs_values = &lhs.buffers()[1].data()[lhs.offset()..]; - let rhs_values = &rhs.buffers()[1].data()[rhs.offset()..]; + let lhs_values = &lhs.buffers()[1].as_slice()[lhs.offset()..]; + let rhs_values = &rhs.buffers()[1].as_slice()[rhs.offset()..]; let lhs_null_count = count_nulls(lhs_nulls, lhs_start, len); let rhs_null_count = count_nulls(rhs_nulls, rhs_start, len); @@ -88,10 +88,10 @@ pub(super) fn variable_sized_equal( // the null bits can still be `None`, so we don't unwrap let lhs_is_null = !lhs_nulls - .map(|v| get_bit(v.data(), lhs_pos)) + .map(|v| get_bit(v.as_slice(), lhs_pos)) .unwrap_or(false); let rhs_is_null = !rhs_nulls - .map(|v| get_bit(v.data(), rhs_pos)) + .map(|v| get_bit(v.as_slice(), rhs_pos)) .unwrap_or(false); lhs_is_null diff --git a/rust/arrow/src/array/transform/boolean.rs b/rust/arrow/src/array/transform/boolean.rs index 2aa918b763a..23be955b975 100644 --- a/rust/arrow/src/array/transform/boolean.rs +++ b/rust/arrow/src/array/transform/boolean.rs @@ -23,7 +23,7 @@ use super::{ }; pub(super) fn build_extend(array: &ArrayData) -> Extend { - let values = array.buffers()[0].data(); + let values = array.buffers()[0].as_slice(); Box::new( move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| { let buffer = &mut mutable.buffer1; diff --git a/rust/arrow/src/array/transform/fixed_binary.rs b/rust/arrow/src/array/transform/fixed_binary.rs index 8899113ede7..477d2ad277c 100644 --- a/rust/arrow/src/array/transform/fixed_binary.rs +++ b/rust/arrow/src/array/transform/fixed_binary.rs @@ -25,7 +25,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { _ => unreachable!(), }; - let values = &array.buffers()[0].data()[array.offset() * size..]; + let values = &array.buffers()[0].as_slice()[array.offset() * size..]; if array.null_count() == 0 { // fast case where we can copy regions without null issues Box::new( diff --git a/rust/arrow/src/array/transform/mod.rs b/rust/arrow/src/array/transform/mod.rs index dc16ed65a48..c32c7876a4b 100644 --- a/rust/arrow/src/array/transform/mod.rs +++ b/rust/arrow/src/array/transform/mod.rs @@ -92,7 +92,7 @@ impl<'a> _MutableArrayData<'a> { fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits { if let Some(bitmap) = array.null_bitmap() { - let bytes = bitmap.bits.data(); + let bytes = bitmap.bits.as_slice(); Box::new(move |mutable, start, len| { utils::reserve_for_bits(&mut mutable.null_buffer, mutable.len + len); mutable.null_count += utils::set_bits( diff --git a/rust/arrow/src/array/transform/primitive.rs b/rust/arrow/src/array/transform/primitive.rs index 01bbd1a4788..86c76941af3 100644 --- a/rust/arrow/src/array/transform/primitive.rs +++ b/rust/arrow/src/array/transform/primitive.rs @@ -22,7 +22,7 @@ use crate::{array::ArrayData, datatypes::ArrowNativeType}; use super::{Extend, _MutableArrayData}; pub(super) fn build_extend(array: &ArrayData) -> Extend { - let values = &array.buffers()[0].data()[array.offset() * size_of::()..]; + let values = &array.buffers()[0].as_slice()[array.offset() * size_of::()..]; Box::new( move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| { let start = start * size_of::(); diff --git a/rust/arrow/src/array/transform/variable_size.rs b/rust/arrow/src/array/transform/variable_size.rs index 3a18b6fe5ee..dcd0ed6147f 100644 --- a/rust/arrow/src/array/transform/variable_size.rs +++ b/rust/arrow/src/array/transform/variable_size.rs @@ -42,7 +42,7 @@ fn extend_offset_values( pub(super) fn build_extend(array: &ArrayData) -> Extend { let offsets = array.buffer::(0); - let values = &array.buffers()[1].data()[array.offset()..]; + let values = &array.buffers()[1].as_slice()[array.offset()..]; if array.null_count() == 0 { // fast case where we can copy regions without null issues Box::new( diff --git a/rust/arrow/src/bitmap.rs b/rust/arrow/src/bitmap.rs index e76cb51c2d1..b977f550999 100644 --- a/rust/arrow/src/bitmap.rs +++ b/rust/arrow/src/bitmap.rs @@ -107,7 +107,7 @@ impl PartialEq for Bitmap { if self_len != other_len { return false; } - self.bits.data()[..self_len] == other.bits.data()[..self_len] + self.bits.as_slice()[..self_len] == other.bits.as_slice()[..self_len] } } diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index ae5a43f92ef..82708180e84 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -125,7 +125,7 @@ impl Buffer { } /// Returns the byte slice stored in this buffer - pub fn data(&self) -> &[u8] { + pub fn as_slice(&self) -> &[u8] { &self.data[self.offset..] } @@ -192,7 +192,7 @@ impl Buffer { /// in larger chunks and starting at arbitrary bit offsets. /// Note that both `offset` and `length` are measured in bits. pub fn bit_chunks(&self, offset: usize, len: usize) -> BitChunks { - BitChunks::new(&self.data(), offset, len) + BitChunks::new(&self.as_slice(), offset, len) } /// Returns the number of 1-bits in this buffer. @@ -271,9 +271,9 @@ where let mut result = MutableBuffer::new(len).with_bitset(len, false); let lanes = u8x64::lanes(); - let mut left_chunks = left.data()[left_offset..].chunks_exact(lanes); - let mut right_chunks = right.data()[right_offset..].chunks_exact(lanes); - let mut result_chunks = result.data_mut().chunks_exact_mut(lanes); + let mut left_chunks = left.as_slice()[left_offset..].chunks_exact(lanes); + let mut right_chunks = right.as_slice()[right_offset..].chunks_exact(lanes); + let mut result_chunks = result.as_slice_mut().chunks_exact_mut(lanes); result_chunks .borrow_mut() @@ -318,8 +318,8 @@ where let mut result = MutableBuffer::new(len).with_bitset(len, false); let lanes = u8x64::lanes(); - let mut left_chunks = left.data()[left_offset..].chunks_exact(lanes); - let mut result_chunks = result.data_mut().chunks_exact_mut(lanes); + let mut left_chunks = left.as_slice()[left_offset..].chunks_exact(lanes); + let mut result_chunks = result.as_slice_mut().chunks_exact_mut(lanes); result_chunks .borrow_mut() @@ -428,10 +428,12 @@ pub(super) fn buffer_bin_and( let mut result = MutableBuffer::new(len).with_bitset(len, false); - let mut left_chunks = left.data()[left_offset..].chunks_exact(AVX512_U8X64_LANES); + let mut left_chunks = + left.as_slice()[left_offset..].chunks_exact(AVX512_U8X64_LANES); let mut right_chunks = - right.data()[right_offset..].chunks_exact(AVX512_U8X64_LANES); - let mut result_chunks = result.data_mut().chunks_exact_mut(AVX512_U8X64_LANES); + right.as_slice()[right_offset..].chunks_exact(AVX512_U8X64_LANES); + let mut result_chunks = + result.as_slice_mut().chunks_exact_mut(AVX512_U8X64_LANES); result_chunks .borrow_mut() @@ -537,10 +539,12 @@ pub(super) fn buffer_bin_or( let mut result = MutableBuffer::new(len).with_bitset(len, false); - let mut left_chunks = left.data()[left_offset..].chunks_exact(AVX512_U8X64_LANES); + let mut left_chunks = + left.as_slice()[left_offset..].chunks_exact(AVX512_U8X64_LANES); let mut right_chunks = - right.data()[right_offset..].chunks_exact(AVX512_U8X64_LANES); - let mut result_chunks = result.data_mut().chunks_exact_mut(AVX512_U8X64_LANES); + right.as_slice()[right_offset..].chunks_exact(AVX512_U8X64_LANES); + let mut result_chunks = + result.as_slice_mut().chunks_exact_mut(AVX512_U8X64_LANES); result_chunks .borrow_mut() @@ -961,7 +965,7 @@ mod tests { let buf = Buffer::from(&[0, 1, 2, 3, 4]); assert_eq!(5, buf.len()); assert!(!buf.as_ptr().is_null()); - assert_eq!([0, 1, 2, 3, 4], buf.data()); + assert_eq!([0, 1, 2, 3, 4], buf.as_slice()); } #[test] @@ -969,7 +973,7 @@ mod tests { let buf = Buffer::from(&[0, 1, 2, 3, 4]); assert_eq!(5, buf.len()); assert!(!buf.as_ptr().is_null()); - assert_eq!([0, 1, 2, 3, 4], buf.data()); + assert_eq!([0, 1, 2, 3, 4], buf.as_slice()); } #[test] @@ -979,7 +983,7 @@ mod tests { assert_eq!(5, buf2.len()); assert_eq!(64, buf2.capacity()); assert!(!buf2.as_ptr().is_null()); - assert_eq!([0, 1, 2, 3, 4], buf2.data()); + assert_eq!([0, 1, 2, 3, 4], buf2.as_slice()); } #[test] @@ -987,21 +991,21 @@ mod tests { let buf = Buffer::from(&[2, 4, 6, 8, 10]); let buf2 = buf.slice(2); - assert_eq!([6, 8, 10], buf2.data()); + assert_eq!([6, 8, 10], buf2.as_slice()); assert_eq!(3, buf2.len()); assert_eq!(unsafe { buf.as_ptr().offset(2) }, buf2.as_ptr()); let buf3 = buf2.slice(1); - assert_eq!([8, 10], buf3.data()); + assert_eq!([8, 10], buf3.as_slice()); assert_eq!(2, buf3.len()); assert_eq!(unsafe { buf.as_ptr().offset(3) }, buf3.as_ptr()); let buf4 = buf.slice(5); let empty_slice: [u8; 0] = []; - assert_eq!(empty_slice, buf4.data()); + assert_eq!(empty_slice, buf4.as_slice()); assert_eq!(0, buf4.len()); assert!(buf4.is_empty()); - assert_eq!(buf2.slice(2).data(), &[10]); + assert_eq!(buf2.slice(2).as_slice(), &[10]); } #[test] @@ -1144,7 +1148,7 @@ mod tests { let immutable_buf = buf.freeze(); assert_eq!(19, immutable_buf.len()); assert_eq!(64, immutable_buf.capacity()); - assert_eq!(b"aaaa bbbb cccc dddd", immutable_buf.data()); + assert_eq!(b"aaaa bbbb cccc dddd", immutable_buf.as_slice()); } #[test] @@ -1169,7 +1173,7 @@ mod tests { fn test_access_concurrently() { let buffer = Buffer::from(vec![1, 2, 3, 4, 5]); let buffer2 = buffer.clone(); - assert_eq!([1, 2, 3, 4, 5], buffer.data()); + assert_eq!([1, 2, 3, 4, 5], buffer.as_slice()); let buffer_copy = thread::spawn(move || { // access buffer in another thread. diff --git a/rust/arrow/src/compute/kernels/comparison.rs b/rust/arrow/src/compute/kernels/comparison.rs index 47e4e6f31cc..92c808a7fee 100644 --- a/rust/arrow/src/compute/kernels/comparison.rs +++ b/rust/arrow/src/compute/kernels/comparison.rs @@ -653,7 +653,7 @@ where Some(buff) => buff, None => new_all_set_buffer(num_bytes), }; - let not_both_null_bitmap = not_both_null_bit_buffer.data(); + let not_both_null_bitmap = not_both_null_bit_buffer.as_slice(); let mut bool_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); let bool_slice = bool_buf.as_slice_mut(); @@ -708,7 +708,7 @@ where Some(buff) => buff, None => new_all_set_buffer(num_bytes), }; - let not_both_null_bitmap = not_both_null_bit_buffer.data(); + let not_both_null_bitmap = not_both_null_bit_buffer.as_slice(); let mut bool_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); let bool_slice = &mut bool_buf; diff --git a/rust/arrow/src/compute/kernels/substring.rs b/rust/arrow/src/compute/kernels/substring.rs index a329f45c46e..66298e67722 100644 --- a/rust/arrow/src/compute/kernels/substring.rs +++ b/rust/arrow/src/compute/kernels/substring.rs @@ -38,7 +38,7 @@ fn generic_substring( // compute values let values = &array.data_ref().buffers()[1]; - let data = values.data(); + let data = values.as_slice(); let mut new_values = Vec::new(); // we have no way to estimate how much this will be. let mut new_offsets: Vec = Vec::with_capacity(array.len() + 1); diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 5161d548ca6..c515f062852 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -718,7 +718,7 @@ fn write_buffer( let total_len: i64 = (len + pad_len) as i64; // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes"); buffers.push(ipc::Buffer::new(offset, total_len)); - arrow_data.extend_from_slice(buffer.data()); + arrow_data.extend_from_slice(buffer.as_slice()); arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); offset + total_len } diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index c8363c6a557..91d6579a947 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -291,7 +291,7 @@ impl ArrayReader for PrimitiveArrayReader { if T::get_physical_type() == PhysicalType::BOOLEAN { let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len()); - for e in record_data.data() { + for e in record_data.as_slice() { boolean_buffer.append(*e > 0); } record_data = boolean_buffer.finish(); From ab0ae2a89c7a6ed2da8d930f9eb37d41d4332f96 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 28 Dec 2020 07:02:29 +0000 Subject: [PATCH 5/6] Added new bench. --- rust/arrow/Cargo.toml | 4 ++ rust/arrow/benches/buffer_create.rs | 71 +++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 rust/arrow/benches/buffer_create.rs diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index 4aa1c673f45..028444bd113 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -141,3 +141,7 @@ harness = false [[bench]] name = "mutable_array" harness = false + +[[bench]] +name = "buffer_create" +harness = false diff --git a/rust/arrow/benches/buffer_create.rs b/rust/arrow/benches/buffer_create.rs new file mode 100644 index 00000000000..6cfbab0deeb --- /dev/null +++ b/rust/arrow/benches/buffer_create.rs @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; +use criterion::Criterion; + +extern crate arrow; + +use arrow::{ + buffer::{Buffer, MutableBuffer}, + datatypes::ToByteSlice, +}; + +fn mutable_buffer(bytes: &[u32], size: usize, capacity: usize) -> Buffer { + criterion::black_box({ + let mut result = MutableBuffer::new(capacity); + + for _ in 0..size { + result.extend_from_slice(bytes.to_byte_slice()) + } + + result.freeze() + }) +} + +fn from_slice(bytes: &[u32], size: usize, capacity: usize) -> Buffer { + criterion::black_box({ + let mut a = Vec::::with_capacity(capacity); + + for _ in 0..size { + a.extend_from_slice(bytes) + } + + Buffer::from(a.to_byte_slice()) + }) +} + +fn benchmark(c: &mut Criterion) { + let bytes = &[128u32; 1025]; + let size = 2usize.pow(10); + + c.bench_function("mutable", |b| b.iter(|| mutable_buffer(bytes, size, 0))); + + c.bench_function("mutable prepared", |b| { + b.iter(|| mutable_buffer(bytes, size, size * bytes.len() * std::mem::size_of::())) + }); + + c.bench_function("from_slice", |b| b.iter(|| from_slice(bytes, size, 0))); + + c.bench_function("from_slice prepared", |b| { + b.iter(|| from_slice(bytes, size, size * bytes.len())) + }); +} + +criterion_group!(benches, benchmark); +criterion_main!(benches); From dff2789fbd23f5ca0a433babc93833e37fab02a1 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 28 Dec 2020 07:01:30 +0000 Subject: [PATCH 6/6] New alloc --- rust/arrow/src/alloc.rs | 159 ++++++++++ rust/arrow/src/array/array_list.rs | 32 -- rust/arrow/src/array/array_primitive.rs | 7 +- rust/arrow/src/array/builder.rs | 115 +++---- rust/arrow/src/array/raw_pointer.rs | 17 +- rust/arrow/src/array/transform/list.rs | 3 +- rust/arrow/src/array/transform/utils.rs | 2 +- .../src/array/transform/variable_size.rs | 3 +- rust/arrow/src/buffer.rs | 206 ++++++------- rust/arrow/src/bytes.rs | 15 +- rust/arrow/src/compute/kernels/comparison.rs | 10 +- rust/arrow/src/compute/kernels/sort.rs | 4 +- rust/arrow/src/compute/kernels/take.rs | 4 +- rust/arrow/src/datatypes.rs | 2 + rust/arrow/src/lib.rs | 5 +- rust/arrow/src/memory.rs | 288 +++++++++--------- rust/arrow/src/zz_memory_check.rs | 2 +- 17 files changed, 479 insertions(+), 395 deletions(-) create mode 100644 rust/arrow/src/alloc.rs diff --git a/rust/arrow/src/alloc.rs b/rust/arrow/src/alloc.rs new file mode 100644 index 00000000000..8e08be257e8 --- /dev/null +++ b/rust/arrow/src/alloc.rs @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// This module is largely copied (with minor simplifications) from the implementation of Rust's std alloc. +/// Rust's current allocator's API is `unstable`, but it solves our main use-case: a container with a custom alignment. +/// The problem that this module solves is to offer an API to allocate, reallocate and deallocate +/// Memory blocks. Main responsibilities: +/// * safeguards against null pointers +/// * panic on Out of memory (OOM) +/// * global thread-safe tracking of allocations +/// * only allocate initialized regions (zero). +/// This module makes no assumptions about type alignments or buffer sizes. Consumers use [std::alloc::Layout] to +/// share this information with this module. +use std::alloc::handle_alloc_error; +use std::ptr::NonNull; +use std::{alloc::Layout, sync::atomic::AtomicIsize}; + +// If this number is not zero after all objects have been `drop`, there is a memory leak +pub static mut ALLOCATIONS: AtomicIsize = AtomicIsize::new(0); + +/// A memory region +#[derive(Debug, Copy, Clone)] +pub struct MemoryBlock { + pub ptr: NonNull, + pub size: usize, +} + +/// Returns a dangling pointer aligned with `layout.align()`. +#[inline] +pub fn dangling(layout: Layout) -> NonNull { + // SAFETY: align is guaranteed to be non-zero + unsafe { NonNull::new_unchecked(layout.align() as *mut u8) } +} + +/// Allocates a new memory region. Returns a dangling pointer iff `layout.size() == 0`. +/// # Panic +/// This function panics whenever it is impossible to allocate a new region (e.g. OOM) +#[inline] +pub fn alloc(layout: Layout) -> MemoryBlock { + debug_assert!(layout.align() > 0); + unsafe { + let size = layout.size(); + if size == 0 { + MemoryBlock { + ptr: dangling(layout), + size: 0, + } + } else { + let raw_ptr = std::alloc::alloc_zeroed(layout); + let ptr = NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout)); + ALLOCATIONS + .fetch_add(layout.size() as isize, std::sync::atomic::Ordering::SeqCst); + MemoryBlock { ptr, size } + } + } +} + +/// Deallocates a previously allocated region. This can be safely called with a dangling pointer iff `layout.size() == 0`. +/// # Safety +/// This function requires the region to be allocated according to the `layout`. +#[inline] +pub unsafe fn dealloc(ptr: NonNull, layout: Layout) { + if layout.size() != 0 { + std::alloc::dealloc(ptr.as_ptr(), layout); + ALLOCATIONS + .fetch_sub(layout.size() as isize, std::sync::atomic::Ordering::SeqCst); + } +} + +/// Initializes a [MemoryBlock] with zeros starting at `offset`. +#[inline] +unsafe fn init_zero(memory: &mut MemoryBlock, offset: usize) { + memory + .ptr + .as_ptr() + .add(offset) + .write_bytes(0, memory.size - offset); +} + +/// Grows a memory region, potentially reallocating it. +// This is similar to AllocRef::grow, but without placement, since it is a no-op, and init, since we always +// allocate initialized to 0. +#[inline] +pub unsafe fn grow(ptr: NonNull, layout: Layout, new_size: usize) -> MemoryBlock { + let size = layout.size(); + debug_assert!( + new_size >= size, + "`new_size` must be greater than or equal to `memory.size()`" + ); + + if size == new_size { + return MemoryBlock { ptr, size }; + } + + if layout.size() == 0 { + let new_layout = Layout::from_size_align_unchecked(new_size, layout.align()); + alloc(new_layout) + } else { + let ptr = std::alloc::realloc(ptr.as_ptr(), layout, new_size); + let mut memory = MemoryBlock { + ptr: NonNull::new(ptr).unwrap_or_else(|| handle_alloc_error(layout)), + size: new_size, + }; + ALLOCATIONS.fetch_add( + (new_size - size) as isize, + std::sync::atomic::Ordering::SeqCst, + ); + init_zero(&mut memory, size); + memory + } +} + +// similar to AllocRef::shrink, but without placement, since it is a no-op +#[inline] +pub unsafe fn shrink(ptr: NonNull, layout: Layout, new_size: usize) -> MemoryBlock { + let size = layout.size(); + debug_assert!( + new_size <= size, + "`new_size` must be smaller than or equal to `memory.size()`" + ); + + if size == new_size { + return MemoryBlock { ptr, size }; + } + + if new_size == 0 { + dealloc(ptr, layout); + MemoryBlock { + ptr: dangling(layout), + size: 0, + } + } else { + // `realloc` probably checks for `new_size < size` or something similar. + let ptr = std::alloc::realloc(ptr.as_ptr(), layout, new_size); + let ptr = NonNull::new(ptr).unwrap_or_else(|| handle_alloc_error(layout)); + ALLOCATIONS.fetch_sub( + (size - new_size) as isize, + std::sync::atomic::Ordering::SeqCst, + ); + MemoryBlock { + ptr, + size: new_size, + } + } +} diff --git a/rust/arrow/src/array/array_list.rs b/rust/arrow/src/array/array_list.rs index 5af27769bfa..01b24aa7c4f 100644 --- a/rust/arrow/src/array/array_list.rs +++ b/rust/arrow/src/array/array_list.rs @@ -302,7 +302,6 @@ mod tests { array::Int32Array, buffer::Buffer, datatypes::{Field, ToByteSlice}, - memory, util::bit_util, }; @@ -782,35 +781,4 @@ mod tests { .build(); ListArray::from(list_data); } - - #[test] - #[should_panic(expected = "memory is not aligned")] - fn test_primitive_array_alignment() { - let ptr = memory::allocate_aligned(8); - let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) }; - let buf2 = buf.slice(1); - let array_data = ArrayData::builder(DataType::Int32).add_buffer(buf2).build(); - Int32Array::from(array_data); - } - - #[test] - #[should_panic(expected = "memory is not aligned")] - fn test_list_array_alignment() { - let ptr = memory::allocate_aligned(8); - let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) }; - let buf2 = buf.slice(1); - - let values: [i32; 8] = [0; 8]; - let value_data = ArrayData::builder(DataType::Int32) - .add_buffer(Buffer::from(values.to_byte_slice())) - .build(); - - let list_data_type = - DataType::List(Box::new(Field::new("item", DataType::Int32, false))); - let list_data = ArrayData::builder(list_data_type) - .add_buffer(buf2) - .add_child_data(value_data) - .build(); - ListArray::from(list_data); - } } diff --git a/rust/arrow/src/array/array_primitive.rs b/rust/arrow/src/array/array_primitive.rs index 03001fe6b4c..0c2f93c341f 100644 --- a/rust/arrow/src/array/array_primitive.rs +++ b/rust/arrow/src/array/array_primitive.rs @@ -313,15 +313,13 @@ impl::Native data_len * mem::size_of::<::Native>(), ); - let null = vec![0; mem::size_of::<::Native>()]; - let null_slice = null_buf.as_slice_mut(); iter.enumerate().for_each(|(i, item)| { if let Some(a) = item.borrow() { bit_util::set_bit(null_slice, i); val_buf.extend_from_slice(a.to_byte_slice()); } else { - val_buf.extend_from_slice(&null); + val_buf.extend(mem::size_of::<::Native>()); } }); @@ -411,14 +409,13 @@ impl PrimitiveArray { let mut val_buf = MutableBuffer::new(data_len * mem::size_of::()); { - let null = vec![0; mem::size_of::()]; let null_slice = null_buf.as_slice_mut(); for (i, v) in data.iter().enumerate() { if let Some(n) = v { bit_util::set_bit(null_slice, i); val_buf.extend_from_slice(&n.to_byte_slice()); } else { - val_buf.extend_from_slice(&null); + val_buf.extend(mem::size_of::()); } } } diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs index abab9e66744..9f124118ea4 100644 --- a/rust/arrow/src/array/builder.rs +++ b/rust/arrow/src/array/builder.rs @@ -178,9 +178,9 @@ impl BufferBuilder { /// assert_eq!(builder.len(), 2); /// ``` #[inline] - pub fn advance(&mut self, i: usize) { + fn advance(&mut self, i: usize) { let new_buffer_len = (self.len + i) * mem::size_of::(); - self.buffer.resize(new_buffer_len); + self.buffer.resize(new_buffer_len, 0); self.len += i; } @@ -198,9 +198,7 @@ impl BufferBuilder { /// ``` #[inline] pub fn reserve(&mut self, n: usize) { - let new_capacity = self.len + n; - let byte_capacity = mem::size_of::() * new_capacity; - self.buffer.reserve(byte_capacity); + self.buffer.reserve(n * mem::size_of::()); } /// Appends a value of type `T` into the builder, @@ -300,10 +298,7 @@ impl BooleanBufferBuilder { #[inline] pub fn new(capacity: usize) -> Self { let byte_capacity = bit_util::ceil(capacity, 8); - let actual_capacity = bit_util::round_upto_multiple_of_64(byte_capacity); - let mut buffer = MutableBuffer::new(actual_capacity); - buffer.set_null_bits(0, actual_capacity); - + let buffer = MutableBuffer::new(byte_capacity); Self { buffer, len: 0 } } @@ -320,21 +315,26 @@ impl BooleanBufferBuilder { } #[inline] - pub fn advance(&mut self, i: usize) { - let new_buffer_len = bit_util::ceil(self.len + i, 8); - self.buffer.resize(new_buffer_len); - self.len += i; + fn set_len(&mut self, new: usize) { + self.len = new; + self.buffer.set_len(bit_util::ceil(self.len, 8)); } #[inline] - pub fn reserve(&mut self, n: usize) { - let new_capacity = self.len + n; - if new_capacity > self.capacity() { - let new_byte_capacity = bit_util::ceil(new_capacity, 8); - let existing_capacity = self.buffer.capacity(); - let new_capacity = self.buffer.reserve(new_byte_capacity); - self.buffer - .set_null_bits(existing_capacity, new_capacity - existing_capacity); + pub fn advance(&mut self, additional: usize) { + self.reserve(additional); + self.set_len(self.len + additional); + } + + /// Reserve space to at least `additional` new bits. + /// Capacity will be `>= self.len() + additional`. + #[inline] + pub fn reserve(&mut self, additional: usize) { + let capacity = self.len + additional; + if capacity > self.capacity() { + // convert differential to bytes + let additional = bit_util::ceil(capacity, 8) - self.buffer.len(); + self.buffer.reserve(additional); } } @@ -342,58 +342,39 @@ impl BooleanBufferBuilder { pub fn append(&mut self, v: bool) { self.reserve(1); if v { - let data = unsafe { - std::slice::from_raw_parts_mut( - self.buffer.as_mut_ptr(), - self.buffer.capacity(), - ) - }; - bit_util::set_bit(data, self.len); + unsafe { bit_util::set_bit_raw(self.buffer.as_mut_ptr(), self.len) }; } - self.len += 1; + self.set_len(self.len + 1); } #[inline] - pub fn append_n(&mut self, n: usize, v: bool) { - self.reserve(n); - if n != 0 && v { - let data = unsafe { - std::slice::from_raw_parts_mut( - self.buffer.as_mut_ptr(), - self.buffer.capacity(), - ) - }; - (self.len..self.len + n).for_each(|i| bit_util::set_bit(data, i)) + pub fn append_n(&mut self, additional: usize, v: bool) { + self.reserve(additional); + if additional > 0 && v { + (self.len..self.len + additional).for_each(|i| unsafe { + bit_util::set_bit_raw(self.buffer.as_mut_ptr(), i) + }) } - self.len += n; + self.set_len(self.len + additional); } #[inline] pub fn append_slice(&mut self, slice: &[bool]) { - let array_slots = slice.len(); - self.reserve(array_slots); + let additional = slice.len(); + self.reserve(additional); - for v in slice { + for (i, v) in slice.iter().enumerate() { if *v { - // For performance the `len` of the buffer is not - // updated on each append but is updated in the - // `freeze` method instead. - unsafe { - bit_util::set_bit_raw(self.buffer.as_mut_ptr(), self.len); - } + unsafe { bit_util::set_bit_raw(self.buffer.as_mut_ptr(), self.len + i) } } - self.len += 1; } + self.set_len(self.len + additional); } #[inline] pub fn finish(&mut self) -> Buffer { - // `append` does not update the buffer's `len` so do it before `freeze` is called. - let new_buffer_len = bit_util::ceil(self.len, 8); - debug_assert!(new_buffer_len >= self.buffer.len()); - let mut buf = std::mem::replace(&mut self.buffer, MutableBuffer::new(0)); - self.len = 0; - buf.resize(new_buffer_len); + let buf = std::mem::replace(&mut self.buffer, MutableBuffer::new(0)); + self.set_len(0); buf.freeze() } } @@ -2476,16 +2457,18 @@ mod tests { builder.append_value(false).unwrap(); let arr2 = builder.finish(); - assert_eq!(arr1.len(), arr2.len()); - assert_eq!(arr1.offset(), arr2.offset()); - assert_eq!(arr1.null_count(), arr2.null_count()); - for i in 0..5 { - assert_eq!(arr1.is_null(i), arr2.is_null(i)); - assert_eq!(arr1.is_valid(i), arr2.is_valid(i)); - if arr1.is_valid(i) { - assert_eq!(arr1.value(i), arr2.value(i)); - } - } + assert_eq!(arr1, arr2); + } + + #[test] + fn test_boolean_array_builder_append_slice_large() { + let arr1 = BooleanArray::from(vec![true; 513]); + + let mut builder = BooleanArray::builder(512); + builder.append_slice(&[true; 513]).unwrap(); + let arr2 = builder.finish(); + + assert_eq!(arr1, arr2); } #[test] diff --git a/rust/arrow/src/array/raw_pointer.rs b/rust/arrow/src/array/raw_pointer.rs index d18ba4b29a3..897dc5b591c 100644 --- a/rust/arrow/src/array/raw_pointer.rs +++ b/rust/arrow/src/array/raw_pointer.rs @@ -36,10 +36,7 @@ impl RawPtrBox { /// * `ptr` is not aligned to a slice of type `T`. This is guaranteed if it was built from a slice of type `T`. pub(super) unsafe fn new(ptr: *const u8) -> Self { let ptr = NonNull::new(ptr as *mut u8).expect("Pointer cannot be null"); - assert!( - memory::is_aligned(ptr, std::mem::align_of::()), - "memory is not aligned" - ); + assert!(memory::is_ptr_aligned::(ptr), "memory is not aligned"); Self { ptr: ptr.cast() } } @@ -50,3 +47,15 @@ impl RawPtrBox { unsafe impl Send for RawPtrBox {} unsafe impl Sync for RawPtrBox {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + #[should_panic(expected = "memory is not aligned")] + fn test_primitive_array_alignment() { + let bytes = vec![0u8, 1u8]; + unsafe { RawPtrBox::::new(bytes.as_ptr().offset(1)) }; + } +} diff --git a/rust/arrow/src/array/transform/list.rs b/rust/arrow/src/array/transform/list.rs index 300afa94bf4..6d38ff56a0c 100644 --- a/rust/arrow/src/array/transform/list.rs +++ b/rust/arrow/src/array/transform/list.rs @@ -66,8 +66,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { let mut last_offset: T = unsafe { get_last_offset(offset_buffer) }; let delta_len = array.len() - array.null_count(); - offset_buffer - .reserve(offset_buffer.len() + delta_len * std::mem::size_of::()); + offset_buffer.reserve(delta_len * std::mem::size_of::()); let child = &mut mutable.child_data[0]; (start..start + len).for_each(|i| { diff --git a/rust/arrow/src/array/transform/utils.rs b/rust/arrow/src/array/transform/utils.rs index c95912996f2..61acb32981e 100644 --- a/rust/arrow/src/array/transform/utils.rs +++ b/rust/arrow/src/array/transform/utils.rs @@ -53,7 +53,7 @@ pub(super) fn extend_offsets( mut last_offset: T, offsets: &[T], ) { - buffer.reserve(buffer.len() + offsets.len() * std::mem::size_of::()); + buffer.reserve(offsets.len() * std::mem::size_of::()); offsets.windows(2).for_each(|offsets| { // compute the new offset let length = offsets[1] - offsets[0]; diff --git a/rust/arrow/src/array/transform/variable_size.rs b/rust/arrow/src/array/transform/variable_size.rs index dcd0ed6147f..d788ef65795 100644 --- a/rust/arrow/src/array/transform/variable_size.rs +++ b/rust/arrow/src/array/transform/variable_size.rs @@ -72,8 +72,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { let mut last_offset: T = unsafe { get_last_offset(offset_buffer) }; // nulls present: append item by item, ignoring null entries - offset_buffer - .reserve(offset_buffer.len() + len * std::mem::size_of::()); + offset_buffer.reserve(len * std::mem::size_of::()); (start..start + len).for_each(|i| { if array.is_valid(i) { diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 82708180e84..d1eff93b3fa 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -//! The main type in the module is `Buffer`, a contiguous immutable memory region of -//! fixed size aligned at a 64-byte boundary. `MutableBuffer` is like `Buffer`, but it can -//! be mutated and grown. +//! This module contains two main structs: [Buffer] and [MutableBuffer]. A buffer represents +//! a contiguous memory region that can be shared via `offsets`. + #[cfg(feature = "simd")] use packed_simd::u8x64; @@ -28,10 +28,9 @@ use crate::{ use std::convert::AsRef; use std::fmt::Debug; -use std::mem; use std::ops::{BitAnd, BitOr, Not}; +use std::ptr::NonNull; use std::sync::Arc; -use std::{cmp, ptr::NonNull}; #[cfg(feature = "avx512")] use crate::arch::avx512::*; @@ -44,11 +43,11 @@ use crate::util::bit_util::ceil; #[cfg(any(feature = "simd", feature = "avx512"))] use std::borrow::BorrowMut; -/// Buffer is a contiguous memory region of fixed size and is aligned at a 64-byte -/// boundary. Buffer is immutable. +/// Buffer represents a contiguous memory region that can be shared with other buffers and across +/// thread boundaries. #[derive(Clone, PartialEq, Debug)] pub struct Buffer { - /// Reference-counted pointer to the internal byte buffer. + /// the internal byte buffer. data: Arc, /// The offset into the buffer. @@ -162,19 +161,17 @@ impl Buffer { /// /// Also `typed_data::` is unsafe as `0x00` and `0x01` are the only valid values for /// `bool` in Rust. However, `bool` arrays in Arrow are bit-packed which breaks this condition. + /// View buffer as typed slice. pub unsafe fn typed_data(&self) -> &[T] { - assert_eq!(self.len() % mem::size_of::(), 0); - assert!(memory::is_ptr_aligned::(self.data.ptr().cast())); // JUSTIFICATION // Benefit // Many of the buffers represent specific types, and consumers of `Buffer` often need to re-interpret them. // Soundness // * The pointer is non-null by construction - // * alignment asserted above - std::slice::from_raw_parts( - self.as_ptr() as *const T, - self.len() / mem::size_of::(), - ) + // * alignment asserted below. + let (prefix, offsets, suffix) = self.as_slice().align_to::(); + assert!(prefix.is_empty() && suffix.is_empty()); + offsets } /// Returns a slice of this buffer starting at a certain bit offset. @@ -219,25 +216,10 @@ impl> From for Buffer { fn from(p: T) -> Self { // allocate aligned memory buffer let slice = p.as_ref(); - let len = slice.len() * mem::size_of::(); - let capacity = bit_util::round_upto_multiple_of_64(len); - let buffer = memory::allocate_aligned(capacity); - // JUSTIFICATION - // Benefit - // It is often useful to create a buffer from bytes, typically when they are allocated by external sources - // Soundness - // * The pointers are non-null by construction - // * alignment asserted above - // Unsoundness - // * There is no guarantee that the memory regions do are non-overalling, but `memcpy` requires this. - unsafe { - memory::memcpy( - buffer, - NonNull::new_unchecked(slice.as_ptr() as *mut u8), - len, - ); - Buffer::build_with_arguments(buffer, len, Deallocation::Native(capacity)) - } + let len = slice.len(); + let mut buffer = MutableBuffer::new(len); + buffer.extend_from_slice(slice); + buffer.freeze() } } @@ -700,21 +682,16 @@ unsafe impl Send for Buffer {} /// converted into a immutable buffer via the `freeze` method. #[derive(Debug)] pub struct MutableBuffer { - // dangling iff capacity = 0 - data: NonNull, + data: memory::RawBytes, len: usize, - capacity: usize, } impl MutableBuffer { /// Allocate a new mutable buffer with initial capacity to be `capacity`. pub fn new(capacity: usize) -> Self { - let new_capacity = bit_util::round_upto_multiple_of_64(capacity); - let ptr = memory::allocate_aligned(new_capacity); Self { - data: ptr, + data: memory::RawBytes::with_capacity(capacity), len: 0, - capacity: new_capacity, } } @@ -731,10 +708,10 @@ impl MutableBuffer { /// the buffer directly (e.g., modifying the buffer by holding a mutable reference /// from `data_mut()`). pub fn with_bitset(mut self, end: usize, val: bool) -> Self { - assert!(end <= self.capacity); + assert!(end <= self.data.capacity()); let v = if val { 255 } else { 0 }; unsafe { - std::ptr::write_bytes(self.data.as_ptr(), v, end); + std::ptr::write_bytes(self.data.ptr().as_ptr(), v, end); self.len = end; } self @@ -746,25 +723,15 @@ impl MutableBuffer { /// `len` of the buffer and so can be used to initialize the memory region from /// `len` to `capacity`. pub fn set_null_bits(&mut self, start: usize, count: usize) { - assert!(start + count <= self.capacity); + assert!(start + count <= self.capacity()); unsafe { - std::ptr::write_bytes(self.data.as_ptr().add(start), 0, count); + std::ptr::write_bytes(self.data.ptr().as_ptr().add(start), 0, count); } } - /// Ensures that this buffer has at least `capacity` slots in this buffer. This will - /// also ensure the new capacity will be a multiple of 64 bytes. - /// - /// Returns the new capacity for this buffer. - pub fn reserve(&mut self, capacity: usize) -> usize { - if capacity > self.capacity { - let new_capacity = bit_util::round_upto_multiple_of_64(capacity); - let new_capacity = cmp::max(new_capacity, self.capacity * 2); - self.data = - unsafe { memory::reallocate(self.data, self.capacity, new_capacity) }; - self.capacity = new_capacity; - } - self.capacity + /// Ensures that this buffer has at least `self.len + additional` bytes. + pub fn reserve(&mut self, additional: usize) { + self.data.reserve(self.len, additional) } /// Resizes the buffer so that the `len` will equal to the `new_len`. @@ -774,18 +741,28 @@ impl MutableBuffer { /// `new_len` will be zeroed out. /// /// If `new_len` is less than `len`, the buffer will be truncated. - pub fn resize(&mut self, new_len: usize) { - if new_len > self.len { - self.reserve(new_len); - } else { - let new_capacity = bit_util::round_upto_multiple_of_64(new_len); - if new_capacity < self.capacity { - self.data = - unsafe { memory::reallocate(self.data, self.capacity, new_capacity) }; - self.capacity = new_capacity; + pub fn resize(&mut self, new_len: usize, value: u8) { + if new_len > self.len() { + let additional = new_len - self.len(); + self.reserve(additional); + unsafe { + let mut ptr = self.as_mut_ptr().add(self.len()); + for _ in 0..additional { + std::ptr::write(ptr, value); + ptr = ptr.offset(1); + } + self.len += additional; } + } else { + self.truncate(new_len); + } + } + + pub fn truncate(&mut self, len: usize) { + if len > self.len { + return; } - self.len = new_len; + self.len = len; } /// Returns whether this buffer is empty or not. @@ -803,7 +780,7 @@ impl MutableBuffer { /// Returns the total capacity in this buffer. #[inline] pub const fn capacity(&self) -> usize { - self.capacity + self.data.capacity() } /// Clear all existing data from this buffer. @@ -826,19 +803,23 @@ impl MutableBuffer { /// Note that this should be used cautiously, and the returned pointer should not be /// stored anywhere, to avoid dangling pointers. #[inline] - pub const fn as_ptr(&self) -> *const u8 { - self.data.as_ptr() + pub fn as_ptr(&self) -> *const u8 { + self.data.ptr().as_ptr() } #[inline] pub fn as_mut_ptr(&mut self) -> *mut u8 { - self.data.as_ptr() + self.data.ptr().as_ptr() } /// Freezes this buffer and return an immutable version of it. pub fn freeze(self) -> Buffer { let buffer_data = unsafe { - Bytes::new(self.data, self.len, Deallocation::Native(self.capacity)) + Bytes::new( + self.data.ptr(), + self.len, + Deallocation::Native(self.capacity()), + ) }; std::mem::forget(self); Buffer { @@ -849,44 +830,34 @@ impl MutableBuffer { /// View buffer as typed slice. pub fn typed_data_mut(&mut self) -> &mut [T] { - assert_eq!(self.len() % mem::size_of::(), 0); - assert!(memory::is_ptr_aligned::(self.data.cast())); - // JUSTIFICATION - // Benefit - // Many of the buffers represent specific types, and consumers of `Buffer` often need to re-interpret them. - // Soundness - // * The pointer is non-null by construction - // * alignment asserted above unsafe { - std::slice::from_raw_parts_mut( - self.as_ptr() as *mut T, - self.len() / mem::size_of::(), - ) + let (prefix, offsets, suffix) = self.as_slice_mut().align_to_mut::(); + assert!(prefix.is_empty() && suffix.is_empty()); + offsets } } /// Extends the buffer from a byte slice, incrementing its capacity if needed. #[inline] pub fn extend_from_slice(&mut self, bytes: &[u8]) { - let new_len = self.len + bytes.len(); - if new_len > self.capacity { - self.reserve(new_len); - } - unsafe { - let dst = NonNull::new_unchecked(self.data.as_ptr().add(self.len)); - let src = NonNull::new_unchecked(bytes.as_ptr() as *mut u8); - memory::memcpy(dst, src, bytes.len()); - } - self.len = new_len; + let additional = bytes.len(); + let len = self.len(); + self.extend(additional); + + let slice = &mut self.as_slice_mut()[len..]; + slice.copy_from_slice(bytes); } - /// Extends the buffer by `len` with all bytes equal to `0u8`, incrementing its capacity if needed. - pub fn extend(&mut self, len: usize) { - let remaining_capacity = self.capacity - self.len; - if len > remaining_capacity { - self.reserve(self.len + len); - } - self.len += len; + /// Extends the buffer by `additional` with all bytes equal to `0u8`, incrementing its capacity if needed. + #[inline] + pub fn extend(&mut self, additional: usize) { + self.reserve(additional); + self.len += additional; + } + + pub fn set_len(&mut self, len: usize) { + assert!(len <= self.capacity()); + self.len = len; } } @@ -904,18 +875,12 @@ impl std::ops::DerefMut for MutableBuffer { } } -impl Drop for MutableBuffer { - fn drop(&mut self) { - unsafe { memory::free_aligned(self.data, self.capacity) }; - } -} - impl PartialEq for MutableBuffer { fn eq(&self, other: &MutableBuffer) -> bool { if self.len != other.len { return false; } - if self.capacity != other.capacity { + if self.capacity() != other.capacity() { return false; } self.as_slice() == other.as_slice() @@ -1101,13 +1066,14 @@ mod tests { assert_eq!(64, buf.capacity()); // Reserving a smaller capacity should have no effect. - let mut new_cap = buf.reserve(10); - assert_eq!(64, new_cap); + buf.reserve(10); assert_eq!(64, buf.capacity()); - new_cap = buf.reserve(100); - assert_eq!(128, new_cap); + buf.reserve(80); assert_eq!(128, buf.capacity()); + + buf.reserve(129); + assert_eq!(256, buf.capacity()); } #[test] @@ -1116,24 +1082,24 @@ mod tests { assert_eq!(64, buf.capacity()); assert_eq!(0, buf.len()); - buf.resize(20); + buf.resize(20, 0); assert_eq!(64, buf.capacity()); assert_eq!(20, buf.len()); - buf.resize(10); + buf.resize(10, 0); assert_eq!(64, buf.capacity()); assert_eq!(10, buf.len()); - buf.resize(100); + buf.resize(100, 0); assert_eq!(128, buf.capacity()); assert_eq!(100, buf.len()); - buf.resize(30); - assert_eq!(64, buf.capacity()); + buf.resize(30, 0); + assert_eq!(128, buf.capacity()); assert_eq!(30, buf.len()); - buf.resize(0); - assert_eq!(0, buf.capacity()); + buf.resize(0, 0); + assert_eq!(128, buf.capacity()); assert_eq!(0, buf.len()); } diff --git a/rust/arrow/src/bytes.rs b/rust/arrow/src/bytes.rs index 331011687da..6a565809586 100644 --- a/rust/arrow/src/bytes.rs +++ b/rust/arrow/src/bytes.rs @@ -125,7 +125,7 @@ impl Drop for Bytes { fn drop(&mut self) { match &self.deallocation { Deallocation::Native(capacity) => { - unsafe { memory::free_aligned(self.ptr, *capacity) }; + unsafe { memory::dealloc(self.ptr, *capacity) }; } // foreign interface knows how to deallocate itself. Deallocation::Foreign(_) => (), @@ -156,16 +156,3 @@ impl Debug for Bytes { write!(f, " }}") } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_dealloc_native() { - let capacity = 5; - let a = memory::allocate_aligned(capacity); - // create Bytes and release it. This will make `a` be an invalid pointer, but it is defined behavior - unsafe { Bytes::new(a, 3, Deallocation::Native(capacity)) }; - } -} diff --git a/rust/arrow/src/compute/kernels/comparison.rs b/rust/arrow/src/compute/kernels/comparison.rs index 92c808a7fee..92fe3a4ec9c 100644 --- a/rust/arrow/src/compute/kernels/comparison.rs +++ b/rust/arrow/src/compute/kernels/comparison.rs @@ -48,9 +48,8 @@ macro_rules! compare_op { combine_option_bitmap($left.data_ref(), $right.data_ref(), $left.len())?; let byte_capacity = bit_util::ceil($left.len(), 8); - let actual_capacity = bit_util::round_upto_multiple_of_64(byte_capacity); - let mut buffer = MutableBuffer::new(actual_capacity); - buffer.resize(byte_capacity); + let mut buffer = MutableBuffer::new(byte_capacity); + buffer.resize(byte_capacity, 0); let data = buffer.as_mut_ptr(); for i in 0..$left.len() { @@ -81,9 +80,8 @@ macro_rules! compare_op_scalar { let null_bit_buffer = $left.data().null_buffer().cloned(); let byte_capacity = bit_util::ceil($left.len(), 8); - let actual_capacity = bit_util::round_upto_multiple_of_64(byte_capacity); - let mut buffer = MutableBuffer::new(actual_capacity); - buffer.resize(byte_capacity); + let mut buffer = MutableBuffer::new(byte_capacity); + buffer.resize(byte_capacity, 0); let data = buffer.as_mut_ptr(); for i in 0..$left.len() { diff --git a/rust/arrow/src/compute/kernels/sort.rs b/rust/arrow/src/compute/kernels/sort.rs index f42d1a8d5d1..6252c743972 100644 --- a/rust/arrow/src/compute/kernels/sort.rs +++ b/rust/arrow/src/compute/kernels/sort.rs @@ -297,7 +297,7 @@ fn sort_boolean( // collect results directly into a buffer instead of a vec to avoid another aligned allocation let mut result = MutableBuffer::new(values.len() * std::mem::size_of::()); // sets len to capacity so we can access the whole buffer as a typed slice - result.resize(values.len() * std::mem::size_of::()); + result.resize(values.len() * std::mem::size_of::(), 0); let result_slice: &mut [u32] = result.typed_data_mut(); debug_assert_eq!(result_slice.len(), nulls_len + valids_len); @@ -364,7 +364,7 @@ where // collect results directly into a buffer instead of a vec to avoid another aligned allocation let mut result = MutableBuffer::new(values.len() * std::mem::size_of::()); // sets len to capacity so we can access the whole buffer as a typed slice - result.resize(values.len() * std::mem::size_of::()); + result.resize(values.len() * std::mem::size_of::(), 0); let result_slice: &mut [u32] = result.typed_data_mut(); debug_assert_eq!(result_slice.len(), nulls_len + nans_len + valids_len); diff --git a/rust/arrow/src/compute/kernels/take.rs b/rust/arrow/src/compute/kernels/take.rs index ebb3022525f..5192e29bb45 100644 --- a/rust/arrow/src/compute/kernels/take.rs +++ b/rust/arrow/src/compute/kernels/take.rs @@ -272,7 +272,7 @@ where let data_len = indices.len(); let mut buffer = MutableBuffer::new(data_len * std::mem::size_of::()); - buffer.resize(data_len * std::mem::size_of::()); + buffer.resize(data_len * std::mem::size_of::(), 0); let data = buffer.typed_data_mut(); let nulls; @@ -417,7 +417,7 @@ where let bytes_offset = (data_len + 1) * std::mem::size_of::(); let mut offsets_buffer = MutableBuffer::new(bytes_offset); - offsets_buffer.resize(bytes_offset); + offsets_buffer.resize(bytes_offset, 0); let offsets = offsets_buffer.typed_data_mut(); let mut values = Vec::with_capacity(bytes_offset); diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index d2cf47e80a1..35f7080145b 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -942,6 +942,7 @@ pub trait ToByteSlice { } impl ToByteSlice for [T] { + #[inline] fn to_byte_slice(&self) -> &[u8] { let raw_ptr = self.as_ptr() as *const T as *const u8; unsafe { from_raw_parts(raw_ptr, self.len() * size_of::()) } @@ -949,6 +950,7 @@ impl ToByteSlice for [T] { } impl ToByteSlice for T { + #[inline] fn to_byte_slice(&self) -> &[u8] { let raw_ptr = self as *const T as *const u8; unsafe { from_raw_parts(raw_ptr, size_of::()) } diff --git a/rust/arrow/src/lib.rs b/rust/arrow/src/lib.rs index 9c91d38566f..929abd6c33b 100644 --- a/rust/arrow/src/lib.rs +++ b/rust/arrow/src/lib.rs @@ -135,11 +135,12 @@ // introduced to ignore lint errors when upgrading from 2020-04-22 to 2020-11-14 #![allow(clippy::float_equality_without_abs, clippy::type_complexity)] +pub(crate) mod alloc; mod arch; pub mod array; pub mod bitmap; pub mod buffer; -pub mod bytes; +mod bytes; pub mod compute; pub mod csv; pub mod datatypes; @@ -147,7 +148,7 @@ pub mod error; pub mod ffi; pub mod ipc; pub mod json; -pub mod memory; +mod memory; pub mod record_batch; pub mod tensor; pub mod util; diff --git a/rust/arrow/src/memory.rs b/rust/arrow/src/memory.rs index ad103b06280..bf9e202bf5e 100644 --- a/rust/arrow/src/memory.rs +++ b/rust/arrow/src/memory.rs @@ -15,17 +15,15 @@ // specific language governing permissions and limitations // under the License. -//! Defines memory-related functions, such as allocate/deallocate/reallocate memory -//! regions, cache and allocation alignments. +//! This module is built on top of [arrow::alloc] and its main responsibility is to offer an API +//! to allocate/reallocate and deallocate memory aligned with cache-lines. -use std::mem::align_of; +use std::alloc::Layout; use std::ptr::NonNull; -use std::{ - alloc::{handle_alloc_error, Layout}, - sync::atomic::AtomicIsize, -}; -// NOTE: Below code is written for spatial/temporal prefetcher optimizations. Memory allocation +use crate::{alloc, util::bit_util}; + +// NOTE: Below code (`ALIGNMENT`) is written for spatial/temporal prefetcher optimizations. Memory allocation // should align well with usage pattern of cache access and block sizes on layers of storage levels from // registers to non-volatile memory. These alignments are all cache aware alignments incorporated // from [cuneiform](https://crates.io/crates/cuneiform) crate. This approach mimicks Intel TBB's @@ -132,153 +130,171 @@ pub const ALIGNMENT: usize = 1 << 6; /// Fallback cache and allocation multiple alignment size const FALLBACK_ALIGNMENT: usize = 1 << 6; -/// -/// As you can see this is global and lives as long as the program lives. -/// Be careful to not write anything to this pointer in any scenario. -/// If you use allocation methods shown here you won't have any problems. -const BYPASS_PTR: NonNull = unsafe { NonNull::new_unchecked(ALIGNMENT as *mut u8) }; - -// If this number is not zero after all objects have been `drop`, there is a memory leak -pub static mut ALLOCATIONS: AtomicIsize = AtomicIsize::new(0); - -pub fn allocate_aligned(size: usize) -> NonNull { - unsafe { - if size == 0 { - // In a perfect world, there is no need to request zero size allocation. - // Currently, passing zero sized layout to alloc is UB. - // This will dodge allocator api for any type. - BYPASS_PTR - } else { - ALLOCATIONS.fetch_add(size as isize, std::sync::atomic::Ordering::SeqCst); +/// A struct to keep track of cache-aligned contiguous memory regions. +/// Similar to `std::alloc::RawVec`, with the following differences: +/// * (re)allocates along (arch-specific) cache lines +/// * (re)allocates in multiples of 64 bytes. +/// * (re)allocates initialized with zeros. +#[derive(Debug)] +pub struct RawBytes { + // pointer is dangling iff cap == 0 + ptr: NonNull, + cap: usize, +} - let layout = Layout::from_size_align_unchecked(size, ALIGNMENT); - let raw_ptr = std::alloc::alloc_zeroed(layout); - NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout)) +impl RawBytes { + /// Creates a [RawBytes] without allocations. + pub fn new() -> Self { + Self { + // safe: ALIGNMENT > 0 + ptr: alloc::dangling(unsafe { + Layout::from_size_align_unchecked(0, ALIGNMENT) + }), + cap: 0, } } -} -/// # Safety -/// -/// This function is unsafe because undefined behavior can result if the caller does not ensure all -/// of the following: -/// -/// * ptr must denote a block of memory currently allocated via this allocator, -/// -/// * size must be the same size that was used to allocate that block of memory, -pub unsafe fn free_aligned(ptr: NonNull, size: usize) { - if ptr != BYPASS_PTR { - ALLOCATIONS.fetch_sub(size as isize, std::sync::atomic::Ordering::SeqCst); - std::alloc::dealloc( - ptr.as_ptr(), - Layout::from_size_align_unchecked(size, ALIGNMENT), - ); + /// Creates a [RawBytes] with at least `capacity` bytes initialized to zero. + /// + /// # Panics + /// + /// Panics if the requested capacity exceeds `isize::MAX` bytes. + /// + /// # Aborts + /// + /// Aborts on OOM. + #[inline] + pub fn with_capacity(capacity: usize) -> Self { + Self::allocate(capacity) } -} -/// # Safety -/// -/// This function is unsafe because undefined behavior can result if the caller does not ensure all -/// of the following: -/// -/// * ptr must be currently allocated via this allocator, -/// -/// * new_size must be greater than zero. -/// -/// * new_size, when rounded up to the nearest multiple of [ALIGNMENT], must not overflow (i.e., -/// the rounded value must be less than usize::MAX). -pub unsafe fn reallocate( - ptr: NonNull, - old_size: usize, - new_size: usize, -) -> NonNull { - if ptr == BYPASS_PTR { - return allocate_aligned(new_size); + /// Gets the capacity of the allocation. + #[inline(always)] + pub const fn capacity(&self) -> usize { + self.cap } - if new_size == 0 { - free_aligned(ptr, old_size); - return BYPASS_PTR; + #[inline] + fn allocate(capacity: usize) -> Self { + let capacity = bit_util::round_upto_multiple_of_64(capacity); + let layout = Layout::from_size_align(capacity, ALIGNMENT) + .unwrap_or_else(|_| capacity_overflow()); + alloc_guard(layout.size()); + + let memory = alloc::alloc(layout); + Self { + ptr: memory.ptr, + cap: memory.size, + } } - ALLOCATIONS.fetch_add( - new_size as isize - old_size as isize, - std::sync::atomic::Ordering::SeqCst, - ); - let raw_ptr = std::alloc::realloc( - ptr.as_ptr(), - Layout::from_size_align_unchecked(old_size, ALIGNMENT), - new_size, - ); - let ptr = NonNull::new(raw_ptr).unwrap_or_else(|| { - handle_alloc_error(Layout::from_size_align_unchecked(new_size, ALIGNMENT)) - }); - - if new_size > old_size { - ptr.as_ptr() - .add(old_size) - .write_bytes(0, new_size - old_size); + /// Returns if the buffer needs to grow to fulfill the needed extra capacity. + #[inline] + fn needs_to_grow(&self, len: usize, additional: usize) -> bool { + additional > self.capacity().wrapping_sub(len) } - ptr -} -/// # Safety -/// -/// Behavior is undefined if any of the following conditions are violated: -/// -/// * `src` must be valid for reads of `len * size_of::()` bytes. -/// -/// * `dst` must be valid for writes of `len * size_of::()` bytes. -/// -/// * Both `src` and `dst` must be properly aligned. -/// -/// `memcpy` creates a bitwise copy of `T`, regardless of whether `T` is [`Copy`]. If `T` is not -/// [`Copy`], using both the values in the region beginning at `*src` and the region beginning at -/// `*dst` can [violate memory safety][read-ownership]. -pub unsafe fn memcpy(dst: NonNull, src: NonNull, count: usize) { - if src != BYPASS_PTR { - std::ptr::copy_nonoverlapping(src.as_ptr(), dst.as_ptr(), count) + /// reserves `additional` bytes assuming that the underlying container has size `len`, + /// guaranteeing that [RawBytes] has at least `len + additional` bytes. + #[inline] + pub fn reserve(&mut self, len: usize, additional: usize) { + if self.needs_to_grow(len, additional) { + self.grow(len, additional) + } } -} -/// Check if the pointer `p` is aligned to offset `a`. -pub fn is_aligned(p: NonNull, a: usize) -> bool { - let a_minus_one = a.wrapping_sub(1); - let pmoda = p.as_ptr() as usize & a_minus_one; - pmoda == 0 -} + #[inline] + fn set_memory(&mut self, memory: alloc::MemoryBlock) { + self.ptr = memory.ptr; + self.cap = memory.size; + } + + // equivalent to `RawVec::grow_amortized(); RawVec::finish()` with the following differences: + // * panics instead of `Result` (i.e. overflows crash) + // * capacity changes are always multiples of 64 + // * alignment is constant and equal to `ALIGNMENT` + #[inline] + fn grow(&mut self, len: usize, additional: usize) { + let capacity = len + .checked_add(additional) + .unwrap_or_else(|| capacity_overflow()); + let capacity = bit_util::round_upto_multiple_of_64(capacity); + let capacity = std::cmp::max(self.cap * 2, capacity); + + let new_layout = Layout::from_size_align(capacity, ALIGNMENT) + .unwrap_or_else(|_| capacity_overflow()); + + alloc_guard(new_layout.size()); + + let memory = if let Some((ptr, old_capacity)) = self.current_memory() { + let old_layout = + unsafe { Layout::from_size_align_unchecked(old_capacity, ALIGNMENT) }; + debug_assert_eq!(old_layout.align(), new_layout.align()); + unsafe { alloc::grow(ptr, old_layout, new_layout.size()) } + } else { + alloc::alloc(new_layout) + }; + + self.set_memory(memory); + } -pub fn is_ptr_aligned(p: NonNull) -> bool { - p.as_ptr().align_offset(align_of::()) == 0 + #[inline] + fn current_memory(&self) -> Option<(NonNull, usize)> { + if self.cap == 0 { + // the pointer is not even valid, so we do not even expose it. + None + } else { + Some((self.ptr, self.cap)) + } + } + + /// Returns the pointer to the start of the allocation. Note that this is + /// a dangling pointer iff `capacity == 0`. + #[inline(always)] + pub fn ptr(&self) -> NonNull { + self.ptr + } } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_allocate() { - for _ in 0..10 { - let p = allocate_aligned(1024); - // make sure this is 64-byte aligned - assert_eq!(0, (p.as_ptr() as usize) % 64); - unsafe { free_aligned(p, 1024) }; +impl Drop for RawBytes { + fn drop(&mut self) { + if let Some((ptr, capacity)) = self.current_memory() { + // this is safe because we own `ptr` and it is allocated (the invariant that ptr is valid iff cap > 0) + unsafe { dealloc(ptr, capacity) } } } +} - #[test] - fn test_is_aligned() { - // allocate memory aligned to 64-byte - let ptr = allocate_aligned(10); - assert_eq!(true, is_aligned::(ptr, 1)); - assert_eq!(true, is_aligned::(ptr, 2)); - assert_eq!(true, is_aligned::(ptr, 4)); - - // now make the memory aligned to 63-byte - let ptr = unsafe { NonNull::new_unchecked(ptr.as_ptr().offset(1)) }; - assert_eq!(true, is_aligned::(ptr, 1)); - assert_eq!(false, is_aligned::(ptr, 2)); - assert_eq!(false, is_aligned::(ptr, 4)); - unsafe { free_aligned(NonNull::new_unchecked(ptr.as_ptr().offset(-1)), 10) }; +/// Deallocates a memory region previously allocated by [RawBytes]. +/// # Safety +/// Do not use if the pointer was not allocated via `RawBytes`. +pub(super) unsafe fn dealloc(ptr: NonNull, capacity: usize) { + alloc::dealloc(ptr, Layout::from_size_align_unchecked(capacity, ALIGNMENT)) +} + +// We need to guarantee the following: +// * We don't ever allocate `> isize::MAX` byte-size objects. +// * We don't overflow `usize::MAX` and actually allocate too little. +// +// On 64-bit we just need to check for overflow since trying to allocate +// `> isize::MAX` bytes will surely fail. On 32-bit and 16-bit we need to add +// an extra guard for this in case we're running on a platform which can use +// all 4GB in user-space, e.g., PAE or x32. +#[inline] +fn alloc_guard(alloc_size: usize) { + if std::mem::size_of::() < 8 && alloc_size > isize::MAX as usize { + panic!("capacity overflow"); } } + +// One central function responsible for reporting capacity overflows. This'll +// ensure that the code generation related to these panics is minimal as there's +// only one location which panics rather than a bunch throughout the module. +fn capacity_overflow() -> ! { + panic!("capacity overflow"); +} + +/// Check if the pointer `p` is aligned with `T` +pub fn is_ptr_aligned(p: NonNull) -> bool { + p.as_ptr().align_offset(std::mem::align_of::()) == 0 +} diff --git a/rust/arrow/src/zz_memory_check.rs b/rust/arrow/src/zz_memory_check.rs index 70ec8ebdbdd..cf92887bd39 100644 --- a/rust/arrow/src/zz_memory_check.rs +++ b/rust/arrow/src/zz_memory_check.rs @@ -21,7 +21,7 @@ #[cfg(feature = "memory-check")] mod tests { - use crate::memory::ALLOCATIONS; + use crate::alloc::ALLOCATIONS; // verify that there is no data un-allocated #[test]