From c8eba3686d13ed5b976b9768aadd36049244ac73 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 2 Jan 2021 07:50:45 +0000 Subject: [PATCH 01/11] Fixed bench --- rust/arrow/benches/length_kernel.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/rust/arrow/benches/length_kernel.rs b/rust/arrow/benches/length_kernel.rs index cdc338acee4..b70f6374f8f 100644 --- a/rust/arrow/benches/length_kernel.rs +++ b/rust/arrow/benches/length_kernel.rs @@ -24,25 +24,23 @@ extern crate arrow; use arrow::array::*; use arrow::compute::kernels::length::length; -fn bench_length() { +fn bench_length(array: &StringArray) { + criterion::black_box(length(array).unwrap()); +} + +fn add_benchmark(c: &mut Criterion) { fn double_vec(v: Vec) -> Vec { [&v[..], &v[..]].concat() } // double ["hello", " ", "world", "!"] 10 times let mut values = vec!["one", "on", "o", ""]; - let mut expected = vec![3, 2, 1, 0]; for _ in 0..10 { values = double_vec(values); - expected = double_vec(expected); } let array = StringArray::from(values); - criterion::black_box(length(&array).unwrap()); -} - -fn add_benchmark(c: &mut Criterion) { - c.bench_function("length", |b| b.iter(bench_length)); + c.bench_function("length", |b| b.iter(|| bench_length(&array))); } criterion_group!(benches, add_benchmark); From 4a3800639a9c88a8de9d613cd3afd5af4bd74a41 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 17 Jan 2021 18:54:22 +0100 Subject: [PATCH 02/11] Added methods to extend MutableBuffer from iterators. --- .../arrow/src/array/transform/fixed_binary.rs | 4 +- rust/arrow/src/array/transform/primitive.rs | 2 +- rust/arrow/src/buffer.rs | 178 ++++++++++++++++-- 3 files changed, 170 insertions(+), 14 deletions(-) diff --git a/rust/arrow/src/array/transform/fixed_binary.rs b/rust/arrow/src/array/transform/fixed_binary.rs index 477d2ad277c..36952d46a4d 100644 --- a/rust/arrow/src/array/transform/fixed_binary.rs +++ b/rust/arrow/src/array/transform/fixed_binary.rs @@ -46,7 +46,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { let bytes = &values[i * size..(i + 1) * size]; values_buffer.extend_from_slice(bytes); } else { - values_buffer.extend(size); + values_buffer.extend_zeros(size); } }) }, @@ -61,5 +61,5 @@ pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) { }; let values_buffer = &mut mutable.buffer1; - values_buffer.extend(len * size); + values_buffer.extend_zeros(len * size); } diff --git a/rust/arrow/src/array/transform/primitive.rs b/rust/arrow/src/array/transform/primitive.rs index 77038432459..032bb4a8779 100644 --- a/rust/arrow/src/array/transform/primitive.rs +++ b/rust/arrow/src/array/transform/primitive.rs @@ -36,5 +36,5 @@ pub(super) fn extend_nulls( mutable: &mut _MutableArrayData, len: usize, ) { - mutable.buffer1.extend(len * size_of::()); + mutable.buffer1.extend_zeros(len * size_of::()); } diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index cf300fe3fd3..2e27ab706c2 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -23,19 +23,19 @@ use packed_simd::u8x64; use crate::{ bytes::{Bytes, Deallocation}, - datatypes::ToByteSlice, + datatypes::{ArrowNativeType, ToByteSlice}, ffi, }; -use std::convert::AsRef; use std::fmt::Debug; +use std::iter::FromIterator; use std::ops::{BitAnd, BitOr, Not}; use std::ptr::NonNull; use std::sync::Arc; +use std::{convert::AsRef, usize}; #[cfg(feature = "avx512")] use crate::arch::avx512::*; -use crate::datatypes::ArrowNativeType; use crate::error::{ArrowError, Result}; use crate::memory; use crate::util::bit_chunk_iterator::BitChunks; @@ -697,6 +697,7 @@ unsafe impl Sync for Buffer {} unsafe impl Send for Buffer {} impl From for Buffer { + #[inline] fn from(buffer: MutableBuffer) -> Self { buffer.into_buffer() } @@ -728,12 +729,12 @@ pub struct MutableBuffer { impl MutableBuffer { /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`. pub fn new(capacity: usize) -> Self { - let new_capacity = bit_util::round_upto_multiple_of_64(capacity); - let ptr = memory::allocate_aligned(new_capacity); + let capacity = bit_util::round_upto_multiple_of_64(capacity); + let ptr = memory::allocate_aligned(capacity); Self { data: ptr, len: 0, - capacity: new_capacity, + capacity, } } @@ -810,10 +811,14 @@ impl MutableBuffer { pub fn reserve(&mut self, additional: usize) { let required_cap = self.len + additional; if required_cap > self.capacity { - let new_capacity = bit_util::round_upto_multiple_of_64(required_cap); - let new_capacity = std::cmp::max(new_capacity, self.capacity * 2); - self.data = - unsafe { memory::reallocate(self.data, self.capacity, new_capacity) }; + // JUSTIFICATION + // Benefit + // necessity + // Soundness + // `self.data` is valid for `self.capacity`. + let (ptr, new_capacity) = + unsafe { reallocate(self.data, self.capacity, required_cap) }; + self.data = ptr; self.capacity = new_capacity; } } @@ -963,11 +968,130 @@ impl MutableBuffer { /// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed. #[inline] - pub fn extend(&mut self, additional: usize) { + pub fn extend_zeros(&mut self, additional: usize) { self.resize(self.len + additional, 0); } } +/// # Safety +/// `ptr` must be allocated for `old_capacity`. +#[inline] +unsafe fn reallocate( + ptr: NonNull, + old_capacity: usize, + new_capacity: usize, +) -> (NonNull, usize) { + let new_capacity = bit_util::round_upto_multiple_of_64(new_capacity); + let new_capacity = std::cmp::max(new_capacity, old_capacity * 2); + let ptr = memory::reallocate(ptr, old_capacity, new_capacity); + (ptr, new_capacity) +} + +impl Extend for MutableBuffer { + #[inline] + fn extend>(&mut self, iter: T) { + let iterator = iter.into_iter(); + self.extend_from_iter(iterator) + } +} + +impl MutableBuffer { + #[inline] + fn extend_from_iter>( + &mut self, + mut iterator: I, + ) { + let size = std::mem::size_of::(); + + // this is necessary because of https://github.com/rust-lang/rust/issues/32155 + let (mut ptr, mut capacity, mut len) = (self.data, self.capacity, self.len); + let mut dst = unsafe { ptr.as_ptr().add(len) as *mut T }; + + while let Some(item) = iterator.next() { + if len >= capacity { + let (lower, _) = iterator.size_hint(); + let additional = (lower + 1) * size; + let (new_ptr, new_capacity) = + unsafe { reallocate(ptr, capacity, len + additional) }; + ptr = new_ptr; + capacity = new_capacity; + // pointer may have moved. Update it. + dst = unsafe { ptr.as_ptr().add(len) as *mut T }; + } + unsafe { + std::ptr::write(dst, item); + dst = dst.add(1); + } + len += size; + } + + self.data = ptr; + self.capacity = capacity; + self.len = len; + } + + /// Creates a [`MutableBuffer`] from an [`ExactSizeIterator`] with a trusted len. + /// Prefer this to `collect` whenever possible, as it is faster. + /// # Example + /// ``` + /// # use arrow::buffer::MutableBuffer; + /// let iter = vec![1u32].iter().map(|x| x * 2); + /// let mut buffer = MutableBuffer::new(0); + /// unsafe { buffer.extend_from_trusted_len_iter(iter) }; + /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes + /// ``` + /// # Safety + /// This method assumes that the iterator's size is correct and is undefined behavior + /// to use it on an iterator that reports an incorrect length. + // This implementation is required for two reasons: + // 1. there is no trait `TrustedLen` in stable rust and therefore + // we can't specialize `extend` for `TrustedLen` like `Vec` does. + // 2. `extend_from_trusted_len_iter` is faster. + pub unsafe fn extend_from_trusted_len_iter< + T: ArrowNativeType, + I: ExactSizeIterator, + >( + &mut self, + iterator: I, + ) { + let len = iterator.len() * std::mem::size_of::(); + + self.reserve(len); + + let mut dst = self.data.as_ptr().add(self.len) as *mut T; + for item in iterator { + // note how there is not reserve here (compared with `extend_from_iter`) + std::ptr::write(dst, item); + dst = dst.add(1); + } + self.len += len; + } +} + +impl FromIterator for MutableBuffer { + fn from_iter>(iter: I) -> Self { + let mut iterator = iter.into_iter(); + let size = std::mem::size_of::(); + + // first iteration, which will likely reserve sufficient space for the buffer. + let mut buffer = match iterator.next() { + None => return Self::new(0), + Some(element) => { + let (lower, _) = iterator.size_hint(); + let mut buffer = Self::new(lower.saturating_add(1) * size); + unsafe { + std::ptr::write(buffer.as_mut_ptr() as *mut T, element); + buffer.len = size; + } + buffer + } + }; + + buffer.extend_from_iter(iterator); + buffer + } +} + impl std::ops::Deref for MutableBuffer { type Target = [u8]; @@ -1172,6 +1296,38 @@ mod tests { assert_eq!(b"hello arrow", buf.as_slice()); } + #[test] + fn mutable_extend_from_iter() { + let mut buf = MutableBuffer::new(0); + buf.extend(vec![1u32, 2]); + assert_eq!(8, buf.len()); + assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice()); + + buf.extend(vec![3u32, 4]); + assert_eq!(16, buf.len()); + assert_eq!( + &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0], + buf.as_slice() + ); + } + + #[test] + fn mutable_extend_from_trusted_len_iter() { + let mut buf = MutableBuffer::new(0); + let iter = vec![1u32, 2].into_iter(); + unsafe { buf.extend_from_trusted_len_iter(iter) }; + assert_eq!(8, buf.len()); + assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice()); + + let iter = vec![3u32, 4].into_iter(); + unsafe { buf.extend_from_trusted_len_iter(iter) }; + assert_eq!(16, buf.len()); + assert_eq!( + &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0], + buf.as_slice() + ); + } + #[test] fn test_mutable_reserve() { let mut buf = MutableBuffer::new(1); From cba4b64b8ff419982dadf0e7dc5f2ff2b1b53e2e Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 17 Jan 2021 18:57:25 +0100 Subject: [PATCH 03/11] Improved performance of length. --- rust/arrow/src/compute/kernels/length.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/rust/arrow/src/compute/kernels/length.rs b/rust/arrow/src/compute/kernels/length.rs index a0107c35f91..74a90ccc5cf 100644 --- a/rust/arrow/src/compute/kernels/length.rs +++ b/rust/arrow/src/compute/kernels/length.rs @@ -17,7 +17,7 @@ //! Defines kernel for length of a string array -use crate::{array::*, buffer::Buffer}; +use crate::{array::*, buffer::MutableBuffer}; use crate::{ datatypes::DataType, error::{ArrowError, Result}, @@ -29,16 +29,21 @@ where OffsetSize: OffsetSizeTrait, { // note: offsets are stored as u8, but they can be interpreted as OffsetSize - let offsets = array.data_ref().clone().buffers()[0].clone(); + let offsets = &array.data_ref().buffers()[0]; // this is a 30% improvement over iterating over u8s and building OffsetSize, which // justifies the usage of `unsafe`. let slice: &[OffsetSize] = &unsafe { offsets.typed_data::() }[array.offset()..]; - let lengths: Vec = slice - .windows(2) - .map(|offset| offset[1] - offset[0]) - .collect(); + let lengths = slice.windows(2).map(|offset| offset[1] - offset[0]); + + let mut buffer = MutableBuffer::new(0); + // JUSTIFICATION + // Benefit + // ~30% speedup + // Soundness + // `windows` is an iterator with a known size. + unsafe { buffer.extend_from_trusted_len_iter(lengths) }; let null_bit_buffer = array .data_ref() @@ -52,7 +57,7 @@ where None, null_bit_buffer, 0, - vec![Buffer::from_slice_ref(&lengths)], + vec![buffer.into()], vec![], ); Ok(make_array(Arc::new(data))) From aa5f5039515c1598bf44dafe5204873bdff7d2f4 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 17 Jan 2021 18:57:58 +0100 Subject: [PATCH 04/11] Improved performance of arithmetic ops. --- rust/arrow/src/compute/kernels/arithmetic.rs | 26 +++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index 98c0660ba2b..1bac981bf32 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -27,9 +27,7 @@ use std::sync::Arc; use num::{One, Zero}; -use crate::buffer::Buffer; -#[cfg(feature = "simd")] -use crate::buffer::MutableBuffer; +use crate::buffer::{Buffer, MutableBuffer}; use crate::compute::util::combine_option_bitmap; use crate::datatypes; use crate::datatypes::ArrowNumericType; @@ -51,11 +49,14 @@ where T::Native: Neg, F: Fn(T::Native) -> T::Native, { - let values = array - .values() - .iter() - .map(|v| op(*v)) - .collect::>(); + let values = array.values().iter().map(|v| op(*v)); + let mut buffer = MutableBuffer::new(0); + // JUSTIFICATION + // Benefit + // ~30% speedup + // Soundness + // `windows` is an iterator with a known size. + unsafe { buffer.extend_from_trusted_len_iter(values) }; let data = ArrayData::new( T::DATA_TYPE, @@ -63,7 +64,7 @@ where None, array.data_ref().null_buffer().cloned(), 0, - vec![Buffer::from_slice_ref(&values)], + vec![buffer.into()], vec![], ); Ok(PrimitiveArray::::from(Arc::new(data))) @@ -147,8 +148,9 @@ where .values() .iter() .zip(right.values().iter()) - .map(|(l, r)| op(*l, *r)) - .collect::>(); + .map(|(l, r)| op(*l, *r)); + let mut buffer = MutableBuffer::new(0); + unsafe { buffer.extend_from_trusted_len_iter(values) }; let data = ArrayData::new( T::DATA_TYPE, @@ -156,7 +158,7 @@ where None, null_bit_buffer, 0, - vec![Buffer::from_slice_ref(&values)], + vec![buffer.into()], vec![], ); Ok(PrimitiveArray::::from(Arc::new(data))) From 681cf45c613ac71d82adaf4c66e8c5bef608261a Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 20 Jan 2021 17:46:28 +0100 Subject: [PATCH 05/11] Fixed error. --- rust/arrow/src/buffer.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 2e27ab706c2..336d3c622c3 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -1008,7 +1008,7 @@ impl MutableBuffer { let mut dst = unsafe { ptr.as_ptr().add(len) as *mut T }; while let Some(item) = iterator.next() { - if len >= capacity { + if len + size >= capacity { let (lower, _) = iterator.size_hint(); let additional = (lower + 1) * size; let (new_ptr, new_capacity) = @@ -1035,7 +1035,8 @@ impl MutableBuffer { /// # Example /// ``` /// # use arrow::buffer::MutableBuffer; - /// let iter = vec![1u32].iter().map(|x| x * 2); + /// let v = vec![1u32]; + /// let iter = v.iter().map(|x| x * 2); /// let mut buffer = MutableBuffer::new(0); /// unsafe { buffer.extend_from_trusted_len_iter(iter) }; /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes From fb0346415d005906236a9da6afbcbb76e0e27678 Mon Sep 17 00:00:00 2001 From: Matt Brubeck Date: Wed, 20 Jan 2021 09:10:20 -0800 Subject: [PATCH 06/11] Benchmark extend vs extend_from_trusted_len_iter --- rust/arrow/benches/buffer_create.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/rust/arrow/benches/buffer_create.rs b/rust/arrow/benches/buffer_create.rs index c5b9a4e69a9..61fc3764950 100644 --- a/rust/arrow/benches/buffer_create.rs +++ b/rust/arrow/benches/buffer_create.rs @@ -39,6 +39,28 @@ fn mutable_buffer(data: &[Vec], capacity: usize) -> Buffer { }) } +fn mutable_buffer_extend(data: &[Vec], capacity: usize) -> Buffer { + criterion::black_box({ + let mut result = MutableBuffer::new(capacity); + + data.iter().for_each(|vec| result.extend(vec.iter().copied())); + + result.into() + }) +} + +fn mutable_buffer_extend_trusted(data: &[Vec], capacity: usize) -> Buffer { + criterion::black_box({ + let mut result = MutableBuffer::new(capacity); + + data.iter().for_each(|vec| unsafe { + result.extend_from_trusted_len_iter(vec.iter().copied()) + }); + + result.into() + }) +} + fn from_slice(data: &[Vec], capacity: usize) -> Buffer { criterion::black_box({ let mut a = Vec::::with_capacity(capacity); @@ -72,6 +94,10 @@ fn benchmark(c: &mut Criterion) { c.bench_function("mutable", |b| b.iter(|| mutable_buffer(&data, 0))); + c.bench_function("mutable extend", |b| b.iter(|| mutable_buffer_extend(&data, 0))); + + c.bench_function("mutable extend trusted", |b| b.iter(|| mutable_buffer_extend_trusted(&data, 0))); + c.bench_function("mutable prepared", |b| { b.iter(|| mutable_buffer(&data, byte_cap)) }); From b96559e0ac4a5bc670a1c0545b4cee70689e3c31 Mon Sep 17 00:00:00 2001 From: Matt Brubeck Date: Wed, 20 Jan 2021 09:45:39 -0800 Subject: [PATCH 07/11] Optimize extend_from_iter --- rust/arrow/src/buffer.rs | 56 ++++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 336d3c622c3..139beb3e3de 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -1002,32 +1002,29 @@ impl MutableBuffer { mut iterator: I, ) { let size = std::mem::size_of::(); + let (lower, _) = iterator.size_hint(); + let additional = lower * size; + self.reserve(additional); // this is necessary because of https://github.com/rust-lang/rust/issues/32155 - let (mut ptr, mut capacity, mut len) = (self.data, self.capacity, self.len); - let mut dst = unsafe { ptr.as_ptr().add(len) as *mut T }; + let mut len = SetLenOnDrop::new(&mut self.len); + let mut dst = unsafe { self.data.as_ptr().add(len.local_len) as *mut T }; + let capacity = self.capacity; - while let Some(item) = iterator.next() { - if len + size >= capacity { - let (lower, _) = iterator.size_hint(); - let additional = (lower + 1) * size; - let (new_ptr, new_capacity) = - unsafe { reallocate(ptr, capacity, len + additional) }; - ptr = new_ptr; - capacity = new_capacity; - // pointer may have moved. Update it. - dst = unsafe { ptr.as_ptr().add(len) as *mut T }; - } - unsafe { - std::ptr::write(dst, item); - dst = dst.add(1); + while len.local_len + size <= capacity { + if let Some(item) = iterator.next() { + unsafe { + std::ptr::write(dst, item); + dst = dst.add(1); + } + len.local_len += size; + } else { + break; } - len += size; } + drop(len); - self.data = ptr; - self.capacity = capacity; - self.len = len; + iterator.for_each(|item| self.push(item)); } /// Creates a [`MutableBuffer`] from an [`ExactSizeIterator`] with a trusted len. @@ -1128,6 +1125,25 @@ impl PartialEq for MutableBuffer { unsafe impl Sync for MutableBuffer {} unsafe impl Send for MutableBuffer {} +struct SetLenOnDrop<'a> { + len: &'a mut usize, + local_len: usize, +} + +impl<'a> SetLenOnDrop<'a> { + #[inline] + fn new(len: &'a mut usize) -> Self { + SetLenOnDrop { local_len: *len, len } + } +} + +impl Drop for SetLenOnDrop<'_> { + #[inline] + fn drop(&mut self) { + *self.len = self.local_len; + } +} + #[cfg(test)] mod tests { use std::thread; From ff89f4a8d4c71fac4fb7ecb9c7eb198066fb98a8 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Thu, 21 Jan 2021 05:23:14 +0100 Subject: [PATCH 08/11] Improved comment. --- rust/arrow/src/buffer.rs | 2 +- rust/arrow/src/compute/kernels/arithmetic.rs | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 139beb3e3de..60fb8f38ca4 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -1058,7 +1058,7 @@ impl MutableBuffer { let mut dst = self.data.as_ptr().add(self.len) as *mut T; for item in iterator { - // note how there is not reserve here (compared with `extend_from_iter`) + // note how there is no reserve here (compared with `extend_from_iter`) std::ptr::write(dst, item); dst = dst.add(1); } diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index 1bac981bf32..eb1c5f95931 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -55,7 +55,7 @@ where // Benefit // ~30% speedup // Soundness - // `windows` is an iterator with a known size. + // `values` is an iterator with a known size. unsafe { buffer.extend_from_trusted_len_iter(values) }; let data = ArrayData::new( @@ -150,6 +150,11 @@ where .zip(right.values().iter()) .map(|(l, r)| op(*l, *r)); let mut buffer = MutableBuffer::new(0); + // JUSTIFICATION + // Benefit + // ~60% speedup + // Soundness + // `values` is an iterator with a known size. unsafe { buffer.extend_from_trusted_len_iter(values) }; let data = ArrayData::new( From 579b8db55b40893632671a4d295759e5f93826d5 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Thu, 21 Jan 2021 19:27:33 +0100 Subject: [PATCH 09/11] fmt --- rust/arrow/benches/buffer_create.rs | 11 ++++++++--- rust/arrow/src/buffer.rs | 5 ++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/rust/arrow/benches/buffer_create.rs b/rust/arrow/benches/buffer_create.rs index 61fc3764950..a79f6e71303 100644 --- a/rust/arrow/benches/buffer_create.rs +++ b/rust/arrow/benches/buffer_create.rs @@ -43,7 +43,8 @@ fn mutable_buffer_extend(data: &[Vec], capacity: usize) -> Buffer { criterion::black_box({ let mut result = MutableBuffer::new(capacity); - data.iter().for_each(|vec| result.extend(vec.iter().copied())); + data.iter() + .for_each(|vec| result.extend(vec.iter().copied())); result.into() }) @@ -94,9 +95,13 @@ fn benchmark(c: &mut Criterion) { c.bench_function("mutable", |b| b.iter(|| mutable_buffer(&data, 0))); - c.bench_function("mutable extend", |b| b.iter(|| mutable_buffer_extend(&data, 0))); + c.bench_function("mutable extend", |b| { + b.iter(|| mutable_buffer_extend(&data, 0)) + }); - c.bench_function("mutable extend trusted", |b| b.iter(|| mutable_buffer_extend_trusted(&data, 0))); + c.bench_function("mutable extend trusted", |b| { + b.iter(|| mutable_buffer_extend_trusted(&data, 0)) + }); c.bench_function("mutable prepared", |b| { b.iter(|| mutable_buffer(&data, byte_cap)) diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 60fb8f38ca4..4fb1695eaba 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -1133,7 +1133,10 @@ struct SetLenOnDrop<'a> { impl<'a> SetLenOnDrop<'a> { #[inline] fn new(len: &'a mut usize) -> Self { - SetLenOnDrop { local_len: *len, len } + SetLenOnDrop { + local_len: *len, + len, + } } } From 844e20ae9575c1df5fcd52af446c0bfb7f5f7ad1 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Thu, 21 Jan 2021 21:55:45 +0100 Subject: [PATCH 10/11] Improved API. --- rust/arrow/benches/buffer_create.rs | 16 ---- rust/arrow/src/buffer.rs | 82 +++++++++++++------- rust/arrow/src/bytes.rs | 1 + rust/arrow/src/compute/kernels/arithmetic.rs | 66 ++++++++-------- rust/arrow/src/compute/kernels/length.rs | 11 ++- 5 files changed, 93 insertions(+), 83 deletions(-) diff --git a/rust/arrow/benches/buffer_create.rs b/rust/arrow/benches/buffer_create.rs index a79f6e71303..17c3423948e 100644 --- a/rust/arrow/benches/buffer_create.rs +++ b/rust/arrow/benches/buffer_create.rs @@ -50,18 +50,6 @@ fn mutable_buffer_extend(data: &[Vec], capacity: usize) -> Buffer { }) } -fn mutable_buffer_extend_trusted(data: &[Vec], capacity: usize) -> Buffer { - criterion::black_box({ - let mut result = MutableBuffer::new(capacity); - - data.iter().for_each(|vec| unsafe { - result.extend_from_trusted_len_iter(vec.iter().copied()) - }); - - result.into() - }) -} - fn from_slice(data: &[Vec], capacity: usize) -> Buffer { criterion::black_box({ let mut a = Vec::::with_capacity(capacity); @@ -99,10 +87,6 @@ fn benchmark(c: &mut Criterion) { b.iter(|| mutable_buffer_extend(&data, 0)) }); - c.bench_function("mutable extend trusted", |b| { - b.iter(|| mutable_buffer_extend_trusted(&data, 0)) - }); - c.bench_function("mutable prepared", |b| { b.iter(|| mutable_buffer(&data, byte_cap)) }); diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 4fb1695eaba..ba710acafa9 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -728,6 +728,7 @@ pub struct MutableBuffer { impl MutableBuffer { /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`. + #[inline] pub fn new(capacity: usize) -> Self { let capacity = bit_util::round_upto_multiple_of_64(capacity); let ptr = memory::allocate_aligned(capacity); @@ -904,6 +905,7 @@ impl MutableBuffer { self.into_buffer() } + #[inline] fn into_buffer(self) -> Buffer { let buffer_data = unsafe { Bytes::new(self.data, self.len, Deallocation::Native(self.capacity)) @@ -1026,16 +1028,17 @@ impl MutableBuffer { iterator.for_each(|item| self.push(item)); } +} - /// Creates a [`MutableBuffer`] from an [`ExactSizeIterator`] with a trusted len. - /// Prefer this to `collect` whenever possible, as it is faster. +impl Buffer { + /// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length. + /// Prefer this to `collect` whenever possible, as it is faster ~60% faster. /// # Example /// ``` - /// # use arrow::buffer::MutableBuffer; + /// # use arrow::buffer::Buffer; /// let v = vec![1u32]; /// let iter = v.iter().map(|x| x * 2); - /// let mut buffer = MutableBuffer::new(0); - /// unsafe { buffer.extend_from_trusted_len_iter(iter) }; + /// let buffer = unsafe { Buffer::from_trusted_len_iter(iter) }; /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes /// ``` /// # Safety @@ -1044,39 +1047,67 @@ impl MutableBuffer { // This implementation is required for two reasons: // 1. there is no trait `TrustedLen` in stable rust and therefore // we can't specialize `extend` for `TrustedLen` like `Vec` does. - // 2. `extend_from_trusted_len_iter` is faster. - pub unsafe fn extend_from_trusted_len_iter< + // 2. `from_trusted_len_iter` is faster. + pub unsafe fn from_trusted_len_iter>( + iterator: I, + ) -> Self { + let (_, upper) = iterator.size_hint(); + let upper = upper.expect("from_trusted_len_iter requires an upper limit"); + let len = upper * std::mem::size_of::(); + + let mut buffer = MutableBuffer::new(len); + + let mut dst = buffer.data.as_ptr() as *mut T; + for item in iterator { + // note how there is no reserve here (compared with `extend_from_iter`) + std::ptr::write(dst, item); + dst = dst.add(1); + } + buffer.len = len; + buffer.into() + } + + /// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length or errors + /// if any of the items of the iterator is an error. + /// Prefer this to `collect` whenever possible, as it is faster ~60% faster. + /// # Safety + /// This method assumes that the iterator's size is correct and is undefined behavior + /// to use it on an iterator that reports an incorrect length. + pub unsafe fn try_from_trusted_len_iter< + E, T: ArrowNativeType, - I: ExactSizeIterator, + I: Iterator>, >( - &mut self, iterator: I, - ) { - let len = iterator.len() * std::mem::size_of::(); + ) -> std::result::Result { + let (_, upper) = iterator.size_hint(); + let upper = upper.expect("try_from_trusted_len_iter requires an upper limit"); + let len = upper * std::mem::size_of::(); - self.reserve(len); + let mut buffer = MutableBuffer::new(len); - let mut dst = self.data.as_ptr().add(self.len) as *mut T; + let mut dst = buffer.data.as_ptr() as *mut T; for item in iterator { // note how there is no reserve here (compared with `extend_from_iter`) - std::ptr::write(dst, item); + std::ptr::write(dst, item?); dst = dst.add(1); } - self.len += len; + buffer.len = len; + Ok(buffer.into()) } } -impl FromIterator for MutableBuffer { +impl FromIterator for Buffer { fn from_iter>(iter: I) -> Self { let mut iterator = iter.into_iter(); let size = std::mem::size_of::(); // first iteration, which will likely reserve sufficient space for the buffer. let mut buffer = match iterator.next() { - None => return Self::new(0), + None => MutableBuffer::new(0), Some(element) => { let (lower, _) = iterator.size_hint(); - let mut buffer = Self::new(lower.saturating_add(1) * size); + let mut buffer = MutableBuffer::new(lower.saturating_add(1) * size); unsafe { std::ptr::write(buffer.as_mut_ptr() as *mut T, element); buffer.len = size; @@ -1086,7 +1117,7 @@ impl FromIterator for MutableBuffer { }; buffer.extend_from_iter(iterator); - buffer + buffer.into() } } @@ -1332,20 +1363,11 @@ mod tests { } #[test] - fn mutable_extend_from_trusted_len_iter() { - let mut buf = MutableBuffer::new(0); + fn test_from_trusted_len_iter() { let iter = vec![1u32, 2].into_iter(); - unsafe { buf.extend_from_trusted_len_iter(iter) }; + let buf = unsafe { Buffer::from_trusted_len_iter(iter) }; assert_eq!(8, buf.len()); assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice()); - - let iter = vec![3u32, 4].into_iter(); - unsafe { buf.extend_from_trusted_len_iter(iter) }; - assert_eq!(16, buf.len()); - assert_eq!( - &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0], - buf.as_slice() - ); } #[test] diff --git a/rust/arrow/src/bytes.rs b/rust/arrow/src/bytes.rs index 61d52ef572f..323654954f8 100644 --- a/rust/arrow/src/bytes.rs +++ b/rust/arrow/src/bytes.rs @@ -79,6 +79,7 @@ impl Bytes { /// /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. + #[inline] pub unsafe fn new( ptr: std::ptr::NonNull, len: usize, diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index eb1c5f95931..06d7c90f6a5 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -27,7 +27,9 @@ use std::sync::Arc; use num::{One, Zero}; -use crate::buffer::{Buffer, MutableBuffer}; +use crate::buffer::Buffer; +#[cfg(simd)] +use crate::buffer::MutableBuffer; use crate::compute::util::combine_option_bitmap; use crate::datatypes; use crate::datatypes::ArrowNumericType; @@ -50,13 +52,12 @@ where F: Fn(T::Native) -> T::Native, { let values = array.values().iter().map(|v| op(*v)); - let mut buffer = MutableBuffer::new(0); // JUSTIFICATION // Benefit - // ~30% speedup + // ~60% speedup // Soundness // `values` is an iterator with a known size. - unsafe { buffer.extend_from_trusted_len_iter(values) }; + let buffer = unsafe { Buffer::from_trusted_len_iter(values) }; let data = ArrayData::new( T::DATA_TYPE, @@ -64,7 +65,7 @@ where None, array.data_ref().null_buffer().cloned(), 0, - vec![buffer.into()], + vec![buffer], vec![], ); Ok(PrimitiveArray::::from(Arc::new(data))) @@ -149,13 +150,12 @@ where .iter() .zip(right.values().iter()) .map(|(l, r)| op(*l, *r)); - let mut buffer = MutableBuffer::new(0); // JUSTIFICATION // Benefit // ~60% speedup // Soundness // `values` is an iterator with a known size. - unsafe { buffer.extend_from_trusted_len_iter(values) }; + let buffer = unsafe { Buffer::from_trusted_len_iter(values) }; let data = ArrayData::new( T::DATA_TYPE, @@ -163,7 +163,7 @@ where None, null_bit_buffer, 0, - vec![buffer.into()], + vec![buffer], vec![], ); Ok(PrimitiveArray::::from(Arc::new(data))) @@ -193,33 +193,37 @@ where let null_bit_buffer = combine_option_bitmap(left.data_ref(), right.data_ref(), left.len())?; - let mut values = Vec::with_capacity(left.len()); - if let Some(b) = &null_bit_buffer { - // some value is null - for i in 0..left.len() { - let is_valid = unsafe { bit_util::get_bit_raw(b.as_ptr(), i) }; - values.push(if is_valid { - let right_value = right.value(i); - if right_value.is_zero() { - return Err(ArrowError::DivideByZero); + let buffer = if let Some(b) = &null_bit_buffer { + let values = left.values().iter().zip(right.values()).enumerate().map( + |(i, (left, right))| { + let is_valid = unsafe { bit_util::get_bit_raw(b.as_ptr(), i) }; + if is_valid { + if right.is_zero() { + Err(ArrowError::DivideByZero) + } else { + Ok(*left / *right) + } } else { - left.value(i) / right_value + Ok(T::default_value()) } - } else { - T::default_value() - }); - } + }, + ); + unsafe { Buffer::try_from_trusted_len_iter(values) } } else { // no value is null - for i in 0..left.len() { - let right_value = right.value(i); - values.push(if right_value.is_zero() { - return Err(ArrowError::DivideByZero); - } else { - left.value(i) / right_value + let values = left + .values() + .iter() + .zip(right.values()) + .map(|(left, right)| { + if right.is_zero() { + Err(ArrowError::DivideByZero) + } else { + Ok(*left / *right) + } }); - } - }; + unsafe { Buffer::try_from_trusted_len_iter(values) } + }?; let data = ArrayData::new( T::DATA_TYPE, @@ -227,7 +231,7 @@ where None, null_bit_buffer, 0, - vec![Buffer::from_slice_ref(&values)], + vec![buffer], vec![], ); Ok(PrimitiveArray::::from(Arc::new(data))) diff --git a/rust/arrow/src/compute/kernels/length.rs b/rust/arrow/src/compute/kernels/length.rs index 74a90ccc5cf..5f2ed59e262 100644 --- a/rust/arrow/src/compute/kernels/length.rs +++ b/rust/arrow/src/compute/kernels/length.rs @@ -17,7 +17,7 @@ //! Defines kernel for length of a string array -use crate::{array::*, buffer::MutableBuffer}; +use crate::{array::*, buffer::Buffer}; use crate::{ datatypes::DataType, error::{ArrowError, Result}, @@ -37,13 +37,12 @@ where let lengths = slice.windows(2).map(|offset| offset[1] - offset[0]); - let mut buffer = MutableBuffer::new(0); // JUSTIFICATION // Benefit - // ~30% speedup + // ~60% speedup // Soundness - // `windows` is an iterator with a known size. - unsafe { buffer.extend_from_trusted_len_iter(lengths) }; + // `values` is an iterator with a known size. + let buffer = unsafe { Buffer::from_trusted_len_iter(lengths) }; let null_bit_buffer = array .data_ref() @@ -57,7 +56,7 @@ where None, null_bit_buffer, 0, - vec![buffer.into()], + vec![buffer], vec![], ); Ok(make_array(Arc::new(data))) From 018a400a7e68d2cb3705d20f2c9265e3b9cb9d38 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 22 Jan 2021 19:45:51 +0100 Subject: [PATCH 11/11] Added extra safeguard. --- rust/arrow/src/buffer.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index ba710acafa9..97f9b1f9c8d 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -1063,6 +1063,11 @@ impl Buffer { std::ptr::write(dst, item); dst = dst.add(1); } + assert_eq!( + dst.offset_from(buffer.data.as_ptr() as *mut T) as usize, + upper, + "Trusted iterator length was not accurately reported" + ); buffer.len = len; buffer.into() } @@ -1092,6 +1097,11 @@ impl Buffer { std::ptr::write(dst, item?); dst = dst.add(1); } + assert_eq!( + dst.offset_from(buffer.data.as_ptr() as *mut T) as usize, + upper, + "Trusted iterator length was not accurately reported" + ); buffer.len = len; Ok(buffer.into()) }