From 69a9a1c3adc538d3bbf16811ff8c4d930f57b0fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 19 Jan 2021 07:16:09 +0900 Subject: [PATCH 01/59] ARROW-11303: [Release][C++] Enable mimalloc in the windows verification script MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #9247 from kszucs/mimalloc-windows-verification Authored-by: Krisztián Szűcs Signed-off-by: Sutou Kouhei --- dev/release/verify-release-candidate.bat | 47 +++++++++++------------- dev/tasks/verify-rc/github.win.yml | 2 +- 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/dev/release/verify-release-candidate.bat b/dev/release/verify-release-candidate.bat index 2bc66c1e865..bef78fc920c 100644 --- a/dev/release/verify-release-candidate.bat +++ b/dev/release/verify-release-candidate.bat @@ -41,20 +41,15 @@ set PYTHON=3.6 @rem Using call with conda.bat seems necessary to avoid terminating the batch @rem script execution -call conda create -p %_VERIFICATION_CONDA_ENV% ^ - --no-shortcuts -f -q -y python=%PYTHON% ^ +call conda create --no-shortcuts -c conda-forge -f -q -y -p %_VERIFICATION_CONDA_ENV% ^ + --file=ci\conda_env_cpp.yml ^ + --file=ci\conda_env_python.yml ^ + git ^ + python=%PYTHON% ^ || exit /B 1 call activate %_VERIFICATION_CONDA_ENV% || exit /B 1 -call conda install -y ^ - --no-shortcuts ^ - python=3.7 ^ - git ^ - --file=ci\conda_env_cpp.yml ^ - --file=ci\conda_env_python.yml ^ - -c conda-forge || exit /B 1 - set GENERATOR=Visual Studio 15 2017 Win64 set CONFIGURATION=release @@ -75,24 +70,25 @@ call "C:\Program Files (x86)\Microsoft Visual Studio\2017\Community\Common7\Tool @rem generator used cmake -G "%GENERATOR%" ^ - -DCMAKE_INSTALL_PREFIX=%ARROW_HOME% ^ - -DARROW_BUILD_STATIC=OFF ^ -DARROW_BOOST_USE_SHARED=ON ^ + -DARROW_BUILD_STATIC=OFF ^ -DARROW_BUILD_TESTS=ON ^ - -DGTest_SOURCE=BUNDLED ^ - -DCMAKE_BUILD_TYPE=%CONFIGURATION% ^ - -DCMAKE_UNITY_BUILD=ON ^ -DARROW_CXXFLAGS="/MP" ^ + -DARROW_DATASET=ON ^ + -DARROW_FLIGHT=ON ^ + -DARROW_MIMALLOC=ON ^ + -DARROW_PARQUET=ON ^ + -DARROW_PYTHON=ON ^ + -DARROW_WITH_BROTLI=ON ^ -DARROW_WITH_BZ2=ON ^ - -DARROW_WITH_ZLIB=ON ^ - -DARROW_WITH_ZSTD=ON ^ -DARROW_WITH_LZ4=ON ^ -DARROW_WITH_SNAPPY=ON ^ - -DARROW_WITH_BROTLI=ON ^ - -DARROW_FLIGHT=ON ^ - -DARROW_PYTHON=ON ^ - -DARROW_DATASET=ON ^ - -DARROW_PARQUET=ON ^ + -DARROW_WITH_ZLIB=ON ^ + -DARROW_WITH_ZSTD=ON ^ + -DCMAKE_BUILD_TYPE=%CONFIGURATION% ^ + -DCMAKE_INSTALL_PREFIX=%ARROW_HOME% ^ + -DCMAKE_UNITY_BUILD=ON ^ + -DGTest_SOURCE=BUNDLED ^ .. || exit /B cmake --build . --target INSTALL --config Release || exit /B 1 @@ -111,9 +107,10 @@ git clone https://github.com/apache/arrow-testing.git %_VERIFICATION_DIR%\arrow- set ARROW_TEST_DATA=%_VERIFICATION_DIR%\arrow-testing\data @rem Needed so python-test.exe works -set PYTHONPATH=%CONDA_PREFIX%\Lib;%CONDA_PREFIX%\Lib\site-packages;%CONDA_PREFIX%\python35.zip;%CONDA_PREFIX%\DLLs;%CONDA_PREFIX%;%PYTHONPATH% - +set PYTHONPATH_ORIGINAL=%PYTHONPATH% +set PYTHONPATH=%CONDA_PREFIX%\Lib;%CONDA_PREFIX%\Lib\site-packages;%CONDA_PREFIX%\DLLs;%CONDA_PREFIX%;%PYTHONPATH% ctest -VV || exit /B 1 +set PYTHONPATH=%PYTHONPATH_ORIGINAL% popd @rem Build and import pyarrow @@ -126,7 +123,7 @@ set PYARROW_WITH_FLIGHT=1 set PYARROW_WITH_PARQUET=1 set PYARROW_WITH_DATASET=1 python setup.py build_ext --inplace --bundle-arrow-cpp bdist_wheel || exit /B 1 -py.test pyarrow -v -s --enable-parquet || exit /B 1 +pytest pyarrow -v -s --enable-parquet || exit /B 1 popd diff --git a/dev/tasks/verify-rc/github.win.yml b/dev/tasks/verify-rc/github.win.yml index fbe0ee26812..7a96f89de51 100644 --- a/dev/tasks/verify-rc/github.win.yml +++ b/dev/tasks/verify-rc/github.win.yml @@ -44,7 +44,7 @@ jobs: - name: Fetch Submodules and Tags shell: bash run: cd arrow && ci/scripts/util_checkout.sh - - uses: s-weigand/setup-conda@v1 + - uses: conda-incubator/setup-miniconda@v2 - name: Install System Dependencies run: | choco install boost-msvc-14.1 From 903b41c2ad4acecec8fba3b9dbabec55eda23047 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Tue, 19 Jan 2021 10:59:52 +0900 Subject: [PATCH 02/59] ARROW-11309: [Release][C#] Use .NET 3.1 for verification Because we require .NET 3 or later since 3.0.0. Closes #9254 from kou/release-verify-macos-csharp Authored-by: Sutou Kouhei Signed-off-by: Sutou Kouhei --- dev/release/verify-release-candidate.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/release/verify-release-candidate.sh b/dev/release/verify-release-candidate.sh index a965416a927..f113a89f206 100755 --- a/dev/release/verify-release-candidate.sh +++ b/dev/release/verify-release-candidate.sh @@ -313,7 +313,7 @@ test_csharp() { fi fi else - local dotnet_version=2.2.300 + local dotnet_version=3.1.405 local dotnet_platform= case "$(uname)" in Linux) From 19e955999e76f8e0ac48667cff6b44b6a9c85fcb Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Tue, 19 Jan 2021 14:06:10 +0100 Subject: [PATCH 03/59] ARROW-11315: [Packaging][APT][arm64] Add missing gir1.2 files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #9260 from kou/release-debian-buster-arm64-add-missing-gir-gandiva Authored-by: Sutou Kouhei Signed-off-by: Krisztián Szűcs --- dev/tasks/tasks.yml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/dev/tasks/tasks.yml b/dev/tasks/tasks.yml index 10b79db749f..42eeaab7b51 100644 --- a/dev/tasks/tasks.yml +++ b/dev/tasks/tasks.yml @@ -634,6 +634,8 @@ tasks: - apache-arrow_{no_rc_version}-1.dsc - apache-arrow_{no_rc_version}.orig.tar.gz - gir1.2-arrow-1.0_{no_rc_version}-1_[a-z0-9]+.deb + - gir1.2-arrow-dataset-1.0_{no_rc_version}-1_[a-z0-9]+.deb + - gir1.2-gandiva-1.0_{no_rc_version}-1_[a-z0-9]+.deb - gir1.2-parquet-1.0_{no_rc_version}-1_[a-z0-9]+.deb - libarrow-dev_{no_rc_version}-1_[a-z0-9]+.deb - libarrow-dataset-dev_{no_rc_version}-1_[a-z0-9]+.deb @@ -745,6 +747,8 @@ tasks: - apache-arrow_{no_rc_version}-1.dsc - apache-arrow_{no_rc_version}.orig.tar.gz - gir1.2-arrow-1.0_{no_rc_version}-1_[a-z0-9]+.deb + - gir1.2-arrow-dataset-1.0_{no_rc_version}-1_[a-z0-9]+.deb + - gir1.2-gandiva-1.0_{no_rc_version}-1_[a-z0-9]+.deb - gir1.2-parquet-1.0_{no_rc_version}-1_[a-z0-9]+.deb - libarrow-dev_{no_rc_version}-1_[a-z0-9]+.deb - libarrow-glib-dev_{no_rc_version}-1_[a-z0-9]+.deb @@ -847,6 +851,8 @@ tasks: - apache-arrow_{no_rc_version}-1.dsc - apache-arrow_{no_rc_version}.orig.tar.gz - gir1.2-arrow-1.0_{no_rc_version}-1_[a-z0-9]+.deb + - gir1.2-arrow-dataset-1.0_{no_rc_version}-1_[a-z0-9]+.deb + - gir1.2-gandiva-1.0_{no_rc_version}-1_[a-z0-9]+.deb - gir1.2-parquet-1.0_{no_rc_version}-1_[a-z0-9]+.deb - libarrow-dev_{no_rc_version}-1_[a-z0-9]+.deb - libarrow-glib-dev_{no_rc_version}-1_[a-z0-9]+.deb @@ -951,6 +957,8 @@ tasks: - apache-arrow_{no_rc_version}-1.dsc - apache-arrow_{no_rc_version}.orig.tar.gz - gir1.2-arrow-1.0_{no_rc_version}-1_[a-z0-9]+.deb + - gir1.2-arrow-dataset-1.0_{no_rc_version}-1_[a-z0-9]+.deb + - gir1.2-gandiva-1.0_{no_rc_version}-1_[a-z0-9]+.deb - gir1.2-parquet-1.0_{no_rc_version}-1_[a-z0-9]+.deb - libarrow-dev_{no_rc_version}-1_[a-z0-9]+.deb - libarrow-glib-dev_{no_rc_version}-1_[a-z0-9]+.deb @@ -1055,6 +1063,7 @@ tasks: - apache-arrow_{no_rc_version}-1.dsc - apache-arrow_{no_rc_version}.orig.tar.gz - gir1.2-arrow-1.0_{no_rc_version}-1_[a-z0-9]+.deb + - gir1.2-arrow-dataset-1.0_{no_rc_version}-1_[a-z0-9]+.deb - gir1.2-gandiva-1.0_{no_rc_version}-1_[a-z0-9]+.deb - gir1.2-parquet-1.0_{no_rc_version}-1_[a-z0-9]+.deb - libarrow-dataset-dev_{no_rc_version}-1_[a-z0-9]+.deb From 17a3fab1d919853a06f5684bcc9fbd51bf533370 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Tue, 19 Jan 2021 14:43:40 +0100 Subject: [PATCH 04/59] ARROW-11314: [Release][APT][Yum] Add support for verifying arm64 packages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #9259 from kou/release-verify-arm64 Authored-by: Sutou Kouhei Signed-off-by: Krisztián Szűcs --- dev/tasks/verify-rc/github.linux.yml | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/dev/tasks/verify-rc/github.linux.yml b/dev/tasks/verify-rc/github.linux.yml index 49d937ac6fa..4da78c80b04 100644 --- a/dev/tasks/verify-rc/github.linux.yml +++ b/dev/tasks/verify-rc/github.linux.yml @@ -27,7 +27,7 @@ on: jobs: verify: name: "Verify release candidate Ubuntu {{ artifact }}" - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 {%- if env is defined %} env: {%- for key, value in env.items() %} @@ -49,8 +49,17 @@ jobs: # TODO: don't require removing newer llvms sudo apt-get --purge remove -y llvm-9 clang-9 sudo apt-get install -y \ - wget curl libboost-all-dev jq \ - autoconf-archive gtk-doc-tools libgirepository1.0-dev flex bison + autoconf-archive \ + binfmt-support \ + bison \ + curl \ + flex \ + gtk-doc-tools \ + jq \ + libboost-all-dev \ + libgirepository1.0-dev \ + qemu-user-static \ + wget if [ "$TEST_JAVA" = "1" ]; then # Maven From 275fda1f59483839c9af88dc1770bdaccce63e06 Mon Sep 17 00:00:00 2001 From: Kenta Murata Date: Tue, 19 Jan 2021 15:54:57 +0100 Subject: [PATCH 05/59] ARROW-7633: [C++][CI] Create fuzz targets for tensors and sparse tensors Closes #6302 from mrkn/ARROW-7633 Lead-authored-by: Kenta Murata Co-authored-by: Antoine Pitrou Signed-off-by: Antoine Pitrou --- .../fuzzing/generate_corpuses.sh | 4 + cpp/src/arrow/ipc/CMakeLists.txt | 5 + .../arrow/ipc/generate_tensor_fuzz_corpus.cc | 134 ++++++++++++++++++ cpp/src/arrow/ipc/metadata_internal.cc | 11 +- cpp/src/arrow/ipc/reader.cc | 17 +++ cpp/src/arrow/ipc/reader.h | 2 + cpp/src/arrow/ipc/tensor_stream_fuzz.cc | 29 ++++ cpp/src/arrow/ipc/test_common.cc | 99 +++++++++++++ cpp/src/arrow/ipc/test_common.h | 6 + cpp/src/arrow/tensor.cc | 12 +- cpp/src/arrow/tensor.h | 9 ++ 11 files changed, 319 insertions(+), 9 deletions(-) create mode 100644 cpp/src/arrow/ipc/generate_tensor_fuzz_corpus.cc create mode 100644 cpp/src/arrow/ipc/tensor_stream_fuzz.cc diff --git a/cpp/build-support/fuzzing/generate_corpuses.sh b/cpp/build-support/fuzzing/generate_corpuses.sh index 11f6b90d410..f0d8e162375 100755 --- a/cpp/build-support/fuzzing/generate_corpuses.sh +++ b/cpp/build-support/fuzzing/generate_corpuses.sh @@ -42,6 +42,10 @@ rm -rf ${CORPUS_DIR} ${OUT}/arrow-ipc-generate-fuzz-corpus -file ${CORPUS_DIR} ${ARROW_CPP}/build-support/fuzzing/pack_corpus.py ${CORPUS_DIR} ${OUT}/arrow-ipc-file-fuzz_seed_corpus.zip +rm -rf ${CORPUS_DIR} +${OUT}/arrow-ipc-generate-tensor-fuzz-corpus -stream ${CORPUS_DIR} +${ARROW_CPP}/build-support/fuzzing/pack_corpus.py ${CORPUS_DIR} ${OUT}/arrow-ipc-tensor-stream-fuzz_seed_corpus.zip + rm -rf ${CORPUS_DIR} ${OUT}/parquet-arrow-generate-fuzz-corpus ${CORPUS_DIR} cp ${ARROW_CPP}/submodules/parquet-testing/data/*.parquet ${CORPUS_DIR} diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index 335a858dc35..495018ec096 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -76,7 +76,12 @@ if(ARROW_FUZZING) add_executable(arrow-ipc-generate-fuzz-corpus generate_fuzz_corpus.cc) target_link_libraries(arrow-ipc-generate-fuzz-corpus ${ARROW_UTIL_LIB} ${ARROW_TEST_LINK_LIBS}) + + add_executable(arrow-ipc-generate-tensor-fuzz-corpus generate_tensor_fuzz_corpus.cc) + target_link_libraries(arrow-ipc-generate-tensor-fuzz-corpus ${ARROW_UTIL_LIB} + ${ARROW_TEST_LINK_LIBS}) endif() add_arrow_fuzz_target(file_fuzz PREFIX "arrow-ipc") add_arrow_fuzz_target(stream_fuzz PREFIX "arrow-ipc") +add_arrow_fuzz_target(tensor_stream_fuzz PREFIX "arrow-ipc") diff --git a/cpp/src/arrow/ipc/generate_tensor_fuzz_corpus.cc b/cpp/src/arrow/ipc/generate_tensor_fuzz_corpus.cc new file mode 100644 index 00000000000..dd40ef0ab2f --- /dev/null +++ b/cpp/src/arrow/ipc/generate_tensor_fuzz_corpus.cc @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A command line executable that generates a bunch of valid IPC files +// containing example tensors. Those are used as fuzzing seeds to make +// fuzzing more efficient. + +#include +#include +#include +#include +#include + +#include "arrow/io/file.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/test_common.h" +#include "arrow/ipc/writer.h" +#include "arrow/result.h" +#include "arrow/tensor.h" +#include "arrow/util/io_util.h" + +namespace arrow { +namespace ipc { + +using ::arrow::internal::PlatformFilename; + +Result PrepareDirectory(const std::string& dir) { + ARROW_ASSIGN_OR_RAISE(auto dir_fn, PlatformFilename::FromString(dir)); + RETURN_NOT_OK(::arrow::internal::CreateDir(dir_fn)); + return std::move(dir_fn); +} + +Result> MakeSerializedBuffer( + std::function&)> fn) { + ARROW_ASSIGN_OR_RAISE(auto sink, io::BufferOutputStream::Create(1024)); + RETURN_NOT_OK(fn(sink)); + return sink->Finish(); +} + +Result> SerializeTensor(const std::shared_ptr& tensor) { + return MakeSerializedBuffer( + [&](const std::shared_ptr& sink) -> Status { + int32_t metadata_length; + int64_t body_length; + return ipc::WriteTensor(*tensor, sink.get(), &metadata_length, &body_length); + }); +} + +Result>> Tensors() { + std::vector> tensors; + std::shared_ptr tensor; + std::vector shape = {5, 3, 7}; + std::shared_ptr types[] = {int8(), int16(), int32(), int64(), + uint8(), uint16(), uint32(), uint64()}; + uint32_t seed = 0; + for (auto type : types) { + RETURN_NOT_OK( + test::MakeRandomTensor(type, shape, /*row_major_p=*/true, &tensor, seed++)); + tensors.push_back(tensor); + RETURN_NOT_OK( + test::MakeRandomTensor(type, shape, /*row_major_p=*/false, &tensor, seed++)); + tensors.push_back(tensor); + } + return tensors; +} + +Status GenerateTensors(const PlatformFilename& dir_fn) { + int sample_num = 1; + auto sample_name = [&]() -> std::string { + return "tensor-" + std::to_string(sample_num++); + }; + + ARROW_ASSIGN_OR_RAISE(auto tensors, Tensors()); + + for (const auto& tensor : tensors) { + ARROW_ASSIGN_OR_RAISE(auto buf, SerializeTensor(tensor)); + ARROW_ASSIGN_OR_RAISE(auto sample_fn, dir_fn.Join(sample_name())); + std::cerr << sample_fn.ToString() << std::endl; + ARROW_ASSIGN_OR_RAISE(auto file, io::FileOutputStream::Open(sample_fn.ToString())); + RETURN_NOT_OK(file->Write(buf)); + RETURN_NOT_OK(file->Close()); + } + return Status::OK(); +} + +Status DoMain(const std::string& out_dir) { + ARROW_ASSIGN_OR_RAISE(auto dir_fn, PrepareDirectory(out_dir)); + return GenerateTensors(dir_fn); +} + +ARROW_NORETURN void Usage() { + std::cerr << "Usage: arrow-ipc-generate-tensor-fuzz-corpus " + << "-stream " << std::endl; + std::exit(2); +} + +int Main(int argc, char** argv) { + if (argc != 3) { + Usage(); + } + + auto opt = std::string(argv[1]); + if (opt != "-stream") { + Usage(); + } + + auto out_dir = std::string(argv[2]); + + Status st = DoMain(out_dir); + if (!st.ok()) { + std::cerr << st.ToString() << std::endl; + return 1; + } + return 0; +} + +} // namespace ipc +} // namespace arrow + +int main(int argc, char** argv) { return arrow::ipc::Main(argc, argv); } diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index 250ac950208..f818aebab24 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -1349,9 +1349,9 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr* type return Status::IOError("Header-type of flatbuffer-encoded Message is not Tensor."); } - int ndim = static_cast(tensor->shape()->size()); + flatbuffers::uoffset_t ndim = tensor->shape()->size(); - for (int i = 0; i < ndim; ++i) { + for (flatbuffers::uoffset_t i = 0; i < ndim; ++i) { auto dim = tensor->shape()->Get(i); shape->push_back(dim->size()); @@ -1359,7 +1359,12 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr* type } if (tensor->strides() && tensor->strides()->size() > 0) { - for (int i = 0; i < ndim; ++i) { + if (tensor->strides()->size() != ndim) { + return Status::IOError( + "The sizes of shape and strides in a tensor are mismatched."); + } + + for (decltype(ndim) i = 0; i < ndim; ++i) { strides->push_back(tensor->strides()->Get(i)); } } diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index e20208a14dc..c4cc84ec09d 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -1747,6 +1747,23 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) { return Status::OK(); } +Status FuzzIpcTensorStream(const uint8_t* data, int64_t size) { + auto buffer = std::make_shared(data, size); + io::BufferReader buffer_reader(buffer); + + std::shared_ptr tensor; + + while (true) { + ARROW_ASSIGN_OR_RAISE(tensor, ReadTensor(&buffer_reader)); + if (tensor == nullptr) { + break; + } + RETURN_NOT_OK(tensor->Validate()); + } + + return Status::OK(); +} + } // namespace internal } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 84934f6ac82..fe9a3b72e16 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -485,6 +485,8 @@ Result> ReadSparseTensorPayload(const IpcPayload& ARROW_EXPORT Status FuzzIpcStream(const uint8_t* data, int64_t size); ARROW_EXPORT +Status FuzzIpcTensorStream(const uint8_t* data, int64_t size); +ARROW_EXPORT Status FuzzIpcFile(const uint8_t* data, int64_t size); } // namespace internal diff --git a/cpp/src/arrow/ipc/tensor_stream_fuzz.cc b/cpp/src/arrow/ipc/tensor_stream_fuzz.cc new file mode 100644 index 00000000000..7524940e17d --- /dev/null +++ b/cpp/src/arrow/ipc/tensor_stream_fuzz.cc @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/ipc/reader.h" +#include "arrow/status.h" +#include "arrow/util/macros.h" + +extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { + auto status = + arrow::ipc::internal::FuzzIpcTensorStream(data, static_cast(size)); + ARROW_UNUSED(status); + return 0; +} diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc index 31b1f655cd4..a0f61ba9d94 100644 --- a/cpp/src/arrow/ipc/test_common.cc +++ b/cpp/src/arrow/ipc/test_common.cc @@ -17,9 +17,11 @@ #include #include +#include #include #include #include +#include #include #include "arrow/array.h" @@ -30,6 +32,7 @@ #include "arrow/pretty_print.h" #include "arrow/record_batch.h" #include "arrow/status.h" +#include "arrow/tensor.h" #include "arrow/testing/extension_type.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" @@ -1000,6 +1003,102 @@ Status MakeDictExtension(std::shared_ptr* out) { return Status::OK(); } +namespace { + +template +void FillRandomData(CValueType* data, size_t n, CValueType min, CValueType max, + SeedType seed) { + std::default_random_engine rng(seed); + DistributionType dist(min, max); + std::generate(data, data + n, + [&dist, &rng] { return static_cast(dist(rng)); }); +} + +template +enable_if_t::value && std::is_signed::value, + void> +FillRandomData(CValueType* data, size_t n, SeedType seed) { + FillRandomData>( + data, n, -1000, 1000, seed); +} + +template +enable_if_t::value && std::is_unsigned::value, + void> +FillRandomData(CValueType* data, size_t n, SeedType seed) { + FillRandomData>( + data, n, 0, 1000, seed); +} + +template +enable_if_t::value, void> FillRandomData( + CValueType* data, size_t n, SeedType seed) { + FillRandomData>( + data, n, -1000, 1000, seed); +} + +} // namespace + +Status MakeRandomTensor(const std::shared_ptr& type, + const std::vector& shape, bool row_major_p, + std::shared_ptr* out, uint32_t seed) { + const auto& element_type = internal::checked_cast(*type); + std::vector strides; + if (row_major_p) { + internal::ComputeRowMajorStrides(element_type, shape, &strides); + } else { + internal::ComputeColumnMajorStrides(element_type, shape, &strides); + } + + const int64_t element_size = element_type.bit_width() / CHAR_BIT; + const int64_t len = + std::accumulate(shape.begin(), shape.end(), int64_t(1), std::multiplies()); + + ARROW_ASSIGN_OR_RAISE(std::shared_ptr buf, AllocateBuffer(element_size * len)); + + switch (type->id()) { + case Type::INT8: + FillRandomData>( + reinterpret_cast(buf->mutable_data()), len, -128, 127, seed); + break; + case Type::UINT8: + FillRandomData>( + reinterpret_cast(buf->mutable_data()), len, 0, 255, seed); + break; + case Type::INT16: + FillRandomData(reinterpret_cast(buf->mutable_data()), len, seed); + break; + case Type::UINT16: + FillRandomData(reinterpret_cast(buf->mutable_data()), len, seed); + break; + case Type::INT32: + FillRandomData(reinterpret_cast(buf->mutable_data()), len, seed); + break; + case Type::UINT32: + FillRandomData(reinterpret_cast(buf->mutable_data()), len, seed); + break; + case Type::INT64: + FillRandomData(reinterpret_cast(buf->mutable_data()), len, seed); + break; + case Type::UINT64: + FillRandomData(reinterpret_cast(buf->mutable_data()), len, seed); + break; + case Type::HALF_FLOAT: + FillRandomData(reinterpret_cast(buf->mutable_data()), len, seed); + break; + case Type::FLOAT: + FillRandomData(reinterpret_cast(buf->mutable_data()), len, seed); + break; + case Type::DOUBLE: + FillRandomData(reinterpret_cast(buf->mutable_data()), len, seed); + break; + default: + return Status::Invalid(type->ToString(), " is not valid data type for a tensor"); + } + + return Tensor::Make(type, buf, shape, strides).Value(out); +} + } // namespace test } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/test_common.h b/cpp/src/arrow/ipc/test_common.h index 021d70258b8..2217bae39fc 100644 --- a/cpp/src/arrow/ipc/test_common.h +++ b/cpp/src/arrow/ipc/test_common.h @@ -19,6 +19,7 @@ #include #include +#include #include "arrow/array.h" #include "arrow/record_batch.h" @@ -161,6 +162,11 @@ Status MakeUuid(std::shared_ptr* out); ARROW_TESTING_EXPORT Status MakeDictExtension(std::shared_ptr* out); +ARROW_TESTING_EXPORT +Status MakeRandomTensor(const std::shared_ptr& type, + const std::vector& shape, bool row_major_p, + std::shared_ptr* out, uint32_t seed = 0); + } // namespace test } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/tensor.cc b/cpp/src/arrow/tensor.cc index 21ad01e31ca..894a94c40cf 100644 --- a/cpp/src/arrow/tensor.cc +++ b/cpp/src/arrow/tensor.cc @@ -59,11 +59,9 @@ void ComputeRowMajorStrides(const FixedWidthType& type, const std::vector& shape, - std::vector* strides) { +void ComputeColumnMajorStrides(const FixedWidthType& type, + const std::vector& shape, + std::vector* strides) { const int byte_width = internal::GetByteWidth(type); int64_t total = byte_width; for (int64_t dimsize : shape) { @@ -78,6 +76,8 @@ static void ComputeColumnMajorStrides(const FixedWidthType& type, } } +} // namespace internal + namespace { inline bool IsTensorStridesRowMajor(const std::shared_ptr& type, @@ -94,7 +94,7 @@ inline bool IsTensorStridesColumnMajor(const std::shared_ptr& type, const std::vector& strides) { std::vector f_strides; const auto& fw_type = checked_cast(*type); - ComputeColumnMajorStrides(fw_type, shape, &f_strides); + internal::ComputeColumnMajorStrides(fw_type, shape, &f_strides); return strides == f_strides; } diff --git a/cpp/src/arrow/tensor.h b/cpp/src/arrow/tensor.h index be24cc13fc8..22da07a16ed 100644 --- a/cpp/src/arrow/tensor.h +++ b/cpp/src/arrow/tensor.h @@ -59,6 +59,11 @@ ARROW_EXPORT void ComputeRowMajorStrides(const FixedWidthType& type, const std::vector& shape, std::vector* strides); +ARROW_EXPORT +void ComputeColumnMajorStrides(const FixedWidthType& type, + const std::vector& shape, + std::vector* strides); + ARROW_EXPORT bool IsTensorStridesContiguous(const std::shared_ptr& type, const std::vector& shape, @@ -174,6 +179,10 @@ class ARROW_EXPORT Tensor { return *ptr; } + Status Validate() const { + return internal::ValidateTensorParameters(type_, data_, shape_, strides_, dim_names_); + } + protected: Tensor() {} From 2d3e8f96a7c1aefd6eed5c69bdbcd34b1d1c2d3e Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Tue, 19 Jan 2021 10:43:23 -0500 Subject: [PATCH 06/59] ARROW-11246: [Rust] Add type to Unexpected accumulator state error I am getting a bug with an error: Unexpected accumulator state, but It's not possible to understand what value was passed when the exception is done on the user's side. I add type to the error message to make investigation of the bug more easy. Closes #9201 from ovr/unexpected-accumulator-state Authored-by: Dmitry Patsura Signed-off-by: Andrew Lamb --- rust/datafusion/src/physical_plan/distinct_expressions.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/src/physical_plan/distinct_expressions.rs b/rust/datafusion/src/physical_plan/distinct_expressions.rs index 9aa639a4314..33001d393b5 100644 --- a/rust/datafusion/src/physical_plan/distinct_expressions.rs +++ b/rust/datafusion/src/physical_plan/distinct_expressions.rs @@ -132,9 +132,10 @@ impl Accumulator for DistinctCountAccumulator { .iter() .map(|state| match state { ScalarValue::List(Some(values), _) => Ok(values), - _ => Err(DataFusionError::Internal( - "Unexpected accumulator state".to_string(), - )), + _ => Err(DataFusionError::Internal(format!( + "Unexpected accumulator state {:?}", + state + ))), }) .collect::>>()?; From e20f4392b78a622f16e20af953c3f29a98af1774 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 19 Jan 2021 10:49:28 -0500 Subject: [PATCH 07/59] ARROW-11254: [Rust][DataFusion] Add SIMD and snmalloc flags as options to benchmarks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I think it would be great to test some compilation options individually on the benchmarks to see the impact. Here are some examples of the impact it can have on the queries: --features "": ``` Query 5 iteration 0 took 655.3 ms Query 5 iteration 1 took 648.9 ms Query 5 iteration 2 took 640.3 ms Query 5 iteration 3 took 658.7 ms Query 5 iteration 4 took 646.3 ms Query 5 iteration 5 took 684.1 ms Query 5 iteration 6 took 642.8 ms Query 5 iteration 7 took 656.9 ms Query 5 iteration 8 took 646.0 ms Query 5 iteration 9 took 669.1 ms Query 5 avg time: 654.85 ms ``` --features "snmalloc" ``` Query 5 iteration 0 took 525.4 ms Query 5 iteration 1 took 478.8 ms Query 5 iteration 2 took 485.7 ms Query 5 iteration 3 took 486.6 ms Query 5 iteration 4 took 482.6 ms Query 5 iteration 5 took 473.1 ms Query 5 iteration 6 took 494.4 ms Query 5 iteration 7 took 483.5 ms Query 5 iteration 8 took 493.1 ms Query 5 iteration 9 took 479.4 ms Query 5 avg time: 488.26 ms ``` --features "" ``` Query 12 iteration 0 took 241.4 ms Query 12 iteration 1 took 234.8 ms Query 12 iteration 2 took 229.8 ms Query 12 iteration 3 took 229.5 ms Query 12 iteration 4 took 228.3 ms Query 12 iteration 5 took 230.0 ms Query 12 iteration 6 took 228.3 ms Query 12 iteration 7 took 229.3 ms Query 12 iteration 8 took 229.9 ms Query 12 iteration 9 took 230.1 ms Query 12 avg time: 231.13 ms ``` --features "simd" ``` Query 12 iteration 0 took 157.7 ms Query 12 iteration 1 took 159.3 ms Query 12 iteration 2 took 156.9 ms Query 12 iteration 3 took 163.0 ms Query 12 iteration 4 took 157.5 ms Query 12 iteration 5 took 157.6 ms Query 12 iteration 6 took 156.6 ms Query 12 iteration 7 took 157.4 ms Query 12 iteration 8 took 158.6 ms Query 12 iteration 9 took 157.0 ms Query 12 avg time: 158.16 ms ``` Closes #9206 from Dandandan/custom_alloc Lead-authored-by: Daniël Heres Co-authored-by: Heres, Daniel Signed-off-by: Andrew Lamb --- rust/benchmarks/Cargo.toml | 7 ++++++- rust/benchmarks/README.md | 6 ++++++ rust/benchmarks/src/bin/nyctaxi.rs | 4 ++++ rust/benchmarks/src/bin/tpch.rs | 4 ++++ 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/rust/benchmarks/Cargo.toml b/rust/benchmarks/Cargo.toml index 74b74ece35a..60675efb913 100644 --- a/rust/benchmarks/Cargo.toml +++ b/rust/benchmarks/Cargo.toml @@ -26,6 +26,10 @@ repository = "https://github.com/apache/arrow" license = "Apache-2.0" publish = false +[features] +simd = ["datafusion/simd"] +snmalloc = ["snmalloc-rs"] + [dependencies] arrow = { path = "../arrow" } parquet = { path = "../parquet" } @@ -33,4 +37,5 @@ datafusion = { path = "../datafusion" } structopt = { version = "0.3", default-features = false } tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] } futures = "0.3" -env_logger = "^0.8" \ No newline at end of file +env_logger = "^0.8" +snmalloc-rs = {version = "0.2", optional = true, features= ["cache-friendly"] } diff --git a/rust/benchmarks/README.md b/rust/benchmarks/README.md index 2ae035b9fc4..bb56b75a1bb 100644 --- a/rust/benchmarks/README.md +++ b/rust/benchmarks/README.md @@ -53,6 +53,12 @@ The benchmark can then be run (assuming the data created from `dbgen` is in `/mn cargo run --release --bin tpch -- benchmark --iterations 3 --path /mnt/tpch-dbgen --format tbl --query 1 --batch-size 4096 ``` +You can enable the features `simd` (to use SIMD instructions) and/or `snmalloc` (to use the snmalloc allocator) as features by passing them in as `--features`: + +``` +cargo run --release --features "simd snmalloc" --bin tpch -- benchmark --iterations 3 --path /mnt/tpch-dbgen --format tbl --query 1 --batch-size 4096 +``` + The benchmark program also supports CSV and Parquet input file formats and a utility is provided to convert from `tbl` (generated by the `dbgen` utility) to CSV and Parquet. diff --git a/rust/benchmarks/src/bin/nyctaxi.rs b/rust/benchmarks/src/bin/nyctaxi.rs index a3cac44afc3..3391c38c8d3 100644 --- a/rust/benchmarks/src/bin/nyctaxi.rs +++ b/rust/benchmarks/src/bin/nyctaxi.rs @@ -31,6 +31,10 @@ use datafusion::physical_plan::collect; use datafusion::physical_plan::csv::CsvReadOptions; use structopt::StructOpt; +#[cfg(feature = "snmalloc")] +#[global_allocator] +static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; + #[derive(Debug, StructOpt)] #[structopt(name = "Benchmarks", about = "Apache Arrow Rust Benchmarks.")] struct Opt { diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index 539b8d23d08..835c89396c1 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -33,6 +33,10 @@ use parquet::basic::Compression; use parquet::file::properties::WriterProperties; use structopt::StructOpt; +#[cfg(feature = "snmalloc")] +#[global_allocator] +static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; + #[derive(Debug, StructOpt)] struct BenchmarkOpt { /// Query number From 18dc62ce8d34914e5098f2fedfdf5fce9b7fd179 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Tue, 19 Jan 2021 10:50:44 -0500 Subject: [PATCH 08/59] ARROW-11074: [Rust][DataFusion] Implement predicate push-down for parquet tables While profiling a DataFusion query I found that the code spends a lot of time in reading data from parquet files. Predicate / filter push-down is a commonly used performance optimization, where statistics data stored in parquet files (such as min / max values for columns in a parquet row group) is evaluated against query filters to determine which row groups could contain data requested by a query. In this way, by pushing down query filters all the way to the parquet data source, entire row groups or even parquet files can be skipped often resulting in significant performance improvements. I have been working on an implementation for a few weeks and initial results look promising - with predicate push-down, DataFusion is now faster than Apache Spark (`140ms for DataFusion vs 200ms for Spark`) for the same query against the same parquet files. Without predicate push-down into parquet, DataFusion takes about 2 - 3s (depending on concurrency) for the same query, because the data is ordered and most files don't contain data that satisfies the query filters, but are still loaded and processed in vain. This work is based on the following key ideas: * predicate-push down is implemented by filtering row group metadata entries to only those which could contain data that could satisfy query filters * it's best to reuse the existing code for evaluating physical expressions already implemented in DataFusion * filter expressions pushed down to a parquet table are rewritten to use parquet statistics (instead of the actual column data), for example `(column / 2) = 4` becomes `(column_min / 2) <= 4 && 4 <= (column_max / 2)` - this is done once for all files in a parquet table * for each parquet file, a RecordBatch containing all required statistics columns ( [`column_min`, `column_max`] in the example above) is produced, and the predicate expression from the previous step is evaluated, producing a binary array which is finally used to filter the row groups in each parquet file This is still work in progress - more tests left to write; I am publishing this now to gather feedback. @andygrove let me know what you think Closes #9064 from yordan-pavlov/parquet_predicate_push_down Authored-by: Yordan Pavlov Signed-off-by: Andrew Lamb --- rust/arrow/src/array/null.rs | 15 + rust/datafusion/src/datasource/parquet.rs | 56 +- rust/datafusion/src/logical_plan/operators.rs | 2 +- rust/datafusion/src/optimizer/utils.rs | 2 +- rust/datafusion/src/physical_plan/parquet.rs | 1006 ++++++++++++++++- rust/datafusion/src/physical_plan/planner.rs | 2 +- rust/datafusion/src/sql/utils.rs | 2 +- rust/parquet/src/arrow/array_reader.rs | 42 +- rust/parquet/src/file/serialized_reader.rs | 35 + 9 files changed, 1137 insertions(+), 25 deletions(-) diff --git a/rust/arrow/src/array/null.rs b/rust/arrow/src/array/null.rs index 08c7cf1f21e..ed3a0ad7d68 100644 --- a/rust/arrow/src/array/null.rs +++ b/rust/arrow/src/array/null.rs @@ -52,6 +52,12 @@ impl NullArray { let array_data = ArrayData::builder(DataType::Null).len(length).build(); NullArray::from(array_data) } + + /// Create a new null array of the specified length and type + pub fn new_with_type(length: usize, data_type: DataType) -> Self { + let array_data = ArrayData::builder(data_type).len(length).build(); + NullArray::from(array_data) + } } impl Array for NullArray { @@ -147,6 +153,15 @@ mod tests { assert_eq!(array2.offset(), 8); } + #[test] + fn test_null_array_new_with_type() { + let length = 10; + let data_type = DataType::Int8; + let array = NullArray::new_with_type(length, data_type.clone()); + assert_eq!(array.len(), length); + assert_eq!(array.data_type(), &data_type); + } + #[test] fn test_debug_null_array() { let array = NullArray::new(1024 * 1024); diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index 3e60d03eb28..8630306170c 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -30,6 +30,8 @@ use crate::logical_plan::Expr; use crate::physical_plan::parquet::ParquetExec; use crate::physical_plan::ExecutionPlan; +use super::datasource::TableProviderFilterPushDown; + /// Table-based representation of a `ParquetFile`. pub struct ParquetTable { path: String, @@ -41,7 +43,7 @@ pub struct ParquetTable { impl ParquetTable { /// Attempt to initialize a new `ParquetTable` from a file path. pub fn try_new(path: &str, max_concurrency: usize) -> Result { - let parquet_exec = ParquetExec::try_from_path(path, None, 0, 1)?; + let parquet_exec = ParquetExec::try_from_path(path, None, None, 0, 1)?; let schema = parquet_exec.schema(); Ok(Self { path: path.to_string(), @@ -67,17 +69,26 @@ impl TableProvider for ParquetTable { self.schema.clone() } + fn supports_filter_pushdown( + &self, + _filter: &Expr, + ) -> Result { + Ok(TableProviderFilterPushDown::Inexact) + } + /// Scan the file(s), using the provided projection, and return one BatchIterator per /// partition. fn scan( &self, projection: &Option>, batch_size: usize, - _filters: &[Expr], + filters: &[Expr], ) -> Result> { + let predicate = combine_filters(filters); Ok(Arc::new(ParquetExec::try_from_path( &self.path, projection.clone(), + predicate, batch_size, self.max_concurrency, )?)) @@ -88,6 +99,22 @@ impl TableProvider for ParquetTable { } } +/// Combines an array of filter expressions into a single filter expression +/// consisting of the input filter expressions joined with logical AND. +/// Returns None if the filters array is empty. +fn combine_filters(filters: &[Expr]) -> Option { + if filters.is_empty() { + return None; + } + let combined_filter = filters + .iter() + .skip(1) + .fold(filters[0].clone(), |acc, filter| { + crate::logical_plan::and(acc, filter.clone()) + }); + Some(combined_filter) +} + #[cfg(test)] mod tests { use super::*; @@ -333,4 +360,29 @@ mod tests { .expect("should have received at least one batch") .map_err(|e| e.into()) } + + #[test] + fn combine_zero_filters() { + let result = combine_filters(&[]); + assert_eq!(result, None); + } + + #[test] + fn combine_one_filter() { + use crate::logical_plan::{binary_expr, col, lit, Operator}; + let filter = binary_expr(col("c1"), Operator::Lt, lit(1)); + let result = combine_filters(&[filter.clone()]); + assert_eq!(result, Some(filter)); + } + + #[test] + fn combine_multiple_filters() { + use crate::logical_plan::{and, binary_expr, col, lit, Operator}; + let filter1 = binary_expr(col("c1"), Operator::Lt, lit(1)); + let filter2 = binary_expr(col("c2"), Operator::Lt, lit(2)); + let filter3 = binary_expr(col("c3"), Operator::Lt, lit(3)); + let result = + combine_filters(&[filter1.clone(), filter2.clone(), filter3.clone()]); + assert_eq!(result, Some(and(and(filter1, filter2), filter3))); + } } diff --git a/rust/datafusion/src/logical_plan/operators.rs b/rust/datafusion/src/logical_plan/operators.rs index b59462f73df..dac48d40b48 100644 --- a/rust/datafusion/src/logical_plan/operators.rs +++ b/rust/datafusion/src/logical_plan/operators.rs @@ -20,7 +20,7 @@ use std::{fmt, ops}; use super::{binary_expr, Expr}; /// Operators applied to expressions -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum Operator { /// Expressions are equal Eq, diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs index 75661c6f723..b9e67a43d7e 100644 --- a/rust/datafusion/src/optimizer/utils.rs +++ b/rust/datafusion/src/optimizer/utils.rs @@ -332,7 +332,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &Vec) -> Result match expr { Expr::BinaryExpr { op, .. } => Ok(Expr::BinaryExpr { left: Box::new(expressions[0].clone()), - op: op.clone(), + op: *op, right: Box::new(expressions[1].clone()), }), Expr::IsNull(_) => Ok(Expr::IsNull(Box::new(expressions[0].clone()))), diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index a92a35992a4..af821a54fc7 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -17,20 +17,40 @@ //! Execution plan for reading Parquet files -use std::any::Any; use std::fmt; use std::fs::File; use std::sync::Arc; use std::task::{Context, Poll}; +use std::{ + any::Any, + collections::{HashMap, HashSet}, +}; -use super::{RecordBatchStream, SendableRecordBatchStream}; -use crate::error::{DataFusionError, Result}; +use super::{ + planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, RecordBatchStream, + SendableRecordBatchStream, +}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::{common, Partitioning}; -use arrow::datatypes::{Schema, SchemaRef}; +use crate::{ + error::{DataFusionError, Result}, + execution::context::ExecutionContextState, + logical_plan::{Expr, Operator}, + optimizer::utils, + prelude::ExecutionConfig, +}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; -use parquet::file::reader::SerializedFileReader; +use arrow::{ + array::{make_array, ArrayData, ArrayRef, BooleanArray, BooleanBufferBuilder}, + buffer::MutableBuffer, + datatypes::{DataType, Field, Schema, SchemaRef}, +}; +use parquet::file::{ + metadata::RowGroupMetaData, + reader::{FileReader, SerializedFileReader}, + statistics::Statistics as ParquetStatistics, +}; use crossbeam::channel::{bounded, Receiver, RecvError, Sender}; use fmt::Debug; @@ -54,6 +74,8 @@ pub struct ParquetExec { batch_size: usize, /// Statistics for the data set (sum of statistics for all partitions) statistics: Statistics, + /// Optional predicate builder + predicate_builder: Option, } /// Represents one partition of a Parquet data set and this currently means one Parquet file. @@ -79,6 +101,7 @@ impl ParquetExec { pub fn try_from_path( path: &str, projection: Option>, + predicate: Option, batch_size: usize, max_concurrency: usize, ) -> Result { @@ -96,7 +119,13 @@ impl ParquetExec { .iter() .map(|filename| filename.as_str()) .collect::>(); - Self::try_from_files(&filenames, projection, batch_size, max_concurrency) + Self::try_from_files( + &filenames, + projection, + predicate, + batch_size, + max_concurrency, + ) } } @@ -105,6 +134,7 @@ impl ParquetExec { pub fn try_from_files( filenames: &[&str], projection: Option>, + predicate: Option, batch_size: usize, max_concurrency: usize, ) -> Result { @@ -156,8 +186,17 @@ impl ParquetExec { ))); } let schema = schemas[0].clone(); + let predicate_builder = predicate.and_then(|predicate_expr| { + RowGroupPredicateBuilder::try_new(&predicate_expr, schema.clone()).ok() + }); - Ok(Self::new(partitions, schema, projection, batch_size)) + Ok(Self::new( + partitions, + schema, + projection, + predicate_builder, + batch_size, + )) } /// Create a new Parquet reader execution plan with provided partitions and schema @@ -165,6 +204,7 @@ impl ParquetExec { partitions: Vec, schema: Schema, projection: Option>, + predicate_builder: Option, batch_size: usize, ) -> Self { let projection = match projection { @@ -199,6 +239,7 @@ impl ParquetExec { partitions, schema: Arc::new(projected_schema), projection, + predicate_builder, batch_size, statistics, } @@ -237,6 +278,458 @@ impl ParquetPartition { } } +#[derive(Debug, Clone)] +/// Predicate builder used for generating of predicate functions, used to filter row group metadata +pub struct RowGroupPredicateBuilder { + parquet_schema: Schema, + predicate_expr: Arc, + stat_column_req: Vec<(String, StatisticsType, Field)>, +} + +impl RowGroupPredicateBuilder { + /// Try to create a new instance of PredicateExpressionBuilder. + /// This will translate the filter expression into a statistics predicate expression + /// (for example (column / 2) = 4 becomes (column_min / 2) <= 4 && 4 <= (column_max / 2)), + /// then convert it to a DataFusion PhysicalExpression and cache it for later use by build_row_group_predicate. + pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result { + // build predicate expression once + let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new(); + let logical_predicate_expr = + build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?; + // println!( + // "RowGroupPredicateBuilder::try_new, logical_predicate_expr: {:?}", + // logical_predicate_expr + // ); + // build physical predicate expression + let stat_fields = stat_column_req + .iter() + .map(|(_, _, f)| f.clone()) + .collect::>(); + let stat_schema = Schema::new(stat_fields); + let execution_context_state = ExecutionContextState { + datasources: HashMap::new(), + scalar_functions: HashMap::new(), + var_provider: HashMap::new(), + aggregate_functions: HashMap::new(), + config: ExecutionConfig::new(), + }; + let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr( + &logical_predicate_expr, + &stat_schema, + &execution_context_state, + )?; + // println!( + // "RowGroupPredicateBuilder::try_new, predicate_expr: {:?}", + // predicate_expr + // ); + Ok(Self { + parquet_schema, + predicate_expr, + stat_column_req, + }) + } + + /// Generate a predicate function used to filter row group metadata. + /// This function takes a list of all row groups as parameter, + /// so that DataFusion's physical expressions can be re-used by + /// generating a RecordBatch, containing statistics arrays, + /// on which the physical predicate expression is executed to generate a row group filter array. + /// The generated filter array is then used in the returned closure to filter row groups. + pub fn build_row_group_predicate( + &self, + row_group_metadata: &[RowGroupMetaData], + ) -> Box bool> { + // build statistics record batch + let predicate_result = build_statistics_record_batch( + row_group_metadata, + &self.parquet_schema, + &self.stat_column_req, + ) + .and_then(|statistics_batch| { + // execute predicate expression + self.predicate_expr.evaluate(&statistics_batch) + }) + .and_then(|v| match v { + ColumnarValue::Array(array) => Ok(array), + ColumnarValue::Scalar(_) => Err(DataFusionError::Plan( + "predicate expression didn't return an array".to_string(), + )), + }); + + let predicate_array = match predicate_result { + Ok(array) => array, + // row group filter array could not be built + // return a closure which will not filter out any row groups + _ => return Box::new(|_r, _i| true), + }; + + let predicate_array = predicate_array.as_any().downcast_ref::(); + match predicate_array { + // return row group predicate function + Some(array) => { + // when the result of the predicate expression for a row group is null / undefined, + // e.g. due to missing statistics, this row group can't be filtered out, + // so replace with true + let predicate_values = + array.iter().map(|x| x.unwrap_or(true)).collect::>(); + Box::new(move |_, i| predicate_values[i]) + } + // predicate result is not a BooleanArray + // return a closure which will not filter out any row groups + _ => Box::new(|_r, _i| true), + } + } +} + +/// Build a RecordBatch from a list of RowGroupMetadata structs, +/// creating arrays, one for each statistics column, +/// as requested in the stat_column_req parameter. +fn build_statistics_record_batch( + row_groups: &[RowGroupMetaData], + parquet_schema: &Schema, + stat_column_req: &Vec<(String, StatisticsType, Field)>, +) -> Result { + let mut fields = Vec::::new(); + let mut arrays = Vec::::new(); + for (column_name, statistics_type, stat_field) in stat_column_req { + if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) { + let statistics = row_groups + .iter() + .map(|g| g.column(column_index).statistics()) + .collect::>(); + let array = build_statistics_array( + &statistics, + *statistics_type, + stat_field.data_type(), + ); + fields.push(stat_field.clone()); + arrays.push(array); + } + } + let schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(schema, arrays) + .map_err(|err| DataFusionError::Plan(err.to_string())) +} + +struct StatisticsExpressionBuilder<'a> { + column_name: String, + column_expr: &'a Expr, + scalar_expr: &'a Expr, + parquet_field: &'a Field, + stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>, + reverse_operator: bool, +} + +impl<'a> StatisticsExpressionBuilder<'a> { + fn try_new( + left: &'a Expr, + right: &'a Expr, + parquet_schema: &'a Schema, + stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>, + ) -> Result { + // find column name; input could be a more complicated expression + let mut left_columns = HashSet::::new(); + utils::expr_to_column_names(left, &mut left_columns)?; + let mut right_columns = HashSet::::new(); + utils::expr_to_column_names(right, &mut right_columns)?; + let (column_expr, scalar_expr, column_names, reverse_operator) = + match (left_columns.len(), right_columns.len()) { + (1, 0) => (left, right, left_columns, false), + (0, 1) => (right, left, right_columns, true), + _ => { + // if more than one column used in expression - not supported + return Err(DataFusionError::Plan( + "Multi-column expressions are not currently supported" + .to_string(), + )); + } + }; + let column_name = column_names.iter().next().unwrap().clone(); + let field = match parquet_schema.column_with_name(&column_name) { + Some((_, f)) => f, + _ => { + // field not found in parquet schema + return Err(DataFusionError::Plan( + "Field not found in parquet schema".to_string(), + )); + } + }; + + Ok(Self { + column_name, + column_expr, + scalar_expr, + parquet_field: field, + stat_column_req, + reverse_operator, + }) + } + + fn correct_operator(&self, op: Operator) -> Operator { + if !self.reverse_operator { + return op; + } + + match op { + Operator::Lt => Operator::Gt, + Operator::Gt => Operator::Lt, + Operator::LtEq => Operator::GtEq, + Operator::GtEq => Operator::LtEq, + _ => op, + } + } + + // fn column_expr(&self) -> &Expr { + // self.column_expr + // } + + fn scalar_expr(&self) -> &Expr { + self.scalar_expr + } + + // fn column_name(&self) -> &String { + // &self.column_name + // } + + fn is_stat_column_missing(&self, statistics_type: StatisticsType) -> bool { + self.stat_column_req + .iter() + .filter(|(c, t, _f)| c == &self.column_name && t == &statistics_type) + .count() + == 0 + } + + fn stat_column_expr( + &mut self, + stat_type: StatisticsType, + suffix: &str, + ) -> Result { + let stat_column_name = format!("{}_{}", self.column_name, suffix); + let stat_field = Field::new( + stat_column_name.as_str(), + self.parquet_field.data_type().clone(), + self.parquet_field.is_nullable(), + ); + if self.is_stat_column_missing(stat_type) { + // only add statistics column if not previously added + self.stat_column_req + .push((self.column_name.clone(), stat_type, stat_field)); + } + rewrite_column_expr( + self.column_expr, + self.column_name.as_str(), + stat_column_name.as_str(), + ) + } + + fn min_column_expr(&mut self) -> Result { + self.stat_column_expr(StatisticsType::Min, "min") + } + + fn max_column_expr(&mut self) -> Result { + self.stat_column_expr(StatisticsType::Max, "max") + } +} + +/// replaces a column with an old name with a new name in an expression +fn rewrite_column_expr( + expr: &Expr, + column_old_name: &str, + column_new_name: &str, +) -> Result { + let expressions = utils::expr_sub_expressions(&expr)?; + let expressions = expressions + .iter() + .map(|e| rewrite_column_expr(e, column_old_name, column_new_name)) + .collect::>>()?; + + if let Expr::Column(name) = expr { + if name == column_old_name { + return Ok(Expr::Column(column_new_name.to_string())); + } + } + utils::rewrite_expression(&expr, &expressions) +} + +/// Translate logical filter expression into parquet statistics predicate expression +fn build_predicate_expression( + expr: &Expr, + parquet_schema: &Schema, + stat_column_req: &mut Vec<(String, StatisticsType, Field)>, +) -> Result { + use crate::logical_plan; + // predicate expression can only be a binary expression + let (left, op, right) = match expr { + Expr::BinaryExpr { left, op, right } => (left, *op, right), + _ => { + // unsupported expression - replace with TRUE + // this can still be useful when multiple conditions are joined using AND + // such as: column > 10 AND TRUE + return Ok(logical_plan::lit(true)); + } + }; + + if op == Operator::And || op == Operator::Or { + let left_expr = + build_predicate_expression(left, parquet_schema, stat_column_req)?; + let right_expr = + build_predicate_expression(right, parquet_schema, stat_column_req)?; + return Ok(logical_plan::binary_expr(left_expr, op, right_expr)); + } + + let expr_builder = StatisticsExpressionBuilder::try_new( + left, + right, + parquet_schema, + stat_column_req, + ); + let mut expr_builder = match expr_builder { + Ok(builder) => builder, + // allow partial failure in predicate expression generation + // this can still produce a useful predicate when multiple conditions are joined using AND + Err(_) => { + return Ok(logical_plan::lit(true)); + } + }; + let corrected_op = expr_builder.correct_operator(op); + let statistics_expr = match corrected_op { + Operator::Eq => { + // column = literal => (min, max) = literal => min <= literal && literal <= max + // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) + let min_column_expr = expr_builder.min_column_expr()?; + let max_column_expr = expr_builder.max_column_expr()?; + min_column_expr + .lt_eq(expr_builder.scalar_expr().clone()) + .and(expr_builder.scalar_expr().lt_eq(max_column_expr)) + } + Operator::Gt => { + // column > literal => (min, max) > literal => max > literal + expr_builder + .max_column_expr()? + .gt(expr_builder.scalar_expr().clone()) + } + Operator::GtEq => { + // column >= literal => (min, max) >= literal => max >= literal + expr_builder + .max_column_expr()? + .gt_eq(expr_builder.scalar_expr().clone()) + } + Operator::Lt => { + // column < literal => (min, max) < literal => min < literal + expr_builder + .min_column_expr()? + .lt(expr_builder.scalar_expr().clone()) + } + Operator::LtEq => { + // column <= literal => (min, max) <= literal => min <= literal + expr_builder + .min_column_expr()? + .lt_eq(expr_builder.scalar_expr().clone()) + } + // other expressions are not supported + _ => logical_plan::lit(true), + }; + Ok(statistics_expr) +} + +#[derive(Debug, Copy, Clone, PartialEq)] +enum StatisticsType { + Min, + Max, +} + +fn build_null_array(data_type: &DataType, length: usize) -> ArrayRef { + Arc::new(arrow::array::NullArray::new_with_type( + length, + data_type.clone(), + )) +} + +fn build_statistics_array( + statistics: &[Option<&ParquetStatistics>], + statistics_type: StatisticsType, + data_type: &DataType, +) -> ArrayRef { + let statistics_count = statistics.len(); + let first_group_stats = statistics.iter().find(|s| s.is_some()); + let first_group_stats = if let Some(Some(statistics)) = first_group_stats { + // found first row group with statistics defined + statistics + } else { + // no row group has statistics defined + return build_null_array(data_type, statistics_count); + }; + + let (data_size, arrow_type) = match first_group_stats { + ParquetStatistics::Int32(_) => (std::mem::size_of::(), DataType::Int32), + ParquetStatistics::Int64(_) => (std::mem::size_of::(), DataType::Int64), + ParquetStatistics::Float(_) => (std::mem::size_of::(), DataType::Float32), + ParquetStatistics::Double(_) => (std::mem::size_of::(), DataType::Float64), + ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => { + (0, DataType::Utf8) + } + _ => { + // type of statistics not supported + return build_null_array(data_type, statistics_count); + } + }; + + let statistics = statistics.iter().map(|s| { + s.filter(|s| s.has_min_max_set()) + .map(|s| match statistics_type { + StatisticsType::Min => s.min_bytes(), + StatisticsType::Max => s.max_bytes(), + }) + }); + + if arrow_type == DataType::Utf8 { + let data_size = statistics + .clone() + .map(|x| x.map(|b| b.len()).unwrap_or(0)) + .sum(); + let mut builder = + arrow::array::StringBuilder::with_capacity(statistics_count, data_size); + let string_statistics = + statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok())); + for maybe_string in string_statistics { + match maybe_string { + Some(string_value) => builder.append_value(string_value).unwrap(), + None => builder.append_null().unwrap(), + }; + } + return Arc::new(builder.finish()); + } + + let mut data_buffer = MutableBuffer::new(statistics_count * data_size); + let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count); + let mut null_count = 0; + for s in statistics { + if let Some(stat_data) = s { + bitmap_builder.append(true); + data_buffer.extend_from_slice(stat_data); + } else { + bitmap_builder.append(false); + data_buffer.resize(data_buffer.len() + data_size); + null_count += 1; + } + } + + let mut builder = ArrayData::builder(arrow_type) + .len(statistics_count) + .add_buffer(data_buffer.into()); + if null_count > 0 { + builder = builder.null_bit_buffer(bitmap_builder.finish()); + } + let array_data = builder.build(); + let statistics_array = make_array(array_data); + if statistics_array.data_type() == data_type { + return statistics_array; + } + // cast statistics array to required data type + arrow::compute::cast(&statistics_array, data_type) + .unwrap_or_else(|_| build_null_array(data_type, statistics_count)) +} + #[async_trait] impl ExecutionPlan for ParquetExec { /// Return a reference to Any that can be used for downcasting @@ -282,10 +775,17 @@ impl ExecutionPlan for ParquetExec { let filenames = self.partitions[partition].filenames.clone(); let projection = self.projection.clone(); + let predicate_builder = self.predicate_builder.clone(); let batch_size = self.batch_size; task::spawn_blocking(move || { - if let Err(e) = read_files(&filenames, &projection, batch_size, response_tx) { + if let Err(e) = read_files( + &filenames, + &projection, + &predicate_builder, + batch_size, + response_tx, + ) { println!("Parquet reader thread terminated due to error: {:?}", e); } }); @@ -310,13 +810,19 @@ fn send_result( fn read_files( filenames: &[String], projection: &[usize], + predicate_builder: &Option, batch_size: usize, response_tx: Sender>>, ) -> Result<()> { for filename in filenames { let file = File::open(&filename)?; - let file_reader = Arc::new(SerializedFileReader::new(file)?); - let mut arrow_reader = ParquetFileArrowReader::new(file_reader); + let mut file_reader = SerializedFileReader::new(file)?; + if let Some(predicate_builder) = predicate_builder { + let row_group_predicate = predicate_builder + .build_row_group_predicate(file_reader.metadata().row_groups()); + file_reader.filter_row_groups(&row_group_predicate); + } + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); let mut batch_reader = arrow_reader .get_record_reader_by_columns(projection.to_owned(), batch_size)?; loop { @@ -389,7 +895,10 @@ impl RecordBatchStream for ParquetStream { #[cfg(test)] mod tests { use super::*; + use arrow::array::{Int32Array, StringArray}; use futures::StreamExt; + use parquet::basic::Type as PhysicalType; + use parquet::schema::types::SchemaDescPtr; #[test] fn test_split_files() { @@ -432,7 +941,7 @@ mod tests { let testdata = arrow::util::test_util::parquet_test_data(); let filename = format!("{}/alltypes_plain.parquet", testdata); let parquet_exec = - ParquetExec::try_from_path(&filename, Some(vec![0, 1, 2]), 1024, 4)?; + ParquetExec::try_from_path(&filename, Some(vec![0, 1, 2]), None, 1024, 4)?; assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); let mut results = parquet_exec.execute(0).await?; @@ -457,4 +966,479 @@ mod tests { Ok(()) } + + #[test] + fn build_statistics_array_int32() { + // build row group metadata array + let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false); + let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false); + let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false); + let statistics = vec![Some(&s1), Some(&s2), Some(&s3)]; + + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32); + let int32_array = statistics_array + .as_any() + .downcast_ref::() + .unwrap(); + let int32_vec = int32_array.into_iter().collect::>(); + assert_eq!(int32_vec, vec![None, Some(2), Some(3)]); + + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32); + let int32_array = statistics_array + .as_any() + .downcast_ref::() + .unwrap(); + let int32_vec = int32_array.into_iter().collect::>(); + // here the first max value is None and not the Some(10) value which was actually set + // because the min value is None + assert_eq!(int32_vec, vec![None, Some(20), Some(30)]); + } + + #[test] + fn build_statistics_array_utf8() { + // build row group metadata array + let s1 = ParquetStatistics::byte_array(None, Some("10".into()), None, 0, false); + let s2 = ParquetStatistics::byte_array( + Some("2".into()), + Some("20".into()), + None, + 0, + false, + ); + let s3 = ParquetStatistics::byte_array( + Some("3".into()), + Some("30".into()), + None, + 0, + false, + ); + let statistics = vec![Some(&s1), Some(&s2), Some(&s3)]; + + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &DataType::Utf8); + let string_array = statistics_array + .as_any() + .downcast_ref::() + .unwrap(); + let string_vec = string_array.into_iter().collect::>(); + assert_eq!(string_vec, vec![None, Some("2"), Some("3")]); + + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Max, &DataType::Utf8); + let string_array = statistics_array + .as_any() + .downcast_ref::() + .unwrap(); + let string_vec = string_array.into_iter().collect::>(); + // here the first max value is None and not the Some("10") value which was actually set + // because the min value is None + assert_eq!(string_vec, vec![None, Some("20"), Some("30")]); + } + + #[test] + fn build_statistics_array_empty_stats() { + let data_type = DataType::Int32; + let statistics = vec![]; + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &data_type); + assert_eq!(statistics_array.len(), 0); + + let statistics = vec![None, None]; + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &data_type); + assert_eq!(statistics_array.len(), statistics.len()); + assert_eq!(statistics_array.data_type(), &data_type); + for i in 0..statistics_array.len() { + assert_eq!(statistics_array.is_null(i), true); + assert_eq!(statistics_array.is_valid(i), false); + } + } + + #[test] + fn build_statistics_array_unsupported_type() { + // boolean is not currently a supported type for statistics + let s1 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false); + let s2 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false); + let statistics = vec![Some(&s1), Some(&s2)]; + let data_type = DataType::Boolean; + let statistics_array = + build_statistics_array(&statistics, StatisticsType::Min, &data_type); + assert_eq!(statistics_array.len(), statistics.len()); + assert_eq!(statistics_array.data_type(), &data_type); + for i in 0..statistics_array.len() { + assert_eq!(statistics_array.is_null(i), true); + assert_eq!(statistics_array.is_valid(i), false); + } + } + + #[test] + fn row_group_predicate_eq() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_min LtEq Int32(1) And Int32(1) LtEq #c1_max"; + + // test column on the left + let expr = col("c1").eq(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + // test column on the right + let expr = lit(1).eq(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_gt() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_max Gt Int32(1)"; + + // test column on the left + let expr = col("c1").gt(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + // test column on the right + let expr = lit(1).lt(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_gt_eq() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_max GtEq Int32(1)"; + + // test column on the left + let expr = col("c1").gt_eq(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + // test column on the right + let expr = lit(1).lt_eq(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_lt() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_min Lt Int32(1)"; + + // test column on the left + let expr = col("c1").lt(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + // test column on the right + let expr = lit(1).gt(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_lt_eq() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "#c1_min LtEq Int32(1)"; + + // test column on the left + let expr = col("c1").lt_eq(lit(1)); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + // test column on the right + let expr = lit(1).gt_eq(col("c1")); + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_and() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + Field::new("c3", DataType::Int32, false), + ]); + // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression + let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3"))); + let expected_expr = "#c1_min Lt Int32(1) And Boolean(true)"; + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_or() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + ]); + // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 expression + let expr = col("c1").lt(lit(1)).or(col("c2").modulus(lit(2))); + let expected_expr = "#c1_min Lt Int32(1) Or Boolean(true)"; + let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_stat_column_req() -> Result<()> { + use crate::logical_plan::{col, lit}; + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + ]); + let mut stat_column_req = vec![]; + // c1 < 1 and (c2 = 2 or c2 = 3) + let expr = col("c1") + .lt(lit(1)) + .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3)))); + let expected_expr = "#c1_min Lt Int32(1) And #c2_min LtEq Int32(2) And Int32(2) LtEq #c2_max Or #c2_min LtEq Int32(3) And Int32(3) LtEq #c2_max"; + let predicate_expr = + build_predicate_expression(&expr, &schema, &mut stat_column_req)?; + assert_eq!(format!("{:?}", predicate_expr), expected_expr); + // c1 < 1 should add c1_min + let c1_min_field = Field::new("c1_min", DataType::Int32, false); + assert_eq!( + stat_column_req[0], + ("c1".to_owned(), StatisticsType::Min, c1_min_field) + ); + // c2 = 2 should add c2_min and c2_max + let c2_min_field = Field::new("c2_min", DataType::Int32, false); + assert_eq!( + stat_column_req[1], + ("c2".to_owned(), StatisticsType::Min, c2_min_field) + ); + let c2_max_field = Field::new("c2_max", DataType::Int32, false); + assert_eq!( + stat_column_req[2], + ("c2".to_owned(), StatisticsType::Max, c2_max_field) + ); + // c2 = 3 shouldn't add any new statistics fields + assert_eq!(stat_column_req.len(), 3); + + Ok(()) + } + + #[test] + fn row_group_predicate_builder_simple_expr() -> Result<()> { + use crate::logical_plan::{col, lit}; + // int > 1 => c1_max > 1 + let expr = col("c1").gt(lit(15)); + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; + + let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], + ); + let row_group_metadata = vec![rgm1, rgm2]; + let row_group_predicate = + predicate_builder.build_row_group_predicate(&row_group_metadata); + let row_group_filter = row_group_metadata + .iter() + .enumerate() + .map(|(i, g)| row_group_predicate(g, i)) + .collect::>(); + assert_eq!(row_group_filter, vec![false, true]); + + Ok(()) + } + + #[test] + fn row_group_predicate_builder_missing_stats() -> Result<()> { + use crate::logical_plan::{col, lit}; + // int > 1 => c1_max > 1 + let expr = col("c1").gt(lit(15)); + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; + + let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(None, None, None, 0, false)], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], + ); + let row_group_metadata = vec![rgm1, rgm2]; + let row_group_predicate = + predicate_builder.build_row_group_predicate(&row_group_metadata); + let row_group_filter = row_group_metadata + .iter() + .enumerate() + .map(|(i, g)| row_group_predicate(g, i)) + .collect::>(); + // missing statistics for first row group mean that the result from the predicate expression + // is null / undefined so the first row group can't be filtered out + assert_eq!(row_group_filter, vec![true, true]); + + Ok(()) + } + + #[test] + fn row_group_predicate_builder_partial_expr() -> Result<()> { + use crate::logical_plan::{col, lit}; + // test row group predicate with partially supported expression + // int > 1 and int % 2 => c1_max > 1 and true + let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2))); + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + ]); + let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema.clone())?; + + let schema_descr = get_test_schema_descr(vec![ + ("c1", PhysicalType::INT32), + ("c2", PhysicalType::INT32), + ]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(1), Some(10), None, 0, false), + ParquetStatistics::int32(Some(1), Some(10), None, 0, false), + ], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(11), Some(20), None, 0, false), + ParquetStatistics::int32(Some(11), Some(20), None, 0, false), + ], + ); + let row_group_metadata = vec![rgm1, rgm2]; + let row_group_predicate = + predicate_builder.build_row_group_predicate(&row_group_metadata); + let row_group_filter = row_group_metadata + .iter() + .enumerate() + .map(|(i, g)| row_group_predicate(g, i)) + .collect::>(); + // the first row group is still filtered out because the predicate expression can be partially evaluated + // when conditions are joined using AND + assert_eq!(row_group_filter, vec![false, true]); + + // if conditions in predicate are joined with OR and an unsupported expression is used + // this bypasses the entire predicate expression and no row groups are filtered out + let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); + let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; + let row_group_predicate = + predicate_builder.build_row_group_predicate(&row_group_metadata); + let row_group_filter = row_group_metadata + .iter() + .enumerate() + .map(|(i, g)| row_group_predicate(g, i)) + .collect::>(); + assert_eq!(row_group_filter, vec![true, true]); + + Ok(()) + } + + #[test] + fn row_group_predicate_builder_unsupported_type() -> Result<()> { + use crate::logical_plan::{col, lit}; + // test row group predicate with unsupported statistics type (boolean) + // where a null array is generated for some statistics columns + // int > 1 and bool = true => c1_max > 1 and null + let expr = col("c1").gt(lit(15)).and(col("c2").eq(lit(true))); + let schema = Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Boolean, false), + ]); + let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; + + let schema_descr = get_test_schema_descr(vec![ + ("c1", PhysicalType::INT32), + ("c2", PhysicalType::BOOLEAN), + ]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(1), Some(10), None, 0, false), + ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), + ], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(11), Some(20), None, 0, false), + ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), + ], + ); + let row_group_metadata = vec![rgm1, rgm2]; + let row_group_predicate = + predicate_builder.build_row_group_predicate(&row_group_metadata); + let row_group_filter = row_group_metadata + .iter() + .enumerate() + .map(|(i, g)| row_group_predicate(g, i)) + .collect::>(); + // no row group is filtered out because the predicate expression can't be evaluated + // when a null array is generated for a statistics column, + // because the null values propagate to the end result, making the predicate result undefined + assert_eq!(row_group_filter, vec![true, true]); + + Ok(()) + } + + fn get_row_group_meta_data( + schema_descr: &SchemaDescPtr, + column_statistics: Vec, + ) -> RowGroupMetaData { + use parquet::file::metadata::ColumnChunkMetaData; + let mut columns = vec![]; + for (i, s) in column_statistics.iter().enumerate() { + let column = ColumnChunkMetaData::builder(schema_descr.column(i)) + .set_statistics(s.clone()) + .build() + .unwrap(); + columns.push(column); + } + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(1000) + .set_total_byte_size(2000) + .set_column_metadata(columns) + .build() + .unwrap() + } + + fn get_test_schema_descr(fields: Vec<(&str, PhysicalType)>) -> SchemaDescPtr { + use parquet::schema::types::{SchemaDescriptor, Type as SchemaType}; + let mut schema_fields = fields + .iter() + .map(|(n, t)| { + Arc::new(SchemaType::primitive_type_builder(n, *t).build().unwrap()) + }) + .collect::>(); + let schema = SchemaType::group_type_builder("schema") + .with_fields(&mut schema_fields) + .build() + .unwrap(); + + Arc::new(SchemaDescriptor::new(Arc::new(schema))) + } } diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 6af2c485b72..6ba0d2606a2 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -475,7 +475,7 @@ impl DefaultPhysicalPlanner { Expr::BinaryExpr { left, op, right } => { let lhs = self.create_physical_expr(left, input_schema, ctx_state)?; let rhs = self.create_physical_expr(right, input_schema, ctx_state)?; - binary(lhs, op.clone(), rhs, input_schema) + binary(lhs, *op, rhs, input_schema) } Expr::Case { expr, diff --git a/rust/datafusion/src/sql/utils.rs b/rust/datafusion/src/sql/utils.rs index ce8b4d1e01f..976e2c574d9 100644 --- a/rust/datafusion/src/sql/utils.rs +++ b/rust/datafusion/src/sql/utils.rs @@ -305,7 +305,7 @@ where }), Expr::BinaryExpr { left, right, op } => Ok(Expr::BinaryExpr { left: Box::new(clone_with_replacement(&**left, replacement_fn)?), - op: op.clone(), + op: *op, right: Box::new(clone_with_replacement(&**right, replacement_fn)?), }), Expr::Case { diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index f456e655a59..71b84cd981b 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -435,16 +435,11 @@ where } fn next_batch(&mut self, batch_size: usize) -> Result { - // Try to initialized column reader + // Try to initialize column reader if self.column_reader.is_none() { - let init_result = self.next_column_reader()?; - if !init_result { - return Err(general_err!("No page left!")); - } + self.next_column_reader()?; } - assert!(self.column_reader.is_some()); - let mut data_buffer: Vec = Vec::with_capacity(batch_size); data_buffer.resize_with(batch_size, T::T::default); @@ -466,7 +461,7 @@ where let mut num_read = 0; - while num_read < batch_size { + while self.column_reader.is_some() && num_read < batch_size { let num_to_read = batch_size - num_read; let cur_data_buf = &mut data_buffer[num_read..]; let cur_def_levels_buf = @@ -2127,6 +2122,37 @@ mod tests { } } + #[test] + fn test_complex_array_reader_no_pages() { + let message_type = " + message test_schema { + REPEATED Group test_mid { + OPTIONAL BYTE_ARRAY leaf (UTF8); + } + } + "; + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + let column_desc = schema.column(0); + let pages: Vec> = Vec::new(); + let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); + + let converter = Utf8Converter::new(Utf8ArrayConverter {}); + let mut array_reader = + ComplexObjectArrayReader::::new( + Box::new(page_iterator), + column_desc, + converter, + None, + ) + .unwrap(); + + let values_per_page = 100; // this value is arbitrary in this test - the result should always be an array of 0 length + let array = array_reader.next_batch(values_per_page).unwrap(); + assert_eq!(array.len(), 0); + } + #[test] fn test_complex_array_reader_def_and_rep_levels() { // Construct column schema diff --git a/rust/parquet/src/file/serialized_reader.rs b/rust/parquet/src/file/serialized_reader.rs index 663412d4087..9fbff419186 100644 --- a/rust/parquet/src/file/serialized_reader.rs +++ b/rust/parquet/src/file/serialized_reader.rs @@ -137,6 +137,24 @@ impl SerializedFileReader { metadata, }) } + + /// Filters row group metadata to only those row groups, + /// for which the predicate function returns true + pub fn filter_row_groups( + &mut self, + predicate: &dyn Fn(&RowGroupMetaData, usize) -> bool, + ) { + let mut filtered_row_groups = Vec::::new(); + for (i, row_group_metadata) in self.metadata.row_groups().iter().enumerate() { + if predicate(row_group_metadata, i) { + filtered_row_groups.push(row_group_metadata.clone()); + } + } + self.metadata = ParquetMetaData::new( + self.metadata.file_metadata().clone(), + filtered_row_groups, + ); + } } impl FileReader for SerializedFileReader { @@ -737,4 +755,21 @@ mod tests { Some("foo.baz.Foobaz$Event".to_owned()) ); } + + #[test] + fn test_file_reader_filter_row_groups() -> Result<()> { + let test_file = get_test_file("alltypes_plain.parquet"); + let mut reader = SerializedFileReader::new(test_file)?; + + // test initial number of row groups + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + + // test filtering out all row groups + reader.filter_row_groups(&|_, _| false); + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 0); + + Ok(()) + } } From 127961af4819c12e51bcedf7422207f157fd7cff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Mu=CC=88ller?= Date: Tue, 19 Jan 2021 18:31:23 +0100 Subject: [PATCH 09/59] ARROW-10489: [C++] Add Intel C++ compiler options for different warning levels MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See [Intel Compiler warning flag documentation](https://software.intel.com/content/www/us/en/develop/documentation/cpp-compiler-developer-guide-and-reference/top/compiler-reference/error-handling-1/warnings-errors-and-remarks.html). Closes #9266 from jcmuel/master Authored-by: Johannes Müller Signed-off-by: Antoine Pitrou --- cpp/cmake_modules/SetupCxxFlags.cmake | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake index 03d6a4ea34e..71443332d29 100644 --- a/cpp/cmake_modules/SetupCxxFlags.cmake +++ b/cpp/cmake_modules/SetupCxxFlags.cmake @@ -267,6 +267,16 @@ if("${BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-declarations") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-sign-conversion") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unused-variable") + elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Intel") + if(WIN32) + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /Wall") + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /Wno-deprecated") + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /Wno-unused-variable") + else() + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall") + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated") + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unused-variable") + endif() else() message(FATAL_ERROR "${UNKNOWN_COMPILER_MESSAGE}") endif() @@ -289,6 +299,12 @@ elseif("${BUILD_WARNING_LEVEL}" STREQUAL "EVERYTHING") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wpedantic") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wextra") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unused-parameter") + elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Intel") + if(WIN32) + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /Wall") + else() + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall") + endif() else() message(FATAL_ERROR "${UNKNOWN_COMPILER_MESSAGE}") endif() @@ -304,9 +320,14 @@ else() set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /W3") elseif(CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang" OR CMAKE_CXX_COMPILER_ID STREQUAL "Clang" - OR CMAKE_CXX_COMPILER_ID STREQUAL "GNU" - OR CMAKE_CXX_COMPILER_ID STREQUAL "Intel") + OR CMAKE_CXX_COMPILER_ID STREQUAL "GNU") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall") + elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Intel") + if(WIN32) + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /Wall") + else() + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall") + endif() else() message(FATAL_ERROR "${UNKNOWN_COMPILER_MESSAGE}") endif() From 0e5d6463ae8165158ddc894b9266639783c114c5 Mon Sep 17 00:00:00 2001 From: "Maarten A. Breddels" Date: Tue, 19 Jan 2021 18:34:06 +0100 Subject: [PATCH 10/59] ARROW-9128: [C++] Implement string space trimming kernels: trim, ltrim, and rtrim There is one obvious loose end in this PR, which is where to generate the `std::set` based on the `TrimOptions` (now in the ctor of UTF8TrimBase). I'm not sure what the lifetime guarantees are for this object (TrimOptions), where it makes sense to initialize this set, and when an (utf8 decoding) error occurs, how/where to report this. Although this is not a costly operation, assuming people don't pass in a billion characters to trim, I do wonder what the best approach here is in general. It does not make much sense to create the `std::set` at each `Exec` call, but that is what happens now. (This also seems to happen in `TransformMatchSubstring` for creating the `prefix_table` btw.) Maybe a good place to put per-kernel pre-compute results are the `*Options` objects, but I'm not sure if that makes sense in the current architecture. Another idea is to explore alternatives to the `std::set`. It seem that (based on the TrimManyAscii benchmark), `std::unordered_set` seemed a bit slower, and simply using a linear search: `std::find(options.characters.begin(), options.characters.end(), c) != options.characters.end()` in the predicate instead of the set doesn't seem to affect performance that much. In CPython, a bloom filter is used, I could explore to see if that makes sense, but the implementation in Arrow lives under the parquet namespace. Closes #8621 from maartenbreddels/ARROW-9128 Authored-by: Maarten A. Breddels Signed-off-by: Antoine Pitrou --- cpp/src/arrow/compute/api_scalar.h | 7 + .../arrow/compute/kernels/codegen_internal.h | 29 ++ .../arrow/compute/kernels/scalar_string.cc | 431 ++++++++++++++++-- .../kernels/scalar_string_benchmark.cc | 23 + .../compute/kernels/scalar_string_test.cc | 60 +++ cpp/src/arrow/util/utf8.h | 67 +++ cpp/src/arrow/util/utf8_util_test.cc | 58 +++ docs/source/cpp/compute.rst | 47 ++ python/pyarrow/_compute.pyx | 17 + python/pyarrow/compute.py | 1 + python/pyarrow/includes/libarrow.pxd | 5 + python/pyarrow/tests/test_compute.py | 18 + 12 files changed, 731 insertions(+), 32 deletions(-) diff --git a/cpp/src/arrow/compute/api_scalar.h b/cpp/src/arrow/compute/api_scalar.h index da219b88793..37f3077e4bd 100644 --- a/cpp/src/arrow/compute/api_scalar.h +++ b/cpp/src/arrow/compute/api_scalar.h @@ -92,6 +92,13 @@ struct ARROW_EXPORT StrptimeOptions : public FunctionOptions { TimeUnit::type unit; }; +struct ARROW_EXPORT TrimOptions : public FunctionOptions { + explicit TrimOptions(std::string characters) : characters(std::move(characters)) {} + + /// The individual characters that can be trimmed from the string. + std::string characters; +}; + enum CompareOperator : int8_t { EQUAL, NOT_EQUAL, diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h index f59f39234b7..c3a6b4b9772 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.h +++ b/cpp/src/arrow/compute/kernels/codegen_internal.h @@ -126,6 +126,35 @@ struct OptionsWrapper : public KernelState { OptionsType options; }; +/// KernelState adapter for when the state is an instance constructed with the +/// KernelContext and the FunctionOptions as argument +template +struct KernelStateFromFunctionOptions : public KernelState { + explicit KernelStateFromFunctionOptions(KernelContext* ctx, OptionsType state) + : state(StateType(ctx, std::move(state))) {} + + static std::unique_ptr Init(KernelContext* ctx, + const KernelInitArgs& args) { + if (auto options = static_cast(args.options)) { + return ::arrow::internal::make_unique(ctx, + *options); + } + + ctx->SetStatus( + Status::Invalid("Attempted to initialize KernelState from null FunctionOptions")); + return NULLPTR; + } + + static const StateType& Get(const KernelState& state) { + return ::arrow::internal::checked_cast(state) + .state; + } + + static const StateType& Get(KernelContext* ctx) { return Get(*ctx->state()); } + + StateType state; +}; + // ---------------------------------------------------------------------- // Input and output value type definitions diff --git a/cpp/src/arrow/compute/kernels/scalar_string.cc b/cpp/src/arrow/compute/kernels/scalar_string.cc index 2f4a0d45ed2..2eeac71c727 100644 --- a/cpp/src/arrow/compute/kernels/scalar_string.cc +++ b/cpp/src/arrow/compute/kernels/scalar_string.cc @@ -86,25 +86,19 @@ void EnsureLookupTablesFilled() { }); } +#endif // ARROW_WITH_UTF8PROC + +/// Transform string -> string with a reasonable guess on the maximum number of codepoints template -struct UTF8Transform { +struct StringTransform { using offset_type = typename Type::offset_type; using ArrayType = typename TypeTraits::ArrayType; - static bool Transform(const uint8_t* input, offset_type input_string_ncodeunits, - uint8_t* output, offset_type* output_written) { - uint8_t* output_start = output; - if (ARROW_PREDICT_FALSE( - !arrow::util::UTF8Transform(input, input + input_string_ncodeunits, &output, - Derived::TransformCodepoint))) { - return false; - } - *output_written = static_cast(output - output_start); - return true; - } - + static int64_t MaxCodeunits(offset_type input_ncodeunits) { return input_ncodeunits; } static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - EnsureLookupTablesFilled(); + Derived().Execute(ctx, batch, out); + } + void Execute(KernelContext* ctx, const ExecBatch& batch, Datum* out) { if (batch[0].kind() == Datum::ARRAY) { const ArrayData& input = *batch[0].array(); ArrayType input_boxed(batch[0].array()); @@ -113,14 +107,7 @@ struct UTF8Transform { offset_type input_ncodeunits = input_boxed.total_values_length(); offset_type input_nstrings = static_cast(input.length); - // Section 5.18 of the Unicode spec claim that the number of codepoints for case - // mapping can grow by a factor of 3. This means grow by a factor of 3 in bytes - // However, since we don't support all casings (SpecialCasing.txt) the growth - // is actually only at max 3/2 (as covered by the unittest). - // Note that rounding down the 3/2 is ok, since only codepoints encoded by - // two code units (even) can grow to 3 code units. - - int64_t output_ncodeunits_max = static_cast(input_ncodeunits) * 3 / 2; + int64_t output_ncodeunits_max = Derived::MaxCodeunits(input_ncodeunits); if (output_ncodeunits_max > std::numeric_limits::max()) { ctx->SetStatus(Status::CapacityError( "Result might not fit in a 32bit utf8 array, convert to large_utf8")); @@ -141,9 +128,9 @@ struct UTF8Transform { offset_type input_string_ncodeunits; const uint8_t* input_string = input_boxed.GetValue(i, &input_string_ncodeunits); offset_type encoded_nbytes = 0; - if (ARROW_PREDICT_FALSE(!Derived::Transform(input_string, input_string_ncodeunits, - output_str + output_ncodeunits, - &encoded_nbytes))) { + if (ARROW_PREDICT_FALSE(!static_cast(*this).Transform( + input_string, input_string_ncodeunits, output_str + output_ncodeunits, + &encoded_nbytes))) { ctx->SetStatus(Status::Invalid("Invalid UTF8 sequence in input")); return; } @@ -161,8 +148,7 @@ struct UTF8Transform { result->is_valid = true; offset_type data_nbytes = static_cast(input.value->size()); - // See note above in the Array version explaining the 3 / 2 - int64_t output_ncodeunits_max = static_cast(data_nbytes) * 3 / 2; + int64_t output_ncodeunits_max = Derived::MaxCodeunits(data_nbytes); if (output_ncodeunits_max > std::numeric_limits::max()) { ctx->SetStatus(Status::CapacityError( "Result might not fit in a 32bit utf8 array, convert to large_utf8")); @@ -172,9 +158,9 @@ struct UTF8Transform { ctx->Allocate(output_ncodeunits_max)); result->value = value_buffer; offset_type encoded_nbytes = 0; - if (ARROW_PREDICT_FALSE(!Derived::Transform(input.value->data(), data_nbytes, - value_buffer->mutable_data(), - &encoded_nbytes))) { + if (ARROW_PREDICT_FALSE(!static_cast(*this).Transform( + input.value->data(), data_nbytes, value_buffer->mutable_data(), + &encoded_nbytes))) { ctx->SetStatus(Status::Invalid("Invalid UTF8 sequence in input")); return; } @@ -186,8 +172,42 @@ struct UTF8Transform { } }; +#ifdef ARROW_WITH_UTF8PROC + +// transforms per codepoint +template +struct StringTransformCodepoint : StringTransform { + using Base = StringTransform; + using offset_type = typename Base::offset_type; + + bool Transform(const uint8_t* input, offset_type input_string_ncodeunits, + uint8_t* output, offset_type* output_written) { + uint8_t* output_start = output; + if (ARROW_PREDICT_FALSE( + !arrow::util::UTF8Transform(input, input + input_string_ncodeunits, &output, + Derived::TransformCodepoint))) { + return false; + } + *output_written = static_cast(output - output_start); + return true; + } + static int64_t MaxCodeunits(offset_type input_ncodeunits) { + // Section 5.18 of the Unicode spec claim that the number of codepoints for case + // mapping can grow by a factor of 3. This means grow by a factor of 3 in bytes + // However, since we don't support all casings (SpecialCasing.txt) the growth + // in bytes iss actually only at max 3/2 (as covered by the unittest). + // Note that rounding down the 3/2 is ok, since only codepoints encoded by + // two code units (even) can grow to 3 code units. + return static_cast(input_ncodeunits) * 3 / 2; + } + void Execute(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + EnsureLookupTablesFilled(); + Base::Execute(ctx, batch, out); + } +}; + template -struct UTF8Upper : UTF8Transform> { +struct UTF8Upper : StringTransformCodepoint> { inline static uint32_t TransformCodepoint(uint32_t codepoint) { return codepoint <= kMaxCodepointLookup ? lut_upper_codepoint[codepoint] : utf8proc_toupper(codepoint); @@ -195,7 +215,7 @@ struct UTF8Upper : UTF8Transform> { }; template -struct UTF8Lower : UTF8Transform> { +struct UTF8Lower : StringTransformCodepoint> { inline static uint32_t TransformCodepoint(uint32_t codepoint) { return codepoint <= kMaxCodepointLookup ? lut_lower_codepoint[codepoint] : utf8proc_tolower(codepoint); @@ -1233,6 +1253,312 @@ Result StrptimeResolve(KernelContext* ctx, const std::vector +struct UTF8TrimWhitespaceBase : StringTransform { + using Base = StringTransform; + using offset_type = typename Base::offset_type; + bool Transform(const uint8_t* input, offset_type input_string_ncodeunits, + uint8_t* output, offset_type* output_written) { + const uint8_t* begin = input; + const uint8_t* end = input + input_string_ncodeunits; + const uint8_t* end_trimmed = end; + const uint8_t* begin_trimmed = begin; + + auto predicate = [](uint32_t c) { return !IsSpaceCharacterUnicode(c); }; + if (left && !ARROW_PREDICT_TRUE( + arrow::util::UTF8FindIf(begin, end, predicate, &begin_trimmed))) { + return false; + } + if (right && (begin_trimmed < end)) { + if (!ARROW_PREDICT_TRUE(arrow::util::UTF8FindIfReverse(begin_trimmed, end, + predicate, &end_trimmed))) { + return false; + } + } + std::copy(begin_trimmed, end_trimmed, output); + *output_written = static_cast(end_trimmed - begin_trimmed); + return true; + } + void Execute(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + EnsureLookupTablesFilled(); + Base::Execute(ctx, batch, out); + } +}; + +template +struct UTF8TrimWhitespace + : UTF8TrimWhitespaceBase> {}; + +template +struct UTF8LTrimWhitespace + : UTF8TrimWhitespaceBase> {}; + +template +struct UTF8RTrimWhitespace + : UTF8TrimWhitespaceBase> {}; + +struct TrimStateUTF8 { + TrimOptions options_; + std::vector codepoints_; + explicit TrimStateUTF8(KernelContext* ctx, TrimOptions options) + : options_(std::move(options)) { + if (!ARROW_PREDICT_TRUE( + arrow::util::UTF8ForEach(options_.characters, [&](uint32_t c) { + codepoints_.resize( + std::max(c + 1, static_cast(codepoints_.size()))); + codepoints_.at(c) = true; + }))) { + ctx->SetStatus(Status::Invalid("Invalid UTF8 sequence in input")); + } + } +}; + +template +struct UTF8TrimBase : StringTransform { + using Base = StringTransform; + using offset_type = typename Base::offset_type; + using State = KernelStateFromFunctionOptions; + TrimStateUTF8 state_; + + explicit UTF8TrimBase(TrimStateUTF8 state) : state_(std::move(state)) {} + + static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + TrimStateUTF8 state = State::Get(ctx); + Derived(state).Execute(ctx, batch, out); + } + + void Execute(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + EnsureLookupTablesFilled(); + Base::Execute(ctx, batch, out); + } + + bool Transform(const uint8_t* input, offset_type input_string_ncodeunits, + uint8_t* output, offset_type* output_written) { + const uint8_t* begin = input; + const uint8_t* end = input + input_string_ncodeunits; + const uint8_t* end_trimmed = end; + const uint8_t* begin_trimmed = begin; + + auto predicate = [&](uint32_t c) { + bool contains = state_.codepoints_[c]; + return !contains; + }; + if (left && !ARROW_PREDICT_TRUE( + arrow::util::UTF8FindIf(begin, end, predicate, &begin_trimmed))) { + return false; + } + if (right && (begin_trimmed < end)) { + if (!ARROW_PREDICT_TRUE(arrow::util::UTF8FindIfReverse(begin_trimmed, end, + predicate, &end_trimmed))) { + return false; + } + } + std::copy(begin_trimmed, end_trimmed, output); + *output_written = static_cast(end_trimmed - begin_trimmed); + return true; + } +}; +template +struct UTF8Trim : UTF8TrimBase> { + using Base = UTF8TrimBase>; + using Base::Base; +}; + +template +struct UTF8LTrim : UTF8TrimBase> { + using Base = UTF8TrimBase>; + using Base::Base; +}; + +template +struct UTF8RTrim : UTF8TrimBase> { + using Base = UTF8TrimBase>; + using Base::Base; +}; + +#endif + +template +struct AsciiTrimWhitespaceBase : StringTransform { + using offset_type = typename Type::offset_type; + bool Transform(const uint8_t* input, offset_type input_string_ncodeunits, + uint8_t* output, offset_type* output_written) { + const uint8_t* begin = input; + const uint8_t* end = input + input_string_ncodeunits; + const uint8_t* end_trimmed = end; + + auto predicate = [](unsigned char c) { return !IsSpaceCharacterAscii(c); }; + const uint8_t* begin_trimmed = left ? std::find_if(begin, end, predicate) : begin; + if (right & (begin_trimmed < end)) { + std::reverse_iterator rbegin(end); + std::reverse_iterator rend(begin_trimmed); + end_trimmed = std::find_if(rbegin, rend, predicate).base(); + } + std::copy(begin_trimmed, end_trimmed, output); + *output_written = static_cast(end_trimmed - begin_trimmed); + return true; + } +}; + +template +struct AsciiTrimWhitespace + : AsciiTrimWhitespaceBase> {}; + +template +struct AsciiLTrimWhitespace + : AsciiTrimWhitespaceBase> {}; + +template +struct AsciiRTrimWhitespace + : AsciiTrimWhitespaceBase> {}; + +template +struct AsciiTrimBase : StringTransform { + using Base = StringTransform; + using offset_type = typename Base::offset_type; + using State = OptionsWrapper; + TrimOptions options_; + std::vector characters_; + + explicit AsciiTrimBase(TrimOptions options) + : options_(std::move(options)), characters_(256) { + std::for_each(options_.characters.begin(), options_.characters.end(), + [&](char c) { characters_[static_cast(c)] = true; }); + } + + static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + TrimOptions options = State::Get(ctx); + Derived(options).Execute(ctx, batch, out); + } + + bool Transform(const uint8_t* input, offset_type input_string_ncodeunits, + uint8_t* output, offset_type* output_written) { + const uint8_t* begin = input; + const uint8_t* end = input + input_string_ncodeunits; + const uint8_t* end_trimmed = end; + const uint8_t* begin_trimmed; + + auto predicate = [&](unsigned char c) { + bool contains = characters_[c]; + return !contains; + }; + + begin_trimmed = left ? std::find_if(begin, end, predicate) : begin; + if (right & (begin_trimmed < end)) { + std::reverse_iterator rbegin(end); + std::reverse_iterator rend(begin_trimmed); + end_trimmed = std::find_if(rbegin, rend, predicate).base(); + } + std::copy(begin_trimmed, end_trimmed, output); + *output_written = static_cast(end_trimmed - begin_trimmed); + return true; + } +}; + +template +struct AsciiTrim : AsciiTrimBase> { + using Base = AsciiTrimBase>; + using Base::Base; +}; + +template +struct AsciiLTrim : AsciiTrimBase> { + using Base = AsciiTrimBase>; + using Base::Base; +}; + +template +struct AsciiRTrim : AsciiTrimBase> { + using Base = AsciiTrimBase>; + using Base::Base; +}; + +const FunctionDoc utf8_trim_whitespace_doc( + "Trim leading and trailing whitespace characters", + ("For each string in `strings`, emit a string with leading and trailing whitespace\n" + "characters removed, where whitespace characters are defined by the Unicode\n" + "standard. Null values emit null."), + {"strings"}); + +const FunctionDoc utf8_ltrim_whitespace_doc( + "Trim leading whitespace characters", + ("For each string in `strings`, emit a string with leading whitespace\n" + "characters removed, where whitespace characters are defined by the Unicode\n" + "standard. Null values emit null."), + {"strings"}); + +const FunctionDoc utf8_rtrim_whitespace_doc( + "Trim trailing whitespace characters", + ("For each string in `strings`, emit a string with trailing whitespace\n" + "characters removed, where whitespace characters are defined by the Unicode\n" + "standard. Null values emit null."), + {"strings"}); + +const FunctionDoc ascii_trim_whitespace_doc( + "Trim leading and trailing ASCII whitespace characters", + ("For each string in `strings`, emit a string with leading and trailing ASCII\n" + "whitespace characters removed. Use `utf8_trim_whitespace` to trim Unicode\n" + "whitespace characters. Null values emit null."), + {"strings"}); + +const FunctionDoc ascii_ltrim_whitespace_doc( + "Trim leading ASCII whitespace characters", + ("For each string in `strings`, emit a string with leading ASCII whitespace\n" + "characters removed. Use `utf8_ltrim_whitespace` to trim leading Unicode\n" + "whitespace characters. Null values emit null."), + {"strings"}); + +const FunctionDoc ascii_rtrim_whitespace_doc( + "Trim trailing ASCII whitespace characters", + ("For each string in `strings`, emit a string with trailing ASCII whitespace\n" + "characters removed. Use `utf8_rtrim_whitespace` to trim trailing Unicode\n" + "whitespace characters. Null values emit null."), + {"strings"}); + +const FunctionDoc utf8_trim_doc( + "Trim leading and trailing characters present in the `characters` arguments", + ("For each string in `strings`, emit a string with leading and trailing\n" + "characters removed that are present in the `characters` argument. Null values\n" + "emit null."), + {"strings"}, "TrimOptions"); + +const FunctionDoc utf8_ltrim_doc( + "Trim leading characters present in the `characters` arguments", + ("For each string in `strings`, emit a string with leading\n" + "characters removed that are present in the `characters` argument. Null values\n" + "emit null."), + {"strings"}, "TrimOptions"); + +const FunctionDoc utf8_rtrim_doc( + "Trim trailing characters present in the `characters` arguments", + ("For each string in `strings`, emit a string with leading " + "characters removed that are present in the `characters` argument. Null values\n" + "emit null."), + {"strings"}, "TrimOptions"); + +const FunctionDoc ascii_trim_doc( + utf8_trim_doc.summary + "", + utf8_trim_doc.description + + ("\nBoth the input string as the `characters` argument are interepreted as\n" + "ASCII characters, to trim non-ASCII characters, use `utf8_trim`."), + {"strings"}, "TrimOptions"); + +const FunctionDoc ascii_ltrim_doc( + utf8_ltrim_doc.summary + "", + utf8_ltrim_doc.description + + ("\nBoth the input string as the `characters` argument are interepreted as\n" + "ASCII characters, to trim non-ASCII characters, use `utf8_trim`."), + {"strings"}, "TrimOptions"); + +const FunctionDoc ascii_rtrim_doc( + utf8_rtrim_doc.summary + "", + utf8_rtrim_doc.description + + ("\nBoth the input string as the `characters` argument are interepreted as\n" + "ASCII characters, to trim non-ASCII characters, use `utf8_trim`."), + {"strings"}, "TrimOptions"); + const FunctionDoc strptime_doc( "Parse timestamps", ("For each string in `strings`, parse it as a timestamp.\n" @@ -1291,6 +1617,26 @@ void MakeUnaryStringBatchKernel( DCHECK_OK(registry->AddFunction(std::move(func))); } +template